module Control.Distributed.Process.Internal.Primitives
, expect
, newChan
, sendChan
, receiveChan
, mergePortsBiased
, mergePortsRR
, unsafeSend
, unsafeSendChan
, unsafeNSend
, Match
, receiveWait
, receiveTimeout
, match
, matchIf
, matchUnknown
, matchAny
, matchAnyIf
, matchChan
, matchSTM
, matchMessage
, matchMessageIf
, isEncoded
, wrapMessage
, unsafeWrapMessage
, unwrapMessage
, handleMessage
, handleMessageIf
, handleMessage_
, handleMessageIf_
, forward
, delegate
, relay
, proxy
, terminate
, ProcessTerminationException(..)
, die
, kill
, exit
, catchExit
, catchesExit
, ProcessExitException()
, getSelfPid
, getSelfNode
, ProcessInfo(..)
, getProcessInfo
, NodeStats(..)
, getNodeStats
, getLocalNodeStats
, link
, unlink
, monitor
, unmonitor
, withMonitor
, say
, register
, reregister
, unregister
, whereis
, nsend
, registerRemoteAsync
, reregisterRemoteAsync
, unregisterRemoteAsync
, whereisRemoteAsync
, nsendRemote
, unClosure
, unStatic
, catch
, Handler(..)
, catches
, try
, mask
, onException
, bracket
, bracket_
, finally
, expectTimeout
, receiveChanTimeout
, spawnAsync
, linkNode
, linkPort
, unlinkNode
, unlinkPort
, monitorNode
, monitorPort
, reconnect
, reconnectPort
, sendCtrlMsg
) where
#if ! MIN_VERSION_base(4,6,0)
import Prelude hiding (catch)
import Data.Binary (decode)
import Data.Time.Clock (getCurrentTime)
import Data.Time.Format (formatTime)
import System.Locale (defaultTimeLocale)
import System.Timeout (timeout)
import Control.Monad (when)
import Control.Monad.Reader (ask)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Applicative ((<$>))
import Control.Exception (Exception(..), throw, throwIO, SomeException)
import qualified Control.Exception as Ex (catch, mask, try)
import Control.Distributed.Process.Internal.StrictMVar
( StrictMVar
, modifyMVar
, modifyMVar_
import Control.Concurrent.STM
, TVar
, atomically
, orElse
, newTVar
, readTVar
, writeTVar
import Control.Distributed.Process.Internal.CQueue
( dequeue
, BlockSpec(..)
, MatchOn(..)
import Control.Distributed.Process.Serializable (Serializable, fingerprint)
import Data.Accessor ((^.), (^:), (^=))
import Control.Distributed.Static
( Static
, Closure
import Data.Rank1Typeable (Typeable)
import qualified Control.Distributed.Static as Static (unstatic, unclosure)
import qualified Control.Distributed.Process.UnsafePrimitives as Unsafe
import Control.Distributed.Process.Internal.Types
( NodeId(..)
, ProcessId(..)
, LocalNode(..)
, LocalProcess(..)
, Process(..)
, Message(..)
, MonitorRef(..)
, SpawnRef(..)
, ProcessSignal(..)
, NodeMonitorNotification(..)
, monitorCounter
, spawnCounter
, SendPort(..)
, ReceivePort(..)
, channelCounter
, typedChannelWithId
, TypedChannel(..)
, SendPortId(..)
, Identifier(..)
, ProcessExitException(..)
, DiedReason(..)
, DidUnmonitor(..)
, DidUnlinkProcess(..)
, DidUnlinkNode(..)
, DidUnlinkPort(..)
, WhereIsReply(..)
, RegisterReply(..)
, ProcessRegistrationException(..)
, ProcessInfo(..)
, ProcessInfoNone(..)
, NodeStats(..)
, isEncoded
, createMessage
, createUnencodedMessage
, runLocalProcess
, ImplicitReconnect( NoImplicitReconnect)
, LocalProcessState
, LocalSendPortId
, messageToPayload
import Control.Distributed.Process.Internal.Messaging
( sendMessage
, sendBinary
, sendPayload
, disconnect
, sendCtrlMsg
import Control.Distributed.Process.Management.Internal.Types
( MxEvent(..)
import Control.Distributed.Process.Management.Internal.Trace.Types
( traceEvent
import Control.Distributed.Process.Internal.WeakTQueue
( newTQueueIO
, readTQueue
, mkWeakTQueue
import Unsafe.Coerce
send :: Serializable a => ProcessId -> a -> Process ()
send them msg = do
proc <- ask
let us = processId proc
node = processNode proc
nodeId = localNodeId node
destNode = (processNodeId them) in do
case destNode == nodeId of
True -> sendLocal them msg
False -> liftIO $ sendMessage (processNode proc)
(ProcessIdentifier (processId proc))
(ProcessIdentifier them)
liftIO $ traceEvent (localEventBus node)
(MxSent them us (createUnencodedMessage msg))
unsafeSend :: Serializable a => ProcessId -> a -> Process ()
unsafeSend = Unsafe.send
expect :: forall a. Serializable a => Process a
expect = receiveWait [match return]
newChan :: Serializable a => Process (SendPort a, ReceivePort a)
newChan = do
proc <- ask
liftIO . modifyMVar (processState proc) $ \st -> do
let lcid = st ^. channelCounter
let cid = SendPortId { sendPortProcessId = processId proc
, sendPortLocalId = lcid
let sport = SendPort cid
chan <- liftIO newTQueueIO
chan' <- mkWeakTQueue chan $ finalizer (processState proc) lcid
let rport = ReceivePort $ readTQueue chan
let tch = TypedChannel chan'
return ( (channelCounter ^: (+ 1))
. (typedChannelWithId lcid ^= Just tch)
$ st
, (sport, rport)
finalizer :: StrictMVar LocalProcessState -> LocalSendPortId -> IO ()
finalizer st lcid = modifyMVar_ st $
return . (typedChannelWithId lcid ^= Nothing)
sendChan :: Serializable a => SendPort a -> a -> Process ()
sendChan (SendPort cid) msg = do
proc <- ask
let node = localNodeId (processNode proc)
destNode = processNodeId (sendPortProcessId cid) in do
case destNode == node of
True -> sendChanLocal cid msg
False -> do
liftIO $ sendBinary (processNode proc)
(ProcessIdentifier (processId proc))
(SendPortIdentifier cid)
unsafeSendChan :: Serializable a => SendPort a -> a -> Process ()
unsafeSendChan = Unsafe.sendChan
receiveChan :: Serializable a => ReceivePort a -> Process a
receiveChan = liftIO . atomically . receiveSTM
receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout 0 ch = liftIO . atomically $
(Just <$> receiveSTM ch) `orElse` return Nothing
receiveChanTimeout n ch = liftIO . timeout n . atomically $
receiveSTM ch
mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsBiased = return . ReceivePort. foldr1 orElse . map receiveSTM
mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsRR = \ps -> do
psVar <- liftIO . atomically $ newTVar (map receiveSTM ps)
return $ ReceivePort (rr psVar)
rotate :: [a] -> [a]
rotate [] = []
rotate (x:xs) = xs ++ [x]
rr :: TVar [STM a] -> STM a
rr psVar = do
ps <- readTVar psVar
a <- foldr1 orElse ps
writeTVar psVar (rotate ps)
return a
newtype Match b = Match { unMatch :: MatchOn Message (Process b) }
receiveWait :: [Match b] -> Process b
receiveWait ms = do
queue <- processQueue <$> ask
Just proc <- liftIO $ dequeue queue Blocking (map unMatch ms)
receiveTimeout :: Int -> [Match b] -> Process (Maybe b)
receiveTimeout t ms = do
queue <- processQueue <$> ask
let blockSpec = if t == 0 then NonBlocking else Timeout t
mProc <- liftIO $ dequeue queue blockSpec (map unMatch ms)
case mProc of
Nothing -> return Nothing
Just proc -> Just <$> proc
matchChan :: ReceivePort a -> (a -> Process b) -> Match b
matchChan p fn = Match $ MatchChan (fmap fn (receiveSTM p))
matchSTM :: STM a -> (a -> Process b) -> Match b
matchSTM stm fn = Match $ MatchChan (fmap fn stm)
match :: forall a b. Serializable a => (a -> Process b) -> Match b
match = matchIf (const True)
matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match b
matchIf c p = Match $ MatchMsg $ \msg ->
case messageFingerprint msg == fingerprint (undefined :: a) of
False -> Nothing
True -> case msg of
(UnencodedMessage _ m) ->
let m' = unsafeCoerce m :: a in
case (c m') of
True -> Just (p m')
False -> Nothing
(EncodedMessage _ _) ->
if (c decoded) then Just (p decoded) else Nothing
decoded :: a
!decoded = decode (messageEncoding msg)
matchMessage :: (Message -> Process Message) -> Match Message
matchMessage p = Match $ MatchMsg $ \msg -> Just (p msg)
matchMessageIf :: (Message -> Bool) -> (Message -> Process Message) -> Match Message
matchMessageIf c p = Match $ MatchMsg $ \msg ->
case (c msg) of
True -> Just (p msg)
False -> Nothing
forward :: Message -> ProcessId -> Process ()
forward msg them = do
proc <- ask
let node = processNode proc
us = processId proc
nid = localNodeId node
destNode = (processNodeId them) in do
case destNode == nid of
True -> sendCtrlMsg Nothing (LocalSend them msg)
False -> liftIO $ sendPayload (processNode proc)
(ProcessIdentifier (processId proc))
(ProcessIdentifier them)
(messageToPayload msg)
liftIO $ traceEvent (localEventBus node)
(MxSent them us msg)
wrapMessage :: Serializable a => a -> Message
wrapMessage = createUnencodedMessage
unsafeWrapMessage :: Serializable a => a -> Message
unsafeWrapMessage = Unsafe.wrapMessage
unwrapMessage :: forall m a. (Monad m, Serializable a) => Message -> m (Maybe a)
unwrapMessage msg =
case messageFingerprint msg == fingerprint (undefined :: a) of
False -> return Nothing :: m (Maybe a)
True -> case msg of
(UnencodedMessage _ ms) ->
let ms' = unsafeCoerce ms :: a
in return (Just ms')
(EncodedMessage _ _) ->
return (Just (decoded))
decoded :: a
!decoded = decode (messageEncoding msg)
handleMessage :: forall m a b. (Monad m, Serializable a)
=> Message -> (a -> m b) -> m (Maybe b)
handleMessage msg proc = handleMessageIf msg (const True) proc
handleMessageIf :: forall m a b . (Monad m, Serializable a)
=> Message
-> (a -> Bool)
-> (a -> m b)
-> m (Maybe b)
handleMessageIf msg c proc = do
case messageFingerprint msg == fingerprint (undefined :: a) of
False -> return Nothing :: m (Maybe b)
True -> case msg of
(UnencodedMessage _ ms) ->
let ms' = unsafeCoerce ms :: a in
case (c ms') of
True -> do { r <- proc ms'; return (Just r) }
False -> return Nothing :: m (Maybe b)
(EncodedMessage _ _) ->
case (c decoded) of
True -> do { r <- proc (decoded :: a); return (Just r) }
False -> return Nothing :: m (Maybe b)
decoded :: a
!decoded = decode (messageEncoding msg)
handleMessage_ :: forall m a . (Monad m, Serializable a)
=> Message -> (a -> m ()) -> m ()
handleMessage_ msg proc = handleMessageIf_ msg (const True) proc
handleMessageIf_ :: forall m a . (Monad m, Serializable a)
=> Message
-> (a -> Bool)
-> (a -> m ())
-> m ()
handleMessageIf_ msg c proc = handleMessageIf msg c proc >> return ()
matchAny :: forall b. (Message -> Process b) -> Match b
matchAny p = Match $ MatchMsg $ \msg -> Just (p msg)
matchAnyIf :: forall a b. (Serializable a)
=> (a -> Bool)
-> (Message -> Process b)
-> Match b
matchAnyIf c p = Match $ MatchMsg $ \msg ->
case messageFingerprint msg == fingerprint (undefined :: a) of
True | check -> Just (p msg)
check :: Bool
!check =
case msg of
(EncodedMessage _ _) -> c decoded
(UnencodedMessage _ m') -> c (unsafeCoerce m')
decoded :: a
!decoded = decode (messageEncoding msg)
_ -> Nothing
matchUnknown :: Process b -> Match b
matchUnknown p = Match $ MatchMsg (const (Just p))
delegate :: ProcessId -> (Message -> Bool) -> Process ()
delegate pid p = do
receiveWait [
matchAny (\m -> case (p m) of
True -> forward m pid
False -> return ())
delegate pid p
relay :: ProcessId -> Process ()
relay !pid = receiveWait [ matchAny (\m -> forward m pid) ] >> relay pid
proxy :: Serializable a => ProcessId -> (a -> Process Bool) -> Process ()
proxy pid proc = do
receiveWait [
matchAny (\m -> do
next <- handleMessage m proc
case next of
Just True -> forward m pid
Just False -> return ()
Nothing -> return ())
proxy pid proc
data ProcessTerminationException = ProcessTerminationException
deriving (Show, Typeable)
instance Exception ProcessTerminationException
terminate :: Process a
terminate = liftIO $ throwIO ProcessTerminationException
die :: Serializable a => a -> Process b
die reason = do
pid <- getSelfPid
liftIO $ throwIO (ProcessExitException pid (createMessage reason))
kill :: ProcessId -> String -> Process ()
kill them reason = sendCtrlMsg Nothing (Kill them reason)
exit :: Serializable a => ProcessId -> a -> Process ()
exit them reason = sendCtrlMsg Nothing (Exit them (createMessage reason))
catchExit :: forall a b . (Show a, Serializable a)
=> Process b
-> (ProcessId -> a -> Process b)
-> Process b
catchExit act exitHandler = catch act handleExit
handleExit ex@(ProcessExitException from msg) =
if messageFingerprint msg == fingerprint (undefined :: a)
then exitHandler from decoded
else liftIO $ throwIO ex
decoded :: a
!decoded = decode (messageEncoding msg)
catchesExit :: Process b
-> [(ProcessId -> Message -> (Process (Maybe b)))]
-> Process b
catchesExit act handlers = catch act ((flip handleExit) handlers)
handleExit :: ProcessExitException
-> [(ProcessId -> Message -> Process (Maybe b))]
-> Process b
handleExit ex [] = liftIO $ throwIO ex
handleExit ex@(ProcessExitException from msg) (h:hs) = do
r <- h from msg
case r of
Nothing -> handleExit ex hs
Just p -> return p
getSelfPid :: Process ProcessId
getSelfPid = processId <$> ask
getSelfNode :: Process NodeId
getSelfNode = localNodeId . processNode <$> ask
getNodeStats :: NodeId -> Process (Either DiedReason NodeStats)
getNodeStats nid = do
selfNode <- getSelfNode
if nid == selfNode
then Right `fmap` getLocalNodeStats
else getNodeStatsRemote nid
getNodeStatsRemote :: NodeId -> Process (Either DiedReason NodeStats)
getNodeStatsRemote nid = do
sendCtrlMsg (Just nid) $ GetNodeStats nid
bracket (monitorNode nid) unmonitor $ \mRef ->
receiveWait [ match (\(stats :: NodeStats) -> return $ Right stats)
, matchIf (\(NodeMonitorNotification ref _ _) -> ref == mRef)
(\(NodeMonitorNotification _ _ dr) -> return $ Left dr)
getLocalNodeStats :: Process NodeStats
getLocalNodeStats = do
self <- getSelfNode
sendCtrlMsg Nothing $ GetNodeStats self
receiveWait [ match (\(stats :: NodeStats) -> return stats) ]
getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo)
getProcessInfo pid =
let them = processNodeId pid in do
us <- getSelfNode
dest <- mkNode them us
sendCtrlMsg dest $ GetInfo pid
receiveWait [
match (\(p :: ProcessInfo) -> return $ Just p)
, match (\(_ :: ProcessInfoNone) -> return Nothing)
where mkNode :: NodeId -> NodeId -> Process (Maybe NodeId)
mkNode them us = case them == us of
True -> return Nothing
_ -> return $ Just them
link :: ProcessId -> Process ()
link = sendCtrlMsg Nothing . Link . ProcessIdentifier
monitor :: ProcessId -> Process MonitorRef
monitor = monitor' . ProcessIdentifier
withMonitor :: ProcessId -> Process a -> Process a
withMonitor pid code = bracket (monitor pid) unmonitor (\_ -> code)
unlink :: ProcessId -> Process ()
unlink pid = do
unlinkAsync pid
receiveWait [ matchIf (\(DidUnlinkProcess pid') -> pid' == pid)
(\_ -> return ())
unlinkNode :: NodeId -> Process ()
unlinkNode nid = do
unlinkNodeAsync nid
receiveWait [ matchIf (\(DidUnlinkNode nid') -> nid' == nid)
(\_ -> return ())
unlinkPort :: SendPort a -> Process ()
unlinkPort sport = do
unlinkPortAsync sport
receiveWait [ matchIf (\(DidUnlinkPort cid) -> cid == sendPortId sport)
(\_ -> return ())
unmonitor :: MonitorRef -> Process ()
unmonitor ref = do
unmonitorAsync ref
receiveWait [ matchIf (\(DidUnmonitor ref') -> ref' == ref)
(\_ -> return ())
catch :: Exception e => Process a -> (e -> Process a) -> Process a
catch p h = do
lproc <- ask
liftIO $ Ex.catch (runLocalProcess lproc p) (runLocalProcess lproc . h)
try :: Exception e => Process a -> Process (Either e a)
try p = do
lproc <- ask
liftIO $ Ex.try (runLocalProcess lproc p)
mask :: ((forall a. Process a -> Process a) -> Process b) -> Process b
mask p = do
lproc <- ask
liftIO $ Ex.mask $ \restore ->
runLocalProcess lproc (p (liftRestore restore))
liftRestore :: (forall a. IO a -> IO a)
-> (forall a. Process a -> Process a)
liftRestore restoreIO = \p2 -> do
ourLocalProc <- ask
liftIO $ restoreIO $ runLocalProcess ourLocalProc p2
onException :: Process a -> Process b -> Process a
onException p what = p `catch` \e -> do _ <- what
liftIO $ throwIO (e :: SomeException)
bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c
bracket before after thing =
mask $ \restore -> do
a <- before
r <- restore (thing a) `onException` after a
_ <- after a
return r
bracket_ :: Process a -> Process b -> Process c -> Process c
bracket_ before after thing = bracket before (const after) (const thing)
finally :: Process a -> Process b -> Process a
finally a sequel = bracket_ (return ()) sequel a
data Handler a = forall e . Exception e => Handler (e -> Process a)
instance Functor Handler where
fmap f (Handler h) = Handler (fmap f . h)
catches :: Process a -> [Handler a] -> Process a
catches proc handlers = proc `catch` catchesHandler handlers
catchesHandler :: [Handler a] -> SomeException -> Process a
catchesHandler handlers e = foldr tryHandler (throw e) handlers
where tryHandler (Handler handler) res
= case fromException e of
Just e' -> handler e'
Nothing -> res
expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)
expectTimeout n = receiveTimeout n [match return]
spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef
spawnAsync nid proc = do
spawnRef <- getSpawnRef
sendCtrlMsg (Just nid) $ Spawn proc spawnRef
return spawnRef
monitorNode :: NodeId -> Process MonitorRef
monitorNode =
monitor' . NodeIdentifier
monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRef
monitorPort (SendPort cid) =
monitor' (SendPortIdentifier cid)
unmonitorAsync :: MonitorRef -> Process ()
unmonitorAsync =
sendCtrlMsg Nothing . Unmonitor
linkNode :: NodeId -> Process ()
linkNode = link' . NodeIdentifier
linkPort :: SendPort a -> Process ()
linkPort (SendPort cid) =
link' (SendPortIdentifier cid)
unlinkAsync :: ProcessId -> Process ()
unlinkAsync =
sendCtrlMsg Nothing . Unlink . ProcessIdentifier
unlinkNodeAsync :: NodeId -> Process ()
unlinkNodeAsync =
sendCtrlMsg Nothing . Unlink . NodeIdentifier
unlinkPortAsync :: SendPort a -> Process ()
unlinkPortAsync (SendPort cid) =
sendCtrlMsg Nothing . Unlink $ SendPortIdentifier cid
say :: String -> Process ()
say string = do
now <- liftIO getCurrentTime
us <- getSelfPid
nsend "logger" (formatTime defaultTimeLocale "%c" now, us, string)
register :: String -> ProcessId -> Process ()
register = registerImpl False
reregister :: String -> ProcessId -> Process ()
reregister = registerImpl True
registerImpl :: Bool -> String -> ProcessId -> Process ()
registerImpl force label pid = do
mynid <- getSelfNode
sendCtrlMsg Nothing (Register label mynid (Just pid) force)
receiveWait [ matchIf (\(RegisterReply label' _) -> label == label')
(\(RegisterReply _ ok) -> handleRegistrationReply label ok)
registerRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
registerRemoteAsync nid label pid =
sendCtrlMsg (Just nid) (Register label nid (Just pid) False)
reregisterRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
reregisterRemoteAsync nid label pid =
sendCtrlMsg (Just nid) (Register label nid (Just pid) True)
unregister :: String -> Process ()
unregister label = do
mynid <- getSelfNode
sendCtrlMsg Nothing (Register label mynid Nothing False)
receiveWait [ matchIf (\(RegisterReply label' _) -> label == label')
(\(RegisterReply _ ok) -> handleRegistrationReply label ok)
handleRegistrationReply :: String -> Bool -> Process ()
handleRegistrationReply label ok =
when (not ok) $
liftIO $ throwIO $ ProcessRegistrationException label
unregisterRemoteAsync :: NodeId -> String -> Process ()
unregisterRemoteAsync nid label =
sendCtrlMsg (Just nid) (Register label nid Nothing False)
whereis :: String -> Process (Maybe ProcessId)
whereis label = do
sendCtrlMsg Nothing (WhereIs label)
receiveWait [ matchIf (\(WhereIsReply label' _) -> label == label')
(\(WhereIsReply _ mPid) -> return mPid)
whereisRemoteAsync :: NodeId -> String -> Process ()
whereisRemoteAsync nid label =
sendCtrlMsg (Just nid) (WhereIs label)
nsend :: Serializable a => String -> a -> Process ()
nsend label msg =
sendCtrlMsg Nothing (NamedSend label (createUnencodedMessage msg))
unsafeNSend :: Serializable a => String -> a -> Process ()
unsafeNSend = Unsafe.nsend
nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()
nsendRemote nid label msg =
sendCtrlMsg (Just nid) (NamedSend label (createMessage msg))
unStatic :: Typeable a => Static a -> Process a
unStatic static = do
rtable <- remoteTable . processNode <$> ask
case Static.unstatic rtable static of
Left err -> fail $ "Could not resolve static value: " ++ err
Right x -> return x
unClosure :: Typeable a => Closure a -> Process a
unClosure closure = do
rtable <- remoteTable . processNode <$> ask
case Static.unclosure rtable closure of
Left err -> fail $ "Could not resolve closure: " ++ err
Right x -> return x
reconnect :: ProcessId -> Process ()
reconnect them = do
us <- getSelfPid
node <- processNode <$> ask
liftIO $ disconnect node (ProcessIdentifier us) (ProcessIdentifier them)
reconnectPort :: SendPort a -> Process ()
reconnectPort them = do
us <- getSelfPid
node <- processNode <$> ask
liftIO $ disconnect node (ProcessIdentifier us) (SendPortIdentifier (sendPortId them))
sendLocal :: (Serializable a) => ProcessId -> a -> Process ()
sendLocal to msg =
sendCtrlMsg Nothing $ LocalSend to (createUnencodedMessage msg)
sendChanLocal :: (Serializable a) => SendPortId -> a -> Process ()
sendChanLocal spId msg =
sendCtrlMsg Nothing $ LocalPortSend spId (createUnencodedMessage msg)
getMonitorRefFor :: Identifier -> Process MonitorRef
getMonitorRefFor ident = do
proc <- ask
liftIO $ modifyMVar (processState proc) $ \st -> do
let counter = st ^. monitorCounter
return ( monitorCounter ^: (+ 1) $ st
, MonitorRef ident counter
getSpawnRef :: Process SpawnRef
getSpawnRef = do
proc <- ask
liftIO $ modifyMVar (processState proc) $ \st -> do
let counter = st ^. spawnCounter
return ( spawnCounter ^: (+ 1) $ st
, SpawnRef counter
monitor' :: Identifier -> Process MonitorRef
monitor' ident = do
monitorRef <- getMonitorRefFor ident
sendCtrlMsg Nothing $ Monitor monitorRef
return monitorRef
link' :: Identifier -> Process ()
link' = sendCtrlMsg Nothing . Link