{-# LANGUAGE CPP #-}
module Control.Distributed.Process
(
ProcessId
, NodeId(..)
, Process
, SendPortId
, processNodeId
, sendPortProcessId
, liftIO
, send
, usend
, expect
, expectTimeout
, ReceivePort
, SendPort
, sendPortId
, newChan
, sendChan
, receiveChan
, receiveChanTimeout
, mergePortsBiased
, mergePortsRR
, unsafeSend
, unsafeUSend
, unsafeSendChan
, unsafeNSend
, unsafeNSendRemote
, unsafeWrapMessage
, Match
, receiveWait
, receiveTimeout
, match
, matchIf
, matchUnknown
, matchAny
, matchAnyIf
, matchChan
, matchSTM
, Message
, matchMessage
, matchMessageIf
, isEncoded
, wrapMessage
, unwrapMessage
, handleMessage
, handleMessageIf
, handleMessage_
, handleMessageIf_
, forward
, uforward
, delegate
, relay
, proxy
, spawn
, call
, terminate
, die
, kill
, exit
, catchExit
, catchesExit
, ProcessTerminationException(..)
, ProcessRegistrationException(..)
, SpawnRef
, getSelfPid
, getSelfNode
, ProcessInfo(..)
, getProcessInfo
, NodeStats(..)
, getNodeStats
, getLocalNodeStats
, link
, linkNode
, linkPort
, unlink
, unlinkNode
, unlinkPort
, monitor
, monitorNode
, monitorPort
, unmonitor
, withMonitor
, withMonitor_
, MonitorRef
, ProcessLinkException(..)
, NodeLinkException(..)
, PortLinkException(..)
, ProcessMonitorNotification(..)
, NodeMonitorNotification(..)
, PortMonitorNotification(..)
, DiedReason(..)
, Closure
, closure
, Static
, unStatic
, unClosure
, RemoteTable
, say
, register
, reregister
, unregister
, whereis
, nsend
, registerRemoteAsync
, reregisterRemoteAsync
, unregisterRemoteAsync
, whereisRemoteAsync
, nsendRemote
, WhereIsReply(..)
, RegisterReply(..)
, catch
, Handler(..)
, catches
, try
, mask
, mask_
, onException
, bracket
, bracket_
, finally
, spawnAsync
, spawnSupervised
, spawnLink
, spawnMonitor
, spawnChannel
, DidSpawn(..)
, spawnLocal
, spawnChannelLocal
, callLocal
, reconnect
, reconnectPort
) where
import Control.Monad.IO.Class (liftIO)
import Control.Applicative
import Control.Monad.Reader (ask)
import Control.Concurrent (killThread)
import Control.Concurrent.MVar
( MVar
, newEmptyMVar
, takeMVar
, putMVar
)
import Control.Distributed.Static
( Closure
, closure
, Static
, RemoteTable
)
import Control.Distributed.Process.Internal.Types
( NodeId(..)
, ProcessId(..)
, Process(..)
, MonitorRef(..)
, ProcessMonitorNotification(..)
, NodeMonitorNotification(..)
, PortMonitorNotification(..)
, ProcessLinkException(..)
, NodeLinkException(..)
, PortLinkException(..)
, ProcessRegistrationException(..)
, DiedReason(..)
, SpawnRef(..)
, DidSpawn(..)
, SendPort(..)
, ReceivePort(..)
, SendPortId(..)
, WhereIsReply(..)
, RegisterReply(..)
, LocalProcess(processNode)
, Message
, localProcessWithId
)
import Control.Distributed.Process.Serializable (Serializable)
import Control.Distributed.Process.Internal.Primitives
(
send
, usend
, expect
, newChan
, sendChan
, receiveChan
, mergePortsBiased
, mergePortsRR
, unsafeSend
, unsafeUSend
, unsafeSendChan
, unsafeNSend
, unsafeNSendRemote
, Match
, receiveWait
, receiveTimeout
, match
, matchIf
, matchUnknown
, matchAny
, matchAnyIf
, matchChan
, matchSTM
, matchMessage
, matchMessageIf
, isEncoded
, wrapMessage
, unsafeWrapMessage
, unwrapMessage
, handleMessage
, handleMessageIf
, handleMessage_
, handleMessageIf_
, forward
, uforward
, delegate
, relay
, proxy
, terminate
, ProcessTerminationException(..)
, die
, exit
, catchExit
, catchesExit
, kill
, getSelfPid
, getSelfNode
, ProcessInfo(..)
, getProcessInfo
, NodeStats(..)
, getNodeStats
, getLocalNodeStats
, link
, linkNode
, linkPort
, unlink
, unlinkNode
, unlinkPort
, monitor
, monitorNode
, monitorPort
, unmonitor
, withMonitor
, withMonitor_
, say
, register
, reregister
, unregister
, whereis
, nsend
, registerRemoteAsync
, reregisterRemoteAsync
, unregisterRemoteAsync
, whereisRemoteAsync
, nsendRemote
, unStatic
, unClosure
, catch
, Handler(..)
, catches
, try
, mask
, mask_
, onException
, bracket
, bracket_
, finally
, expectTimeout
, receiveChanTimeout
, spawnAsync
, reconnect
, reconnectPort
)
import Control.Distributed.Process.Node (forkProcess)
import Control.Distributed.Process.Internal.Types
( processThread
, withValidLocalState
)
import Control.Distributed.Process.Internal.Spawn
(
spawn
, spawnLink
, spawnMonitor
, spawnChannel
, spawnSupervised
, call
)
import qualified Control.Monad.Catch as Catch
#if MIN_VERSION_base(4,6,0)
import Prelude
#else
import Prelude hiding (catch)
#endif
import qualified Control.Exception as Exception (onException)
import Data.Accessor ((^.))
import Data.Foldable (forM_)
spawnLocal :: Process () -> Process ProcessId
spawnLocal :: Process () -> Process ProcessId
spawnLocal Process ()
proc = do
LocalNode
node <- LocalProcess -> LocalNode
processNode (LocalProcess -> LocalNode)
-> Process LocalProcess -> Process LocalNode
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
IO ProcessId -> Process ProcessId
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ProcessId -> Process ProcessId)
-> IO ProcessId -> Process ProcessId
forall a b. (a -> b) -> a -> b
$ LocalNode -> Process () -> IO ProcessId
forkProcess LocalNode
node Process ()
proc
spawnChannelLocal :: Serializable a
=> (ReceivePort a -> Process ())
-> Process (SendPort a)
spawnChannelLocal :: forall a.
Serializable a =>
(ReceivePort a -> Process ()) -> Process (SendPort a)
spawnChannelLocal ReceivePort a -> Process ()
proc = do
LocalNode
node <- LocalProcess -> LocalNode
processNode (LocalProcess -> LocalNode)
-> Process LocalProcess -> Process LocalNode
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
IO (SendPort a) -> Process (SendPort a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SendPort a) -> Process (SendPort a))
-> IO (SendPort a) -> Process (SendPort a)
forall a b. (a -> b) -> a -> b
$ do
MVar (SendPort a)
mvar <- IO (MVar (SendPort a))
forall a. IO (MVar a)
newEmptyMVar
ProcessId
_ <- LocalNode -> Process () -> IO ProcessId
forkProcess LocalNode
node (Process () -> IO ProcessId) -> Process () -> IO ProcessId
forall a b. (a -> b) -> a -> b
$ do
(SendPort a
sport, ReceivePort a
rport) <- Process (SendPort a, ReceivePort a)
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MVar (SendPort a) -> SendPort a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (SendPort a)
mvar SendPort a
sport
ReceivePort a -> Process ()
proc ReceivePort a
rport
MVar (SendPort a) -> IO (SendPort a)
forall a. MVar a -> IO a
takeMVar MVar (SendPort a)
mvar
callLocal :: Process a -> Process a
callLocal :: forall a. Process a -> Process a
callLocal Process a
proc = ((forall a. Process a -> Process a) -> Process a) -> Process a
forall b.
HasCallStack =>
((forall a. Process a -> Process a) -> Process b) -> Process b
forall (m :: * -> *) b.
(MonadMask m, HasCallStack) =>
((forall a. m a -> m a) -> m b) -> m b
Catch.mask (((forall a. Process a -> Process a) -> Process a) -> Process a)
-> ((forall a. Process a -> Process a) -> Process a) -> Process a
forall a b. (a -> b) -> a -> b
$ \forall a. Process a -> Process a
release -> do
MVar (Either SomeException a)
mv <- IO (MVar (Either SomeException a))
-> Process (MVar (Either SomeException a))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar (Either SomeException a))
forall a. IO (MVar a)
newEmptyMVar :: Process (MVar (Either Catch.SomeException a))
ProcessId
child <- Process () -> Process ProcessId
spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$ Process a -> Process (Either SomeException a)
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
Catch.try (Process a -> Process a
forall a. Process a -> Process a
release Process a
proc) Process (Either SomeException a)
-> (Either SomeException a -> Process ()) -> Process ()
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ())
-> (Either SomeException a -> IO ())
-> Either SomeException a
-> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar (Either SomeException a) -> Either SomeException a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException a)
mv
LocalProcess
lproc <- Process LocalProcess
forall r (m :: * -> *). MonadReader r m => m r
ask
IO a -> Process a
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> Process a) -> IO a -> Process a
forall a b. (a -> b) -> a -> b
$ do
Either SomeException a
rs <- IO (Either SomeException a)
-> IO (Either SomeException a) -> IO (Either SomeException a)
forall a b. IO a -> IO b -> IO a
Exception.onException (MVar (Either SomeException a) -> IO (Either SomeException a)
forall a. MVar a -> IO a
takeMVar MVar (Either SomeException a)
mv) (IO (Either SomeException a) -> IO (Either SomeException a))
-> IO (Either SomeException a) -> IO (Either SomeException a)
forall a b. (a -> b) -> a -> b
$ IO (Either SomeException a) -> IO (Either SomeException a)
forall (m :: * -> *) a. (HasCallStack, MonadMask m) => m a -> m a
Catch.uninterruptibleMask_ (IO (Either SomeException a) -> IO (Either SomeException a))
-> IO (Either SomeException a) -> IO (Either SomeException a)
forall a b. (a -> b) -> a -> b
$
do Maybe ThreadId
mchildThreadId <- LocalNode
-> (ValidLocalNodeState -> IO (Maybe ThreadId))
-> IO (Maybe ThreadId)
forall r. LocalNode -> (ValidLocalNodeState -> IO r) -> IO r
withValidLocalState (LocalProcess -> LocalNode
processNode LocalProcess
lproc) ((ValidLocalNodeState -> IO (Maybe ThreadId))
-> IO (Maybe ThreadId))
-> (ValidLocalNodeState -> IO (Maybe ThreadId))
-> IO (Maybe ThreadId)
forall a b. (a -> b) -> a -> b
$
\ValidLocalNodeState
vst -> Maybe ThreadId -> IO (Maybe ThreadId)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ThreadId -> IO (Maybe ThreadId))
-> Maybe ThreadId -> IO (Maybe ThreadId)
forall a b. (a -> b) -> a -> b
$ (LocalProcess -> ThreadId) -> Maybe LocalProcess -> Maybe ThreadId
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap LocalProcess -> ThreadId
processThread (Maybe LocalProcess -> Maybe ThreadId)
-> Maybe LocalProcess -> Maybe ThreadId
forall a b. (a -> b) -> a -> b
$
ValidLocalNodeState
vst ValidLocalNodeState
-> T ValidLocalNodeState (Maybe LocalProcess) -> Maybe LocalProcess
forall r a. r -> T r a -> a
^. LocalProcessId -> T ValidLocalNodeState (Maybe LocalProcess)
localProcessWithId (ProcessId -> LocalProcessId
processLocalId ProcessId
child)
Maybe ThreadId -> (ThreadId -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe ThreadId
mchildThreadId ThreadId -> IO ()
killThread
MVar (Either SomeException a) -> IO (Either SomeException a)
forall a. MVar a -> IO a
takeMVar MVar (Either SomeException a)
mv
(SomeException -> IO a)
-> (a -> IO a) -> Either SomeException a -> IO a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
Catch.throwM a -> IO a
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Either SomeException a
rs