module Control.Distributed.Process.UnsafePrimitives
(
send
, sendChan
, nsend
, nsendRemote
, usend
, wrapMessage
) where
import Control.Distributed.Process.Internal.Messaging
( sendMessage
, sendBinary
, sendCtrlMsg
)
import Control.Distributed.Process.Management.Internal.Types
( MxEvent(..)
)
import Control.Distributed.Process.Management.Internal.Trace.Types
( traceEvent
)
import Control.Distributed.Process.Internal.Types
( ProcessId(..)
, NodeId(..)
, LocalNode(..)
, LocalProcess(..)
, Process(..)
, SendPort(..)
, ProcessSignal(..)
, Identifier(..)
, ImplicitReconnect(..)
, SendPortId(..)
, Message
, createMessage
, sendPortProcessId
, unsafeCreateUnencodedMessage
)
import Control.Distributed.Process.Serializable (Serializable)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Reader (ask)
nsend :: Serializable a => String -> a -> Process ()
nsend :: forall a. Serializable a => String -> a -> Process ()
nsend String
label a
msg = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
let us :: ProcessId
us = LocalProcess -> ProcessId
processId LocalProcess
proc
let msg' :: Message
msg' = a -> Message
forall a. Serializable a => a -> Message
wrapMessage a
msg
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus (LocalProcess -> LocalNode
processNode LocalProcess
proc))
(String -> ProcessId -> Message -> MxEvent
MxSentToName String
label ProcessId
us Message
msg')
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (String -> Message -> ProcessSignal
NamedSend String
label Message
msg')
nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()
nsendRemote :: forall a. Serializable a => NodeId -> String -> a -> Process ()
nsendRemote NodeId
nid String
label a
msg = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
let us :: ProcessId
us = LocalProcess -> ProcessId
processId LocalProcess
proc
let node :: LocalNode
node = LocalProcess -> LocalNode
processNode LocalProcess
proc
if LocalNode -> NodeId
localNodeId LocalNode
node NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
nid
then String -> a -> Process ()
forall a. Serializable a => String -> a -> Process ()
nsend String
label a
msg
else
let lbl :: String
lbl = String
label String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"@" String -> String -> String
forall a. [a] -> [a] -> [a]
++ NodeId -> String
forall a. Show a => a -> String
show NodeId
nid in do
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node)
(String -> ProcessId -> Message -> MxEvent
MxSentToName String
lbl ProcessId
us (a -> Message
forall a. Serializable a => a -> Message
wrapMessage a
msg))
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid) (String -> Message -> ProcessSignal
NamedSend String
label (a -> Message
forall a. Serializable a => a -> Message
createMessage a
msg))
send :: Serializable a => ProcessId -> a -> Process ()
send :: forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
them a
msg = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
let node :: NodeId
node = LocalNode -> NodeId
localNodeId (LocalProcess -> LocalNode
processNode LocalProcess
proc)
destNode :: NodeId
destNode = (ProcessId -> NodeId
processNodeId ProcessId
them)
us :: ProcessId
us = (LocalProcess -> ProcessId
processId LocalProcess
proc)
msg' :: Message
msg' = a -> Message
forall a. Serializable a => a -> Message
wrapMessage a
msg in do
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus (LocalProcess -> LocalNode
processNode LocalProcess
proc))
(ProcessId -> ProcessId -> Message -> MxEvent
MxSent ProcessId
them ProcessId
us Message
msg')
if NodeId
destNode NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
node
then Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> Message -> ProcessSignal
LocalSend ProcessId
them Message
msg'
else IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
forall a.
Serializable a =>
LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
sendMessage (LocalProcess -> LocalNode
processNode LocalProcess
proc)
(ProcessId -> Identifier
ProcessIdentifier (LocalProcess -> ProcessId
processId LocalProcess
proc))
(ProcessId -> Identifier
ProcessIdentifier ProcessId
them)
ImplicitReconnect
NoImplicitReconnect
a
msg
usend :: Serializable a => ProcessId -> a -> Process ()
usend :: forall a. Serializable a => ProcessId -> a -> Process ()
usend ProcessId
them a
msg = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
let there :: NodeId
there = ProcessId -> NodeId
processNodeId ProcessId
them
let (ProcessId
us, LocalNode
node) = (LocalProcess -> ProcessId
processId LocalProcess
proc, LocalProcess -> LocalNode
processNode LocalProcess
proc)
let msg' :: Message
msg' = a -> Message
forall a. Serializable a => a -> Message
wrapMessage a
msg
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node) (ProcessId -> ProcessId -> Message -> MxEvent
MxSent ProcessId
them ProcessId
us Message
msg')
if LocalNode -> NodeId
localNodeId (LocalProcess -> LocalNode
processNode LocalProcess
proc) NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
there
then Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> Message -> ProcessSignal
LocalSend ProcessId
them Message
msg'
else Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
there) (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalProcessId -> Message -> ProcessSignal
UnreliableSend (ProcessId -> LocalProcessId
processLocalId ProcessId
them)
(a -> Message
forall a. Serializable a => a -> Message
createMessage a
msg)
sendChan :: Serializable a => SendPort a -> a -> Process ()
sendChan :: forall a. Serializable a => SendPort a -> a -> Process ()
sendChan (SendPort SendPortId
cid) a
msg = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
let
node :: LocalNode
node = LocalProcess -> LocalNode
processNode LocalProcess
proc
pid :: ProcessId
pid = LocalProcess -> ProcessId
processId LocalProcess
proc
us :: NodeId
us = LocalNode -> NodeId
localNodeId LocalNode
node
them :: NodeId
them = ProcessId -> NodeId
processNodeId (SendPortId -> ProcessId
sendPortProcessId SendPortId
cid)
msg' :: Message
msg' = a -> Message
forall a. Serializable a => a -> Message
wrapMessage a
msg
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node) (ProcessId -> SendPortId -> Message -> MxEvent
MxSentToPort ProcessId
pid SendPortId
cid Message
msg')
if NodeId
them NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
us
then SendPortId -> Message -> Process ()
unsafeSendChanLocal SendPortId
cid Message
msg'
else IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
forall a.
Binary a =>
LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
sendBinary LocalNode
node
(ProcessId -> Identifier
ProcessIdentifier ProcessId
pid)
(SendPortId -> Identifier
SendPortIdentifier SendPortId
cid)
ImplicitReconnect
NoImplicitReconnect
a
msg
where
unsafeSendChanLocal :: SendPortId -> Message -> Process ()
unsafeSendChanLocal :: SendPortId -> Message -> Process ()
unsafeSendChanLocal SendPortId
p Message
m = Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ SendPortId -> Message -> ProcessSignal
LocalPortSend SendPortId
p Message
m
wrapMessage :: Serializable a => a -> Message
wrapMessage :: forall a. Serializable a => a -> Message
wrapMessage = a -> Message
forall a. Serializable a => a -> Message
unsafeCreateUnencodedMessage