module Control.Distributed.Process
(
ProcessId
, NodeId(..)
, Process
, SendPortId
, processNodeId
, sendPortProcessId
, liftIO
, send
, usend
, expect
, expectTimeout
, ReceivePort
, SendPort
, sendPortId
, newChan
, sendChan
, receiveChan
, receiveChanTimeout
, mergePortsBiased
, mergePortsRR
, unsafeSend
, unsafeUSend
, unsafeSendChan
, unsafeNSend
, unsafeNSendRemote
, unsafeWrapMessage
, Match
, receiveWait
, receiveTimeout
, match
, matchIf
, matchUnknown
, matchAny
, matchAnyIf
, matchChan
, matchSTM
, Message
, matchMessage
, matchMessageIf
, isEncoded
, wrapMessage
, unwrapMessage
, handleMessage
, handleMessageIf
, handleMessage_
, handleMessageIf_
, forward
, uforward
, delegate
, relay
, proxy
, spawn
, call
, terminate
, die
, kill
, exit
, catchExit
, catchesExit
, ProcessTerminationException(..)
, ProcessRegistrationException(..)
, SpawnRef
, getSelfPid
, getSelfNode
, ProcessInfo(..)
, getProcessInfo
, NodeStats(..)
, getNodeStats
, getLocalNodeStats
, link
, linkNode
, linkPort
, unlink
, unlinkNode
, unlinkPort
, monitor
, monitorNode
, monitorPort
, unmonitor
, withMonitor
, MonitorRef
, ProcessLinkException(..)
, NodeLinkException(..)
, PortLinkException(..)
, ProcessMonitorNotification(..)
, NodeMonitorNotification(..)
, PortMonitorNotification(..)
, DiedReason(..)
, Closure
, closure
, Static
, unStatic
, unClosure
, RemoteTable
, say
, register
, reregister
, unregister
, whereis
, nsend
, registerRemoteAsync
, reregisterRemoteAsync
, unregisterRemoteAsync
, whereisRemoteAsync
, nsendRemote
, WhereIsReply(..)
, RegisterReply(..)
, catch
, Handler(..)
, catches
, try
, mask
, mask_
, onException
, bracket
, bracket_
, finally
, spawnAsync
, spawnSupervised
, spawnLink
, spawnMonitor
, spawnChannel
, DidSpawn(..)
, spawnLocal
, spawnChannelLocal
, callLocal
, reconnect
, reconnectPort
) where
import Control.Monad.IO.Class (liftIO)
import Control.Applicative
import Control.Monad.Reader (ask)
import Control.Concurrent (killThread)
import Control.Concurrent.MVar
( MVar
, newEmptyMVar
, takeMVar
, putMVar
)
import Control.Distributed.Static
( Closure
, closure
, Static
, RemoteTable
)
import Control.Distributed.Process.Internal.Types
( NodeId(..)
, ProcessId(..)
, Process(..)
, MonitorRef(..)
, ProcessMonitorNotification(..)
, NodeMonitorNotification(..)
, PortMonitorNotification(..)
, ProcessLinkException(..)
, NodeLinkException(..)
, PortLinkException(..)
, ProcessRegistrationException(..)
, DiedReason(..)
, SpawnRef(..)
, DidSpawn(..)
, SendPort(..)
, ReceivePort(..)
, SendPortId(..)
, WhereIsReply(..)
, RegisterReply(..)
, LocalProcess(processNode)
, Message
, localProcessWithId
)
import Control.Distributed.Process.Serializable (Serializable)
import Control.Distributed.Process.Internal.Primitives
(
send
, usend
, expect
, newChan
, sendChan
, receiveChan
, mergePortsBiased
, mergePortsRR
, unsafeSend
, unsafeUSend
, unsafeSendChan
, unsafeNSend
, unsafeNSendRemote
, Match
, receiveWait
, receiveTimeout
, match
, matchIf
, matchUnknown
, matchAny
, matchAnyIf
, matchChan
, matchSTM
, matchMessage
, matchMessageIf
, isEncoded
, wrapMessage
, unsafeWrapMessage
, unwrapMessage
, handleMessage
, handleMessageIf
, handleMessage_
, handleMessageIf_
, forward
, uforward
, delegate
, relay
, proxy
, terminate
, ProcessTerminationException(..)
, die
, exit
, catchExit
, catchesExit
, kill
, getSelfPid
, getSelfNode
, ProcessInfo(..)
, getProcessInfo
, NodeStats(..)
, getNodeStats
, getLocalNodeStats
, link
, linkNode
, linkPort
, unlink
, unlinkNode
, unlinkPort
, monitor
, monitorNode
, monitorPort
, unmonitor
, withMonitor
, say
, register
, reregister
, unregister
, whereis
, nsend
, registerRemoteAsync
, reregisterRemoteAsync
, unregisterRemoteAsync
, whereisRemoteAsync
, nsendRemote
, unStatic
, unClosure
, catch
, Handler(..)
, catches
, try
, mask
, mask_
, onException
, bracket
, bracket_
, finally
, expectTimeout
, receiveChanTimeout
, spawnAsync
, reconnect
, reconnectPort
)
import Control.Distributed.Process.Node (forkProcess)
import Control.Distributed.Process.Internal.Types
( processThread
, withValidLocalState
)
import Control.Distributed.Process.Internal.Spawn
(
spawn
, spawnLink
, spawnMonitor
, spawnChannel
, spawnSupervised
, call
)
import qualified Control.Monad.Catch as Catch
#if MIN_VERSION_base(4,6,0)
import Prelude
#else
import Prelude hiding (catch)
#endif
import Control.Distributed.Process.Internal.StrictMVar (readMVar)
import qualified Control.Exception as Exception (onException)
import Data.Accessor ((^.))
import Data.Foldable (forM_)
spawnLocal :: Process () -> Process ProcessId
spawnLocal proc = do
node <- processNode <$> ask
liftIO $ forkProcess node proc
spawnChannelLocal :: Serializable a
=> (ReceivePort a -> Process ())
-> Process (SendPort a)
spawnChannelLocal proc = do
node <- processNode <$> ask
liftIO $ do
mvar <- newEmptyMVar
_ <- forkProcess node $ do
(sport, rport) <- newChan
liftIO $ putMVar mvar sport
proc rport
takeMVar mvar
callLocal :: Process a -> Process a
callLocal proc = Catch.mask $ \release -> do
mv <- liftIO newEmptyMVar :: Process (MVar (Either Catch.SomeException a))
child <- spawnLocal $ Catch.try (release proc) >>= liftIO . putMVar mv
lproc <- ask
liftIO $ do
rs <- Exception.onException (takeMVar mv) $ Catch.uninterruptibleMask_ $
do mchildThreadId <- withValidLocalState (processNode lproc) $
\vst -> return $ fmap processThread $
vst ^. localProcessWithId (processLocalId child)
forM_ mchildThreadId killThread
takeMVar mv
either Catch.throwM return rs