Safe Haskell | None |
---|---|
Language | Haskell2010 |
- Cloud Haskell
This is an implementation of Cloud Haskell, as described in Towards Haskell in the Cloud by Jeff Epstein, Andrew Black, and Simon Peyton Jones (see http://research.microsoft.com/en-us/um/people/simonpj/papers/parallel/), although some of the details are different. The precise message passing semantics are based on A unified semantics for future Erlang by Hans Svensson, Lars-Åke Fredlund and Clara Benac Earle.
For a detailed description of the package and other reference materials, please see the distributed-process wiki page on github: https://github.com/haskell-distributed/distributed-process/wiki.
Synopsis
- data ProcessId
- newtype NodeId = NodeId {}
- data Process a
- data SendPortId
- processNodeId :: ProcessId -> NodeId
- sendPortProcessId :: SendPortId -> ProcessId
- liftIO :: MonadIO m => IO a -> m a
- send :: Serializable a => ProcessId -> a -> Process ()
- usend :: Serializable a => ProcessId -> a -> Process ()
- expect :: Serializable a => Process a
- expectTimeout :: Serializable a => Int -> Process (Maybe a)
- data ReceivePort a
- data SendPort a
- sendPortId :: SendPort a -> SendPortId
- newChan :: Serializable a => Process (SendPort a, ReceivePort a)
- sendChan :: Serializable a => SendPort a -> a -> Process ()
- receiveChan :: Serializable a => ReceivePort a -> Process a
- receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a)
- mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
- mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
- unsafeSend :: Serializable a => ProcessId -> a -> Process ()
- unsafeUSend :: Serializable a => ProcessId -> a -> Process ()
- unsafeSendChan :: Serializable a => SendPort a -> a -> Process ()
- unsafeNSend :: Serializable a => String -> a -> Process ()
- unsafeNSendRemote :: Serializable a => NodeId -> String -> a -> Process ()
- unsafeWrapMessage :: Serializable a => a -> Message
- data Match b
- receiveWait :: [Match b] -> Process b
- receiveTimeout :: Int -> [Match b] -> Process (Maybe b)
- match :: Serializable a => (a -> Process b) -> Match b
- matchIf :: Serializable a => (a -> Bool) -> (a -> Process b) -> Match b
- matchUnknown :: Process b -> Match b
- matchAny :: (Message -> Process b) -> Match b
- matchAnyIf :: Serializable a => (a -> Bool) -> (Message -> Process b) -> Match b
- matchChan :: ReceivePort a -> (a -> Process b) -> Match b
- matchSTM :: STM a -> (a -> Process b) -> Match b
- data Message
- matchMessage :: (Message -> Process Message) -> Match Message
- matchMessageIf :: (Message -> Bool) -> (Message -> Process Message) -> Match Message
- isEncoded :: Message -> Bool
- wrapMessage :: Serializable a => a -> Message
- unwrapMessage :: (Monad m, Serializable a) => Message -> m (Maybe a)
- handleMessage :: (Monad m, Serializable a) => Message -> (a -> m b) -> m (Maybe b)
- handleMessageIf :: (Monad m, Serializable a) => Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b)
- handleMessage_ :: (Monad m, Serializable a) => Message -> (a -> m ()) -> m ()
- handleMessageIf_ :: (Monad m, Serializable a) => Message -> (a -> Bool) -> (a -> m ()) -> m ()
- forward :: Message -> ProcessId -> Process ()
- uforward :: Message -> ProcessId -> Process ()
- delegate :: ProcessId -> (Message -> Bool) -> Process ()
- relay :: ProcessId -> Process ()
- proxy :: Serializable a => ProcessId -> (a -> Process Bool) -> Process ()
- spawn :: NodeId -> Closure (Process ()) -> Process ProcessId
- call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Process a
- terminate :: Process a
- die :: Serializable a => a -> Process b
- kill :: ProcessId -> String -> Process ()
- exit :: Serializable a => ProcessId -> a -> Process ()
- catchExit :: (Show a, Serializable a) => Process b -> (ProcessId -> a -> Process b) -> Process b
- catchesExit :: Process b -> [ProcessId -> Message -> Process (Maybe b)] -> Process b
- data ProcessTerminationException = ProcessTerminationException
- data ProcessRegistrationException = ProcessRegistrationException !String !(Maybe ProcessId)
- data SpawnRef
- getSelfPid :: Process ProcessId
- getSelfNode :: Process NodeId
- data ProcessInfo = ProcessInfo {
- infoNode :: NodeId
- infoRegisteredNames :: [String]
- infoMessageQueueLength :: Int
- infoMonitors :: [(ProcessId, MonitorRef)]
- infoLinks :: [ProcessId]
- getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo)
- data NodeStats = NodeStats {}
- getNodeStats :: NodeId -> Process (Either DiedReason NodeStats)
- getLocalNodeStats :: Process NodeStats
- link :: ProcessId -> Process ()
- linkNode :: NodeId -> Process ()
- linkPort :: SendPort a -> Process ()
- unlink :: ProcessId -> Process ()
- unlinkNode :: NodeId -> Process ()
- unlinkPort :: SendPort a -> Process ()
- monitor :: ProcessId -> Process MonitorRef
- monitorNode :: NodeId -> Process MonitorRef
- monitorPort :: Serializable a => SendPort a -> Process MonitorRef
- unmonitor :: MonitorRef -> Process ()
- withMonitor :: ProcessId -> (MonitorRef -> Process a) -> Process a
- withMonitor_ :: ProcessId -> Process a -> Process a
- data MonitorRef
- data ProcessLinkException = ProcessLinkException !ProcessId !DiedReason
- data NodeLinkException = NodeLinkException !NodeId !DiedReason
- data PortLinkException = PortLinkException !SendPortId !DiedReason
- data ProcessMonitorNotification = ProcessMonitorNotification !MonitorRef !ProcessId !DiedReason
- data NodeMonitorNotification = NodeMonitorNotification !MonitorRef !NodeId !DiedReason
- data PortMonitorNotification = PortMonitorNotification !MonitorRef !SendPortId !DiedReason
- data DiedReason
- data Closure a
- closure :: Static (ByteString -> a) -> ByteString -> Closure a
- data Static a
- unStatic :: Typeable a => Static a -> Process a
- unClosure :: Typeable a => Closure a -> Process a
- data RemoteTable
- say :: String -> Process ()
- register :: String -> ProcessId -> Process ()
- reregister :: String -> ProcessId -> Process ()
- unregister :: String -> Process ()
- whereis :: String -> Process (Maybe ProcessId)
- nsend :: Serializable a => String -> a -> Process ()
- registerRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
- reregisterRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
- unregisterRemoteAsync :: NodeId -> String -> Process ()
- whereisRemoteAsync :: NodeId -> String -> Process ()
- nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()
- data WhereIsReply = WhereIsReply String (Maybe ProcessId)
- data RegisterReply = RegisterReply String Bool (Maybe ProcessId)
- catch :: Exception e => Process a -> (e -> Process a) -> Process a
- data Handler a = Exception e => Handler (e -> Process a)
- catches :: Process a -> [Handler a] -> Process a
- try :: Exception e => Process a -> Process (Either e a)
- mask :: ((forall a. Process a -> Process a) -> Process b) -> Process b
- mask_ :: Process a -> Process a
- onException :: Process a -> Process b -> Process a
- bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c
- bracket_ :: Process a -> Process b -> Process c -> Process c
- finally :: Process a -> Process b -> Process a
- spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef
- spawnSupervised :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)
- spawnLink :: NodeId -> Closure (Process ()) -> Process ProcessId
- spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef)
- spawnChannel :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (ReceivePort a -> Process ()) -> Process (SendPort a)
- data DidSpawn = DidSpawn SpawnRef ProcessId
- spawnLocal :: Process () -> Process ProcessId
- spawnChannelLocal :: Serializable a => (ReceivePort a -> Process ()) -> Process (SendPort a)
- callLocal :: Process a -> Process a
- reconnect :: ProcessId -> Process ()
- reconnectPort :: SendPort a -> Process ()
Basic types
Process identifier
Instances
Node identifier
Instances
Data NodeId Source # | |||||
Defined in Control.Distributed.Process.Internal.Types gfoldl :: (forall d b. Data d => c (d -> b) -> d -> c b) -> (forall g. g -> c g) -> NodeId -> c NodeId # gunfold :: (forall b r. Data b => c (b -> r) -> c r) -> (forall r. r -> c r) -> Constr -> c NodeId # toConstr :: NodeId -> Constr # dataTypeOf :: NodeId -> DataType # dataCast1 :: Typeable t => (forall d. Data d => c (t d)) -> Maybe (c NodeId) # dataCast2 :: Typeable t => (forall d e. (Data d, Data e) => c (t d e)) -> Maybe (c NodeId) # gmapT :: (forall b. Data b => b -> b) -> NodeId -> NodeId # gmapQl :: (r -> r' -> r) -> r -> (forall d. Data d => d -> r') -> NodeId -> r # gmapQr :: forall r r'. (r' -> r -> r) -> r -> (forall d. Data d => d -> r') -> NodeId -> r # gmapQ :: (forall d. Data d => d -> u) -> NodeId -> [u] # gmapQi :: Int -> (forall d. Data d => d -> u) -> NodeId -> u # gmapM :: Monad m => (forall d. Data d => d -> m d) -> NodeId -> m NodeId # gmapMp :: MonadPlus m => (forall d. Data d => d -> m d) -> NodeId -> m NodeId # gmapMo :: MonadPlus m => (forall d. Data d => d -> m d) -> NodeId -> m NodeId # | |||||
Generic NodeId Source # | |||||
Defined in Control.Distributed.Process.Internal.Types
| |||||
Show NodeId Source # | |||||
Binary NodeId Source # | |||||
NFData NodeId Source # | |||||
Defined in Control.Distributed.Process.Internal.Types | |||||
Eq NodeId Source # | |||||
Ord NodeId Source # | |||||
Hashable NodeId Source # | |||||
Defined in Control.Distributed.Process.Internal.Types | |||||
type Rep NodeId Source # | |||||
Defined in Control.Distributed.Process.Internal.Types type Rep NodeId = D1 ('MetaData "NodeId" "Control.Distributed.Process.Internal.Types" "distributed-process-0.7.5-14hdXLMmdeU5goWgtdXS4F" 'True) (C1 ('MetaCons "NodeId" 'PrefixI 'True) (S1 ('MetaSel ('Just "nodeAddress") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 EndPointAddress))) |
The Cloud Haskell Process
type
Instances
MonadFail Process Source # | |
Defined in Control.Distributed.Process.Internal.Types | |
MonadFix Process Source # | |
Defined in Control.Distributed.Process.Internal.Types | |
MonadIO Process Source # | |
Defined in Control.Distributed.Process.Internal.Types | |
Applicative Process Source # | |
Functor Process Source # | |
Monad Process Source # | |
MonadCatch Process Source # | |
Defined in Control.Distributed.Process.Internal.Types | |
MonadMask Process Source # | |
Defined in Control.Distributed.Process.Internal.Types mask :: HasCallStack => ((forall a. Process a -> Process a) -> Process b) -> Process b # uninterruptibleMask :: HasCallStack => ((forall a. Process a -> Process a) -> Process b) -> Process b # generalBracket :: HasCallStack => Process a -> (a -> ExitCase b -> Process c) -> (a -> Process b) -> Process (b, c) # | |
MonadThrow Process Source # | |
Defined in Control.Distributed.Process.Internal.Types throwM :: (HasCallStack, Exception e) => e -> Process a # | |
MonadReader LocalProcess Process Source # | |
Defined in Control.Distributed.Process.Internal.Types ask :: Process LocalProcess # local :: (LocalProcess -> LocalProcess) -> Process a -> Process a # reader :: (LocalProcess -> a) -> Process a # | |
Serializable b => MkTDict (Process b) Source # | |
data SendPortId Source #
A send port is identified by a SendPortId.
You cannot send directly to a SendPortId; instead, use newChan
to create a SendPort.
Instances
Generic SendPortId Source # | |||||
Defined in Control.Distributed.Process.Internal.Types
from :: SendPortId -> Rep SendPortId x # to :: Rep SendPortId x -> SendPortId # | |||||
Show SendPortId Source # | |||||
Defined in Control.Distributed.Process.Internal.Types showsPrec :: Int -> SendPortId -> ShowS # show :: SendPortId -> String # showList :: [SendPortId] -> ShowS # | |||||
Binary SendPortId Source # | |||||
Defined in Control.Distributed.Process.Internal.Types | |||||
NFData SendPortId Source # | |||||
Defined in Control.Distributed.Process.Internal.Types rnf :: SendPortId -> () # | |||||
Eq SendPortId Source # | |||||
Defined in Control.Distributed.Process.Internal.Types (==) :: SendPortId -> SendPortId -> Bool # (/=) :: SendPortId -> SendPortId -> Bool # | |||||
Ord SendPortId Source # | |||||
Defined in Control.Distributed.Process.Internal.Types compare :: SendPortId -> SendPortId -> Ordering # (<) :: SendPortId -> SendPortId -> Bool # (<=) :: SendPortId -> SendPortId -> Bool # (>) :: SendPortId -> SendPortId -> Bool # (>=) :: SendPortId -> SendPortId -> Bool # max :: SendPortId -> SendPortId -> SendPortId # min :: SendPortId -> SendPortId -> SendPortId # | |||||
Hashable SendPortId Source # | |||||
Defined in Control.Distributed.Process.Internal.Types hashWithSalt :: Int -> SendPortId -> Int # hash :: SendPortId -> Int # | |||||
type Rep SendPortId Source # | |||||
Defined in Control.Distributed.Process.Internal.Types type Rep SendPortId = D1 ('MetaData "SendPortId" "Control.Distributed.Process.Internal.Types" "distributed-process-0.7.5-14hdXLMmdeU5goWgtdXS4F" 'False) (C1 ('MetaCons "SendPortId" 'PrefixI 'True) (S1 ('MetaSel ('Just "sendPortProcessId") 'SourceUnpack 'SourceStrict 'DecidedStrict) (Rec0 ProcessId) :*: S1 ('MetaSel ('Just "sendPortLocalId") 'SourceUnpack 'SourceStrict 'DecidedStrict) (Rec0 LocalSendPortId))) |
processNodeId :: ProcessId -> NodeId Source #
The ID of the node the process is running on
sendPortProcessId :: SendPortId -> ProcessId Source #
The ID of the process that will receive messages sent on this port
liftIO :: MonadIO m => IO a -> m a #
Lift a computation from the IO
monad.
This allows us to run IO computations in any monadic stack, so long as it supports these kinds of operations
(i.e. IO
is the base monad for the stack).
Example
import Control.Monad.Trans.State -- from the "transformers" library printState :: Show s => StateT s IO () printState = do state <- get liftIO $ print state
Had we omitted
, we would have ended up with this error:liftIO
• Couldn't match type ‘IO’ with ‘StateT s IO’ Expected type: StateT s IO () Actual type: IO ()
The important part here is the mismatch between StateT s IO ()
and
.IO
()
Luckily, we know of a function that takes an
and returns an IO
a(m a)
:
,
enabling us to run the program and see the expected results:liftIO
> evalStateT printState "hello" "hello" > evalStateT printState 3 3
Basic messaging
expect :: Serializable a => Process a Source #
Wait for a message of a specific type
expectTimeout :: Serializable a => Int -> Process (Maybe a) Source #
Like expect
but with a timeout
Channels
data ReceivePort a Source #
The receive end of a typed channel (not serializable)
Note that ReceivePort
implements Functor
, Applicative
, Alternative
and Monad
. This is especially useful when merging receive ports.
Instances
Alternative ReceivePort Source # | |
Defined in Control.Distributed.Process.Internal.Types empty :: ReceivePort a # (<|>) :: ReceivePort a -> ReceivePort a -> ReceivePort a # some :: ReceivePort a -> ReceivePort [a] # many :: ReceivePort a -> ReceivePort [a] # | |
Applicative ReceivePort Source # | |
Defined in Control.Distributed.Process.Internal.Types pure :: a -> ReceivePort a # (<*>) :: ReceivePort (a -> b) -> ReceivePort a -> ReceivePort b # liftA2 :: (a -> b -> c) -> ReceivePort a -> ReceivePort b -> ReceivePort c # (*>) :: ReceivePort a -> ReceivePort b -> ReceivePort b # (<*) :: ReceivePort a -> ReceivePort b -> ReceivePort a # | |
Functor ReceivePort Source # | |
Defined in Control.Distributed.Process.Internal.Types fmap :: (a -> b) -> ReceivePort a -> ReceivePort b # (<$) :: a -> ReceivePort b -> ReceivePort a # | |
Monad ReceivePort Source # | |
Defined in Control.Distributed.Process.Internal.Types (>>=) :: ReceivePort a -> (a -> ReceivePort b) -> ReceivePort b # (>>) :: ReceivePort a -> ReceivePort b -> ReceivePort b # return :: a -> ReceivePort a # |
The send send of a typed channel (serializable)
Instances
Generic (SendPort a) Source # | |||||
Defined in Control.Distributed.Process.Internal.Types
| |||||
Show (SendPort a) Source # | |||||
Serializable a => Binary (SendPort a) Source # | |||||
NFData a => NFData (SendPort a) Source # | |||||
Defined in Control.Distributed.Process.Internal.Types | |||||
Eq (SendPort a) Source # | |||||
Ord (SendPort a) Source # | |||||
Defined in Control.Distributed.Process.Internal.Types | |||||
Hashable a => Hashable (SendPort a) Source # | |||||
Defined in Control.Distributed.Process.Internal.Types | |||||
type Rep (SendPort a) Source # | |||||
Defined in Control.Distributed.Process.Internal.Types type Rep (SendPort a) = D1 ('MetaData "SendPort" "Control.Distributed.Process.Internal.Types" "distributed-process-0.7.5-14hdXLMmdeU5goWgtdXS4F" 'True) (C1 ('MetaCons "SendPort" 'PrefixI 'True) (S1 ('MetaSel ('Just "sendPortId") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 SendPortId))) |
sendPortId :: SendPort a -> SendPortId Source #
The (unique) ID of this send port
newChan :: Serializable a => Process (SendPort a, ReceivePort a) Source #
Create a new typed channel, bound to the calling Process
Note that the channel is bound to the lifecycle of the process that evaluates this function, such that when it dies/exits, the channel will no longer function, but will remain accessible. Thus reading from the ReceivePort will fail silently thereafter, blocking indefinitely (unless a timeout is used).
sendChan :: Serializable a => SendPort a -> a -> Process () Source #
Send a message on a typed channel
receiveChan :: Serializable a => ReceivePort a -> Process a Source #
Wait for a message on a typed channel
receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a) Source #
Like receiveChan
but with a timeout. If the timeout is 0, do a
non-blocking check for a message.
mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a) Source #
Merge a list of typed channels.
The result port is left-biased: if there are messages available on more than one port, the first available message is returned.
mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a) Source #
Like mergePortsBiased
, but with a round-robin scheduler (rather than
left-biased)
Unsafe messaging variants
unsafeSend :: Serializable a => ProcessId -> a -> Process () Source #
Unsafe variant of send
. This function makes no attempt to serialize
and (in the case when the destination process resides on the same local
node) therefore ensure that the payload is fully evaluated before it is
delivered.
unsafeUSend :: Serializable a => ProcessId -> a -> Process () Source #
Unsafe variant of usend
. This function makes no attempt to serialize
the message when the destination process resides on the same local
node. Therefore, a local receiver would need to be prepared to cope with any
errors resulting from evaluation of the message.
unsafeSendChan :: Serializable a => SendPort a -> a -> Process () Source #
Send a message on a typed channel. This function makes no attempt to
serialize and (in the case when the ReceivePort
resides on the same local
node) therefore ensure that the payload is fully evaluated before it is
delivered.
unsafeNSend :: Serializable a => String -> a -> Process () Source #
Named send to a process in the local registry (asynchronous). This function makes no attempt to serialize and (in the case when the destination process resides on the same local node) therefore ensure that the payload is fully evaluated before it is delivered.
unsafeNSendRemote :: Serializable a => NodeId -> String -> a -> Process () Source #
Named send to a process in a remote registry (asynchronous) This function makes no attempt to serialize and (in the case when the destination process resides on the same local node) therefore ensure that the payload is fully evaluated before it is delivered.
unsafeWrapMessage :: Serializable a => a -> Message Source #
This is the unsafe variant of wrapMessage
. See
Control.Distributed.Process.UnsafePrimitives for details.
Advanced messaging
Opaque type used in receiveWait
and receiveTimeout
receiveWait :: [Match b] -> Process b Source #
Test the matches in order against each message in the queue
receiveTimeout :: Int -> [Match b] -> Process (Maybe b) Source #
Like receiveWait
but with a timeout.
If the timeout is zero do a non-blocking check for matching messages. A non-zero timeout is applied only when waiting for incoming messages (that is, after we have checked the messages that are already in the mailbox).
match :: Serializable a => (a -> Process b) -> Match b Source #
Match against any message of the right type
matchIf :: Serializable a => (a -> Bool) -> (a -> Process b) -> Match b Source #
Match against any message of the right type that satisfies a predicate
matchUnknown :: Process b -> Match b Source #
Remove any message from the queue
matchAny :: (Message -> Process b) -> Match b Source #
Match against an arbitrary message. matchAny
removes the first available
message from the process mailbox. To handle arbitrary raw messages once
removed from the mailbox, see handleMessage
and unwrapMessage
.
matchAnyIf :: Serializable a => (a -> Bool) -> (Message -> Process b) -> Match b Source #
Match against an arbitrary message. Intended for use with handleMessage
and unwrapMessage
, this function only removes a message from the process
mailbox, if the supplied condition matches. The success (or failure) of
runtime type checks deferred to handleMessage
and friends is irrelevant
here, i.e., if the condition evaluates to True
then the message will
be removed from the process mailbox and decoded, but that does not
guarantee that an expression passed to handleMessage
will pass the
runtime type checks and therefore be evaluated.
matchSTM :: STM a -> (a -> Process b) -> Match b Source #
Match on an arbitrary STM action.
This rather unusaul match primitive allows us to compose arbitrary STM
actions with checks against our process' mailbox and/or any typed channel
ReceivePort
s we may hold.
This allows us to process multiple input streams along with our mailbox,
in just the same way that matchChan
supports checking both the mailbox
and an arbitrary set of typed channels in one atomic transaction.
Note there are no ordering guarnatees with respect to these disparate input sources.
Messages consist of their typeRep fingerprint and their encoding
matchMessage :: (Message -> Process Message) -> Match Message Source #
Match against any message, regardless of the underlying (contained) type
matchMessageIf :: (Message -> Bool) -> (Message -> Process Message) -> Match Message Source #
Match against any message (regardless of underlying type) that satisfies a predicate
wrapMessage :: Serializable a => a -> Message Source #
Wrap a Serializable
value in a Message
. Note that Message
s are
Serializable
- like the datum they contain - but also note, deserialising
such a Message
will yield a Message
, not the type within it! To obtain the
wrapped datum, use unwrapMessage
or handleMessage
with a specific type.
do self <- getSelfPid send self (wrapMessage "blah") Nothing <- expectTimeout 1000000 :: Process (Maybe String) (Just m) <- expectTimeout 1000000 :: Process (Maybe Message) (Just "blah") <- unwrapMessage m :: Process (Maybe String)
unwrapMessage :: (Monad m, Serializable a) => Message -> m (Maybe a) Source #
Attempt to unwrap a raw Message
.
If the type of the decoded message payload matches the expected type, the
value will be returned with Just
, otherwise Nothing
indicates the types
do not match.
This expression, for example, will evaluate to Nothing
> unwrapMessage (wrapMessage "foobar") :: Process (Maybe Int)
Whereas this expression, will yield Just "foo"
> unwrapMessage (wrapMessage "foo") :: Process (Maybe String)
handleMessage :: (Monad m, Serializable a) => Message -> (a -> m b) -> m (Maybe b) Source #
Attempt to handle a raw Message
.
If the type of the message matches the type of the first argument to
the supplied expression, then the message will be decoded and the expression
evaluated against its value. If this runtime type checking fails however,
Nothing
will be returned to indicate the fact. If the check succeeds and
evaluation proceeds, the resulting value with be wrapped with Just
.
Intended for use in catchesExit
and matchAny
primitives.
handleMessageIf :: (Monad m, Serializable a) => Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b) Source #
Conditionally handle a raw Message
.
If the predicate (a -> Bool)
evaluates to True
, invokes the supplied
handler, other returns Nothing
to indicate failure. See handleMessage
for further information about runtime type checking.
handleMessage_ :: (Monad m, Serializable a) => Message -> (a -> m ()) -> m () Source #
As handleMessage
but ignores result, which is useful if you don't
care whether or not the handler succeeded.
handleMessageIf_ :: (Monad m, Serializable a) => Message -> (a -> Bool) -> (a -> m ()) -> m () Source #
Conditional version of handleMessage_
.
delegate :: ProcessId -> (Message -> Bool) -> Process () Source #
Receives messages and forwards them to pid
if p msg == True
.
relay :: ProcessId -> Process () Source #
A straight relay that forwards all messages to the supplied pid.
Process management
spawn :: NodeId -> Closure (Process ()) -> Process ProcessId Source #
Spawn a process
For more information about Closure
, see
Control.Distributed.Process.Closure.
See also call
.
call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Process a Source #
Run a process remotely and wait for it to reply
We monitor the remote process: if it dies before it can send a reply, we die too.
For more information about Static
, SerializableDict
, and Closure
, see
Control.Distributed.Process.Closure.
See also spawn
.
die :: Serializable a => a -> Process b Source #
Die immediately - throws a ProcessExitException
with the given reason
.
exit :: Serializable a => ProcessId -> a -> Process () Source #
Graceful request to exit a process. Throws ProcessExitException
with the
supplied reason
encoded as a message. Any exit signal raised in this
manner can be handled using the catchExit
family of functions.
catchExit :: (Show a, Serializable a) => Process b -> (ProcessId -> a -> Process b) -> Process b Source #
Catches ProcessExitException
. The handler will not be applied unless its
type matches the encoded data stored in the exception (see the reason
argument given to the exit
primitive). If the handler cannot be applied,
the exception will be re-thrown.
To handle ProcessExitException
without regard for reason, see catch
.
To handle multiple reasons of differing types, see catchesExit
.
catchesExit :: Process b -> [ProcessId -> Message -> Process (Maybe b)] -> Process b Source #
Lift catches
(almost).
As ProcessExitException
stores the exit reason
as a typed, encoded
message, a handler must accept inputs of the expected type. In order to
handle a list of potentially different handlers (and therefore input types),
a handler passed to catchesExit
must accept Message
and return
Maybe
(i.e., Just p
if it handled the exit reason, otherwise Nothing
).
See maybeHandleMessage
and Message
for more details.
data ProcessTerminationException Source #
Thrown by terminate
Instances
Exception ProcessTerminationException Source # | |
Show ProcessTerminationException Source # | |
Defined in Control.Distributed.Process.Internal.Primitives showsPrec :: Int -> ProcessTerminationException -> ShowS # show :: ProcessTerminationException -> String # showList :: [ProcessTerminationException] -> ShowS # |
data ProcessRegistrationException Source #
Exception thrown when a process attempts to register a process under an already-registered name or to unregister a name that hasn't been registered. Returns the name and the identifier of the process that owns it, if any.
Instances
Exception ProcessRegistrationException Source # | |
Show ProcessRegistrationException Source # | |
Defined in Control.Distributed.Process.Internal.Types showsPrec :: Int -> ProcessRegistrationException -> ShowS # show :: ProcessRegistrationException -> String # showList :: [ProcessRegistrationException] -> ShowS # |
SpawnRef
are used to return pids of spawned processes
getSelfPid :: Process ProcessId Source #
Our own process ID
getSelfNode :: Process NodeId Source #
Get the node ID of our local node
data ProcessInfo Source #
Provide information about a running process
ProcessInfo | |
|
Instances
Show ProcessInfo Source # | |
Defined in Control.Distributed.Process.Internal.Types showsPrec :: Int -> ProcessInfo -> ShowS # show :: ProcessInfo -> String # showList :: [ProcessInfo] -> ShowS # | |
Binary ProcessInfo Source # | |
Defined in Control.Distributed.Process.Internal.Types | |
Eq ProcessInfo Source # | |
Defined in Control.Distributed.Process.Internal.Types (==) :: ProcessInfo -> ProcessInfo -> Bool # (/=) :: ProcessInfo -> ProcessInfo -> Bool # |
getProcessInfo :: ProcessId -> Process (Maybe ProcessInfo) Source #
Get information about the specified process
getNodeStats :: NodeId -> Process (Either DiedReason NodeStats) Source #
Get statistics about the specified node
getLocalNodeStats :: Process NodeStats Source #
Get statistics about our local node
Monitoring and linking
link :: ProcessId -> Process () Source #
Link to a remote process (asynchronous)
When process A links to process B (that is, process A calls
link pidB
) then an asynchronous exception will be thrown to process A
when process B terminates (normally or abnormally), or when process A gets
disconnected from process B. Although it is technically possible to catch
these exceptions, chances are if you find yourself trying to do so you should
probably be using monitor
rather than link
. In particular, code such as
link pidB -- Link to process B expect -- Wait for a message from process B unlink pidB -- Unlink again
doesn't quite do what one might expect: if process B sends a message to
process A, and subsequently terminates, then process A might or might not
be terminated too, depending on whether the exception is thrown before or
after the unlink
(i.e., this code has a race condition).
Linking is all-or-nothing: A is either linked to B, or it's not. A second
call to link
has no effect.
Note that link
provides unidirectional linking (see spawnSupervised
).
Linking makes no distinction between normal and abnormal termination of
the remote process.
unlink :: ProcessId -> Process () Source #
Remove a link
This is synchronous in the sense that once it returns you are guaranteed that no exception will be raised if the remote process dies. However, it is asynchronous in the sense that we do not wait for a response from the remote node.
unlinkNode :: NodeId -> Process () Source #
Remove a node link
This has the same synchronous/asynchronous nature as unlink
.
unlinkPort :: SendPort a -> Process () Source #
Remove a channel (send port) link
This has the same synchronous/asynchronous nature as unlink
.
monitor :: ProcessId -> Process MonitorRef Source #
Monitor another process (asynchronous)
When process A monitors process B (that is, process A calls
monitor pidB
) then process A will receive a ProcessMonitorNotification
when process B terminates (normally or abnormally), or when process A gets
disconnected from process B. You receive this message like any other (using
expect
); the notification includes a reason (DiedNormal
, DiedException
,
DiedDisconnect
, etc.).
Every call to monitor
returns a new monitor reference MonitorRef
; if
multiple monitors are set up, multiple notifications will be delivered
and monitors can be disabled individually using unmonitor
.
monitorNode :: NodeId -> Process MonitorRef Source #
Monitor a node (asynchronous)
monitorPort :: Serializable a => SendPort a -> Process MonitorRef Source #
Monitor a typed channel (asynchronous)
unmonitor :: MonitorRef -> Process () Source #
Remove a monitor
This has the same synchronous/asynchronous nature as unlink
.
ProcessMonitorNotification messages for the given MonitorRef are removed from the mailbox.
withMonitor :: ProcessId -> (MonitorRef -> Process a) -> Process a Source #
Establishes temporary monitoring of another process.
withMonitor pid code
sets up monitoring of pid
for the duration
of code
. Note: although monitoring is no longer active when
withMonitor
returns, there might still be unreceived monitor
messages in the queue.
withMonitor_ :: ProcessId -> Process a -> Process a Source #
Establishes temporary monitoring of another process.
withMonitor_ pid code
sets up monitoring of pid
for the duration
of code
. Note: although monitoring is no longer active when
withMonitor_
returns, there might still be unreceived monitor
messages in the queue.
Since 0.6.1
data MonitorRef Source #
MonitorRef is opaque for regular Cloud Haskell processes
Instances
Generic MonitorRef Source # | |||||
Defined in Control.Distributed.Process.Internal.Types
from :: MonitorRef -> Rep MonitorRef x # to :: Rep MonitorRef x -> MonitorRef # | |||||
Show MonitorRef Source # | |||||
Defined in Control.Distributed.Process.Internal.Types showsPrec :: Int -> MonitorRef -> ShowS # show :: MonitorRef -> String # showList :: [MonitorRef] -> ShowS # | |||||
Binary MonitorRef Source # | |||||
Defined in Control.Distributed.Process.Internal.Types | |||||
NFData MonitorRef Source # | |||||
Defined in Control.Distributed.Process.Internal.Types rnf :: MonitorRef -> () # | |||||
Eq MonitorRef Source # | |||||
Defined in Control.Distributed.Process.Internal.Types (==) :: MonitorRef -> MonitorRef -> Bool # (/=) :: MonitorRef -> MonitorRef -> Bool # | |||||
Ord MonitorRef Source # | |||||
Defined in Control.Distributed.Process.Internal.Types compare :: MonitorRef -> MonitorRef -> Ordering # (<) :: MonitorRef -> MonitorRef -> Bool # (<=) :: MonitorRef -> MonitorRef -> Bool # (>) :: MonitorRef -> MonitorRef -> Bool # (>=) :: MonitorRef -> MonitorRef -> Bool # max :: MonitorRef -> MonitorRef -> MonitorRef # min :: MonitorRef -> MonitorRef -> MonitorRef # | |||||
Hashable MonitorRef Source # | |||||
Defined in Control.Distributed.Process.Internal.Types hashWithSalt :: Int -> MonitorRef -> Int # hash :: MonitorRef -> Int # | |||||
type Rep MonitorRef Source # | |||||
Defined in Control.Distributed.Process.Internal.Types type Rep MonitorRef = D1 ('MetaData "MonitorRef" "Control.Distributed.Process.Internal.Types" "distributed-process-0.7.5-14hdXLMmdeU5goWgtdXS4F" 'False) (C1 ('MetaCons "MonitorRef" 'PrefixI 'True) (S1 ('MetaSel ('Just "monitorRefIdent") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Identifier) :*: S1 ('MetaSel ('Just "monitorRefCounter") 'NoSourceUnpackedness 'SourceStrict 'DecidedStrict) (Rec0 Int32))) |
data ProcessLinkException Source #
Exceptions thrown when a linked process dies
Instances
Exception ProcessLinkException Source # | |
Show ProcessLinkException Source # | |
Defined in Control.Distributed.Process.Internal.Types showsPrec :: Int -> ProcessLinkException -> ShowS # show :: ProcessLinkException -> String # showList :: [ProcessLinkException] -> ShowS # |
data NodeLinkException Source #
Exception thrown when a linked node dies
Instances
Exception NodeLinkException Source # | |
Show NodeLinkException Source # | |
Defined in Control.Distributed.Process.Internal.Types showsPrec :: Int -> NodeLinkException -> ShowS # show :: NodeLinkException -> String # showList :: [NodeLinkException] -> ShowS # |
data PortLinkException Source #
Exception thrown when a linked channel (port) dies
Instances
Exception PortLinkException Source # | |
Show PortLinkException Source # | |
Defined in Control.Distributed.Process.Internal.Types showsPrec :: Int -> PortLinkException -> ShowS # show :: PortLinkException -> String # showList :: [PortLinkException] -> ShowS # |
data ProcessMonitorNotification Source #
Message sent by process monitors
Instances
Show ProcessMonitorNotification Source # | |
Defined in Control.Distributed.Process.Internal.Types showsPrec :: Int -> ProcessMonitorNotification -> ShowS # show :: ProcessMonitorNotification -> String # showList :: [ProcessMonitorNotification] -> ShowS # | |
Binary ProcessMonitorNotification Source # | |
Defined in Control.Distributed.Process.Internal.Types put :: ProcessMonitorNotification -> Put # get :: Get ProcessMonitorNotification # putList :: [ProcessMonitorNotification] -> Put # |
data NodeMonitorNotification Source #
Message sent by node monitors
Instances
Show NodeMonitorNotification Source # | |
Defined in Control.Distributed.Process.Internal.Types showsPrec :: Int -> NodeMonitorNotification -> ShowS # show :: NodeMonitorNotification -> String # showList :: [NodeMonitorNotification] -> ShowS # | |
Binary NodeMonitorNotification Source # | |
Defined in Control.Distributed.Process.Internal.Types put :: NodeMonitorNotification -> Put # get :: Get NodeMonitorNotification # putList :: [NodeMonitorNotification] -> Put # |
data PortMonitorNotification Source #
Message sent by channel (port) monitors
Instances
Show PortMonitorNotification Source # | |
Defined in Control.Distributed.Process.Internal.Types showsPrec :: Int -> PortMonitorNotification -> ShowS # show :: PortMonitorNotification -> String # showList :: [PortMonitorNotification] -> ShowS # | |
Binary PortMonitorNotification Source # | |
Defined in Control.Distributed.Process.Internal.Types put :: PortMonitorNotification -> Put # get :: Get PortMonitorNotification # putList :: [PortMonitorNotification] -> Put # |
data DiedReason Source #
Why did a process die?
DiedNormal | Normal termination |
DiedException !String | The process exited with an exception
(provided as |
DiedDisconnect | We got disconnected from the process node |
DiedNodeDown | The process node died |
DiedUnknownId | Invalid (processnodechannel) identifier |
Instances
Show DiedReason Source # | |
Defined in Control.Distributed.Process.Internal.Types showsPrec :: Int -> DiedReason -> ShowS # show :: DiedReason -> String # showList :: [DiedReason] -> ShowS # | |
Binary DiedReason Source # | |
Defined in Control.Distributed.Process.Internal.Types | |
NFData DiedReason Source # | |
Defined in Control.Distributed.Process.Internal.Types rnf :: DiedReason -> () # | |
Eq DiedReason Source # | |
Defined in Control.Distributed.Process.Internal.Types (==) :: DiedReason -> DiedReason -> Bool # (/=) :: DiedReason -> DiedReason -> Bool # |
Closures
A closure is a static value and an encoded environment
:: Static (ByteString -> a) | Decoder |
-> ByteString | Encoded closure environment |
-> Closure a |
A static value. Static is opaque; see staticLabel
and staticApply
.
data RemoteTable #
Runtime dictionary for unstatic
lookups
Logging
say :: String -> Process () Source #
Log a string
say message
sends a message of type SayMessage
with the current time and
ProcessId
of the current process to the process registered as logger
. By
default, this process simply sends the string to stderr
. Individual Cloud
Haskell backends might replace this with a different logger process, however.
Registry
register :: String -> ProcessId -> Process () Source #
Register a process with the local registry (synchronous). The name must not
already be registered. The process need not be on this node. A bad
registration will result in a ProcessRegistrationException
The process to be registered does not have to be local itself.
reregister :: String -> ProcessId -> Process () Source #
Like register
, but will replace an existing registration.
The name must already be registered.
unregister :: String -> Process () Source #
Remove a process from the local registry (asynchronous). This version will wait until a response is gotten from the management process. The name must already be registered.
nsend :: Serializable a => String -> a -> Process () Source #
Named send to a process in the local registry (asynchronous)
registerRemoteAsync :: NodeId -> String -> ProcessId -> Process () Source #
Register a process with a remote registry (asynchronous).
The process to be registered does not have to live on the same remote node.
Reply wil come in the form of a RegisterReply
message
See comments in whereisRemoteAsync
unregisterRemoteAsync :: NodeId -> String -> Process () Source #
Remove a process from a remote registry (asynchronous).
Reply wil come in the form of a RegisterReply
message
See comments in whereisRemoteAsync
whereisRemoteAsync :: NodeId -> String -> Process () Source #
Query a remote process registry (asynchronous)
Reply will come in the form of a WhereIsReply
message.
There is currently no synchronous version of whereisRemoteAsync
: if
you implement one yourself, be sure to take into account that the remote
node might die or get disconnect before it can respond (i.e. you should
use monitorNode
and take appropriate action when you receive a
NodeMonitorNotification
).
nsendRemote :: Serializable a => NodeId -> String -> a -> Process () Source #
Named send to a process in a remote registry (asynchronous)
data WhereIsReply Source #
(Asynchronous) reply from whereis
Instances
Show WhereIsReply Source # | |
Defined in Control.Distributed.Process.Internal.Types showsPrec :: Int -> WhereIsReply -> ShowS # show :: WhereIsReply -> String # showList :: [WhereIsReply] -> ShowS # | |
Binary WhereIsReply Source # | |
Defined in Control.Distributed.Process.Internal.Types |
data RegisterReply Source #
(Asynchronous) reply from register
and unregister
Instances
Show RegisterReply Source # | |
Defined in Control.Distributed.Process.Internal.Types showsPrec :: Int -> RegisterReply -> ShowS # show :: RegisterReply -> String # showList :: [RegisterReply] -> ShowS # | |
Binary RegisterReply Source # | |
Defined in Control.Distributed.Process.Internal.Types |
Exception handling
catch :: Exception e => Process a -> (e -> Process a) -> Process a Source #
Deprecated: Use Control.Monad.Catch.catch instead
Lift catch
You need this when using catches
try :: Exception e => Process a -> Process (Either e a) Source #
Deprecated: Use Control.Monad.Catch.try instead
Lift try
mask :: ((forall a. Process a -> Process a) -> Process b) -> Process b Source #
Deprecated: Use Control.Monad.Catch.mask_ instead
Lift mask
onException :: Process a -> Process b -> Process a Source #
Deprecated: Use Control.Monad.Catch.onException instead
Lift onException
bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c Source #
Deprecated: Use Control.Monad.Catch.bracket instead
Lift bracket
bracket_ :: Process a -> Process b -> Process c -> Process c Source #
Deprecated: Use Control.Monad.Catch.bracket_ instead
Lift bracket_
finally :: Process a -> Process b -> Process a Source #
Deprecated: Use Control.Monad.Catch.finally instead
Lift finally
Auxiliary API
spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef Source #
Asynchronous version of spawn
(spawn
is defined in terms of spawnAsync
and expect
)
spawnSupervised :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef) Source #
Spawn a child process, have the child link to the parent and the parent monitor the child
spawnMonitor :: NodeId -> Closure (Process ()) -> Process (ProcessId, MonitorRef) Source #
Like spawnLink
, but monitor the spawned process
spawnChannel :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (ReceivePort a -> Process ()) -> Process (SendPort a) Source #
Spawn a new process, supplying it with a new ReceivePort
and return
the corresponding SendPort
.
Local versions of spawn
spawnChannelLocal :: Serializable a => (ReceivePort a -> Process ()) -> Process (SendPort a) Source #
Create a new typed channel, spawn a process on the local node, passing it the receive port, and return the send port
callLocal :: Process a -> Process a Source #
Local version of call
. Running a process in this way isolates it from
messages sent to the caller process, and also allows silently dropping late
or duplicate messages sent to the isolated process after it exits.
Silently dropping messages may not always be the best approach.
Reconnecting
reconnect :: ProcessId -> Process () Source #
Cloud Haskell provides the illusion of connection-less, reliable, ordered
message passing. However, when network connections get disrupted this
illusion cannot always be maintained. Once a network connection breaks (even
temporarily) no further communication on that connection will be possible.
For example, if process A sends a message to process B, and A is then
notified (by monitor notification) that it got disconnected from B, A will
not be able to send any further messages to B, unless A explicitly
indicates that it is acceptable to attempt to reconnect to B using the
Cloud Haskell reconnect
primitive.
Importantly, when A calls reconnect
it acknowledges that some messages to
B might have been lost. For instance, if A sends messages m1 and m2 to B,
then receives a monitor notification that its connection to B has been lost,
calls reconnect
and then sends m3, it is possible that B will receive m1
and m3 but not m2.
Note that reconnect
does not mean reconnect now but rather /it is okay
to attempt to reconnect on the next send/. In particular, if no further
communication attempts are made to B then A can use reconnect to clean up
its connection to B.