module Control.Distributed.Process
(
ProcessId
, NodeId
, Process
, SendPortId
, processNodeId
, sendPortProcessId
, liftIO
, send
, expect
, expectTimeout
, ReceivePort
, SendPort
, sendPortId
, newChan
, sendChan
, receiveChan
, receiveChanTimeout
, mergePortsBiased
, mergePortsRR
, Match
, receiveWait
, receiveTimeout
, match
, matchIf
, matchUnknown
, AbstractMessage(..)
, matchAny
, matchAnyIf
, matchChan
, spawn
, call
, terminate
, die
, kill
, exit
, catchExit
, catchesExit
, ProcessTerminationException(..)
, ProcessRegistrationException(..)
, SpawnRef
, getSelfPid
, getSelfNode
, ProcessInfo(..)
, getProcessInfo
, link
, linkNode
, linkPort
, unlink
, unlinkNode
, unlinkPort
, monitor
, monitorNode
, monitorPort
, unmonitor
, withMonitor
, MonitorRef
, ProcessLinkException(..)
, NodeLinkException(..)
, PortLinkException(..)
, ProcessMonitorNotification(..)
, NodeMonitorNotification(..)
, PortMonitorNotification(..)
, DiedReason(..)
, Closure
, closure
, Static
, unStatic
, unClosure
, RemoteTable
, say
, register
, reregister
, unregister
, whereis
, nsend
, registerRemoteAsync
, reregisterRemoteAsync
, unregisterRemoteAsync
, whereisRemoteAsync
, nsendRemote
, WhereIsReply(..)
, RegisterReply(..)
, catch
, Handler(..)
, catches
, try
, mask
, onException
, bracket
, bracket_
, finally
, spawnAsync
, spawnSupervised
, spawnLink
, spawnMonitor
, spawnChannel
, DidSpawn(..)
, spawnLocal
, spawnChannelLocal
, reconnect
, reconnectPort
) where
#if ! MIN_VERSION_base(4,6,0)
import Prelude hiding (catch)
#endif
import Data.Typeable (Typeable)
import Control.Monad.IO.Class (liftIO)
import Control.Applicative ((<$>))
import Control.Monad.Reader (ask)
import Control.Concurrent.MVar (newEmptyMVar, takeMVar, putMVar)
import Control.Distributed.Static
( Closure
, closure
, Static
, RemoteTable
, closureCompose
, staticClosure
)
import Control.Distributed.Process.Internal.Types
( NodeId(..)
, ProcessId(..)
, Process(..)
, MonitorRef(..)
, ProcessMonitorNotification(..)
, NodeMonitorNotification(..)
, PortMonitorNotification(..)
, ProcessLinkException(..)
, NodeLinkException(..)
, PortLinkException(..)
, ProcessRegistrationException(..)
, DiedReason(..)
, SpawnRef(..)
, DidSpawn(..)
, SendPort(..)
, ReceivePort(..)
, SendPortId(..)
, WhereIsReply(..)
, RegisterReply(..)
, LocalProcess(processNode)
, nullProcessId
)
import Control.Distributed.Process.Serializable (Serializable, SerializableDict)
import Control.Distributed.Process.Internal.Closure.BuiltIn
( sdictSendPort
, sndStatic
, idCP
, seqCP
, bindCP
, splitCP
, cpLink
, cpSend
, cpNewChan
, cpDelay
)
import Control.Distributed.Process.Internal.Primitives
(
send
, expect
, newChan
, sendChan
, receiveChan
, mergePortsBiased
, mergePortsRR
, Match
, receiveWait
, receiveTimeout
, match
, matchIf
, matchUnknown
, AbstractMessage(..)
, matchAny
, matchAnyIf
, matchChan
, terminate
, ProcessTerminationException(..)
, die
, exit
, catchExit
, catchesExit
, kill
, getSelfPid
, getSelfNode
, ProcessInfo(..)
, getProcessInfo
, link
, linkNode
, linkPort
, unlink
, unlinkNode
, unlinkPort
, monitor
, monitorNode
, monitorPort
, unmonitor
, withMonitor
, say
, register
, reregister
, unregister
, whereis
, nsend
, registerRemoteAsync
, reregisterRemoteAsync
, unregisterRemoteAsync
, whereisRemoteAsync
, nsendRemote
, unStatic
, unClosure
, catch
, Handler(..)
, catches
, try
, mask
, onException
, bracket
, bracket_
, finally
, expectTimeout
, receiveChanTimeout
, spawnAsync
, reconnect
, reconnectPort
)
import Control.Distributed.Process.Node (forkProcess)
spawn :: NodeId -> Closure (Process ()) -> Process ProcessId
spawn nid proc = do
us <- getSelfPid
mRef <- monitorNode nid
sRef <- spawnAsync nid (cpDelay us proc)
receiveWait [
matchIf (\(DidSpawn ref _) -> ref == sRef) $ \(DidSpawn _ pid) -> do
unmonitor mRef
send pid ()
return pid
, matchIf (\(NodeMonitorNotification ref _ _) -> ref == mRef) $ \_ ->
return (nullProcessId nid)
]
spawnLink :: NodeId -> Closure (Process ()) -> Process ProcessId
spawnLink nid proc = do
pid <- spawn nid proc
link pid
return pid
spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)
spawnMonitor nid proc = do
pid <- spawn nid proc
ref <- monitor pid
return (pid, ref)
call :: Serializable a
=> Static (SerializableDict a)
-> NodeId
-> Closure (Process a)
-> Process a
call dict nid proc = do
us <- getSelfPid
(pid, mRef) <- spawnMonitor nid (proc `bindCP` cpSend dict us)
mResult <- receiveWait
[ match (return . Right)
, matchIf (\(ProcessMonitorNotification ref _ _) -> ref == mRef)
(\(ProcessMonitorNotification _ _ reason) -> return (Left reason))
]
case mResult of
Right a -> do
receiveWait
[ matchIf (\(ProcessMonitorNotification ref _ _) -> ref == mRef)
(\(ProcessMonitorNotification {}) -> return ())
]
reconnect pid
return a
Left err ->
fail $ "call: remote process died: " ++ show err
spawnSupervised :: NodeId
-> Closure (Process ())
-> Process (ProcessId, MonitorRef)
spawnSupervised nid proc = do
us <- getSelfPid
them <- spawn nid (cpLink us `seqCP` proc)
ref <- monitor them
return (them, ref)
spawnChannel :: forall a. Typeable a => Static (SerializableDict a)
-> NodeId
-> Closure (ReceivePort a -> Process ())
-> Process (SendPort a)
spawnChannel dict nid proc = do
us <- getSelfPid
_ <- spawn nid (go us)
expect
where
go :: ProcessId -> Closure (Process ())
go pid = cpNewChan dict
`bindCP`
(cpSend (sdictSendPort dict) pid `splitCP` proc)
`bindCP`
(idCP `closureCompose` staticClosure sndStatic)
spawnLocal :: Process () -> Process ProcessId
spawnLocal proc = do
node <- processNode <$> ask
liftIO $ forkProcess node proc
spawnChannelLocal :: Serializable a
=> (ReceivePort a -> Process ())
-> Process (SendPort a)
spawnChannelLocal proc = do
node <- processNode <$> ask
liftIO $ do
mvar <- newEmptyMVar
_ <- forkProcess node $ do
(sport, rport) <- newChan
liftIO $ putMVar mvar sport
proc rport
takeMVar mvar