module Control.Distributed.Process.Internal.Primitives
(
send
, expect
, newChan
, sendChan
, receiveChan
, mergePortsBiased
, mergePortsRR
, unsafeSend
, unsafeSendChan
, unsafeNSend
, Match
, receiveWait
, receiveTimeout
, match
, matchIf
, matchUnknown
, matchAny
, matchAnyIf
, matchChan
, matchSTM
, matchMessage
, matchMessageIf
, isEncoded
, wrapMessage
, unsafeWrapMessage
, unwrapMessage
, handleMessage
, handleMessageIf
, handleMessage_
, handleMessageIf_
, forward
, delegate
, relay
, proxy
, terminate
, ProcessTerminationException(..)
, die
, kill
, exit
, catchExit
, catchesExit
, ProcessExitException()
, getSelfPid
, getSelfNode
, ProcessInfo(..)
, getProcessInfo
, NodeStats(..)
, getNodeStats
, getLocalNodeStats
, link
, unlink
, monitor
, unmonitor
, withMonitor
, say
, register
, reregister
, unregister
, whereis
, nsend
, registerRemoteAsync
, reregisterRemoteAsync
, unregisterRemoteAsync
, whereisRemoteAsync
, nsendRemote
, unClosure
, unStatic
, catch
, Handler(..)
, catches
, try
, mask
, mask_
, onException
, bracket
, bracket_
, finally
, expectTimeout
, receiveChanTimeout
, spawnAsync
, linkNode
, linkPort
, unlinkNode
, unlinkPort
, monitorNode
, monitorPort
, reconnect
, reconnectPort
, sendCtrlMsg
) where
#if ! MIN_VERSION_base(4,6,0)
import Prelude hiding (catch)
#endif
import Data.Binary (decode)
import Data.Time.Clock (getCurrentTime)
import Data.Time.Format (formatTime)
#if MIN_VERSION_time(1,5,0)
import Data.Time.Format (defaultTimeLocale)
#else
import System.Locale (defaultTimeLocale)
#endif
import System.Timeout (timeout)
import Control.Monad (when)
import Control.Monad.Reader (ask)
import Control.Monad.IO.Class (liftIO)
import Control.Applicative ((<$>))
import Control.Exception (Exception(..), throw, throwIO, SomeException)
import qualified Control.Exception as Ex (catch, mask, mask_, try)
import Control.Distributed.Process.Internal.StrictMVar
( StrictMVar
, modifyMVar
, modifyMVar_
)
import Control.Concurrent.STM
( STM
, TVar
, atomically
, orElse
, newTVar
, readTVar
, writeTVar
)
import Control.Distributed.Process.Internal.CQueue
( dequeue
, BlockSpec(..)
, MatchOn(..)
)
import Control.Distributed.Process.Serializable (Serializable, fingerprint)
import Data.Accessor ((^.), (^:), (^=))
import Control.Distributed.Static
( Static
, Closure
)
import Data.Rank1Typeable (Typeable)
import qualified Control.Distributed.Static as Static (unstatic, unclosure)
import qualified Control.Distributed.Process.UnsafePrimitives as Unsafe
import Control.Distributed.Process.Internal.Types
( NodeId(..)
, ProcessId(..)
, LocalNode(..)
, LocalProcess(..)
, Process(..)
, Message(..)
, MonitorRef(..)
, SpawnRef(..)
, ProcessSignal(..)
, NodeMonitorNotification(..)
, monitorCounter
, spawnCounter
, SendPort(..)
, ReceivePort(..)
, channelCounter
, typedChannelWithId
, TypedChannel(..)
, SendPortId(..)
, Identifier(..)
, ProcessExitException(..)
, DiedReason(..)
, DidUnmonitor(..)
, DidUnlinkProcess(..)
, DidUnlinkNode(..)
, DidUnlinkPort(..)
, WhereIsReply(..)
, RegisterReply(..)
, ProcessRegistrationException(..)
, ProcessInfo(..)
, ProcessInfoNone(..)
, NodeStats(..)
, isEncoded
, createMessage
, createUnencodedMessage
, runLocalProcess
, ImplicitReconnect( NoImplicitReconnect)
, LocalProcessState
, LocalSendPortId
, messageToPayload
)
import Control.Distributed.Process.Internal.Messaging
( sendMessage
, sendBinary
, sendPayload
, disconnect
, sendCtrlMsg
)
import Control.Distributed.Process.Management.Internal.Types
( MxEvent(..)
)
import Control.Distributed.Process.Management.Internal.Trace.Types
( traceEvent
)
import Control.Distributed.Process.Internal.WeakTQueue
( newTQueueIO
, readTQueue
, mkWeakTQueue
)
import Unsafe.Coerce
send :: Serializable a => ProcessId -> a -> Process ()
send them msg = do
proc <- ask
let us = processId proc
node = processNode proc
nodeId = localNodeId node
destNode = (processNodeId them) in do
case destNode == nodeId of
True -> sendLocal them msg
False -> liftIO $ sendMessage (processNode proc)
(ProcessIdentifier (processId proc))
(ProcessIdentifier them)
NoImplicitReconnect
msg
liftIO $ traceEvent (localEventBus node)
(MxSent them us (createUnencodedMessage msg))
unsafeSend :: Serializable a => ProcessId -> a -> Process ()
unsafeSend = Unsafe.send
expect :: forall a. Serializable a => Process a
expect = receiveWait [match return]
newChan :: Serializable a => Process (SendPort a, ReceivePort a)
newChan = do
proc <- ask
liftIO . modifyMVar (processState proc) $ \st -> do
let lcid = st ^. channelCounter
let cid = SendPortId { sendPortProcessId = processId proc
, sendPortLocalId = lcid
}
let sport = SendPort cid
chan <- liftIO newTQueueIO
chan' <- mkWeakTQueue chan $ finalizer (processState proc) lcid
let rport = ReceivePort $ readTQueue chan
let tch = TypedChannel chan'
return ( (channelCounter ^: (+ 1))
. (typedChannelWithId lcid ^= Just tch)
$ st
, (sport, rport)
)
where
finalizer :: StrictMVar LocalProcessState -> LocalSendPortId -> IO ()
finalizer st lcid = modifyMVar_ st $
return . (typedChannelWithId lcid ^= Nothing)
sendChan :: Serializable a => SendPort a -> a -> Process ()
sendChan (SendPort cid) msg = do
proc <- ask
let node = localNodeId (processNode proc)
destNode = processNodeId (sendPortProcessId cid) in do
case destNode == node of
True -> sendChanLocal cid msg
False -> do
liftIO $ sendBinary (processNode proc)
(ProcessIdentifier (processId proc))
(SendPortIdentifier cid)
NoImplicitReconnect
msg
unsafeSendChan :: Serializable a => SendPort a -> a -> Process ()
unsafeSendChan = Unsafe.sendChan
receiveChan :: Serializable a => ReceivePort a -> Process a
receiveChan = liftIO . atomically . receiveSTM
receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout 0 ch = liftIO . atomically $
(Just <$> receiveSTM ch) `orElse` return Nothing
receiveChanTimeout n ch = liftIO . timeout n . atomically $
receiveSTM ch
mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsBiased = return . ReceivePort. foldr1 orElse . map receiveSTM
mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsRR = \ps -> do
psVar <- liftIO . atomically $ newTVar (map receiveSTM ps)
return $ ReceivePort (rr psVar)
where
rotate :: [a] -> [a]
rotate [] = []
rotate (x:xs) = xs ++ [x]
rr :: TVar [STM a] -> STM a
rr psVar = do
ps <- readTVar psVar
a <- foldr1 orElse ps
writeTVar psVar (rotate ps)
return a
newtype Match b = Match { unMatch :: MatchOn Message (Process b) }
receiveWait :: [Match b] -> Process b
receiveWait ms = do
queue <- processQueue <$> ask
Just proc <- liftIO $ dequeue queue Blocking (map unMatch ms)
proc
receiveTimeout :: Int -> [Match b] -> Process (Maybe b)
receiveTimeout t ms = do
queue <- processQueue <$> ask
let blockSpec = if t == 0 then NonBlocking else Timeout t
mProc <- liftIO $ dequeue queue blockSpec (map unMatch ms)
case mProc of
Nothing -> return Nothing
Just proc -> Just <$> proc
matchChan :: ReceivePort a -> (a -> Process b) -> Match b
matchChan p fn = Match $ MatchChan (fmap fn (receiveSTM p))
matchSTM :: STM a -> (a -> Process b) -> Match b
matchSTM stm fn = Match $ MatchChan (fmap fn stm)
match :: forall a b. Serializable a => (a -> Process b) -> Match b
match = matchIf (const True)
matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match b
matchIf c p = Match $ MatchMsg $ \msg ->
case messageFingerprint msg == fingerprint (undefined :: a) of
False -> Nothing
True -> case msg of
(UnencodedMessage _ m) ->
let m' = unsafeCoerce m :: a in
case (c m') of
True -> Just (p m')
False -> Nothing
(EncodedMessage _ _) ->
if (c decoded) then Just (p decoded) else Nothing
where
decoded :: a
!decoded = decode (messageEncoding msg)
matchMessage :: (Message -> Process Message) -> Match Message
matchMessage p = Match $ MatchMsg $ \msg -> Just (p msg)
matchMessageIf :: (Message -> Bool) -> (Message -> Process Message) -> Match Message
matchMessageIf c p = Match $ MatchMsg $ \msg ->
case (c msg) of
True -> Just (p msg)
False -> Nothing
forward :: Message -> ProcessId -> Process ()
forward msg them = do
proc <- ask
let node = processNode proc
us = processId proc
nid = localNodeId node
destNode = (processNodeId them) in do
case destNode == nid of
True -> sendCtrlMsg Nothing (LocalSend them msg)
False -> liftIO $ sendPayload (processNode proc)
(ProcessIdentifier (processId proc))
(ProcessIdentifier them)
NoImplicitReconnect
(messageToPayload msg)
liftIO $ traceEvent (localEventBus node)
(MxSent them us msg)
wrapMessage :: Serializable a => a -> Message
wrapMessage = createUnencodedMessage
unsafeWrapMessage :: Serializable a => a -> Message
unsafeWrapMessage = Unsafe.wrapMessage
unwrapMessage :: forall m a. (Monad m, Serializable a) => Message -> m (Maybe a)
unwrapMessage msg =
case messageFingerprint msg == fingerprint (undefined :: a) of
False -> return Nothing :: m (Maybe a)
True -> case msg of
(UnencodedMessage _ ms) ->
let ms' = unsafeCoerce ms :: a
in return (Just ms')
(EncodedMessage _ _) ->
return (Just (decoded))
where
decoded :: a
!decoded = decode (messageEncoding msg)
handleMessage :: forall m a b. (Monad m, Serializable a)
=> Message -> (a -> m b) -> m (Maybe b)
handleMessage msg proc = handleMessageIf msg (const True) proc
handleMessageIf :: forall m a b . (Monad m, Serializable a)
=> Message
-> (a -> Bool)
-> (a -> m b)
-> m (Maybe b)
handleMessageIf msg c proc = do
case messageFingerprint msg == fingerprint (undefined :: a) of
False -> return Nothing :: m (Maybe b)
True -> case msg of
(UnencodedMessage _ ms) ->
let ms' = unsafeCoerce ms :: a in
case (c ms') of
True -> do { r <- proc ms'; return (Just r) }
False -> return Nothing :: m (Maybe b)
(EncodedMessage _ _) ->
case (c decoded) of
True -> do { r <- proc (decoded :: a); return (Just r) }
False -> return Nothing :: m (Maybe b)
where
decoded :: a
!decoded = decode (messageEncoding msg)
handleMessage_ :: forall m a . (Monad m, Serializable a)
=> Message -> (a -> m ()) -> m ()
handleMessage_ msg proc = handleMessageIf_ msg (const True) proc
handleMessageIf_ :: forall m a . (Monad m, Serializable a)
=> Message
-> (a -> Bool)
-> (a -> m ())
-> m ()
handleMessageIf_ msg c proc = handleMessageIf msg c proc >> return ()
matchAny :: forall b. (Message -> Process b) -> Match b
matchAny p = Match $ MatchMsg $ \msg -> Just (p msg)
matchAnyIf :: forall a b. (Serializable a)
=> (a -> Bool)
-> (Message -> Process b)
-> Match b
matchAnyIf c p = Match $ MatchMsg $ \msg ->
case messageFingerprint msg == fingerprint (undefined :: a) of
True | check -> Just (p msg)
where
check :: Bool
!check =
case msg of
(EncodedMessage _ _) -> c decoded
(UnencodedMessage _ m') -> c (unsafeCoerce m')
decoded :: a
!decoded = decode (messageEncoding msg)
_ -> Nothing
matchUnknown :: Process b -> Match b
matchUnknown p = Match $ MatchMsg (const (Just p))
delegate :: ProcessId -> (Message -> Bool) -> Process ()
delegate pid p = do
receiveWait [
matchAny (\m -> case (p m) of
True -> forward m pid
False -> return ())
]
delegate pid p
relay :: ProcessId -> Process ()
relay !pid = receiveWait [ matchAny (\m -> forward m pid) ] >> relay pid
proxy :: Serializable a => ProcessId -> (a -> Process Bool) -> Process ()
proxy pid proc = do
receiveWait [
matchAny (\m -> do
next <- handleMessage m proc
case next of
Just True -> forward m pid
Just False -> return ()
Nothing -> return ())
]
proxy pid proc
data ProcessTerminationException = ProcessTerminationException
deriving (Show, Typeable)
instance Exception ProcessTerminationException
terminate :: Process a
terminate = liftIO $ throwIO ProcessTerminationException
die :: Serializable a => a -> Process b
die reason = do
pid <- getSelfPid
liftIO $ throwIO (ProcessExitException pid (createMessage reason))
kill :: ProcessId -> String -> Process ()
kill them reason = sendCtrlMsg Nothing (Kill them reason)
exit :: Serializable a => ProcessId -> a -> Process ()
exit them reason = sendCtrlMsg Nothing (Exit them (createMessage reason))
catchExit :: forall a b . (Show a, Serializable a)
=> Process b
-> (ProcessId -> a -> Process b)
-> Process b
catchExit act exitHandler = catch act handleExit
where
handleExit ex@(ProcessExitException from msg) =
if messageFingerprint msg == fingerprint (undefined :: a)
then exitHandler from decoded
else liftIO $ throwIO ex
where
decoded :: a
!decoded = decode (messageEncoding msg)
catchesExit :: Process b
-> [(ProcessId -> Message -> (Process (Maybe b)))]
-> Process b
catchesExit act handlers = catch act ((flip handleExit) handlers)
where
handleExit :: ProcessExitException
-> [(ProcessId -> Message -> Process (Maybe b))]
-> Process b
handleExit ex [] = liftIO $ throwIO ex
handleExit ex@(ProcessExitException from msg) (h:hs) = do
r <- h from msg
case r of
Nothing -> handleExit ex hs
Just p -> return p
getSelfPid :: Process ProcessId
getSelfPid = processId <$> ask
getSelfNode :: Process NodeId
getSelfNode = localNodeId . processNode <$> ask
getNodeStats :: NodeId -> Process (Either DiedReason NodeStats)
getNodeStats nid = do
selfNode <- getSelfNode
if nid == selfNode
then Right `fmap` getLocalNodeStats
else getNodeStatsRemote
where
getNodeStatsRemote :: Process (Either DiedReason NodeStats)
getNodeStatsRemote = do
sendCtrlMsg (Just nid) $ GetNodeStats nid
bracket (monitorNode nid) unmonitor $ \mRef ->
receiveWait [ match (\(stats :: NodeStats) -> return $ Right stats)
, matchIf (\(NodeMonitorNotification ref _ _) -> ref == mRef)
(\(NodeMonitorNotification _ _ dr) -> return $ Left dr)
]
getLocalNodeStats :: Process NodeStats
getLocalNodeStats = do
self <- getSelfNode
sendCtrlMsg Nothing $ GetNodeStats self
receiveWait [ match (\(stats :: NodeStats) -> return stats) ]
getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo)
getProcessInfo pid =
let them = processNodeId pid in do
us <- getSelfNode
dest <- mkNode them us
sendCtrlMsg dest $ GetInfo pid
receiveWait [
match (\(p :: ProcessInfo) -> return $ Just p)
, match (\(_ :: ProcessInfoNone) -> return Nothing)
]
where mkNode :: NodeId -> NodeId -> Process (Maybe NodeId)
mkNode them us = case them == us of
True -> return Nothing
_ -> return $ Just them
link :: ProcessId -> Process ()
link = sendCtrlMsg Nothing . Link . ProcessIdentifier
monitor :: ProcessId -> Process MonitorRef
monitor = monitor' . ProcessIdentifier
withMonitor :: ProcessId -> Process a -> Process a
withMonitor pid code = bracket (monitor pid) unmonitor (\_ -> code)
unlink :: ProcessId -> Process ()
unlink pid = do
unlinkAsync pid
receiveWait [ matchIf (\(DidUnlinkProcess pid') -> pid' == pid)
(\_ -> return ())
]
unlinkNode :: NodeId -> Process ()
unlinkNode nid = do
unlinkNodeAsync nid
receiveWait [ matchIf (\(DidUnlinkNode nid') -> nid' == nid)
(\_ -> return ())
]
unlinkPort :: SendPort a -> Process ()
unlinkPort sport = do
unlinkPortAsync sport
receiveWait [ matchIf (\(DidUnlinkPort cid) -> cid == sendPortId sport)
(\_ -> return ())
]
unmonitor :: MonitorRef -> Process ()
unmonitor ref = do
unmonitorAsync ref
receiveWait [ matchIf (\(DidUnmonitor ref') -> ref' == ref)
(\_ -> return ())
]
catch :: Exception e => Process a -> (e -> Process a) -> Process a
catch p h = do
lproc <- ask
liftIO $ Ex.catch (runLocalProcess lproc p) (runLocalProcess lproc . h)
try :: Exception e => Process a -> Process (Either e a)
try p = do
lproc <- ask
liftIO $ Ex.try (runLocalProcess lproc p)
mask :: ((forall a. Process a -> Process a) -> Process b) -> Process b
mask p = do
lproc <- ask
liftIO $ Ex.mask $ \restore ->
runLocalProcess lproc (p (liftRestore restore))
where
liftRestore :: (forall a. IO a -> IO a)
-> (forall a. Process a -> Process a)
liftRestore restoreIO = \p2 -> do
ourLocalProc <- ask
liftIO $ restoreIO $ runLocalProcess ourLocalProc p2
mask_ :: Process a -> Process a
mask_ p = do
lproc <- ask
liftIO $ Ex.mask_ $ runLocalProcess lproc p
onException :: Process a -> Process b -> Process a
onException p what = p `catch` \e -> do _ <- what
liftIO $ throwIO (e :: SomeException)
bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c
bracket before after thing =
mask $ \restore -> do
a <- before
r <- restore (thing a) `onException` after a
_ <- after a
return r
bracket_ :: Process a -> Process b -> Process c -> Process c
bracket_ before after thing = bracket before (const after) (const thing)
finally :: Process a -> Process b -> Process a
finally a sequel = bracket_ (return ()) sequel a
data Handler a = forall e . Exception e => Handler (e -> Process a)
instance Functor Handler where
fmap f (Handler h) = Handler (fmap f . h)
catches :: Process a -> [Handler a] -> Process a
catches proc handlers = proc `catch` catchesHandler handlers
catchesHandler :: [Handler a] -> SomeException -> Process a
catchesHandler handlers e = foldr tryHandler (throw e) handlers
where tryHandler (Handler handler) res
= case fromException e of
Just e' -> handler e'
Nothing -> res
expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)
expectTimeout n = receiveTimeout n [match return]
spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef
spawnAsync nid proc = do
spawnRef <- getSpawnRef
sendCtrlMsg (Just nid) $ Spawn proc spawnRef
return spawnRef
monitorNode :: NodeId -> Process MonitorRef
monitorNode =
monitor' . NodeIdentifier
monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRef
monitorPort (SendPort cid) =
monitor' (SendPortIdentifier cid)
unmonitorAsync :: MonitorRef -> Process ()
unmonitorAsync =
sendCtrlMsg Nothing . Unmonitor
linkNode :: NodeId -> Process ()
linkNode = link' . NodeIdentifier
linkPort :: SendPort a -> Process ()
linkPort (SendPort cid) =
link' (SendPortIdentifier cid)
unlinkAsync :: ProcessId -> Process ()
unlinkAsync =
sendCtrlMsg Nothing . Unlink . ProcessIdentifier
unlinkNodeAsync :: NodeId -> Process ()
unlinkNodeAsync =
sendCtrlMsg Nothing . Unlink . NodeIdentifier
unlinkPortAsync :: SendPort a -> Process ()
unlinkPortAsync (SendPort cid) =
sendCtrlMsg Nothing . Unlink $ SendPortIdentifier cid
say :: String -> Process ()
say string = do
now <- liftIO getCurrentTime
us <- getSelfPid
nsend "logger" (formatTime defaultTimeLocale "%c" now, us, string)
register :: String -> ProcessId -> Process ()
register = registerImpl False
reregister :: String -> ProcessId -> Process ()
reregister = registerImpl True
registerImpl :: Bool -> String -> ProcessId -> Process ()
registerImpl force label pid = do
mynid <- getSelfNode
sendCtrlMsg Nothing (Register label mynid (Just pid) force)
receiveWait [ matchIf (\(RegisterReply label' _) -> label == label')
(\(RegisterReply _ ok) -> handleRegistrationReply label ok)
]
registerRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
registerRemoteAsync nid label pid =
sendCtrlMsg (Just nid) (Register label nid (Just pid) False)
reregisterRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
reregisterRemoteAsync nid label pid =
sendCtrlMsg (Just nid) (Register label nid (Just pid) True)
unregister :: String -> Process ()
unregister label = do
mynid <- getSelfNode
sendCtrlMsg Nothing (Register label mynid Nothing False)
receiveWait [ matchIf (\(RegisterReply label' _) -> label == label')
(\(RegisterReply _ ok) -> handleRegistrationReply label ok)
]
handleRegistrationReply :: String -> Bool -> Process ()
handleRegistrationReply label ok =
when (not ok) $
liftIO $ throwIO $ ProcessRegistrationException label
unregisterRemoteAsync :: NodeId -> String -> Process ()
unregisterRemoteAsync nid label =
sendCtrlMsg (Just nid) (Register label nid Nothing False)
whereis :: String -> Process (Maybe ProcessId)
whereis label = do
sendCtrlMsg Nothing (WhereIs label)
receiveWait [ matchIf (\(WhereIsReply label' _) -> label == label')
(\(WhereIsReply _ mPid) -> return mPid)
]
whereisRemoteAsync :: NodeId -> String -> Process ()
whereisRemoteAsync nid label =
sendCtrlMsg (Just nid) (WhereIs label)
nsend :: Serializable a => String -> a -> Process ()
nsend label msg =
sendCtrlMsg Nothing (NamedSend label (createMessage msg))
unsafeNSend :: Serializable a => String -> a -> Process ()
unsafeNSend = Unsafe.nsend
nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()
nsendRemote nid label msg =
sendCtrlMsg (Just nid) (NamedSend label (createMessage msg))
unStatic :: Typeable a => Static a -> Process a
unStatic static = do
rtable <- remoteTable . processNode <$> ask
case Static.unstatic rtable static of
Left err -> fail $ "Could not resolve static value: " ++ err
Right x -> return x
unClosure :: Typeable a => Closure a -> Process a
unClosure closure = do
rtable <- remoteTable . processNode <$> ask
case Static.unclosure rtable closure of
Left err -> fail $ "Could not resolve closure: " ++ err
Right x -> return x
reconnect :: ProcessId -> Process ()
reconnect them = do
us <- getSelfPid
node <- processNode <$> ask
liftIO $ disconnect node (ProcessIdentifier us) (ProcessIdentifier them)
reconnectPort :: SendPort a -> Process ()
reconnectPort them = do
us <- getSelfPid
node <- processNode <$> ask
liftIO $ disconnect node (ProcessIdentifier us) (SendPortIdentifier (sendPortId them))
sendLocal :: (Serializable a) => ProcessId -> a -> Process ()
sendLocal to msg =
sendCtrlMsg Nothing $ LocalSend to (createUnencodedMessage msg)
sendChanLocal :: (Serializable a) => SendPortId -> a -> Process ()
sendChanLocal spId msg =
sendCtrlMsg Nothing $ LocalPortSend spId (createUnencodedMessage msg)
getMonitorRefFor :: Identifier -> Process MonitorRef
getMonitorRefFor ident = do
proc <- ask
liftIO $ modifyMVar (processState proc) $ \st -> do
let counter = st ^. monitorCounter
return ( monitorCounter ^: (+ 1) $ st
, MonitorRef ident counter
)
getSpawnRef :: Process SpawnRef
getSpawnRef = do
proc <- ask
liftIO $ modifyMVar (processState proc) $ \st -> do
let counter = st ^. spawnCounter
return ( spawnCounter ^: (+ 1) $ st
, SpawnRef counter
)
monitor' :: Identifier -> Process MonitorRef
monitor' ident = do
monitorRef <- getMonitorRefFor ident
sendCtrlMsg Nothing $ Monitor monitorRef
return monitorRef
link' :: Identifier -> Process ()
link' = sendCtrlMsg Nothing . Link