module Control.Distributed.Process.Extras.Internal.Primitives
(
Addressable
, Routable(..)
, Resolvable(..)
, Linkable(..)
, Killable(..)
, spawnSignalled
, spawnLinkLocal
, spawnMonitorLocal
, linkOnFailure
, whereisRemote
, whereisOrStart
, whereisOrStartRemote
, matchCond
, awaitResponse
, times
, monitor
, awaitExit
, isProcessAlive
, forever'
, deliver
, __remoteTable
) where
import Control.Concurrent (myThreadId, throwTo)
import Control.Distributed.Process hiding (monitor, finally, catch)
import qualified Control.Distributed.Process as P (monitor)
import Control.Distributed.Process.Closure (seqCP, remotable, mkClosure)
import Control.Distributed.Process.Serializable (Serializable)
import Control.Distributed.Process.Extras.Internal.Types
( Addressable
, Linkable(..)
, Killable(..)
, Resolvable(..)
, Routable(..)
, RegisterSelf(..)
, ExitReason(ExitOther)
, whereisRemote
)
import Control.Monad (void)
import Control.Monad.Catch (finally, catchIf)
import Data.Maybe (isJust, fromJust)
monitor :: Resolvable a => a -> Process (Maybe MonitorRef)
monitor addr = do
mPid <- resolve addr
case mPid of
Nothing -> return Nothing
Just p -> return . Just =<< P.monitor p
awaitExit :: Resolvable a => a -> Process ()
awaitExit addr = do
mPid <- resolve addr
case mPid of
Nothing -> return ()
Just p -> do
mRef <- P.monitor p
receiveWait [
matchIf (\(ProcessMonitorNotification r p' _) -> r == mRef && p == p')
(\_ -> return ())
]
deliver :: (Addressable a, Serializable m) => m -> a -> Process ()
deliver = flip sendTo
isProcessAlive :: ProcessId -> Process Bool
isProcessAlive pid = getProcessInfo pid >>= \info -> return $ info /= Nothing
times :: Int -> Process () -> Process ()
n `times` proc = runP proc n
where runP :: Process () -> Int -> Process ()
runP _ 0 = return ()
runP p n' = p >> runP p (n' 1)
forever' :: Monad m => m a -> m b
forever' a = let a' = a >> a' in a'
spawnSignalled :: Process a -> (a -> Process ()) -> Process ProcessId
spawnSignalled before after = do
(sigStart, recvStart) <- newChan
(pid, mRef) <- spawnMonitorLocal $ do
initProc <- before
sendChan sigStart ()
after initProc
receiveWait [
matchIf (\(ProcessMonitorNotification ref _ _) -> ref == mRef)
(\(ProcessMonitorNotification _ _ dr) -> die $ ExitOther (show dr))
, matchChan recvStart (\() -> return pid)
] `finally` (unmonitor mRef)
spawnLinkLocal :: Process () -> Process ProcessId
spawnLinkLocal p = do
pid <- spawnLocal p
link pid
return pid
spawnMonitorLocal :: Process () -> Process (ProcessId, MonitorRef)
spawnMonitorLocal p = do
pid <- spawnLocal p
ref <- P.monitor pid
return (pid, ref)
linkOnFailure :: ProcessId -> Process ()
linkOnFailure them = do
us <- getSelfPid
tid <- liftIO $ myThreadId
void $ spawnLocal $ do
callerRef <- P.monitor us
calleeRef <- P.monitor them
reason <- receiveWait [
matchIf (\(ProcessMonitorNotification mRef _ _) ->
mRef == callerRef)
(\_ -> return DiedNormal)
, matchIf (\(ProcessMonitorNotification mRef' _ _) ->
mRef' == calleeRef)
(\(ProcessMonitorNotification _ _ r') -> return r')
]
case reason of
DiedNormal -> return ()
_ -> liftIO $ throwTo tid (ProcessLinkException us reason)
whereisOrStart :: String -> Process () -> Process ProcessId
whereisOrStart name proc = do
(sigStart, recvStart) <- newChan
(_, mRef) <- spawnMonitorLocal $ do
us <- getSelfPid
catchIf (\(ProcessRegistrationException _ r) -> isJust r)
(register name us >> sendChan sigStart us)
(\(ProcessRegistrationException _ rPid) ->
sendChan sigStart $ fromJust rPid)
proc
receiveWait [
matchIf (\(ProcessMonitorNotification ref _ _) -> ref == mRef)
(\(ProcessMonitorNotification _ _ dr) -> die $ ExitOther (show dr))
, matchChan recvStart return
] `finally` (unmonitor mRef)
registerSelf :: (String, ProcessId) -> Process ()
registerSelf (name,target) =
do self <- getSelfPid
register name self
send target (RegisterSelf, self)
() <- expect
return ()
$(remotable ['registerSelf])
whereisOrStartRemote :: NodeId -> String -> Closure (Process ()) -> Process (Maybe ProcessId)
whereisOrStartRemote nid name proc =
do mRef <- monitorNode nid
whereisRemoteAsync nid name
res <- receiveWait
[ matchIf (\(WhereIsReply label _) -> label == name)
(\(WhereIsReply _ mPid) -> return (Just mPid)),
matchIf (\(NodeMonitorNotification aref _ _) -> aref == mRef)
(\(NodeMonitorNotification _ _ _) -> return Nothing)
]
case res of
Nothing -> return Nothing
Just (Just pid) -> unmonitor mRef >> return (Just pid)
Just Nothing ->
do self <- getSelfPid
sRef <- spawnAsync nid ($(mkClosure 'registerSelf) (name,self) `seqCP` proc)
ret <- receiveWait [
matchIf (\(NodeMonitorNotification ref _ _) -> ref == mRef)
(\(NodeMonitorNotification _ _ _) -> return Nothing),
matchIf (\(DidSpawn ref _) -> ref==sRef )
(\(DidSpawn _ pid) ->
do pRef <- P.monitor pid
receiveWait
[ matchIf (\(RegisterSelf, apid) -> apid == pid)
(\(RegisterSelf, _) -> do unmonitor pRef
send pid ()
return $ Just pid),
matchIf (\(NodeMonitorNotification aref _ _) -> aref == mRef)
(\(NodeMonitorNotification _aref _ _) -> return Nothing),
matchIf (\(ProcessMonitorNotification ref _ _) -> ref==pRef)
(\(ProcessMonitorNotification _ _ _) -> return Nothing)
] )
]
unmonitor mRef
case ret of
Nothing -> whereisOrStartRemote nid name proc
Just pid -> return $ Just pid
matchCond :: (Serializable a) => (a -> Maybe (Process b)) -> Match b
matchCond cond =
let v n = (isJust n, fromJust n)
res = v . cond
in matchIf (fst . res) (snd . res)
awaitResponse :: Addressable a
=> a
-> [Match (Either ExitReason b)]
-> Process (Either ExitReason b)
awaitResponse addr matches = do
mPid <- resolve addr
case mPid of
Nothing -> return $ Left $ ExitOther "UnresolvedAddress"
Just p -> do
mRef <- P.monitor p
receiveWait ((matchRef mRef):matches)
where
matchRef :: MonitorRef -> Match (Either ExitReason b)
matchRef r = matchIf (\(ProcessMonitorNotification r' _ _) -> r == r')
(\(ProcessMonitorNotification _ _ d) -> do
return (Left (ExitOther (show d))))