{-# LANGUAGE RankNTypes #-}
module Control.Distributed.Process.Internal.Spawn
( spawn
, spawnLink
, spawnMonitor
, call
, spawnSupervised
, spawnChannel
) where
import Control.Distributed.Static
( Static
, Closure
, closureCompose
, staticClosure
)
import Control.Distributed.Process.Internal.Types
( NodeId(..)
, ProcessId(..)
, Process(..)
, MonitorRef(..)
, ProcessMonitorNotification(..)
, NodeMonitorNotification(..)
, DidSpawn(..)
, SendPort(..)
, ReceivePort(..)
, nullProcessId
)
import Control.Distributed.Process.Serializable (Serializable, SerializableDict)
import Control.Distributed.Process.Internal.Closure.BuiltIn
( sdictSendPort
, sndStatic
, idCP
, seqCP
, bindCP
, splitCP
, cpLink
, cpSend
, cpNewChan
, cpDelayed
, returnCP
, sdictUnit
)
import Control.Distributed.Process.Internal.Primitives
(
usend
, expect
, receiveWait
, match
, matchIf
, link
, getSelfPid
, monitor
, monitorNode
, unmonitor
, spawnAsync
, reconnect
)
spawn :: NodeId -> Closure (Process ()) -> Process ProcessId
spawn :: NodeId -> Closure (Process ()) -> Process ProcessId
spawn NodeId
nid Closure (Process ())
proc = do
ProcessId
us <- Process ProcessId
getSelfPid
MonitorRef
mRef <- NodeId -> Process MonitorRef
monitorNode NodeId
nid
SpawnRef
sRef <- NodeId -> Closure (Process ()) -> Process SpawnRef
spawnAsync NodeId
nid (ProcessId -> Closure (Process ()) -> Closure (Process ())
cpDelayed ProcessId
us Closure (Process ())
proc)
[Match ProcessId] -> Process ProcessId
forall b. [Match b] -> Process b
receiveWait [
(DidSpawn -> Bool)
-> (DidSpawn -> Process ProcessId) -> Match ProcessId
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(DidSpawn SpawnRef
ref ProcessId
_) -> SpawnRef
ref SpawnRef -> SpawnRef -> Bool
forall a. Eq a => a -> a -> Bool
== SpawnRef
sRef) ((DidSpawn -> Process ProcessId) -> Match ProcessId)
-> (DidSpawn -> Process ProcessId) -> Match ProcessId
forall a b. (a -> b) -> a -> b
$ \(DidSpawn SpawnRef
_ ProcessId
pid) -> do
MonitorRef -> Process ()
unmonitor MonitorRef
mRef
ProcessId -> () -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
usend ProcessId
pid ()
ProcessId -> Process ProcessId
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessId
pid
, (NodeMonitorNotification -> Bool)
-> (NodeMonitorNotification -> Process ProcessId)
-> Match ProcessId
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(NodeMonitorNotification MonitorRef
ref NodeId
_ DiedReason
_) -> MonitorRef
ref MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
mRef) ((NodeMonitorNotification -> Process ProcessId) -> Match ProcessId)
-> (NodeMonitorNotification -> Process ProcessId)
-> Match ProcessId
forall a b. (a -> b) -> a -> b
$ \NodeMonitorNotification
_ ->
ProcessId -> Process ProcessId
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (NodeId -> ProcessId
nullProcessId NodeId
nid)
]
spawnLink :: NodeId -> Closure (Process ()) -> Process ProcessId
spawnLink :: NodeId -> Closure (Process ()) -> Process ProcessId
spawnLink NodeId
nid Closure (Process ())
proc = do
ProcessId
pid <- NodeId -> Closure (Process ()) -> Process ProcessId
spawn NodeId
nid Closure (Process ())
proc
ProcessId -> Process ()
link ProcessId
pid
ProcessId -> Process ProcessId
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessId
pid
spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)
spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)
spawnMonitor NodeId
nid Closure (Process ())
proc = do
ProcessId
pid <- NodeId -> Closure (Process ()) -> Process ProcessId
spawn NodeId
nid Closure (Process ())
proc
MonitorRef
ref <- ProcessId -> Process MonitorRef
monitor ProcessId
pid
(ProcessId, MonitorRef) -> Process (ProcessId, MonitorRef)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessId
pid, MonitorRef
ref)
call :: Serializable a
=> Static (SerializableDict a)
-> NodeId
-> Closure (Process a)
-> Process a
call :: forall a.
Serializable a =>
Static (SerializableDict a)
-> NodeId -> Closure (Process a) -> Process a
call Static (SerializableDict a)
dict NodeId
nid Closure (Process a)
proc = do
ProcessId
us <- Process ProcessId
getSelfPid
(ProcessId
pid, MonitorRef
mRef) <- NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)
spawnMonitor NodeId
nid (Closure (Process a)
proc Closure (Process a) -> CP a () -> Closure (Process ())
forall a b.
(Typeable a, Typeable b) =>
Closure (Process a) -> CP a b -> Closure (Process b)
`bindCP`
Static (SerializableDict a) -> ProcessId -> CP a ()
forall a.
Typeable a =>
Static (SerializableDict a) -> ProcessId -> CP a ()
cpSend Static (SerializableDict a)
dict ProcessId
us Closure (Process ())
-> Closure (Process ()) -> Closure (Process ())
forall a b.
(Typeable a, Typeable b) =>
Closure (Process a) -> Closure (Process b) -> Closure (Process b)
`seqCP`
ProcessId -> Closure (Process ()) -> Closure (Process ())
cpDelayed ProcessId
us (Static (SerializableDict ()) -> () -> Closure (Process ())
forall a.
Serializable a =>
Static (SerializableDict a) -> a -> Closure (Process a)
returnCP Static (SerializableDict ())
sdictUnit ())
)
Either DiedReason a
mResult <- [Match (Either DiedReason a)] -> Process (Either DiedReason a)
forall b. [Match b] -> Process b
receiveWait
[ (a -> Process (Either DiedReason a)) -> Match (Either DiedReason a)
forall a b. Serializable a => (a -> Process b) -> Match b
match ((a -> Process (Either DiedReason a))
-> Match (Either DiedReason a))
-> (a -> Process (Either DiedReason a))
-> Match (Either DiedReason a)
forall a b. (a -> b) -> a -> b
$ \a
a -> ProcessId -> () -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
usend ProcessId
pid () Process ()
-> Process (Either DiedReason a) -> Process (Either DiedReason a)
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either DiedReason a -> Process (Either DiedReason a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Either DiedReason a
forall a b. b -> Either a b
Right a
a)
, (ProcessMonitorNotification -> Bool)
-> (ProcessMonitorNotification -> Process (Either DiedReason a))
-> Match (Either DiedReason a)
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(ProcessMonitorNotification MonitorRef
ref ProcessId
_ DiedReason
_) -> MonitorRef
ref MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
mRef)
(\(ProcessMonitorNotification MonitorRef
_ ProcessId
_ DiedReason
reason) -> Either DiedReason a -> Process (Either DiedReason a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (DiedReason -> Either DiedReason a
forall a b. a -> Either a b
Left DiedReason
reason))
]
case Either DiedReason a
mResult of
Right a
a -> do
[Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait
[ (ProcessMonitorNotification -> Bool)
-> (ProcessMonitorNotification -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(ProcessMonitorNotification MonitorRef
ref ProcessId
_ DiedReason
_) -> MonitorRef
ref MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
mRef)
(\(ProcessMonitorNotification {}) -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
]
ProcessId -> Process ()
reconnect ProcessId
pid
a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
Left DiedReason
err ->
String -> Process a
forall a. String -> Process a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Process a) -> String -> Process a
forall a b. (a -> b) -> a -> b
$ String
"call: remote process died: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ DiedReason -> String
forall a. Show a => a -> String
show DiedReason
err
spawnSupervised :: NodeId
-> Closure (Process ())
-> Process (ProcessId, MonitorRef)
spawnSupervised :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)
spawnSupervised NodeId
nid Closure (Process ())
proc = do
ProcessId
us <- Process ProcessId
getSelfPid
ProcessId
them <- NodeId -> Closure (Process ()) -> Process ProcessId
spawn NodeId
nid (ProcessId -> Closure (Process ())
cpLink ProcessId
us Closure (Process ())
-> Closure (Process ()) -> Closure (Process ())
forall a b.
(Typeable a, Typeable b) =>
Closure (Process a) -> Closure (Process b) -> Closure (Process b)
`seqCP` Closure (Process ())
proc)
MonitorRef
ref <- ProcessId -> Process MonitorRef
monitor ProcessId
them
(ProcessId, MonitorRef) -> Process (ProcessId, MonitorRef)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessId
them, MonitorRef
ref)
spawnChannel :: forall a. Serializable a => Static (SerializableDict a)
-> NodeId
-> Closure (ReceivePort a -> Process ())
-> Process (SendPort a)
spawnChannel :: forall a.
Serializable a =>
Static (SerializableDict a)
-> NodeId
-> Closure (ReceivePort a -> Process ())
-> Process (SendPort a)
spawnChannel Static (SerializableDict a)
dict NodeId
nid Closure (ReceivePort a -> Process ())
proc = do
ProcessId
us <- Process ProcessId
getSelfPid
ProcessId
_ <- NodeId -> Closure (Process ()) -> Process ProcessId
spawn NodeId
nid (ProcessId -> Closure (Process ())
go ProcessId
us)
Process (SendPort a)
forall a. Serializable a => Process a
expect
where
go :: ProcessId -> Closure (Process ())
go :: ProcessId -> Closure (Process ())
go ProcessId
pid = Static (SerializableDict a)
-> Closure (Process (SendPort a, ReceivePort a))
forall a.
Typeable a =>
Static (SerializableDict a)
-> Closure (Process (SendPort a, ReceivePort a))
cpNewChan Static (SerializableDict a)
dict
Closure (Process (SendPort a, ReceivePort a))
-> CP (SendPort a, ReceivePort a) ((), ())
-> Closure (Process ((), ()))
forall a b.
(Typeable a, Typeable b) =>
Closure (Process a) -> CP a b -> Closure (Process b)
`bindCP`
(Static (SerializableDict (SendPort a))
-> ProcessId -> CP (SendPort a) ()
forall a.
Typeable a =>
Static (SerializableDict a) -> ProcessId -> CP a ()
cpSend (Static (SerializableDict a)
-> Static (SerializableDict (SendPort a))
forall a.
Typeable a =>
Static (SerializableDict a)
-> Static (SerializableDict (SendPort a))
sdictSendPort Static (SerializableDict a)
dict) ProcessId
pid CP (SendPort a) ()
-> Closure (ReceivePort a -> Process ())
-> CP (SendPort a, ReceivePort a) ((), ())
forall a b c d.
(Typeable a, Typeable b, Typeable c, Typeable d) =>
CP a c -> CP b d -> CP (a, b) (c, d)
`splitCP` Closure (ReceivePort a -> Process ())
proc)
Closure (Process ((), ()))
-> CP ((), ()) () -> Closure (Process ())
forall a b.
(Typeable a, Typeable b) =>
Closure (Process a) -> CP a b -> Closure (Process b)
`bindCP`
(CP () ()
forall a. Typeable a => CP a a
idCP CP () () -> Closure (((), ()) -> ()) -> CP ((), ()) ()
forall b c a.
Closure (b -> c) -> Closure (a -> b) -> Closure (a -> c)
`closureCompose` Static (((), ()) -> ()) -> Closure (((), ()) -> ())
forall a. Static a -> Closure a
staticClosure Static (((), ()) -> ())
forall a b. Static ((a, b) -> b)
sndStatic)