module Control.Distributed.Process.Internal.Spawn
( spawn
, spawnLink
, spawnMonitor
, call
, spawnSupervised
, spawnChannel
) where
import Control.Distributed.Static
( Static
, Closure
, closureCompose
, staticClosure
)
import Control.Distributed.Process.Internal.Types
( NodeId(..)
, ProcessId(..)
, Process(..)
, MonitorRef(..)
, ProcessMonitorNotification(..)
, NodeMonitorNotification(..)
, DidSpawn(..)
, SendPort(..)
, ReceivePort(..)
, nullProcessId
)
import Control.Distributed.Process.Serializable (Serializable, SerializableDict)
import Control.Distributed.Process.Internal.Closure.BuiltIn
( sdictSendPort
, sndStatic
, idCP
, seqCP
, bindCP
, splitCP
, cpLink
, cpSend
, cpNewChan
, cpDelayed
, returnCP
, sdictUnit
)
import Control.Distributed.Process.Internal.Primitives
(
send
, expect
, receiveWait
, match
, matchIf
, link
, getSelfPid
, monitor
, monitorNode
, unmonitor
, spawnAsync
, reconnect
)
spawn :: NodeId -> Closure (Process ()) -> Process ProcessId
spawn nid proc = do
us <- getSelfPid
mRef <- monitorNode nid
sRef <- spawnAsync nid (cpDelayed us proc)
receiveWait [
matchIf (\(DidSpawn ref _) -> ref == sRef) $ \(DidSpawn _ pid) -> do
unmonitor mRef
send pid ()
return pid
, matchIf (\(NodeMonitorNotification ref _ _) -> ref == mRef) $ \_ ->
return (nullProcessId nid)
]
spawnLink :: NodeId -> Closure (Process ()) -> Process ProcessId
spawnLink nid proc = do
pid <- spawn nid proc
link pid
return pid
spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)
spawnMonitor nid proc = do
pid <- spawn nid proc
ref <- monitor pid
return (pid, ref)
call :: Serializable a
=> Static (SerializableDict a)
-> NodeId
-> Closure (Process a)
-> Process a
call dict nid proc = do
us <- getSelfPid
(pid, mRef) <- spawnMonitor nid (proc `bindCP`
cpSend dict us `seqCP`
cpDelayed us (returnCP sdictUnit ())
)
mResult <- receiveWait
[ match $ \a -> send pid () >> return (Right a)
, matchIf (\(ProcessMonitorNotification ref _ _) -> ref == mRef)
(\(ProcessMonitorNotification _ _ reason) -> return (Left reason))
]
case mResult of
Right a -> do
receiveWait
[ matchIf (\(ProcessMonitorNotification ref _ _) -> ref == mRef)
(\(ProcessMonitorNotification {}) -> return ())
]
reconnect pid
return a
Left err ->
fail $ "call: remote process died: " ++ show err
spawnSupervised :: NodeId
-> Closure (Process ())
-> Process (ProcessId, MonitorRef)
spawnSupervised nid proc = do
us <- getSelfPid
them <- spawn nid (cpLink us `seqCP` proc)
ref <- monitor them
return (them, ref)
spawnChannel :: forall a. Serializable a => Static (SerializableDict a)
-> NodeId
-> Closure (ReceivePort a -> Process ())
-> Process (SendPort a)
spawnChannel dict nid proc = do
us <- getSelfPid
_ <- spawn nid (go us)
expect
where
go :: ProcessId -> Closure (Process ())
go pid = cpNewChan dict
`bindCP`
(cpSend (sdictSendPort dict) pid `splitCP` proc)
`bindCP`
(idCP `closureCompose` staticClosure sndStatic)