{-# LANGUAGE CPP #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE BangPatterns #-}
module Control.Distributed.Process.Internal.Primitives
(
send
, usend
, expect
, newChan
, sendChan
, receiveChan
, mergePortsBiased
, mergePortsRR
, unsafeSend
, unsafeUSend
, unsafeSendChan
, unsafeNSend
, unsafeNSendRemote
, Match
, receiveWait
, receiveTimeout
, match
, matchIf
, matchUnknown
, matchAny
, matchAnyIf
, matchChan
, matchSTM
, matchMessage
, matchMessageIf
, isEncoded
, wrapMessage
, unsafeWrapMessage
, unwrapMessage
, handleMessage
, handleMessageIf
, handleMessage_
, handleMessageIf_
, forward
, uforward
, delegate
, relay
, proxy
, terminate
, ProcessTerminationException(..)
, die
, kill
, exit
, catchExit
, catchesExit
, ProcessExitException()
, getSelfPid
, getSelfNode
, ProcessInfo(..)
, getProcessInfo
, NodeStats(..)
, getNodeStats
, getLocalNodeStats
, link
, unlink
, monitor
, unmonitor
, unmonitorAsync
, withMonitor
, withMonitor_
, SayMessage(..)
, say
, register
, reregister
, unregister
, whereis
, nsend
, registerRemoteAsync
, reregisterRemoteAsync
, unregisterRemoteAsync
, whereisRemoteAsync
, nsendRemote
, unClosure
, unStatic
, catch
, Handler(..)
, catches
, try
, mask
, mask_
, onException
, bracket
, bracket_
, finally
, expectTimeout
, receiveChanTimeout
, spawnAsync
, linkNode
, linkPort
, unlinkNode
, unlinkPort
, monitorNode
, monitorPort
, reconnect
, reconnectPort
, sendCtrlMsg
) where
#if ! MIN_VERSION_base(4,6,0)
import Prelude hiding (catch)
#endif
import Data.Binary (Binary(..), Put, Get, decode)
import Data.Time.Clock (getCurrentTime, UTCTime(..))
import Data.Time.Calendar (Day(..))
import Data.Time.Format (formatTime)
#if MIN_VERSION_time(1,5,0)
import Data.Time.Format (defaultTimeLocale)
#else
import System.Locale (defaultTimeLocale)
#endif
import System.Timeout (timeout)
import Control.Monad (when, void)
import Control.Monad.Reader (ask)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Catch
( Exception
, SomeException
, throwM
, fromException
)
import qualified Control.Monad.Catch as Catch
import Control.Applicative
import Control.Distributed.Process.Internal.StrictMVar
( StrictMVar
, modifyMVar
, modifyMVar_
)
import Control.Concurrent.STM
( STM
, TVar
, atomically
, orElse
, newTVar
, readTVar
, writeTVar
)
import Control.Distributed.Process.Internal.CQueue
( dequeue
, BlockSpec(..)
, MatchOn(..)
)
import Control.Distributed.Process.Serializable (Serializable, fingerprint)
import Data.Accessor ((^.), (^:), (^=))
import Control.Distributed.Static
( Static
, Closure
)
import Data.Rank1Typeable (Typeable)
import qualified Control.Distributed.Static as Static (unstatic, unclosure)
import qualified Control.Distributed.Process.UnsafePrimitives as Unsafe
import Control.Distributed.Process.Internal.Types
( NodeId(..)
, ProcessId(..)
, LocalNode(..)
, LocalProcess(..)
, Process(..)
, Message(..)
, MonitorRef(..)
, SpawnRef(..)
, ProcessSignal(..)
, NodeMonitorNotification(..)
, ProcessMonitorNotification(..)
, monitorCounter
, spawnCounter
, SendPort(..)
, ReceivePort(..)
, channelCounter
, typedChannelWithId
, TypedChannel(..)
, SendPortId(..)
, Identifier(..)
, ProcessExitException(..)
, DiedReason(..)
, DidUnmonitor(..)
, DidUnlinkProcess(..)
, DidUnlinkNode(..)
, DidUnlinkPort(..)
, WhereIsReply(..)
, RegisterReply(..)
, ProcessRegistrationException(..)
, ProcessInfo(..)
, ProcessInfoNone(..)
, NodeStats(..)
, isEncoded
, createMessage
, createUnencodedMessage
, ImplicitReconnect( NoImplicitReconnect)
, LocalProcessState
, LocalSendPortId
, messageToPayload
)
import Control.Distributed.Process.Internal.Messaging
( sendMessage
, sendBinary
, sendPayload
, disconnect
, sendCtrlMsg
)
import Control.Distributed.Process.Management.Internal.Types
( MxEvent(..)
)
import Control.Distributed.Process.Management.Internal.Trace.Types
( traceEvent
)
import Control.Distributed.Process.Internal.WeakTQueue
( newTQueueIO
, readTQueue
, mkWeakTQueue
)
import Prelude
import Unsafe.Coerce
send :: Serializable a => ProcessId -> a -> Process ()
send :: forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
them a
msg = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
let us :: ProcessId
us = LocalProcess -> ProcessId
processId LocalProcess
proc
node :: LocalNode
node = LocalProcess -> LocalNode
processNode LocalProcess
proc
nodeId :: NodeId
nodeId = LocalNode -> NodeId
localNodeId LocalNode
node
destNode :: NodeId
destNode = (ProcessId -> NodeId
processNodeId ProcessId
them)
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node)
(ProcessId -> ProcessId -> Message -> MxEvent
MxSent ProcessId
them ProcessId
us (a -> Message
forall a. Serializable a => a -> Message
createUnencodedMessage a
msg))
if NodeId
destNode NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
nodeId
then ProcessId -> a -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
sendLocal ProcessId
them a
msg
else IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
forall a.
Serializable a =>
LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
sendMessage (LocalProcess -> LocalNode
processNode LocalProcess
proc)
(ProcessId -> Identifier
ProcessIdentifier (LocalProcess -> ProcessId
processId LocalProcess
proc))
(ProcessId -> Identifier
ProcessIdentifier ProcessId
them)
ImplicitReconnect
NoImplicitReconnect
a
msg
unsafeSend :: Serializable a => ProcessId -> a -> Process ()
unsafeSend :: forall a. Serializable a => ProcessId -> a -> Process ()
unsafeSend = ProcessId -> a -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
Unsafe.send
usend :: Serializable a => ProcessId -> a -> Process ()
usend :: forall a. Serializable a => ProcessId -> a -> Process ()
usend ProcessId
them a
msg = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
let there :: NodeId
there = ProcessId -> NodeId
processNodeId ProcessId
them
let (ProcessId
us, LocalNode
node) = (LocalProcess -> ProcessId
processId LocalProcess
proc, LocalProcess -> LocalNode
processNode LocalProcess
proc)
let msg' :: Message
msg' = a -> Message
forall a. Serializable a => a -> Message
wrapMessage a
msg
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node) (ProcessId -> ProcessId -> Message -> MxEvent
MxSent ProcessId
them ProcessId
us Message
msg')
if LocalNode -> NodeId
localNodeId (LocalProcess -> LocalNode
processNode LocalProcess
proc) NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
there
then ProcessId -> a -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
sendLocal ProcessId
them a
msg
else Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
there) (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalProcessId -> Message -> ProcessSignal
UnreliableSend (ProcessId -> LocalProcessId
processLocalId ProcessId
them)
(a -> Message
forall a. Serializable a => a -> Message
createMessage a
msg)
unsafeUSend :: Serializable a => ProcessId -> a -> Process ()
unsafeUSend :: forall a. Serializable a => ProcessId -> a -> Process ()
unsafeUSend = ProcessId -> a -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
Unsafe.usend
expect :: forall a. Serializable a => Process a
expect :: forall a. Serializable a => Process a
expect = [Match a] -> Process a
forall b. [Match b] -> Process b
receiveWait [(a -> Process a) -> Match a
forall a b. Serializable a => (a -> Process b) -> Match b
match a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return]
newChan :: Serializable a => Process (SendPort a, ReceivePort a)
newChan :: forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
IO (SendPort a, ReceivePort a)
-> Process (SendPort a, ReceivePort a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SendPort a, ReceivePort a)
-> Process (SendPort a, ReceivePort a))
-> ((LocalProcessState
-> IO (LocalProcessState, (SendPort a, ReceivePort a)))
-> IO (SendPort a, ReceivePort a))
-> (LocalProcessState
-> IO (LocalProcessState, (SendPort a, ReceivePort a)))
-> Process (SendPort a, ReceivePort a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StrictMVar LocalProcessState
-> (LocalProcessState
-> IO (LocalProcessState, (SendPort a, ReceivePort a)))
-> IO (SendPort a, ReceivePort a)
forall a b. StrictMVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (LocalProcess -> StrictMVar LocalProcessState
processState LocalProcess
proc) ((LocalProcessState
-> IO (LocalProcessState, (SendPort a, ReceivePort a)))
-> Process (SendPort a, ReceivePort a))
-> (LocalProcessState
-> IO (LocalProcessState, (SendPort a, ReceivePort a)))
-> Process (SendPort a, ReceivePort a)
forall a b. (a -> b) -> a -> b
$ \LocalProcessState
st -> do
let lcid :: Int32
lcid = LocalProcessState
st LocalProcessState -> T LocalProcessState Int32 -> Int32
forall r a. r -> T r a -> a
^. T LocalProcessState Int32
channelCounter
let cid :: SendPortId
cid = SendPortId { sendPortProcessId :: ProcessId
sendPortProcessId = LocalProcess -> ProcessId
processId LocalProcess
proc
, sendPortLocalId :: Int32
sendPortLocalId = Int32
lcid
}
let sport :: SendPort a
sport = SendPortId -> SendPort a
forall a. SendPortId -> SendPort a
SendPort SendPortId
cid
TQueue a
chan <- IO (TQueue a) -> IO (TQueue a)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TQueue a)
forall a. IO (TQueue a)
newTQueueIO
Weak (TQueue a)
chan' <- TQueue a -> IO () -> IO (Weak (TQueue a))
forall a. TQueue a -> IO () -> IO (Weak (TQueue a))
mkWeakTQueue TQueue a
chan (IO () -> IO (Weak (TQueue a))) -> IO () -> IO (Weak (TQueue a))
forall a b. (a -> b) -> a -> b
$ StrictMVar LocalProcessState -> Int32 -> IO ()
finalizer (LocalProcess -> StrictMVar LocalProcessState
processState LocalProcess
proc) Int32
lcid
let rport :: ReceivePort a
rport = STM a -> ReceivePort a
forall a. STM a -> ReceivePort a
ReceivePort (STM a -> ReceivePort a) -> STM a -> ReceivePort a
forall a b. (a -> b) -> a -> b
$ TQueue a -> STM a
forall a. TQueue a -> STM a
readTQueue TQueue a
chan
let tch :: TypedChannel
tch = Weak (TQueue a) -> TypedChannel
forall a. Serializable a => Weak (TQueue a) -> TypedChannel
TypedChannel Weak (TQueue a)
chan'
(LocalProcessState, (SendPort a, ReceivePort a))
-> IO (LocalProcessState, (SendPort a, ReceivePort a))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ( (T LocalProcessState Int32
channelCounter T LocalProcessState Int32
-> (Int32 -> Int32) -> LocalProcessState -> LocalProcessState
forall r a. T r a -> (a -> a) -> r -> r
^: (Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
1))
(LocalProcessState -> LocalProcessState)
-> (LocalProcessState -> LocalProcessState)
-> LocalProcessState
-> LocalProcessState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int32 -> Accessor LocalProcessState (Maybe TypedChannel)
typedChannelWithId Int32
lcid Accessor LocalProcessState (Maybe TypedChannel)
-> Maybe TypedChannel -> LocalProcessState -> LocalProcessState
forall r a. T r a -> a -> r -> r
^= TypedChannel -> Maybe TypedChannel
forall a. a -> Maybe a
Just TypedChannel
tch)
(LocalProcessState -> LocalProcessState)
-> LocalProcessState -> LocalProcessState
forall a b. (a -> b) -> a -> b
$ LocalProcessState
st
, (SendPort a
forall {a}. SendPort a
sport, ReceivePort a
rport)
)
where
finalizer :: StrictMVar LocalProcessState -> LocalSendPortId -> IO ()
finalizer :: StrictMVar LocalProcessState -> Int32 -> IO ()
finalizer StrictMVar LocalProcessState
st Int32
lcid = StrictMVar LocalProcessState
-> (LocalProcessState -> IO LocalProcessState) -> IO ()
forall a. StrictMVar a -> (a -> IO a) -> IO ()
modifyMVar_ StrictMVar LocalProcessState
st ((LocalProcessState -> IO LocalProcessState) -> IO ())
-> (LocalProcessState -> IO LocalProcessState) -> IO ()
forall a b. (a -> b) -> a -> b
$
LocalProcessState -> IO LocalProcessState
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (LocalProcessState -> IO LocalProcessState)
-> (LocalProcessState -> LocalProcessState)
-> LocalProcessState
-> IO LocalProcessState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int32 -> Accessor LocalProcessState (Maybe TypedChannel)
typedChannelWithId Int32
lcid Accessor LocalProcessState (Maybe TypedChannel)
-> Maybe TypedChannel -> LocalProcessState -> LocalProcessState
forall r a. T r a -> a -> r -> r
^= Maybe TypedChannel
forall a. Maybe a
Nothing)
sendChan :: Serializable a => SendPort a -> a -> Process ()
sendChan :: forall a. Serializable a => SendPort a -> a -> Process ()
sendChan (SendPort SendPortId
cid) a
msg = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
let node :: LocalNode
node = LocalProcess -> LocalNode
processNode LocalProcess
proc
pid :: ProcessId
pid = LocalProcess -> ProcessId
processId LocalProcess
proc
us :: NodeId
us = LocalNode -> NodeId
localNodeId LocalNode
node
them :: NodeId
them = ProcessId -> NodeId
processNodeId (SendPortId -> ProcessId
sendPortProcessId SendPortId
cid)
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node) (ProcessId -> SendPortId -> Message -> MxEvent
MxSentToPort ProcessId
pid SendPortId
cid (Message -> MxEvent) -> Message -> MxEvent
forall a b. (a -> b) -> a -> b
$ a -> Message
forall a. Serializable a => a -> Message
wrapMessage a
msg)
case NodeId
them NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
us of
Bool
True -> SendPortId -> a -> Process ()
forall a. Serializable a => SendPortId -> a -> Process ()
sendChanLocal SendPortId
cid a
msg
Bool
False -> do
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
forall a.
Binary a =>
LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
sendBinary LocalNode
node
(ProcessId -> Identifier
ProcessIdentifier ProcessId
pid)
(SendPortId -> Identifier
SendPortIdentifier SendPortId
cid)
ImplicitReconnect
NoImplicitReconnect
a
msg
unsafeSendChan :: Serializable a => SendPort a -> a -> Process ()
unsafeSendChan :: forall a. Serializable a => SendPort a -> a -> Process ()
unsafeSendChan = SendPort a -> a -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
Unsafe.sendChan
receiveChan :: Serializable a => ReceivePort a -> Process a
receiveChan :: forall a. Serializable a => ReceivePort a -> Process a
receiveChan = IO a -> Process a
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> Process a)
-> (ReceivePort a -> IO a) -> ReceivePort a -> Process a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> IO a
forall a. STM a -> IO a
atomically (STM a -> IO a)
-> (ReceivePort a -> STM a) -> ReceivePort a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ReceivePort a -> STM a
forall a. ReceivePort a -> STM a
receiveSTM
receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout :: forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
0 ReceivePort a
ch = IO (Maybe a) -> Process (Maybe a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe a) -> Process (Maybe a))
-> (STM (Maybe a) -> IO (Maybe a))
-> STM (Maybe a)
-> Process (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (STM (Maybe a) -> Process (Maybe a))
-> STM (Maybe a) -> Process (Maybe a)
forall a b. (a -> b) -> a -> b
$
(a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> STM a -> STM (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ReceivePort a -> STM a
forall a. ReceivePort a -> STM a
receiveSTM ReceivePort a
ch) STM (Maybe a) -> STM (Maybe a) -> STM (Maybe a)
forall a. STM a -> STM a -> STM a
`orElse` Maybe a -> STM (Maybe a)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
receiveChanTimeout Int
n ReceivePort a
ch = IO (Maybe a) -> Process (Maybe a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe a) -> Process (Maybe a))
-> (STM a -> IO (Maybe a)) -> STM a -> Process (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO a -> IO (Maybe a)
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
n (IO a -> IO (Maybe a)) -> (STM a -> IO a) -> STM a -> IO (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> IO a
forall a. STM a -> IO a
atomically (STM a -> Process (Maybe a)) -> STM a -> Process (Maybe a)
forall a b. (a -> b) -> a -> b
$
ReceivePort a -> STM a
forall a. ReceivePort a -> STM a
receiveSTM ReceivePort a
ch
mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsBiased :: forall a.
Serializable a =>
[ReceivePort a] -> Process (ReceivePort a)
mergePortsBiased = ReceivePort a -> Process (ReceivePort a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ReceivePort a -> Process (ReceivePort a))
-> ([ReceivePort a] -> ReceivePort a)
-> [ReceivePort a]
-> Process (ReceivePort a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> ReceivePort a
forall a. STM a -> ReceivePort a
ReceivePort(STM a -> ReceivePort a)
-> ([ReceivePort a] -> STM a) -> [ReceivePort a] -> ReceivePort a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (STM a -> STM a -> STM a) -> [STM a] -> STM a
forall a. (a -> a -> a) -> [a] -> a
forall (t :: * -> *) a. Foldable t => (a -> a -> a) -> t a -> a
foldr1 STM a -> STM a -> STM a
forall a. STM a -> STM a -> STM a
orElse ([STM a] -> STM a)
-> ([ReceivePort a] -> [STM a]) -> [ReceivePort a] -> STM a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ReceivePort a -> STM a) -> [ReceivePort a] -> [STM a]
forall a b. (a -> b) -> [a] -> [b]
map ReceivePort a -> STM a
forall a. ReceivePort a -> STM a
receiveSTM
mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsRR :: forall a.
Serializable a =>
[ReceivePort a] -> Process (ReceivePort a)
mergePortsRR = \[ReceivePort a]
ps -> do
TVar [STM a]
psVar <- IO (TVar [STM a]) -> Process (TVar [STM a])
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TVar [STM a]) -> Process (TVar [STM a]))
-> (STM (TVar [STM a]) -> IO (TVar [STM a]))
-> STM (TVar [STM a])
-> Process (TVar [STM a])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (TVar [STM a]) -> IO (TVar [STM a])
forall a. STM a -> IO a
atomically (STM (TVar [STM a]) -> Process (TVar [STM a]))
-> STM (TVar [STM a]) -> Process (TVar [STM a])
forall a b. (a -> b) -> a -> b
$ [STM a] -> STM (TVar [STM a])
forall a. a -> STM (TVar a)
newTVar ((ReceivePort a -> STM a) -> [ReceivePort a] -> [STM a]
forall a b. (a -> b) -> [a] -> [b]
map ReceivePort a -> STM a
forall a. ReceivePort a -> STM a
receiveSTM [ReceivePort a]
ps)
ReceivePort a -> Process (ReceivePort a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ReceivePort a -> Process (ReceivePort a))
-> ReceivePort a -> Process (ReceivePort a)
forall a b. (a -> b) -> a -> b
$ STM a -> ReceivePort a
forall a. STM a -> ReceivePort a
ReceivePort (TVar [STM a] -> STM a
forall a. TVar [STM a] -> STM a
rr TVar [STM a]
psVar)
where
rotate :: [a] -> [a]
rotate :: forall a. [a] -> [a]
rotate [] = []
rotate (a
x:[a]
xs) = [a]
xs [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++ [a
x]
rr :: TVar [STM a] -> STM a
rr :: forall a. TVar [STM a] -> STM a
rr TVar [STM a]
psVar = do
[STM a]
ps <- TVar [STM a] -> STM [STM a]
forall a. TVar a -> STM a
readTVar TVar [STM a]
psVar
a
a <- (STM a -> STM a -> STM a) -> [STM a] -> STM a
forall a. (a -> a -> a) -> [a] -> a
forall (t :: * -> *) a. Foldable t => (a -> a -> a) -> t a -> a
foldr1 STM a -> STM a -> STM a
forall a. STM a -> STM a -> STM a
orElse [STM a]
ps
TVar [STM a] -> [STM a] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [STM a]
psVar ([STM a] -> [STM a]
forall a. [a] -> [a]
rotate [STM a]
ps)
a -> STM a
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
newtype Match b = Match { forall b. Match b -> MatchOn Message (Process b)
unMatch :: MatchOn Message (Process b) }
deriving ((forall a b. (a -> b) -> Match a -> Match b)
-> (forall a b. a -> Match b -> Match a) -> Functor Match
forall a b. a -> Match b -> Match a
forall a b. (a -> b) -> Match a -> Match b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall a b. (a -> b) -> Match a -> Match b
fmap :: forall a b. (a -> b) -> Match a -> Match b
$c<$ :: forall a b. a -> Match b -> Match a
<$ :: forall a b. a -> Match b -> Match a
Functor)
receiveWait :: [Match b] -> Process b
receiveWait :: forall b. [Match b] -> Process b
receiveWait [Match b]
ms = do
CQueue Message
queue <- LocalProcess -> CQueue Message
processQueue (LocalProcess -> CQueue Message)
-> Process LocalProcess -> Process (CQueue Message)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
Maybe (Process b)
mProc <- IO (Maybe (Process b)) -> Process (Maybe (Process b))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Process b)) -> Process (Maybe (Process b)))
-> IO (Maybe (Process b)) -> Process (Maybe (Process b))
forall a b. (a -> b) -> a -> b
$ CQueue Message
-> BlockSpec
-> [MatchOn Message (Process b)]
-> IO (Maybe (Process b))
forall m a. CQueue m -> BlockSpec -> [MatchOn m a] -> IO (Maybe a)
dequeue CQueue Message
queue BlockSpec
Blocking ((Match b -> MatchOn Message (Process b))
-> [Match b] -> [MatchOn Message (Process b)]
forall a b. (a -> b) -> [a] -> [b]
map Match b -> MatchOn Message (Process b)
forall b. Match b -> MatchOn Message (Process b)
unMatch [Match b]
ms)
case Maybe (Process b)
mProc of
Just Process b
proc' -> Process b
proc'
Maybe (Process b)
Nothing -> String -> Process b
forall a b. Serializable a => a -> Process b
die (String -> Process b) -> String -> Process b
forall a b. (a -> b) -> a -> b
$ String
"System Invariant Violation: CQueue.hs returned `Nothing` "
String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"in the absence of a timeout value."
receiveTimeout :: Int -> [Match b] -> Process (Maybe b)
receiveTimeout :: forall b. Int -> [Match b] -> Process (Maybe b)
receiveTimeout Int
t [Match b]
ms = do
CQueue Message
queue <- LocalProcess -> CQueue Message
processQueue (LocalProcess -> CQueue Message)
-> Process LocalProcess -> Process (CQueue Message)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
let blockSpec :: BlockSpec
blockSpec = if Int
t Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 then BlockSpec
NonBlocking else Int -> BlockSpec
Timeout Int
t
Maybe (Process b)
mProc <- IO (Maybe (Process b)) -> Process (Maybe (Process b))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Process b)) -> Process (Maybe (Process b)))
-> IO (Maybe (Process b)) -> Process (Maybe (Process b))
forall a b. (a -> b) -> a -> b
$ CQueue Message
-> BlockSpec
-> [MatchOn Message (Process b)]
-> IO (Maybe (Process b))
forall m a. CQueue m -> BlockSpec -> [MatchOn m a] -> IO (Maybe a)
dequeue CQueue Message
queue BlockSpec
blockSpec ((Match b -> MatchOn Message (Process b))
-> [Match b] -> [MatchOn Message (Process b)]
forall a b. (a -> b) -> [a] -> [b]
map Match b -> MatchOn Message (Process b)
forall b. Match b -> MatchOn Message (Process b)
unMatch [Match b]
ms)
case Maybe (Process b)
mProc of
Maybe (Process b)
Nothing -> Maybe b -> Process (Maybe b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
Just Process b
proc -> b -> Maybe b
forall a. a -> Maybe a
Just (b -> Maybe b) -> Process b -> Process (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process b
proc
matchChan :: ReceivePort a -> (a -> Process b) -> Match b
matchChan :: forall a b. ReceivePort a -> (a -> Process b) -> Match b
matchChan ReceivePort a
p a -> Process b
fn = MatchOn Message (Process b) -> Match b
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process b) -> Match b)
-> MatchOn Message (Process b) -> Match b
forall a b. (a -> b) -> a -> b
$ STM (Process b) -> MatchOn Message (Process b)
forall m a. STM a -> MatchOn m a
MatchChan ((a -> Process b) -> STM a -> STM (Process b)
forall a b. (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Process b
fn (ReceivePort a -> STM a
forall a. ReceivePort a -> STM a
receiveSTM ReceivePort a
p))
matchSTM :: STM a -> (a -> Process b) -> Match b
matchSTM :: forall a b. STM a -> (a -> Process b) -> Match b
matchSTM STM a
stm a -> Process b
fn = MatchOn Message (Process b) -> Match b
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process b) -> Match b)
-> MatchOn Message (Process b) -> Match b
forall a b. (a -> b) -> a -> b
$ STM (Process b) -> MatchOn Message (Process b)
forall m a. STM a -> MatchOn m a
MatchChan ((a -> Process b) -> STM a -> STM (Process b)
forall a b. (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Process b
fn STM a
stm)
match :: forall a b. Serializable a => (a -> Process b) -> Match b
match :: forall a b. Serializable a => (a -> Process b) -> Match b
match = (a -> Bool) -> (a -> Process b) -> Match b
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (Bool -> a -> Bool
forall a b. a -> b -> a
const Bool
True)
matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match b
matchIf :: forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf a -> Bool
c a -> Process b
p = MatchOn Message (Process b) -> Match b
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process b) -> Match b)
-> MatchOn Message (Process b) -> Match b
forall a b. (a -> b) -> a -> b
$ (Message -> Maybe (Process b)) -> MatchOn Message (Process b)
forall m a. (m -> Maybe a) -> MatchOn m a
MatchMsg ((Message -> Maybe (Process b)) -> MatchOn Message (Process b))
-> (Message -> Maybe (Process b)) -> MatchOn Message (Process b)
forall a b. (a -> b) -> a -> b
$ \Message
msg ->
case Message -> Fingerprint
messageFingerprint Message
msg Fingerprint -> Fingerprint -> Bool
forall a. Eq a => a -> a -> Bool
== a -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint (a
forall a. HasCallStack => a
undefined :: a) of
Bool
False -> Maybe (Process b)
forall a. Maybe a
Nothing
Bool
True -> case Message
msg of
(UnencodedMessage Fingerprint
_ a
m) ->
let m' :: a
m' = a -> a
forall a b. a -> b
unsafeCoerce a
m :: a in
case (a -> Bool
c a
m') of
Bool
True -> Process b -> Maybe (Process b)
forall a. a -> Maybe a
Just (a -> Process b
p a
m')
Bool
False -> Maybe (Process b)
forall a. Maybe a
Nothing
(EncodedMessage Fingerprint
_ ByteString
_) ->
if (a -> Bool
c a
decoded) then Process b -> Maybe (Process b)
forall a. a -> Maybe a
Just (a -> Process b
p a
decoded) else Maybe (Process b)
forall a. Maybe a
Nothing
where
decoded :: a
!decoded :: a
decoded = ByteString -> a
forall a. Binary a => ByteString -> a
decode (Message -> ByteString
messageEncoding Message
msg)
matchMessage :: (Message -> Process Message) -> Match Message
matchMessage :: (Message -> Process Message) -> Match Message
matchMessage Message -> Process Message
p = MatchOn Message (Process Message) -> Match Message
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process Message) -> Match Message)
-> MatchOn Message (Process Message) -> Match Message
forall a b. (a -> b) -> a -> b
$ (Message -> Maybe (Process Message))
-> MatchOn Message (Process Message)
forall m a. (m -> Maybe a) -> MatchOn m a
MatchMsg ((Message -> Maybe (Process Message))
-> MatchOn Message (Process Message))
-> (Message -> Maybe (Process Message))
-> MatchOn Message (Process Message)
forall a b. (a -> b) -> a -> b
$ \Message
msg -> Process Message -> Maybe (Process Message)
forall a. a -> Maybe a
Just (Message -> Process Message
p Message
msg)
matchMessageIf :: (Message -> Bool) -> (Message -> Process Message) -> Match Message
matchMessageIf :: (Message -> Bool) -> (Message -> Process Message) -> Match Message
matchMessageIf Message -> Bool
c Message -> Process Message
p = MatchOn Message (Process Message) -> Match Message
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process Message) -> Match Message)
-> MatchOn Message (Process Message) -> Match Message
forall a b. (a -> b) -> a -> b
$ (Message -> Maybe (Process Message))
-> MatchOn Message (Process Message)
forall m a. (m -> Maybe a) -> MatchOn m a
MatchMsg ((Message -> Maybe (Process Message))
-> MatchOn Message (Process Message))
-> (Message -> Maybe (Process Message))
-> MatchOn Message (Process Message)
forall a b. (a -> b) -> a -> b
$ \Message
msg ->
case (Message -> Bool
c Message
msg) of
Bool
True -> Process Message -> Maybe (Process Message)
forall a. a -> Maybe a
Just (Message -> Process Message
p Message
msg)
Bool
False -> Maybe (Process Message)
forall a. Maybe a
Nothing
forward :: Message -> ProcessId -> Process ()
forward :: Message -> ProcessId -> Process ()
forward Message
msg ProcessId
them = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
let node :: LocalNode
node = LocalProcess -> LocalNode
processNode LocalProcess
proc
us :: ProcessId
us = LocalProcess -> ProcessId
processId LocalProcess
proc
nid :: NodeId
nid = LocalNode -> NodeId
localNodeId LocalNode
node
destNode :: NodeId
destNode = (ProcessId -> NodeId
processNodeId ProcessId
them)
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node) (ProcessId -> ProcessId -> Message -> MxEvent
MxSent ProcessId
them ProcessId
us Message
msg)
if NodeId
destNode NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
nid
then Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessId -> Message -> ProcessSignal
LocalSend ProcessId
them Message
msg)
else IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalNode
-> Identifier
-> Identifier
-> ImplicitReconnect
-> [ByteString]
-> IO ()
sendPayload (LocalProcess -> LocalNode
processNode LocalProcess
proc)
(ProcessId -> Identifier
ProcessIdentifier (LocalProcess -> ProcessId
processId LocalProcess
proc))
(ProcessId -> Identifier
ProcessIdentifier ProcessId
them)
ImplicitReconnect
NoImplicitReconnect
(Message -> [ByteString]
messageToPayload Message
msg)
uforward :: Message -> ProcessId -> Process ()
uforward :: Message -> ProcessId -> Process ()
uforward Message
msg ProcessId
them = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
let node :: LocalNode
node = LocalProcess -> LocalNode
processNode LocalProcess
proc
us :: ProcessId
us = LocalProcess -> ProcessId
processId LocalProcess
proc
nid :: NodeId
nid = LocalNode -> NodeId
localNodeId LocalNode
node
destNode :: NodeId
destNode = (ProcessId -> NodeId
processNodeId ProcessId
them)
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node) (ProcessId -> ProcessId -> Message -> MxEvent
MxSent ProcessId
them ProcessId
us Message
msg)
if NodeId
destNode NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
nid
then Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessId -> Message -> ProcessSignal
LocalSend ProcessId
them Message
msg)
else Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
destNode) (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalProcessId -> Message -> ProcessSignal
UnreliableSend (ProcessId -> LocalProcessId
processLocalId ProcessId
them) Message
msg
wrapMessage :: Serializable a => a -> Message
wrapMessage :: forall a. Serializable a => a -> Message
wrapMessage = a -> Message
forall a. Serializable a => a -> Message
createUnencodedMessage
unsafeWrapMessage :: Serializable a => a -> Message
unsafeWrapMessage :: forall a. Serializable a => a -> Message
unsafeWrapMessage = a -> Message
forall a. Serializable a => a -> Message
Unsafe.wrapMessage
unwrapMessage :: forall m a. (Monad m, Serializable a) => Message -> m (Maybe a)
unwrapMessage :: forall (m :: * -> *) a.
(Monad m, Serializable a) =>
Message -> m (Maybe a)
unwrapMessage Message
msg =
case Message -> Fingerprint
messageFingerprint Message
msg Fingerprint -> Fingerprint -> Bool
forall a. Eq a => a -> a -> Bool
== a -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint (a
forall a. HasCallStack => a
undefined :: a) of
Bool
False -> Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing :: m (Maybe a)
Bool
True -> case Message
msg of
(UnencodedMessage Fingerprint
_ a
ms) ->
let ms' :: a
ms' = a -> a
forall a b. a -> b
unsafeCoerce a
ms :: a
in Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
ms')
(EncodedMessage Fingerprint
_ ByteString
_) ->
Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just (a
decoded))
where
decoded :: a
!decoded :: a
decoded = ByteString -> a
forall a. Binary a => ByteString -> a
decode (Message -> ByteString
messageEncoding Message
msg)
handleMessage :: forall m a b. (Monad m, Serializable a)
=> Message -> (a -> m b) -> m (Maybe b)
handleMessage :: forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
msg a -> m b
proc = Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b)
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b)
handleMessageIf Message
msg (Bool -> a -> Bool
forall a b. a -> b -> a
const Bool
True) a -> m b
proc
handleMessageIf :: forall m a b . (Monad m, Serializable a)
=> Message
-> (a -> Bool)
-> (a -> m b)
-> m (Maybe b)
handleMessageIf :: forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b)
handleMessageIf Message
msg a -> Bool
c a -> m b
proc = do
case Message -> Fingerprint
messageFingerprint Message
msg Fingerprint -> Fingerprint -> Bool
forall a. Eq a => a -> a -> Bool
== a -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint (a
forall a. HasCallStack => a
undefined :: a) of
Bool
False -> Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing :: m (Maybe b)
Bool
True -> case Message
msg of
(UnencodedMessage Fingerprint
_ a
ms) ->
let ms' :: a
ms' = a -> a
forall a b. a -> b
unsafeCoerce a
ms :: a in
case (a -> Bool
c a
ms') of
Bool
True -> do { b
r <- a -> m b
proc a
ms'; Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Maybe b
forall a. a -> Maybe a
Just b
r) }
Bool
False -> Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing :: m (Maybe b)
(EncodedMessage Fingerprint
_ ByteString
_) ->
case (a -> Bool
c a
decoded) of
Bool
True -> do { b
r <- a -> m b
proc (a
decoded :: a); Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Maybe b
forall a. a -> Maybe a
Just b
r) }
Bool
False -> Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing :: m (Maybe b)
where
decoded :: a
!decoded :: a
decoded = ByteString -> a
forall a. Binary a => ByteString -> a
decode (Message -> ByteString
messageEncoding Message
msg)
handleMessage_ :: forall m a . (Monad m, Serializable a)
=> Message -> (a -> m ()) -> m ()
handleMessage_ :: forall (m :: * -> *) a.
(Monad m, Serializable a) =>
Message -> (a -> m ()) -> m ()
handleMessage_ Message
msg a -> m ()
proc = Message -> (a -> Bool) -> (a -> m ()) -> m ()
forall (m :: * -> *) a.
(Monad m, Serializable a) =>
Message -> (a -> Bool) -> (a -> m ()) -> m ()
handleMessageIf_ Message
msg (Bool -> a -> Bool
forall a b. a -> b -> a
const Bool
True) a -> m ()
proc
handleMessageIf_ :: forall m a . (Monad m, Serializable a)
=> Message
-> (a -> Bool)
-> (a -> m ())
-> m ()
handleMessageIf_ :: forall (m :: * -> *) a.
(Monad m, Serializable a) =>
Message -> (a -> Bool) -> (a -> m ()) -> m ()
handleMessageIf_ Message
msg a -> Bool
c a -> m ()
proc = Message -> (a -> Bool) -> (a -> m ()) -> m (Maybe ())
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b)
handleMessageIf Message
msg a -> Bool
c a -> m ()
proc m (Maybe ()) -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
matchAny :: forall b. (Message -> Process b) -> Match b
matchAny :: forall b. (Message -> Process b) -> Match b
matchAny Message -> Process b
p = MatchOn Message (Process b) -> Match b
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process b) -> Match b)
-> MatchOn Message (Process b) -> Match b
forall a b. (a -> b) -> a -> b
$ (Message -> Maybe (Process b)) -> MatchOn Message (Process b)
forall m a. (m -> Maybe a) -> MatchOn m a
MatchMsg ((Message -> Maybe (Process b)) -> MatchOn Message (Process b))
-> (Message -> Maybe (Process b)) -> MatchOn Message (Process b)
forall a b. (a -> b) -> a -> b
$ \Message
msg -> Process b -> Maybe (Process b)
forall a. a -> Maybe a
Just (Message -> Process b
p Message
msg)
matchAnyIf :: forall a b. (Serializable a)
=> (a -> Bool)
-> (Message -> Process b)
-> Match b
matchAnyIf :: forall a b.
Serializable a =>
(a -> Bool) -> (Message -> Process b) -> Match b
matchAnyIf a -> Bool
c Message -> Process b
p = MatchOn Message (Process b) -> Match b
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process b) -> Match b)
-> MatchOn Message (Process b) -> Match b
forall a b. (a -> b) -> a -> b
$ (Message -> Maybe (Process b)) -> MatchOn Message (Process b)
forall m a. (m -> Maybe a) -> MatchOn m a
MatchMsg ((Message -> Maybe (Process b)) -> MatchOn Message (Process b))
-> (Message -> Maybe (Process b)) -> MatchOn Message (Process b)
forall a b. (a -> b) -> a -> b
$ \Message
msg ->
case Message -> Fingerprint
messageFingerprint Message
msg Fingerprint -> Fingerprint -> Bool
forall a. Eq a => a -> a -> Bool
== a -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint (a
forall a. HasCallStack => a
undefined :: a) of
Bool
True | Bool
check -> Process b -> Maybe (Process b)
forall a. a -> Maybe a
Just (Message -> Process b
p Message
msg)
where
check :: Bool
!check :: Bool
check =
case Message
msg of
(EncodedMessage Fingerprint
_ ByteString
_) -> a -> Bool
c a
decoded
(UnencodedMessage Fingerprint
_ a
m') -> a -> Bool
c (a -> a
forall a b. a -> b
unsafeCoerce a
m')
decoded :: a
!decoded :: a
decoded = ByteString -> a
forall a. Binary a => ByteString -> a
decode (Message -> ByteString
messageEncoding Message
msg)
Bool
_ -> Maybe (Process b)
forall a. Maybe a
Nothing
matchUnknown :: Process b -> Match b
matchUnknown :: forall b. Process b -> Match b
matchUnknown Process b
p = MatchOn Message (Process b) -> Match b
forall b. MatchOn Message (Process b) -> Match b
Match (MatchOn Message (Process b) -> Match b)
-> MatchOn Message (Process b) -> Match b
forall a b. (a -> b) -> a -> b
$ (Message -> Maybe (Process b)) -> MatchOn Message (Process b)
forall m a. (m -> Maybe a) -> MatchOn m a
MatchMsg (Maybe (Process b) -> Message -> Maybe (Process b)
forall a b. a -> b -> a
const (Process b -> Maybe (Process b)
forall a. a -> Maybe a
Just Process b
p))
delegate :: ProcessId -> (Message -> Bool) -> Process ()
delegate :: ProcessId -> (Message -> Bool) -> Process ()
delegate ProcessId
pid Message -> Bool
p = do
[Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [
(Message -> Process ()) -> Match ()
forall b. (Message -> Process b) -> Match b
matchAny (\Message
m -> case (Message -> Bool
p Message
m) of
Bool
True -> Message -> ProcessId -> Process ()
forward Message
m ProcessId
pid
Bool
False -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
]
ProcessId -> (Message -> Bool) -> Process ()
delegate ProcessId
pid Message -> Bool
p
relay :: ProcessId -> Process ()
relay :: ProcessId -> Process ()
relay !ProcessId
pid = [Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [ (Message -> Process ()) -> Match ()
forall b. (Message -> Process b) -> Match b
matchAny (\Message
m -> Message -> ProcessId -> Process ()
forward Message
m ProcessId
pid) ] Process () -> Process () -> Process ()
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ProcessId -> Process ()
relay ProcessId
pid
proxy :: Serializable a => ProcessId -> (a -> Process Bool) -> Process ()
proxy :: forall a.
Serializable a =>
ProcessId -> (a -> Process Bool) -> Process ()
proxy ProcessId
pid a -> Process Bool
proc = do
[Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [
(Message -> Process ()) -> Match ()
forall b. (Message -> Process b) -> Match b
matchAny (\Message
m -> do
Maybe Bool
next <- Message -> (a -> Process Bool) -> Process (Maybe Bool)
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
m a -> Process Bool
proc
case Maybe Bool
next of
Just Bool
True -> Message -> ProcessId -> Process ()
forward Message
m ProcessId
pid
Just Bool
False -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Maybe Bool
Nothing -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
]
ProcessId -> (a -> Process Bool) -> Process ()
forall a.
Serializable a =>
ProcessId -> (a -> Process Bool) -> Process ()
proxy ProcessId
pid a -> Process Bool
proc
data ProcessTerminationException = ProcessTerminationException
deriving (Int -> ProcessTerminationException -> String -> String
[ProcessTerminationException] -> String -> String
ProcessTerminationException -> String
(Int -> ProcessTerminationException -> String -> String)
-> (ProcessTerminationException -> String)
-> ([ProcessTerminationException] -> String -> String)
-> Show ProcessTerminationException
forall a.
(Int -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
$cshowsPrec :: Int -> ProcessTerminationException -> String -> String
showsPrec :: Int -> ProcessTerminationException -> String -> String
$cshow :: ProcessTerminationException -> String
show :: ProcessTerminationException -> String
$cshowList :: [ProcessTerminationException] -> String -> String
showList :: [ProcessTerminationException] -> String -> String
Show, Typeable)
instance Exception ProcessTerminationException
terminate :: Process a
terminate :: forall a. Process a
terminate = ProcessTerminationException -> Process a
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM ProcessTerminationException
ProcessTerminationException
die :: Serializable a => a -> Process b
die :: forall a b. Serializable a => a -> Process b
die a
reason = do
ProcessId
pid <- Process ProcessId
getSelfPid
ProcessExitException -> Process b
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM (ProcessId -> Message -> ProcessExitException
ProcessExitException ProcessId
pid (a -> Message
forall a. Serializable a => a -> Message
createMessage a
reason))
kill :: ProcessId -> String -> Process ()
kill :: ProcessId -> String -> Process ()
kill ProcessId
them String
reason = Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessId -> String -> ProcessSignal
Kill ProcessId
them String
reason)
exit :: Serializable a => ProcessId -> a -> Process ()
exit :: forall a. Serializable a => ProcessId -> a -> Process ()
exit ProcessId
them a
reason = Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessId -> Message -> ProcessSignal
Exit ProcessId
them (a -> Message
forall a. Serializable a => a -> Message
createMessage a
reason))
catchExit :: forall a b . (Show a, Serializable a)
=> Process b
-> (ProcessId -> a -> Process b)
-> Process b
catchExit :: forall a b.
(Show a, Serializable a) =>
Process b -> (ProcessId -> a -> Process b) -> Process b
catchExit Process b
act ProcessId -> a -> Process b
exitHandler = Process b -> (ProcessExitException -> Process b) -> Process b
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
Catch.catch Process b
act ProcessExitException -> Process b
handleExit
where
handleExit :: ProcessExitException -> Process b
handleExit ex :: ProcessExitException
ex@(ProcessExitException ProcessId
from Message
msg) =
if Message -> Fingerprint
messageFingerprint Message
msg Fingerprint -> Fingerprint -> Bool
forall a. Eq a => a -> a -> Bool
== a -> Fingerprint
forall a. Typeable a => a -> Fingerprint
fingerprint (a
forall a. HasCallStack => a
undefined :: a)
then ProcessId -> a -> Process b
exitHandler ProcessId
from a
decoded
else ProcessExitException -> Process b
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM ProcessExitException
ex
where
decoded :: a
!decoded :: a
decoded = ByteString -> a
forall a. Binary a => ByteString -> a
decode (Message -> ByteString
messageEncoding Message
msg)
catchesExit :: Process b
-> [(ProcessId -> Message -> (Process (Maybe b)))]
-> Process b
catchesExit :: forall b.
Process b
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b
catchesExit Process b
act [ProcessId -> Message -> Process (Maybe b)]
handlers = Process b -> (ProcessExitException -> Process b) -> Process b
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
Catch.catch Process b
act (((ProcessExitException
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b)
-> [ProcessId -> Message -> Process (Maybe b)]
-> ProcessExitException
-> Process b
forall a b c. (a -> b -> c) -> b -> a -> c
flip ProcessExitException
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b
forall b.
ProcessExitException
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b
handleExit) [ProcessId -> Message -> Process (Maybe b)]
handlers)
where
handleExit :: ProcessExitException
-> [(ProcessId -> Message -> Process (Maybe b))]
-> Process b
handleExit :: forall b.
ProcessExitException
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b
handleExit ProcessExitException
ex [] = ProcessExitException -> Process b
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM ProcessExitException
ex
handleExit ex :: ProcessExitException
ex@(ProcessExitException ProcessId
from Message
msg) (ProcessId -> Message -> Process (Maybe b)
h:[ProcessId -> Message -> Process (Maybe b)]
hs) = do
Maybe b
r <- ProcessId -> Message -> Process (Maybe b)
h ProcessId
from Message
msg
case Maybe b
r of
Maybe b
Nothing -> ProcessExitException
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b
forall b.
ProcessExitException
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b
handleExit ProcessExitException
ex [ProcessId -> Message -> Process (Maybe b)]
hs
Just b
p -> b -> Process b
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return b
p
getSelfPid :: Process ProcessId
getSelfPid :: Process ProcessId
getSelfPid = LocalProcess -> ProcessId
processId (LocalProcess -> ProcessId)
-> Process LocalProcess -> Process ProcessId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
getSelfNode :: Process NodeId
getSelfNode :: Process NodeId
getSelfNode = LocalNode -> NodeId
localNodeId (LocalNode -> NodeId)
-> (LocalProcess -> LocalNode) -> LocalProcess -> NodeId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LocalProcess -> LocalNode
processNode (LocalProcess -> NodeId) -> Process LocalProcess -> Process NodeId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
getNodeStats :: NodeId -> Process (Either DiedReason NodeStats)
getNodeStats :: NodeId -> Process (Either DiedReason NodeStats)
getNodeStats NodeId
nid = do
NodeId
selfNode <- Process NodeId
getSelfNode
if NodeId
nid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
selfNode
then NodeStats -> Either DiedReason NodeStats
forall a b. b -> Either a b
Right (NodeStats -> Either DiedReason NodeStats)
-> Process NodeStats -> Process (Either DiedReason NodeStats)
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` Process NodeStats
getLocalNodeStats
else NodeId -> Process (Either DiedReason NodeStats)
getNodeStatsRemote NodeId
selfNode
where
getNodeStatsRemote :: NodeId -> Process (Either DiedReason NodeStats)
getNodeStatsRemote :: NodeId -> Process (Either DiedReason NodeStats)
getNodeStatsRemote NodeId
selfNode = do
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid) (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ NodeId -> ProcessSignal
GetNodeStats NodeId
selfNode
Process MonitorRef
-> (MonitorRef -> Process ())
-> (MonitorRef -> Process (Either DiedReason NodeStats))
-> Process (Either DiedReason NodeStats)
forall a b c.
Process a -> (a -> Process b) -> (a -> Process c) -> Process c
bracket (NodeId -> Process MonitorRef
monitorNode NodeId
nid) MonitorRef -> Process ()
unmonitor ((MonitorRef -> Process (Either DiedReason NodeStats))
-> Process (Either DiedReason NodeStats))
-> (MonitorRef -> Process (Either DiedReason NodeStats))
-> Process (Either DiedReason NodeStats)
forall a b. (a -> b) -> a -> b
$ \MonitorRef
mRef ->
[Match (Either DiedReason NodeStats)]
-> Process (Either DiedReason NodeStats)
forall b. [Match b] -> Process b
receiveWait [ (NodeStats -> Process (Either DiedReason NodeStats))
-> Match (Either DiedReason NodeStats)
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(NodeStats
stats :: NodeStats) -> Either DiedReason NodeStats
-> Process (Either DiedReason NodeStats)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either DiedReason NodeStats
-> Process (Either DiedReason NodeStats))
-> Either DiedReason NodeStats
-> Process (Either DiedReason NodeStats)
forall a b. (a -> b) -> a -> b
$ NodeStats -> Either DiedReason NodeStats
forall a b. b -> Either a b
Right NodeStats
stats)
, (NodeMonitorNotification -> Bool)
-> (NodeMonitorNotification
-> Process (Either DiedReason NodeStats))
-> Match (Either DiedReason NodeStats)
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(NodeMonitorNotification MonitorRef
ref NodeId
_ DiedReason
_) -> MonitorRef
ref MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
mRef)
(\(NodeMonitorNotification MonitorRef
_ NodeId
_ DiedReason
dr) -> Either DiedReason NodeStats
-> Process (Either DiedReason NodeStats)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either DiedReason NodeStats
-> Process (Either DiedReason NodeStats))
-> Either DiedReason NodeStats
-> Process (Either DiedReason NodeStats)
forall a b. (a -> b) -> a -> b
$ DiedReason -> Either DiedReason NodeStats
forall a b. a -> Either a b
Left DiedReason
dr)
]
getLocalNodeStats :: Process NodeStats
getLocalNodeStats :: Process NodeStats
getLocalNodeStats = do
NodeId
self <- Process NodeId
getSelfNode
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ NodeId -> ProcessSignal
GetNodeStats NodeId
self
[Match NodeStats] -> Process NodeStats
forall b. [Match b] -> Process b
receiveWait [ (NodeStats -> Process NodeStats) -> Match NodeStats
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(NodeStats
stats :: NodeStats) -> NodeStats -> Process NodeStats
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return NodeStats
stats) ]
getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo)
getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo)
getProcessInfo ProcessId
pid =
let them :: NodeId
them = ProcessId -> NodeId
processNodeId ProcessId
pid in do
NodeId
us <- Process NodeId
getSelfNode
Maybe NodeId
dest <- NodeId -> NodeId -> Process (Maybe NodeId)
mkNode NodeId
them NodeId
us
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
dest (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> ProcessSignal
GetInfo ProcessId
pid
[Match (Maybe ProcessInfo)] -> Process (Maybe ProcessInfo)
forall b. [Match b] -> Process b
receiveWait [
(ProcessInfo -> Process (Maybe ProcessInfo))
-> Match (Maybe ProcessInfo)
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(ProcessInfo
p :: ProcessInfo) -> Maybe ProcessInfo -> Process (Maybe ProcessInfo)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ProcessInfo -> Process (Maybe ProcessInfo))
-> Maybe ProcessInfo -> Process (Maybe ProcessInfo)
forall a b. (a -> b) -> a -> b
$ ProcessInfo -> Maybe ProcessInfo
forall a. a -> Maybe a
Just ProcessInfo
p)
, (ProcessInfoNone -> Process (Maybe ProcessInfo))
-> Match (Maybe ProcessInfo)
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(ProcessInfoNone
_ :: ProcessInfoNone) -> Maybe ProcessInfo -> Process (Maybe ProcessInfo)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ProcessInfo
forall a. Maybe a
Nothing)
]
where mkNode :: NodeId -> NodeId -> Process (Maybe NodeId)
mkNode :: NodeId -> NodeId -> Process (Maybe NodeId)
mkNode NodeId
them NodeId
us = case NodeId
them NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
us of
Bool
True -> Maybe NodeId -> Process (Maybe NodeId)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe NodeId
forall a. Maybe a
Nothing
Bool
_ -> Maybe NodeId -> Process (Maybe NodeId)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe NodeId -> Process (Maybe NodeId))
-> Maybe NodeId -> Process (Maybe NodeId)
forall a b. (a -> b) -> a -> b
$ NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
them
link :: ProcessId -> Process ()
link :: ProcessId -> Process ()
link = Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ())
-> (ProcessId -> ProcessSignal) -> ProcessId -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identifier -> ProcessSignal
Link (Identifier -> ProcessSignal)
-> (ProcessId -> Identifier) -> ProcessId -> ProcessSignal
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessId -> Identifier
ProcessIdentifier
monitor :: ProcessId -> Process MonitorRef
monitor :: ProcessId -> Process MonitorRef
monitor = Identifier -> Process MonitorRef
monitor' (Identifier -> Process MonitorRef)
-> (ProcessId -> Identifier) -> ProcessId -> Process MonitorRef
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessId -> Identifier
ProcessIdentifier
withMonitor :: ProcessId -> (MonitorRef -> Process a) -> Process a
withMonitor :: forall a. ProcessId -> (MonitorRef -> Process a) -> Process a
withMonitor ProcessId
pid = Process MonitorRef
-> (MonitorRef -> Process ())
-> (MonitorRef -> Process a)
-> Process a
forall a b c.
Process a -> (a -> Process b) -> (a -> Process c) -> Process c
bracket (ProcessId -> Process MonitorRef
monitor ProcessId
pid) MonitorRef -> Process ()
unmonitor
withMonitor_ :: ProcessId -> Process a -> Process a
withMonitor_ :: forall a. ProcessId -> Process a -> Process a
withMonitor_ ProcessId
p = ProcessId -> (MonitorRef -> Process a) -> Process a
forall a. ProcessId -> (MonitorRef -> Process a) -> Process a
withMonitor ProcessId
p ((MonitorRef -> Process a) -> Process a)
-> (Process a -> MonitorRef -> Process a) -> Process a -> Process a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Process a -> MonitorRef -> Process a
forall a b. a -> b -> a
const
unlink :: ProcessId -> Process ()
unlink :: ProcessId -> Process ()
unlink ProcessId
pid = do
ProcessId -> Process ()
unlinkAsync ProcessId
pid
[Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [ (DidUnlinkProcess -> Bool)
-> (DidUnlinkProcess -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(DidUnlinkProcess ProcessId
pid') -> ProcessId
pid' ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
pid)
(\DidUnlinkProcess
_ -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
]
unlinkNode :: NodeId -> Process ()
unlinkNode :: NodeId -> Process ()
unlinkNode NodeId
nid = do
NodeId -> Process ()
unlinkNodeAsync NodeId
nid
[Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [ (DidUnlinkNode -> Bool)
-> (DidUnlinkNode -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(DidUnlinkNode NodeId
nid') -> NodeId
nid' NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
nid)
(\DidUnlinkNode
_ -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
]
unlinkPort :: SendPort a -> Process ()
unlinkPort :: forall a. SendPort a -> Process ()
unlinkPort SendPort a
sport = do
SendPort a -> Process ()
forall a. SendPort a -> Process ()
unlinkPortAsync SendPort a
sport
[Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [ (DidUnlinkPort -> Bool)
-> (DidUnlinkPort -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(DidUnlinkPort SendPortId
cid) -> SendPortId
cid SendPortId -> SendPortId -> Bool
forall a. Eq a => a -> a -> Bool
== SendPort a -> SendPortId
forall a. SendPort a -> SendPortId
sendPortId SendPort a
sport)
(\DidUnlinkPort
_ -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
]
unmonitor :: MonitorRef -> Process ()
unmonitor :: MonitorRef -> Process ()
unmonitor MonitorRef
ref = do
MonitorRef -> Process ()
unmonitorAsync MonitorRef
ref
[Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [ (DidUnmonitor -> Bool) -> (DidUnmonitor -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(DidUnmonitor MonitorRef
ref') -> MonitorRef
ref' MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
ref)
(\DidUnmonitor
_ -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
]
Process (Maybe ()) -> Process ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Process (Maybe ()) -> Process ())
-> Process (Maybe ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ Int -> [Match ()] -> Process (Maybe ())
forall b. Int -> [Match b] -> Process (Maybe b)
receiveTimeout Int
0
[ (ProcessMonitorNotification -> Bool)
-> (ProcessMonitorNotification -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(ProcessMonitorNotification MonitorRef
ref' ProcessId
_ DiedReason
_) -> MonitorRef
ref' MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
ref)
(Process () -> ProcessMonitorNotification -> Process ()
forall a b. a -> b -> a
const (Process () -> ProcessMonitorNotification -> Process ())
-> Process () -> ProcessMonitorNotification -> Process ()
forall a b. (a -> b) -> a -> b
$ () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
]
catch :: Exception e => Process a -> (e -> Process a) -> Process a
catch :: forall e a.
Exception e =>
Process a -> (e -> Process a) -> Process a
catch = Process a -> (e -> Process a) -> Process a
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
Catch.catch
{-# DEPRECATED catch "Use Control.Monad.Catch.catch instead" #-}
try :: Exception e => Process a -> Process (Either e a)
try :: forall e a. Exception e => Process a -> Process (Either e a)
try = Process a -> Process (Either e a)
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
Catch.try
{-# DEPRECATED try "Use Control.Monad.Catch.try instead" #-}
mask :: ((forall a. Process a -> Process a) -> Process b) -> Process b
mask :: forall b.
((forall a. Process a -> Process a) -> Process b) -> Process b
mask = ((forall a. Process a -> Process a) -> Process b) -> Process b
forall b.
HasCallStack =>
((forall a. Process a -> Process a) -> Process b) -> Process b
forall (m :: * -> *) b.
(MonadMask m, HasCallStack) =>
((forall a. m a -> m a) -> m b) -> m b
Catch.mask
{-# DEPRECATED mask "Use Control.Monad.Catch.mask_ instead" #-}
mask_ :: Process a -> Process a
mask_ :: forall a. Process a -> Process a
mask_ = Process a -> Process a
forall (m :: * -> *) a. (HasCallStack, MonadMask m) => m a -> m a
Catch.mask_
{-# DEPRECATED mask_ "Use Control.Monad.Catch.mask_ instead" #-}
onException :: Process a -> Process b -> Process a
onException :: forall a b. Process a -> Process b -> Process a
onException = Process a -> Process b -> Process a
forall (m :: * -> *) a b.
(HasCallStack, MonadCatch m) =>
m a -> m b -> m a
Catch.onException
{-# DEPRECATED onException "Use Control.Monad.Catch.onException instead" #-}
bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c
bracket :: forall a b c.
Process a -> (a -> Process b) -> (a -> Process c) -> Process c
bracket = Process a -> (a -> Process b) -> (a -> Process c) -> Process c
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> (a -> m c) -> (a -> m b) -> m b
Catch.bracket
{-# DEPRECATED bracket "Use Control.Monad.Catch.bracket instead" #-}
bracket_ :: Process a -> Process b -> Process c -> Process c
bracket_ :: forall a b c. Process a -> Process b -> Process c -> Process c
bracket_ = Process a -> Process b -> Process c -> Process c
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> m c -> m b -> m b
Catch.bracket_
{-# DEPRECATED bracket_ "Use Control.Monad.Catch.bracket_ instead" #-}
finally :: Process a -> Process b -> Process a
finally :: forall a b. Process a -> Process b -> Process a
finally = Process a -> Process b -> Process a
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
Catch.finally
{-# DEPRECATED finally "Use Control.Monad.Catch.finally instead" #-}
data Handler a = forall e . Exception e => Handler (e -> Process a)
instance Functor Handler where
fmap :: forall a b. (a -> b) -> Handler a -> Handler b
fmap a -> b
f (Handler e -> Process a
h) = (e -> Process b) -> Handler b
forall a e. Exception e => (e -> Process a) -> Handler a
Handler ((a -> b) -> Process a -> Process b
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f (Process a -> Process b) -> (e -> Process a) -> e -> Process b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> Process a
h)
catches :: Process a -> [Handler a] -> Process a
catches :: forall a. Process a -> [Handler a] -> Process a
catches Process a
proc [Handler a]
handlers = Process a
proc Process a -> (SomeException -> Process a) -> Process a
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
`Catch.catch` [Handler a] -> SomeException -> Process a
forall a. [Handler a] -> SomeException -> Process a
catchesHandler [Handler a]
handlers
catchesHandler :: [Handler a] -> SomeException -> Process a
catchesHandler :: forall a. [Handler a] -> SomeException -> Process a
catchesHandler [Handler a]
handlers SomeException
e = (Handler a -> Process a -> Process a)
-> Process a -> [Handler a] -> Process a
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr Handler a -> Process a -> Process a
forall {a}. Handler a -> Process a -> Process a
tryHandler (SomeException -> Process a
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM SomeException
e) [Handler a]
handlers
where tryHandler :: Handler a -> Process a -> Process a
tryHandler (Handler e -> Process a
handler) Process a
res
= case SomeException -> Maybe e
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
Just e
e' -> e -> Process a
handler e
e'
Maybe e
Nothing -> Process a
res
expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)
expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)
expectTimeout Int
n = Int -> [Match a] -> Process (Maybe a)
forall b. Int -> [Match b] -> Process (Maybe b)
receiveTimeout Int
n [(a -> Process a) -> Match a
forall a b. Serializable a => (a -> Process b) -> Match b
match a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return]
spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef
spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef
spawnAsync NodeId
nid Closure (Process ())
proc = do
SpawnRef
spawnRef <- Process SpawnRef
getSpawnRef
NodeId
node <- Process NodeId
getSelfNode
if NodeId
nid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
node
then Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ Closure (Process ()) -> SpawnRef -> ProcessSignal
Spawn Closure (Process ())
proc SpawnRef
spawnRef
else Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid) (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ Closure (Process ()) -> SpawnRef -> ProcessSignal
Spawn Closure (Process ())
proc SpawnRef
spawnRef
SpawnRef -> Process SpawnRef
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return SpawnRef
spawnRef
monitorNode :: NodeId -> Process MonitorRef
monitorNode :: NodeId -> Process MonitorRef
monitorNode =
Identifier -> Process MonitorRef
monitor' (Identifier -> Process MonitorRef)
-> (NodeId -> Identifier) -> NodeId -> Process MonitorRef
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NodeId -> Identifier
NodeIdentifier
monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRef
monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRef
monitorPort (SendPort SendPortId
cid) =
Identifier -> Process MonitorRef
monitor' (SendPortId -> Identifier
SendPortIdentifier SendPortId
cid)
unmonitorAsync :: MonitorRef -> Process ()
unmonitorAsync :: MonitorRef -> Process ()
unmonitorAsync =
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ())
-> (MonitorRef -> ProcessSignal) -> MonitorRef -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MonitorRef -> ProcessSignal
Unmonitor
linkNode :: NodeId -> Process ()
linkNode :: NodeId -> Process ()
linkNode = Identifier -> Process ()
link' (Identifier -> Process ())
-> (NodeId -> Identifier) -> NodeId -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NodeId -> Identifier
NodeIdentifier
linkPort :: SendPort a -> Process ()
linkPort :: forall a. SendPort a -> Process ()
linkPort (SendPort SendPortId
cid) =
Identifier -> Process ()
link' (SendPortId -> Identifier
SendPortIdentifier SendPortId
cid)
unlinkAsync :: ProcessId -> Process ()
unlinkAsync :: ProcessId -> Process ()
unlinkAsync =
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ())
-> (ProcessId -> ProcessSignal) -> ProcessId -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identifier -> ProcessSignal
Unlink (Identifier -> ProcessSignal)
-> (ProcessId -> Identifier) -> ProcessId -> ProcessSignal
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessId -> Identifier
ProcessIdentifier
unlinkNodeAsync :: NodeId -> Process ()
unlinkNodeAsync :: NodeId -> Process ()
unlinkNodeAsync =
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ())
-> (NodeId -> ProcessSignal) -> NodeId -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identifier -> ProcessSignal
Unlink (Identifier -> ProcessSignal)
-> (NodeId -> Identifier) -> NodeId -> ProcessSignal
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NodeId -> Identifier
NodeIdentifier
unlinkPortAsync :: SendPort a -> Process ()
unlinkPortAsync :: forall a. SendPort a -> Process ()
unlinkPortAsync (SendPort SendPortId
cid) =
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ())
-> (Identifier -> ProcessSignal) -> Identifier -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identifier -> ProcessSignal
Unlink (Identifier -> Process ()) -> Identifier -> Process ()
forall a b. (a -> b) -> a -> b
$ SendPortId -> Identifier
SendPortIdentifier SendPortId
cid
data SayMessage = SayMessage { SayMessage -> UTCTime
sayTime :: UTCTime
, SayMessage -> ProcessId
sayProcess :: ProcessId
, SayMessage -> String
sayMessage :: String }
deriving (Typeable)
instance Show SayMessage where
showsPrec :: Int -> SayMessage -> String -> String
showsPrec Int
p SayMessage
msg =
Bool -> (String -> String) -> String -> String
showParen (Int
p Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
11)
((String -> String) -> String -> String)
-> (String -> String) -> String -> String
forall a b. (a -> b) -> a -> b
$ String -> String -> String
showString String
"SayMessage "
(String -> String) -> (String -> String) -> String -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> String -> String
showString (TimeLocale -> String -> UTCTime -> String
forall t. FormatTime t => TimeLocale -> String -> t -> String
formatTime TimeLocale
defaultTimeLocale String
"%c" (SayMessage -> UTCTime
sayTime SayMessage
msg))
(String -> String) -> (String -> String) -> String -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Char -> String -> String
showChar Char
' '
(String -> String) -> (String -> String) -> String -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> ProcessId -> String -> String
forall a. Show a => Int -> a -> String -> String
showsPrec Int
11 (SayMessage -> ProcessId
sayProcess SayMessage
msg) (String -> String) -> (String -> String) -> String -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Char -> String -> String
showChar Char
' '
(String -> String) -> (String -> String) -> String -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> String -> String -> String
forall a. Show a => Int -> a -> String -> String
showsPrec Int
11 (SayMessage -> String
sayMessage SayMessage
msg) (String -> String) -> (String -> String) -> String -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Char -> String -> String
showChar Char
' '
instance Binary SayMessage where
put :: SayMessage -> Put
put SayMessage
s = do
UTCTime -> Put
putUTCTime (SayMessage -> UTCTime
sayTime SayMessage
s)
ProcessId -> Put
forall t. Binary t => t -> Put
put (SayMessage -> ProcessId
sayProcess SayMessage
s)
String -> Put
forall t. Binary t => t -> Put
put (SayMessage -> String
sayMessage SayMessage
s)
get :: Get SayMessage
get = UTCTime -> ProcessId -> String -> SayMessage
SayMessage (UTCTime -> ProcessId -> String -> SayMessage)
-> Get UTCTime -> Get (ProcessId -> String -> SayMessage)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get UTCTime
getUTCTime Get (ProcessId -> String -> SayMessage)
-> Get ProcessId -> Get (String -> SayMessage)
forall a b. Get (a -> b) -> Get a -> Get b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Get ProcessId
forall t. Binary t => Get t
get Get (String -> SayMessage) -> Get String -> Get SayMessage
forall a b. Get (a -> b) -> Get a -> Get b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Get String
forall t. Binary t => Get t
get
putUTCTime :: UTCTime -> Put
putUTCTime :: UTCTime -> Put
putUTCTime (UTCTime (ModifiedJulianDay Integer
day) DiffTime
tod) = do
Integer -> Put
forall t. Binary t => t -> Put
put Integer
day
Rational -> Put
forall t. Binary t => t -> Put
put (DiffTime -> Rational
forall a. Real a => a -> Rational
toRational DiffTime
tod)
getUTCTime :: Get UTCTime
getUTCTime :: Get UTCTime
getUTCTime = do
Integer
day <- Get Integer
forall t. Binary t => Get t
get
Rational
tod <- Get Rational
forall t. Binary t => Get t
get
UTCTime -> Get UTCTime
forall a. a -> Get a
forall (m :: * -> *) a. Monad m => a -> m a
return (UTCTime -> Get UTCTime) -> UTCTime -> Get UTCTime
forall a b. (a -> b) -> a -> b
$! Day -> DiffTime -> UTCTime
UTCTime (Integer -> Day
ModifiedJulianDay Integer
day)
(Rational -> DiffTime
forall a. Fractional a => Rational -> a
fromRational Rational
tod)
say :: String -> Process ()
say :: String -> Process ()
say String
string = do
UTCTime
now <- IO UTCTime -> Process UTCTime
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
ProcessId
us <- Process ProcessId
getSelfPid
String -> SayMessage -> Process ()
forall a. Serializable a => String -> a -> Process ()
nsend String
"logger" (UTCTime -> ProcessId -> String -> SayMessage
SayMessage UTCTime
now ProcessId
us String
string)
register :: String -> ProcessId -> Process ()
register :: String -> ProcessId -> Process ()
register = Bool -> String -> ProcessId -> Process ()
registerImpl Bool
False
reregister :: String -> ProcessId -> Process ()
reregister :: String -> ProcessId -> Process ()
reregister = Bool -> String -> ProcessId -> Process ()
registerImpl Bool
True
registerImpl :: Bool -> String -> ProcessId -> Process ()
registerImpl :: Bool -> String -> ProcessId -> Process ()
registerImpl Bool
force String
label ProcessId
pid = do
NodeId
mynid <- Process NodeId
getSelfNode
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
mynid (ProcessId -> Maybe ProcessId
forall a. a -> Maybe a
Just ProcessId
pid) Bool
force)
[Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait
[ (RegisterReply -> Bool)
-> (RegisterReply -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(RegisterReply String
label' Bool
_ Maybe ProcessId
_) -> String
label String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
label')
(\(RegisterReply String
_ Bool
ok Maybe ProcessId
owner) -> String -> Bool -> Maybe ProcessId -> Process ()
handleRegistrationReply String
label Bool
ok Maybe ProcessId
owner)
]
registerRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
registerRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
registerRemoteAsync NodeId
nid String
label ProcessId
pid = do
NodeId
here <- Process NodeId
getSelfNode
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (if NodeId
nid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
here then Maybe NodeId
forall a. Maybe a
Nothing else NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid)
(String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
nid (ProcessId -> Maybe ProcessId
forall a. a -> Maybe a
Just ProcessId
pid) Bool
False)
reregisterRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
reregisterRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
reregisterRemoteAsync NodeId
nid String
label ProcessId
pid =
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid) (String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
nid (ProcessId -> Maybe ProcessId
forall a. a -> Maybe a
Just ProcessId
pid) Bool
True)
unregister :: String -> Process ()
unregister :: String -> Process ()
unregister String
label = do
NodeId
mynid <- Process NodeId
getSelfNode
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
mynid Maybe ProcessId
forall a. Maybe a
Nothing Bool
False)
[Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait
[ (RegisterReply -> Bool)
-> (RegisterReply -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(RegisterReply String
label' Bool
_ Maybe ProcessId
_) -> String
label String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
label')
(\(RegisterReply String
_ Bool
ok Maybe ProcessId
owner) -> String -> Bool -> Maybe ProcessId -> Process ()
handleRegistrationReply String
label Bool
ok Maybe ProcessId
owner)
]
handleRegistrationReply :: String -> Bool -> Maybe ProcessId -> Process ()
handleRegistrationReply :: String -> Bool -> Maybe ProcessId -> Process ()
handleRegistrationReply String
label Bool
ok Maybe ProcessId
owner =
Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
ok) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
ProcessRegistrationException -> Process ()
forall e a. (HasCallStack, Exception e) => e -> Process a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM (ProcessRegistrationException -> Process ())
-> ProcessRegistrationException -> Process ()
forall a b. (a -> b) -> a -> b
$ String -> Maybe ProcessId -> ProcessRegistrationException
ProcessRegistrationException String
label Maybe ProcessId
owner
unregisterRemoteAsync :: NodeId -> String -> Process ()
unregisterRemoteAsync :: NodeId -> String -> Process ()
unregisterRemoteAsync NodeId
nid String
label =
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid) (String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
nid Maybe ProcessId
forall a. Maybe a
Nothing Bool
False)
whereis :: String -> Process (Maybe ProcessId)
whereis :: String -> Process (Maybe ProcessId)
whereis String
label = do
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (String -> ProcessSignal
WhereIs String
label)
[Match (Maybe ProcessId)] -> Process (Maybe ProcessId)
forall b. [Match b] -> Process b
receiveWait [ (WhereIsReply -> Bool)
-> (WhereIsReply -> Process (Maybe ProcessId))
-> Match (Maybe ProcessId)
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(WhereIsReply String
label' Maybe ProcessId
_) -> String
label String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
label')
(\(WhereIsReply String
_ Maybe ProcessId
mPid) -> Maybe ProcessId -> Process (Maybe ProcessId)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ProcessId
mPid)
]
whereisRemoteAsync :: NodeId -> String -> Process ()
whereisRemoteAsync :: NodeId -> String -> Process ()
whereisRemoteAsync NodeId
nid String
label = do
NodeId
here <- Process NodeId
getSelfNode
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (if NodeId
nid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
here then Maybe NodeId
forall a. Maybe a
Nothing else NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid) (String -> ProcessSignal
WhereIs String
label)
nsend :: Serializable a => String -> a -> Process ()
nsend :: forall a. Serializable a => String -> a -> Process ()
nsend String
label a
msg = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
let msg' :: Message
msg' = a -> Message
forall a. Serializable a => a -> Message
createUnencodedMessage a
msg
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus (LocalProcess -> LocalNode
processNode LocalProcess
proc))
(String -> ProcessId -> Message -> MxEvent
MxSentToName String
label (LocalProcess -> ProcessId
processId LocalProcess
proc) Message
msg')
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (String -> Message -> ProcessSignal
NamedSend String
label Message
msg')
unsafeNSend :: Serializable a => String -> a -> Process ()
unsafeNSend :: forall a. Serializable a => String -> a -> Process ()
unsafeNSend = String -> a -> Process ()
forall a. Serializable a => String -> a -> Process ()
Unsafe.nsend
nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()
nsendRemote :: forall a. Serializable a => NodeId -> String -> a -> Process ()
nsendRemote NodeId
nid String
label a
msg = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
let us :: ProcessId
us = LocalProcess -> ProcessId
processId LocalProcess
proc
let node :: LocalNode
node = LocalProcess -> LocalNode
processNode LocalProcess
proc
if LocalNode -> NodeId
localNodeId LocalNode
node NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== NodeId
nid
then String -> a -> Process ()
forall a. Serializable a => String -> a -> Process ()
nsend String
label a
msg
else let lbl :: String
lbl = String
label String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"@" String -> String -> String
forall a. [a] -> [a] -> [a]
++ NodeId -> String
forall a. Show a => a -> String
show NodeId
nid in do
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MxEventBus -> MxEvent -> IO ()
traceEvent (LocalNode -> MxEventBus
localEventBus LocalNode
node)
(String -> ProcessId -> Message -> MxEvent
MxSentToName String
lbl ProcessId
us (a -> Message
forall a. Serializable a => a -> Message
wrapMessage a
msg))
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg (NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid) (String -> Message -> ProcessSignal
NamedSend String
label (a -> Message
forall a. Serializable a => a -> Message
createMessage a
msg))
unsafeNSendRemote :: Serializable a => NodeId -> String -> a -> Process ()
unsafeNSendRemote :: forall a. Serializable a => NodeId -> String -> a -> Process ()
unsafeNSendRemote = NodeId -> String -> a -> Process ()
forall a. Serializable a => NodeId -> String -> a -> Process ()
Unsafe.nsendRemote
unStatic :: Typeable a => Static a -> Process a
unStatic :: forall a. Typeable a => Static a -> Process a
unStatic Static a
static = do
RemoteTable
rtable <- LocalNode -> RemoteTable
remoteTable (LocalNode -> RemoteTable)
-> (LocalProcess -> LocalNode) -> LocalProcess -> RemoteTable
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LocalProcess -> LocalNode
processNode (LocalProcess -> RemoteTable)
-> Process LocalProcess -> Process RemoteTable
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
case RemoteTable -> Static a -> Either String a
forall a. Typeable a => RemoteTable -> Static a -> Either String a
Static.unstatic RemoteTable
rtable Static a
static of
Left String
err -> String -> Process a
forall a. String -> Process a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Process a) -> String -> Process a
forall a b. (a -> b) -> a -> b
$ String
"Could not resolve static value: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
err
Right a
x -> a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
unClosure :: Typeable a => Closure a -> Process a
unClosure :: forall a. Typeable a => Closure a -> Process a
unClosure Closure a
closure = do
RemoteTable
rtable <- LocalNode -> RemoteTable
remoteTable (LocalNode -> RemoteTable)
-> (LocalProcess -> LocalNode) -> LocalProcess -> RemoteTable
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LocalProcess -> LocalNode
processNode (LocalProcess -> RemoteTable)
-> Process LocalProcess -> Process RemoteTable
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
case RemoteTable -> Closure a -> Either String a
forall a. Typeable a => RemoteTable -> Closure a -> Either String a
Static.unclosure RemoteTable
rtable Closure a
closure of
Left String
err -> String -> Process a
forall a. String -> Process a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Process a) -> String -> Process a
forall a b. (a -> b) -> a -> b
$ String
"Could not resolve closure: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
err
Right a
x -> a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
reconnect :: ProcessId -> Process ()
reconnect :: ProcessId -> Process ()
reconnect ProcessId
them = do
ProcessId
us <- Process ProcessId
getSelfPid
LocalNode
node <- LocalProcess -> LocalNode
processNode (LocalProcess -> LocalNode)
-> Process LocalProcess -> Process LocalNode
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> Identifier -> Identifier -> IO ()
disconnect LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
us) (ProcessId -> Identifier
ProcessIdentifier ProcessId
them)
reconnectPort :: SendPort a -> Process ()
reconnectPort :: forall a. SendPort a -> Process ()
reconnectPort SendPort a
them = do
ProcessId
us <- Process ProcessId
getSelfPid
LocalNode
node <- LocalProcess -> LocalNode
processNode (LocalProcess -> LocalNode)
-> Process LocalProcess -> Process LocalNode
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> Identifier -> Identifier -> IO ()
disconnect LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
us) (SendPortId -> Identifier
SendPortIdentifier (SendPort a -> SendPortId
forall a. SendPort a -> SendPortId
sendPortId SendPort a
them))
sendLocal :: (Serializable a) => ProcessId -> a -> Process ()
sendLocal :: forall a. Serializable a => ProcessId -> a -> Process ()
sendLocal ProcessId
to a
msg =
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> Message -> ProcessSignal
LocalSend ProcessId
to (a -> Message
forall a. Serializable a => a -> Message
createUnencodedMessage a
msg)
sendChanLocal :: (Serializable a) => SendPortId -> a -> Process ()
sendChanLocal :: forall a. Serializable a => SendPortId -> a -> Process ()
sendChanLocal SendPortId
spId a
msg =
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ SendPortId -> Message -> ProcessSignal
LocalPortSend SendPortId
spId (a -> Message
forall a. Serializable a => a -> Message
createUnencodedMessage a
msg)
getMonitorRefFor :: Identifier -> Process MonitorRef
getMonitorRefFor :: Identifier -> Process MonitorRef
getMonitorRefFor Identifier
ident = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
IO MonitorRef -> Process MonitorRef
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO MonitorRef -> Process MonitorRef)
-> IO MonitorRef -> Process MonitorRef
forall a b. (a -> b) -> a -> b
$ StrictMVar LocalProcessState
-> (LocalProcessState -> IO (LocalProcessState, MonitorRef))
-> IO MonitorRef
forall a b. StrictMVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (LocalProcess -> StrictMVar LocalProcessState
processState LocalProcess
proc) ((LocalProcessState -> IO (LocalProcessState, MonitorRef))
-> IO MonitorRef)
-> (LocalProcessState -> IO (LocalProcessState, MonitorRef))
-> IO MonitorRef
forall a b. (a -> b) -> a -> b
$ \LocalProcessState
st -> do
let counter :: Int32
counter = LocalProcessState
st LocalProcessState -> T LocalProcessState Int32 -> Int32
forall r a. r -> T r a -> a
^. T LocalProcessState Int32
monitorCounter
(LocalProcessState, MonitorRef)
-> IO (LocalProcessState, MonitorRef)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ( T LocalProcessState Int32
monitorCounter T LocalProcessState Int32
-> (Int32 -> Int32) -> LocalProcessState -> LocalProcessState
forall r a. T r a -> (a -> a) -> r -> r
^: (Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
1) (LocalProcessState -> LocalProcessState)
-> LocalProcessState -> LocalProcessState
forall a b. (a -> b) -> a -> b
$ LocalProcessState
st
, Identifier -> Int32 -> MonitorRef
MonitorRef Identifier
ident Int32
counter
)
getSpawnRef :: Process SpawnRef
getSpawnRef :: Process SpawnRef
getSpawnRef = do
LocalProcess
proc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
IO SpawnRef -> Process SpawnRef
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO SpawnRef -> Process SpawnRef)
-> IO SpawnRef -> Process SpawnRef
forall a b. (a -> b) -> a -> b
$ StrictMVar LocalProcessState
-> (LocalProcessState -> IO (LocalProcessState, SpawnRef))
-> IO SpawnRef
forall a b. StrictMVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (LocalProcess -> StrictMVar LocalProcessState
processState LocalProcess
proc) ((LocalProcessState -> IO (LocalProcessState, SpawnRef))
-> IO SpawnRef)
-> (LocalProcessState -> IO (LocalProcessState, SpawnRef))
-> IO SpawnRef
forall a b. (a -> b) -> a -> b
$ \LocalProcessState
st -> do
let counter :: Int32
counter = LocalProcessState
st LocalProcessState -> T LocalProcessState Int32 -> Int32
forall r a. r -> T r a -> a
^. T LocalProcessState Int32
spawnCounter
(LocalProcessState, SpawnRef) -> IO (LocalProcessState, SpawnRef)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ( T LocalProcessState Int32
spawnCounter T LocalProcessState Int32
-> (Int32 -> Int32) -> LocalProcessState -> LocalProcessState
forall r a. T r a -> (a -> a) -> r -> r
^: (Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
1) (LocalProcessState -> LocalProcessState)
-> LocalProcessState -> LocalProcessState
forall a b. (a -> b) -> a -> b
$ LocalProcessState
st
, Int32 -> SpawnRef
SpawnRef Int32
counter
)
monitor' :: Identifier -> Process MonitorRef
monitor' :: Identifier -> Process MonitorRef
monitor' Identifier
ident = do
MonitorRef
monitorRef <- Identifier -> Process MonitorRef
getMonitorRefFor Identifier
ident
Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ()) -> ProcessSignal -> Process ()
forall a b. (a -> b) -> a -> b
$ MonitorRef -> ProcessSignal
Monitor MonitorRef
monitorRef
MonitorRef -> Process MonitorRef
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return MonitorRef
monitorRef
link' :: Identifier -> Process ()
link' :: Identifier -> Process ()
link' = Maybe NodeId -> ProcessSignal -> Process ()
sendCtrlMsg Maybe NodeId
forall a. Maybe a
Nothing (ProcessSignal -> Process ())
-> (Identifier -> ProcessSignal) -> Identifier -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identifier -> ProcessSignal
Link