module Control.Distributed.Process.Internal.Primitives
(
send
, expect
, newChan
, sendChan
, receiveChan
, mergePortsBiased
, mergePortsRR
, Match
, receiveWait
, receiveTimeout
, match
, matchIf
, matchUnknown
, terminate
, ProcessTerminationException(..)
, getSelfPid
, getSelfNode
, link
, unlink
, monitor
, unmonitor
, say
, register
, unregister
, whereis
, nsend
, registerRemote
, unregisterRemote
, whereisRemote
, whereisRemoteAsync
, nsendRemote
, catch
, expectTimeout
, spawnAsync
, linkNode
, linkPort
, unlinkNode
, unlinkPort
, monitorNode
, monitorPort
) where
import Prelude hiding (catch)
import Data.Binary (decode)
import Data.Typeable (Typeable)
import Data.Time.Clock (getCurrentTime)
import Data.Time.Format (formatTime)
import System.Locale (defaultTimeLocale)
import Control.Monad.Reader (ask)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Applicative ((<$>))
import Control.Exception (Exception, throw)
import qualified Control.Exception as Exception (catch)
import Control.Concurrent.MVar (modifyMVar)
import Control.Concurrent.Chan (writeChan)
import Control.Concurrent.STM
( STM
, atomically
, orElse
, newTChan
, readTChan
, newTVar
, readTVar
, writeTVar
)
import Control.Distributed.Process.Internal.CQueue (dequeue, BlockSpec(..))
import Control.Distributed.Process.Serializable (Serializable, fingerprint)
import Data.Accessor ((^.), (^:), (^=))
import Control.Distributed.Process.Internal.Types
( NodeId(..)
, ProcessId(..)
, LocalNode(..)
, LocalProcess(..)
, Process(..)
, Closure(..)
, Message(..)
, MonitorRef(..)
, SpawnRef(..)
, NCMsg(..)
, ProcessSignal(..)
, monitorCounter
, spawnCounter
, Closure(..)
, SendPort(..)
, ReceivePort(..)
, channelCounter
, typedChannelWithId
, TypedChannel(..)
, SendPortId(..)
, Identifier(..)
, procMsg
, DidUnmonitor(..)
, DidUnlinkProcess(..)
, DidUnlinkNode(..)
, DidUnlinkPort(..)
, WhereIsReply(..)
, createMessage
)
import Control.Distributed.Process.Internal.MessageT
( sendMessage
, sendBinary
, getLocalNode
)
import Control.Distributed.Process.Internal.Node (runLocalProcess)
send :: Serializable a => ProcessId -> a -> Process ()
send them msg = procMsg $ sendMessage (ProcessIdentifier them) msg
expect :: forall a. Serializable a => Process a
expect = receiveWait [match return]
newChan :: Serializable a => Process (SendPort a, ReceivePort a)
newChan = do
proc <- ask
liftIO . modifyMVar (processState proc) $ \st -> do
chan <- liftIO . atomically $ newTChan
let lcid = st ^. channelCounter
cid = SendPortId { sendPortProcessId = processId proc
, sendPortLocalId = lcid
}
sport = SendPort cid
rport = ReceivePortSingle chan
tch = TypedChannel chan
return ( (channelCounter ^: (+ 1))
. (typedChannelWithId lcid ^= Just tch)
$ st
, (sport, rport)
)
sendChan :: Serializable a => SendPort a -> a -> Process ()
sendChan (SendPort cid) msg = procMsg $ sendBinary (SendPortIdentifier cid) msg
receiveChan :: Serializable a => ReceivePort a -> Process a
receiveChan = liftIO . atomically . receiveSTM
where
receiveSTM :: ReceivePort a -> STM a
receiveSTM (ReceivePortSingle c) =
readTChan c
receiveSTM (ReceivePortBiased ps) =
foldr1 orElse (map receiveSTM ps)
receiveSTM (ReceivePortRR psVar) = do
ps <- readTVar psVar
a <- foldr1 orElse (map receiveSTM ps)
writeTVar psVar (rotate ps)
return a
rotate :: [a] -> [a]
rotate [] = []
rotate (x:xs) = xs ++ [x]
mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsBiased = return . ReceivePortBiased
mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsRR ps = liftIO . atomically $ ReceivePortRR <$> newTVar ps
newtype Match b = Match { unMatch :: Message -> Maybe (Process b) }
receiveWait :: [Match b] -> Process b
receiveWait ms = do
queue <- processQueue <$> ask
Just proc <- liftIO $ dequeue queue Blocking (map unMatch ms)
proc
receiveTimeout :: Int -> [Match b] -> Process (Maybe b)
receiveTimeout t ms = do
queue <- processQueue <$> ask
let blockSpec = if t == 0 then NonBlocking else Timeout t
mProc <- liftIO $ dequeue queue blockSpec (map unMatch ms)
case mProc of
Nothing -> return Nothing
Just proc -> Just <$> proc
match :: forall a b. Serializable a => (a -> Process b) -> Match b
match = matchIf (const True)
matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match b
matchIf c p = Match $ \msg ->
let decoded :: a
decoded = decode . messageEncoding $ msg in
if messageFingerprint msg == fingerprint (undefined :: a) && c decoded
then Just $ p decoded
else Nothing
matchUnknown :: Process b -> Match b
matchUnknown = Match . const . Just
data ProcessTerminationException = ProcessTerminationException
deriving (Show, Typeable)
instance Exception ProcessTerminationException
terminate :: Process a
terminate = liftIO $ throw ProcessTerminationException
getSelfPid :: Process ProcessId
getSelfPid = processId <$> ask
getSelfNode :: Process NodeId
getSelfNode = localNodeId <$> procMsg getLocalNode
link :: ProcessId -> Process ()
link = sendCtrlMsg Nothing . Link . ProcessIdentifier
monitor :: ProcessId -> Process MonitorRef
monitor = monitor' . ProcessIdentifier
unlink :: ProcessId -> Process ()
unlink pid = do
unlinkAsync pid
receiveWait [ matchIf (\(DidUnlinkProcess pid') -> pid' == pid)
(\_ -> return ())
]
unlinkNode :: NodeId -> Process ()
unlinkNode nid = do
unlinkNodeAsync nid
receiveWait [ matchIf (\(DidUnlinkNode nid') -> nid' == nid)
(\_ -> return ())
]
unlinkPort :: SendPort a -> Process ()
unlinkPort sport = do
unlinkPortAsync sport
receiveWait [ matchIf (\(DidUnlinkPort cid) -> cid == sendPortId sport)
(\_ -> return ())
]
unmonitor :: MonitorRef -> Process ()
unmonitor ref = do
unmonitorAsync ref
receiveWait [ matchIf (\(DidUnmonitor ref') -> ref' == ref)
(\_ -> return ())
]
catch :: Exception e => Process a -> (e -> Process a) -> Process a
catch p h = do
node <- procMsg getLocalNode
lproc <- ask
let run :: Process a -> IO a
run proc = runLocalProcess node proc lproc
liftIO $ Exception.catch (run p) (run . h)
expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)
expectTimeout timeout = receiveTimeout timeout [match return]
spawnAsync :: NodeId -> Closure (Process ()) -> Process SpawnRef
spawnAsync nid proc = do
spawnRef <- getSpawnRef
sendCtrlMsg (Just nid) $ Spawn proc spawnRef
return spawnRef
monitorNode :: NodeId -> Process MonitorRef
monitorNode =
monitor' . NodeIdentifier
monitorPort :: forall a. Serializable a => SendPort a -> Process MonitorRef
monitorPort (SendPort cid) =
monitor' (SendPortIdentifier cid)
unmonitorAsync :: MonitorRef -> Process ()
unmonitorAsync =
sendCtrlMsg Nothing . Unmonitor
linkNode :: NodeId -> Process ()
linkNode = link' . NodeIdentifier
linkPort :: SendPort a -> Process ()
linkPort (SendPort cid) =
link' (SendPortIdentifier cid)
unlinkAsync :: ProcessId -> Process ()
unlinkAsync =
sendCtrlMsg Nothing . Unlink . ProcessIdentifier
unlinkNodeAsync :: NodeId -> Process ()
unlinkNodeAsync =
sendCtrlMsg Nothing . Unlink . NodeIdentifier
unlinkPortAsync :: SendPort a -> Process ()
unlinkPortAsync (SendPort cid) =
sendCtrlMsg Nothing . Unlink $ SendPortIdentifier cid
say :: String -> Process ()
say string = do
now <- liftIO getCurrentTime
us <- getSelfPid
nsend "logger" (formatTime defaultTimeLocale "%c" now, us, string)
register :: String -> ProcessId -> Process ()
register label pid =
sendCtrlMsg Nothing (Register label (Just pid))
registerRemote :: NodeId -> String -> ProcessId -> Process ()
registerRemote nid label pid =
sendCtrlMsg (Just nid) (Register label (Just pid))
unregister :: String -> Process ()
unregister label =
sendCtrlMsg Nothing (Register label Nothing)
unregisterRemote :: NodeId -> String -> Process ()
unregisterRemote nid label =
sendCtrlMsg (Just nid) (Register label Nothing)
whereis :: String -> Process (Maybe ProcessId)
whereis label = do
sendCtrlMsg Nothing (WhereIs label)
receiveWait [ matchIf (\(WhereIsReply label' _) -> label == label')
(\(WhereIsReply _ mPid) -> return mPid)
]
whereisRemote :: NodeId -> String -> Process (Maybe ProcessId)
whereisRemote nid label = do
whereisRemoteAsync nid label
receiveWait [ matchIf (\(WhereIsReply label' _) -> label == label')
(\(WhereIsReply _ mPid) -> return mPid)
]
whereisRemoteAsync :: NodeId -> String -> Process ()
whereisRemoteAsync nid label =
sendCtrlMsg (Just nid) (WhereIs label)
nsend :: Serializable a => String -> a -> Process ()
nsend label msg =
sendCtrlMsg Nothing (NamedSend label (createMessage msg))
nsendRemote :: Serializable a => NodeId -> String -> a -> Process ()
nsendRemote nid label msg =
sendCtrlMsg (Just nid) (NamedSend label (createMessage msg))
getMonitorRefFor :: Identifier -> Process MonitorRef
getMonitorRefFor ident = do
proc <- ask
liftIO $ modifyMVar (processState proc) $ \st -> do
let counter = st ^. monitorCounter
return ( monitorCounter ^: (+ 1) $ st
, MonitorRef ident counter
)
getSpawnRef :: Process SpawnRef
getSpawnRef = do
proc <- ask
liftIO $ modifyMVar (processState proc) $ \st -> do
let counter = st ^. spawnCounter
return ( spawnCounter ^: (+ 1) $ st
, SpawnRef counter
)
monitor' :: Identifier -> Process MonitorRef
monitor' ident = do
monitorRef <- getMonitorRefFor ident
sendCtrlMsg Nothing $ Monitor monitorRef
return monitorRef
link' :: Identifier -> Process ()
link' = sendCtrlMsg Nothing . Link
sendCtrlMsg :: Maybe NodeId
-> ProcessSignal
-> Process ()
sendCtrlMsg mNid signal = do
us <- getSelfPid
let msg = NCMsg { ctrlMsgSender = ProcessIdentifier us
, ctrlMsgSignal = signal
}
case mNid of
Nothing -> do
ctrlChan <- localCtrlChan <$> procMsg getLocalNode
liftIO $ writeChan ctrlChan msg
Just nid ->
procMsg $ sendBinary (NodeIdentifier nid) msg