{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE MagicHash #-}
module Control.Distributed.Process.Node
( LocalNode
, newLocalNode
, closeLocalNode
, forkProcess
, runProcess
, initRemoteTable
, localNodeId
) where
import System.IO (fixIO, hPutStrLn, stderr)
import System.Mem.Weak (Weak, deRefWeak)
import qualified Data.ByteString.Lazy as BSL (fromChunks)
import Data.Binary (decode)
import Data.Map (Map)
import qualified Data.Map as Map
( empty
, toList
, fromList
, partition
, partitionWithKey
, elems
, size
, filterWithKey
, foldlWithKey
)
import Data.Time.Format (formatTime)
#if MIN_VERSION_time(1,5,0)
import Data.Time.Format (defaultTimeLocale)
#else
import System.Locale (defaultTimeLocale)
#endif
import Data.Set (Set)
import qualified Data.Set as Set
( empty
, insert
, delete
, map
, member
, toList
, union
)
import Data.Foldable (forM_)
import Data.Maybe (isJust, fromJust, isNothing, catMaybes)
import Data.Typeable (Typeable)
import Control.Category ((>>>))
import Control.Applicative
import Control.Monad (void, when, join)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.State.Strict (MonadState, StateT, evalStateT, gets)
import qualified Control.Monad.State.Strict as StateT (get, put)
import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask)
import Control.Exception
( throwIO
, SomeException
, Exception
, throwTo
, uninterruptibleMask_
, getMaskingState
, MaskingState(..)
)
import qualified Control.Exception as Exception
( Handler(..)
, catch
, catches
, finally
)
import Control.Concurrent (forkIO, killThread)
import Control.Distributed.Process.Internal.BiMultiMap (BiMultiMap)
import qualified Control.Distributed.Process.Internal.BiMultiMap as BiMultiMap
import Control.Distributed.Process.Internal.StrictMVar
( newMVar
, withMVar
, modifyMVarMasked
, modifyMVar
, newEmptyMVar
, putMVar
, takeMVar
)
import Control.Concurrent.Chan (newChan, writeChan, readChan)
import qualified Control.Concurrent.MVar as MVar (newEmptyMVar, takeMVar)
import Control.Concurrent.STM
( atomically
)
import Control.Distributed.Process.Internal.CQueue
( CQueue
, enqueue
, newCQueue
, mkWeakCQueue
, queueSize
)
import qualified Network.Transport as NT
( Transport
, EndPoint
, newEndPoint
, receive
, Event(..)
, EventErrorCode(..)
, TransportError(..)
, address
, closeEndPoint
, Connection
, ConnectionId
, close
, EndPointAddress
, Reliability(ReliableOrdered)
)
import Data.Accessor (Accessor, accessor, (^.), (^=), (^:))
import System.Random (randomIO)
import Control.Distributed.Static (RemoteTable, Closure)
import qualified Control.Distributed.Static as Static
( unclosure
, initRemoteTable
)
import Control.Distributed.Process.Internal.Types
( NodeId(..)
, LocalProcessId(..)
, ProcessId(..)
, LocalNode(..)
, MxEventBus(..)
, LocalNodeState(..)
, ValidLocalNodeState(..)
, withValidLocalState
, modifyValidLocalState
, LocalProcess(..)
, LocalProcessState(..)
, Process(..)
, DiedReason(..)
, NCMsg(..)
, ProcessSignal(..)
, localPidCounter
, localPidUnique
, localProcessWithId
, localProcesses
, localConnections
, forever'
, MonitorRef(..)
, NodeClosedException(..)
, ProcessMonitorNotification(..)
, NodeMonitorNotification(..)
, PortMonitorNotification(..)
, ProcessExitException(..)
, ProcessLinkException(..)
, NodeLinkException(..)
, PortLinkException(..)
, DidUnmonitor(..)
, DidUnlinkProcess(..)
, DidUnlinkNode(..)
, DidUnlinkPort(..)
, SpawnRef
, DidSpawn(..)
, Message(..)
, TypedChannel(..)
, Identifier(..)
, nodeOf
, ProcessInfo(..)
, ProcessInfoNone(..)
, NodeStats(..)
, SendPortId(..)
, typedChannelWithId
, RegisterReply(..)
, WhereIsReply(..)
, payloadToMessage
, createUnencodedMessage
, unsafeCreateUnencodedMessage
, runLocalProcess
, firstNonReservedProcessId
, ImplicitReconnect(WithImplicitReconnect)
)
import Control.Distributed.Process.Management.Internal.Agent
( mxAgentController
)
import Control.Distributed.Process.Management.Internal.Types
( MxEvent(..)
)
import qualified Control.Distributed.Process.Management.Internal.Trace.Remote as Trace
( remoteTable
)
import Control.Distributed.Process.Management.Internal.Trace.Tracer
( defaultTracer
)
import Control.Distributed.Process.Management.Internal.Trace.Types
( TraceArg(..)
, traceEvent
, traceLogFmt
, enableTrace
)
import Control.Distributed.Process.Serializable (Serializable)
import Control.Distributed.Process.Internal.Messaging
( sendBinary
, closeImplicitReconnections
, impliesDeathOf
)
import Control.Distributed.Process.Internal.Primitives
( register
, receiveWait
, match
, sendChan
, unwrapMessage
, SayMessage(..)
)
import Control.Distributed.Process.Internal.Types (SendPort, Tracer(..))
import qualified Control.Distributed.Process.Internal.Closure.BuiltIn as BuiltIn (remoteTable)
import Control.Distributed.Process.Internal.WeakTQueue (TQueue, writeTQueue)
import qualified Control.Distributed.Process.Internal.StrictContainerAccessors as DAC
( mapMaybe
, mapDefault
)
import Control.Monad.Catch (try)
import GHC.IO (IO(..), unsafeUnmask)
import GHC.Base ( maskAsyncExceptions# )
import Unsafe.Coerce
import Prelude
block :: IO a -> IO a
block :: forall a. IO a -> IO a
block (IO State# RealWorld -> (# State# RealWorld, a #)
io) = (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
forall a. (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
IO ((State# RealWorld -> (# State# RealWorld, a #)) -> IO a)
-> (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
forall a b. (a -> b) -> a -> b
$ (State# RealWorld -> (# State# RealWorld, a #))
-> State# RealWorld -> (# State# RealWorld, a #)
forall a.
(State# RealWorld -> (# State# RealWorld, a #))
-> State# RealWorld -> (# State# RealWorld, a #)
maskAsyncExceptions# State# RealWorld -> (# State# RealWorld, a #)
io
unblock :: IO a -> IO a
unblock :: forall a. IO a -> IO a
unblock = IO a -> IO a
forall a. IO a -> IO a
unsafeUnmask
initRemoteTable :: RemoteTable
initRemoteTable :: RemoteTable
initRemoteTable = RemoteTable -> RemoteTable
Trace.remoteTable (RemoteTable -> RemoteTable) -> RemoteTable -> RemoteTable
forall a b. (a -> b) -> a -> b
$ RemoteTable -> RemoteTable
BuiltIn.remoteTable RemoteTable
Static.initRemoteTable
newLocalNode :: NT.Transport -> RemoteTable -> IO LocalNode
newLocalNode :: Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
transport RemoteTable
rtable = do
Either (TransportError NewEndPointErrorCode) EndPoint
mEndPoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
NT.newEndPoint Transport
transport
case Either (TransportError NewEndPointErrorCode) EndPoint
mEndPoint of
Left TransportError NewEndPointErrorCode
ex -> TransportError NewEndPointErrorCode -> IO LocalNode
forall e a. Exception e => e -> IO a
throwIO TransportError NewEndPointErrorCode
ex
Right EndPoint
endPoint -> do
LocalNode
localNode <- EndPoint -> RemoteTable -> IO LocalNode
createBareLocalNode EndPoint
endPoint RemoteTable
rtable
LocalNode -> IO ()
startServiceProcesses LocalNode
localNode
LocalNode -> IO LocalNode
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return LocalNode
localNode
createBareLocalNode :: NT.EndPoint -> RemoteTable -> IO LocalNode
createBareLocalNode :: EndPoint -> RemoteTable -> IO LocalNode
createBareLocalNode EndPoint
endPoint RemoteTable
rtable = do
Int32
unq <- IO Int32
forall a (m :: * -> *). (Random a, MonadIO m) => m a
randomIO
StrictMVar LocalNodeState
state <- LocalNodeState -> IO (StrictMVar LocalNodeState)
forall a. a -> IO (StrictMVar a)
newMVar (LocalNodeState -> IO (StrictMVar LocalNodeState))
-> LocalNodeState -> IO (StrictMVar LocalNodeState)
forall a b. (a -> b) -> a -> b
$ ValidLocalNodeState -> LocalNodeState
LocalNodeValid (ValidLocalNodeState -> LocalNodeState)
-> ValidLocalNodeState -> LocalNodeState
forall a b. (a -> b) -> a -> b
$ ValidLocalNodeState
{ _localProcesses :: Map LocalProcessId LocalProcess
_localProcesses = Map LocalProcessId LocalProcess
forall k a. Map k a
Map.empty
, _localPidCounter :: Int32
_localPidCounter = Int32
firstNonReservedProcessId
, _localPidUnique :: Int32
_localPidUnique = Int32
unq
, _localConnections :: Map (Identifier, Identifier) (Connection, ImplicitReconnect)
_localConnections = Map (Identifier, Identifier) (Connection, ImplicitReconnect)
forall k a. Map k a
Map.empty
}
Chan NCMsg
ctrlChan <- IO (Chan NCMsg)
forall a. IO (Chan a)
newChan
let node :: LocalNode
node = LocalNode { localNodeId :: NodeId
localNodeId = EndPointAddress -> NodeId
NodeId (EndPointAddress -> NodeId) -> EndPointAddress -> NodeId
forall a b. (a -> b) -> a -> b
$ EndPoint -> EndPointAddress
NT.address EndPoint
endPoint
, localEndPoint :: EndPoint
localEndPoint = EndPoint
endPoint
, localState :: StrictMVar LocalNodeState
localState = StrictMVar LocalNodeState
state
, localCtrlChan :: Chan NCMsg
localCtrlChan = Chan NCMsg
ctrlChan
, localEventBus :: MxEventBus
localEventBus = MxEventBus
MxEventBusInitialising
, remoteTable :: RemoteTable
remoteTable = RemoteTable
rtable
}
LocalNode
tracedNode <- LocalNode -> IO LocalNode
startMxAgent LocalNode
node
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
Exception.finally (LocalNode -> IO ()
runNodeController LocalNode
tracedNode)
(EndPoint -> IO ()
NT.closeEndPoint (LocalNode -> EndPoint
localEndPoint LocalNode
node))
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
Exception.finally (LocalNode -> IO ()
handleIncomingMessages LocalNode
tracedNode)
(LocalNode -> IO ()
stopNC LocalNode
node)
LocalNode -> IO LocalNode
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return LocalNode
tracedNode
where
stopNC :: LocalNode -> IO ()
stopNC LocalNode
node =
Chan NCMsg -> NCMsg -> IO ()
forall a. Chan a -> a -> IO ()
writeChan (LocalNode -> Chan NCMsg
localCtrlChan LocalNode
node) NCMsg
{ ctrlMsgSender :: Identifier
ctrlMsgSender = NodeId -> Identifier
NodeIdentifier (LocalNode -> NodeId
localNodeId LocalNode
node)
, ctrlMsgSignal :: ProcessSignal
ctrlMsgSignal = ProcessSignal
SigShutdown
}
startMxAgent :: LocalNode -> IO LocalNode
startMxAgent :: LocalNode -> IO LocalNode
startMxAgent LocalNode
node = do
let fork :: Process () -> IO ProcessId
fork = LocalNode -> Process () -> IO ProcessId
forkProcess LocalNode
node
MVar AgentConfig
mv <- IO (MVar AgentConfig)
forall a. IO (MVar a)
MVar.newEmptyMVar
ProcessId
pid <- Process () -> IO ProcessId
fork (Process () -> IO ProcessId) -> Process () -> IO ProcessId
forall a b. (a -> b) -> a -> b
$ (Process () -> IO ProcessId) -> MVar AgentConfig -> Process ()
mxAgentController Process () -> IO ProcessId
fork MVar AgentConfig
mv
(Tracer
tracer', Weak (CQueue Message)
wqRef, ((TChan Message, TChan Message) -> Process ()) -> IO ProcessId
mxNew') <- MVar AgentConfig -> IO AgentConfig
forall a. MVar a -> IO a
MVar.takeMVar MVar AgentConfig
mv
LocalNode -> IO LocalNode
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return LocalNode
node { localEventBus = (MxEventBus pid tracer' wqRef mxNew') }
startDefaultTracer :: LocalNode -> IO ()
startDefaultTracer :: LocalNode -> IO ()
startDefaultTracer LocalNode
node' = do
let t :: MxEventBus
t = LocalNode -> MxEventBus
localEventBus LocalNode
node'
case MxEventBus
t of
MxEventBus ProcessId
_ (Tracer ProcessId
pid Weak (CQueue Message)
_) Weak (CQueue Message)
_ ((TChan Message, TChan Message) -> Process ()) -> IO ProcessId
_ -> do
LocalNode -> Process () -> IO ()
runProcess LocalNode
node' (Process () -> IO ()) -> Process () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> ProcessId -> Process ()
register String
"trace.controller" ProcessId
pid
ProcessId
pid' <- LocalNode -> Process () -> IO ProcessId
forkProcess LocalNode
node' Process ()
defaultTracer
MxEventBus -> ProcessId -> IO ()
enableTrace (LocalNode -> MxEventBus
localEventBus LocalNode
node') ProcessId
pid'
LocalNode -> Process () -> IO ()
runProcess LocalNode
node' (Process () -> IO ()) -> Process () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> ProcessId -> Process ()
register String
"tracer.initial" ProcessId
pid'
MxEventBus
_ -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
startServiceProcesses :: LocalNode -> IO ()
startServiceProcesses :: LocalNode -> IO ()
startServiceProcesses LocalNode
node = do
LocalNode -> IO ()
startDefaultTracer LocalNode
node
ProcessId
logger <- LocalNode -> Process () -> IO ProcessId
forkProcess LocalNode
node Process ()
loop
LocalNode -> Process () -> IO ()
runProcess LocalNode
node (Process () -> IO ()) -> Process () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
String -> ProcessId -> Process ()
register String
"logger" ProcessId
logger
String -> ProcessId -> Process ()
register String
"trace.logger" ProcessId
logger
where
loop :: Process ()
loop = do
[Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait
[ (SayMessage -> Process ()) -> Match ()
forall a b. Serializable a => (a -> Process b) -> Match b
match ((SayMessage -> Process ()) -> Match ())
-> (SayMessage -> Process ()) -> Match ()
forall a b. (a -> b) -> a -> b
$ \(SayMessage UTCTime
time ProcessId
pid String
string) -> do
let time' :: String
time' = TimeLocale -> String -> UTCTime -> String
forall t. FormatTime t => TimeLocale -> String -> t -> String
formatTime TimeLocale
defaultTimeLocale String
"%c" UTCTime
time
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> (String -> IO ()) -> String -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
time' String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
": " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
string
Process ()
loop
, ((String, String) -> Process ()) -> Match ()
forall a b. Serializable a => (a -> Process b) -> Match b
match (((String, String) -> Process ()) -> Match ())
-> ((String, String) -> Process ()) -> Match ()
forall a b. (a -> b) -> a -> b
$ \((String
time, String
string) :: (String, String)) -> do
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> (String -> IO ()) -> String -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
time String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" [trace] " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
string
Process ()
loop
, (SendPort () -> Process ()) -> Match ()
forall a b. Serializable a => (a -> Process b) -> Match b
match ((SendPort () -> Process ()) -> Match ())
-> (SendPort () -> Process ()) -> Match ()
forall a b. (a -> b) -> a -> b
$ \(SendPort ()
ch :: SendPort ()) ->
SendPort () -> () -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort ()
ch ()
]
closeLocalNode :: LocalNode -> IO ()
closeLocalNode :: LocalNode -> IO ()
closeLocalNode LocalNode
node = do
IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ StrictMVar LocalNodeState
-> (LocalNodeState -> IO (LocalNodeState, IO ())) -> IO (IO ())
forall a b. StrictMVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (LocalNode -> StrictMVar LocalNodeState
localState LocalNode
node) ((LocalNodeState -> IO (LocalNodeState, IO ())) -> IO (IO ()))
-> (LocalNodeState -> IO (LocalNodeState, IO ())) -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ \LocalNodeState
st -> case LocalNodeState
st of
LocalNodeValid ValidLocalNodeState
vst -> do
(LocalNodeState, IO ()) -> IO (LocalNodeState, IO ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ( LocalNodeState
LocalNodeClosed
, Map LocalProcessId LocalProcess -> (LocalProcess -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ValidLocalNodeState
vst ValidLocalNodeState
-> T ValidLocalNodeState (Map LocalProcessId LocalProcess)
-> Map LocalProcessId LocalProcess
forall r a. r -> T r a -> a
^. T ValidLocalNodeState (Map LocalProcessId LocalProcess)
localProcesses) ((LocalProcess -> IO ()) -> IO ())
-> (LocalProcess -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \LocalProcess
lproc ->
ThreadId -> IO ()
killThread (LocalProcess -> ThreadId
processThread LocalProcess
lproc)
)
LocalNodeState
LocalNodeClosed -> (LocalNodeState, IO ()) -> IO (LocalNodeState, IO ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (LocalNodeState
LocalNodeClosed, () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
EndPoint -> IO ()
NT.closeEndPoint (LocalNode -> EndPoint
localEndPoint LocalNode
node)
runProcess :: LocalNode -> Process () -> IO ()
runProcess :: LocalNode -> Process () -> IO ()
runProcess LocalNode
node Process ()
proc = do
StrictMVar (Either SomeException ())
done <- IO (StrictMVar (Either SomeException ()))
forall a. IO (StrictMVar a)
newEmptyMVar
IO ProcessId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ProcessId -> IO ()) -> IO ProcessId -> IO ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> Process () -> IO ProcessId
forkProcess LocalNode
node (Process () -> IO ProcessId) -> Process () -> IO ProcessId
forall a b. (a -> b) -> a -> b
$ Process () -> Process (Either SomeException ())
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
try Process ()
proc Process (Either SomeException ())
-> (Either SomeException () -> Process ()) -> Process ()
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ())
-> (Either SomeException () -> IO ())
-> Either SomeException ()
-> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StrictMVar (Either SomeException ())
-> Either SomeException () -> IO ()
forall a. StrictMVar a -> a -> IO ()
putMVar StrictMVar (Either SomeException ())
done
StrictMVar (Either SomeException ())
-> IO (Either SomeException ())
forall a. StrictMVar a -> IO a
takeMVar StrictMVar (Either SomeException ())
done IO (Either SomeException ())
-> (Either SomeException () -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (SomeException -> IO ())
-> (() -> IO ()) -> Either SomeException () -> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (SomeException -> IO a
forall {a}. SomeException -> IO a
forall e a. Exception e => e -> IO a
throwIO :: SomeException -> IO a) () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return
forkProcess :: LocalNode -> Process () -> IO ProcessId
forkProcess :: LocalNode -> Process () -> IO ProcessId
forkProcess LocalNode
node Process ()
proc = do
MaskingState
ms <- IO MaskingState
getMaskingState
StrictMVar LocalNodeState
-> (LocalNodeState -> IO (LocalNodeState, ProcessId))
-> IO ProcessId
forall a b. StrictMVar a -> (a -> IO (a, b)) -> IO b
modifyMVarMasked (LocalNode -> StrictMVar LocalNodeState
localState LocalNode
node) (MaskingState -> LocalNodeState -> IO (LocalNodeState, ProcessId)
startProcess MaskingState
ms)
where
startProcess :: MaskingState
-> LocalNodeState
-> IO (LocalNodeState, ProcessId)
startProcess :: MaskingState -> LocalNodeState -> IO (LocalNodeState, ProcessId)
startProcess MaskingState
ms (LocalNodeValid ValidLocalNodeState
vst) = do
let lpid :: LocalProcessId
lpid = LocalProcessId { lpidCounter :: Int32
lpidCounter = ValidLocalNodeState
vst ValidLocalNodeState -> T ValidLocalNodeState Int32 -> Int32
forall r a. r -> T r a -> a
^. T ValidLocalNodeState Int32
localPidCounter
, lpidUnique :: Int32
lpidUnique = ValidLocalNodeState
vst ValidLocalNodeState -> T ValidLocalNodeState Int32 -> Int32
forall r a. r -> T r a -> a
^. T ValidLocalNodeState Int32
localPidUnique
}
let pid :: ProcessId
pid = ProcessId { processNodeId :: NodeId
processNodeId = LocalNode -> NodeId
localNodeId LocalNode
node
, processLocalId :: LocalProcessId
processLocalId = LocalProcessId
lpid
}
StrictMVar LocalProcessState
pst <- LocalProcessState -> IO (StrictMVar LocalProcessState)
forall a. a -> IO (StrictMVar a)
newMVar LocalProcessState { _monitorCounter :: Int32
_monitorCounter = Int32
0
, _spawnCounter :: Int32
_spawnCounter = Int32
0
, _channelCounter :: Int32
_channelCounter = Int32
0
, _typedChannels :: Map Int32 TypedChannel
_typedChannels = Map Int32 TypedChannel
forall k a. Map k a
Map.empty
}
CQueue Message
queue <- IO (CQueue Message)
forall a. IO (CQueue a)
newCQueue
Weak (CQueue Message)
weakQueue <- CQueue Message -> IO () -> IO (Weak (CQueue Message))
forall a. CQueue a -> IO () -> IO (Weak (CQueue a))
mkWeakCQueue CQueue Message
queue (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
(ThreadId
_, LocalProcess
lproc) <- ((ThreadId, LocalProcess) -> IO (ThreadId, LocalProcess))
-> IO (ThreadId, LocalProcess)
forall a. (a -> IO a) -> IO a
fixIO (((ThreadId, LocalProcess) -> IO (ThreadId, LocalProcess))
-> IO (ThreadId, LocalProcess))
-> ((ThreadId, LocalProcess) -> IO (ThreadId, LocalProcess))
-> IO (ThreadId, LocalProcess)
forall a b. (a -> b) -> a -> b
$ \ ~(ThreadId
tid, LocalProcess
_) -> do
let lproc :: LocalProcess
lproc = LocalProcess { processQueue :: CQueue Message
processQueue = CQueue Message
queue
, processWeakQ :: Weak (CQueue Message)
processWeakQ = Weak (CQueue Message)
weakQueue
, processId :: ProcessId
processId = ProcessId
pid
, processState :: StrictMVar LocalProcessState
processState = StrictMVar LocalProcessState
pst
, processThread :: ThreadId
processThread = ThreadId
tid
, processNode :: LocalNode
processNode = LocalNode
node
}
let unmask :: IO a -> IO a
unmask = case MaskingState
ms of
MaskingState
Unmasked -> IO a -> IO a
forall a. IO a -> IO a
unblock
MaskingState
MaskedInterruptible -> IO a -> IO a
forall a. IO a -> IO a
block
MaskingState
MaskedUninterruptible -> IO a -> IO a
forall a. a -> a
id
ThreadId
tid' <- IO ThreadId -> IO ThreadId
forall a. IO a -> IO a
uninterruptibleMask_ (IO ThreadId -> IO ThreadId) -> IO ThreadId -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
DiedReason
reason <- IO DiedReason -> [Handler DiedReason] -> IO DiedReason
forall a. IO a -> [Handler a] -> IO a
Exception.catches
(IO DiedReason -> IO DiedReason
forall a. IO a -> IO a
unmask (IO DiedReason -> IO DiedReason) -> IO DiedReason -> IO DiedReason
forall a b. (a -> b) -> a -> b
$ LocalProcess -> Process () -> IO ()
forall a. LocalProcess -> Process a -> IO a
runLocalProcess LocalProcess
lproc Process ()
proc IO () -> IO DiedReason -> IO DiedReason
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> DiedReason -> IO DiedReason
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return DiedReason
DiedNormal)
[ ((ProcessExitException -> IO DiedReason) -> Handler DiedReason
forall a e. Exception e => (e -> IO a) -> Handler a
Exception.Handler (\ex :: ProcessExitException
ex@(ProcessExitException ProcessId
from Message
msg) -> do
Maybe String
mMsg <- Message -> IO (Maybe String)
forall (m :: * -> *) a.
(Monad m, Serializable a) =>
Message -> m (Maybe a)
unwrapMessage Message
msg :: IO (Maybe String)
case Maybe String
mMsg of
Maybe String
Nothing -> DiedReason -> IO DiedReason
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DiedReason -> IO DiedReason) -> DiedReason -> IO DiedReason
forall a b. (a -> b) -> a -> b
$ String -> DiedReason
DiedException (String -> DiedReason) -> String -> DiedReason
forall a b. (a -> b) -> a -> b
$ ProcessExitException -> String
forall a. Show a => a -> String
show ProcessExitException
ex
Just String
m -> DiedReason -> IO DiedReason
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DiedReason -> IO DiedReason) -> DiedReason -> IO DiedReason
forall a b. (a -> b) -> a -> b
$ String -> DiedReason
DiedException (String
"exit-from=" String -> String -> String
forall a. [a] -> [a] -> [a]
++ (ProcessId -> String
forall a. Show a => a -> String
show ProcessId
from) String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
",reason=" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
m)))
, ((SomeException -> IO DiedReason) -> Handler DiedReason
forall a e. Exception e => (e -> IO a) -> Handler a
Exception.Handler
(DiedReason -> IO DiedReason
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DiedReason -> IO DiedReason)
-> (SomeException -> DiedReason) -> SomeException -> IO DiedReason
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> DiedReason
DiedException (String -> DiedReason)
-> (SomeException -> String) -> SomeException -> DiedReason
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (SomeException -> String
forall a. Show a => a -> String
show :: SomeException -> String)))]
Maybe [Connection]
mconns <- LocalNode
-> (ValidLocalNodeState -> IO (ValidLocalNodeState, [Connection]))
-> IO (Maybe [Connection])
forall a.
LocalNode
-> (ValidLocalNodeState -> IO (ValidLocalNodeState, a))
-> IO (Maybe a)
modifyValidLocalState LocalNode
node (ProcessId
-> ValidLocalNodeState -> IO (ValidLocalNodeState, [Connection])
cleanupProcess ProcessId
pid)
Maybe [Connection] -> ([Connection] -> IO ThreadId) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe [Connection]
mconns (([Connection] -> IO ThreadId) -> IO ())
-> ([Connection] -> IO ThreadId) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId)
-> ([Connection] -> IO ()) -> [Connection] -> IO ThreadId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Connection -> IO ()) -> [Connection] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Connection -> IO ()
NT.close
Chan NCMsg -> NCMsg -> IO ()
forall a. Chan a -> a -> IO ()
writeChan (LocalNode -> Chan NCMsg
localCtrlChan LocalNode
node) NCMsg
{ ctrlMsgSender :: Identifier
ctrlMsgSender = ProcessId -> Identifier
ProcessIdentifier ProcessId
pid
, ctrlMsgSignal :: ProcessSignal
ctrlMsgSignal = Identifier -> DiedReason -> ProcessSignal
Died (ProcessId -> Identifier
ProcessIdentifier ProcessId
pid) DiedReason
reason
}
(ThreadId, LocalProcess) -> IO (ThreadId, LocalProcess)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId
tid', LocalProcess
lproc)
LocalNode -> MxEvent -> IO ()
trace LocalNode
node (ProcessId -> MxEvent
MxSpawned ProcessId
pid)
if LocalProcessId -> Int32
lpidCounter LocalProcessId
lpid Int32 -> Int32 -> Bool
forall a. Eq a => a -> a -> Bool
== Int32
forall a. Bounded a => a
maxBound
then do
Int32
newUnique <- IO Int32
forall a (m :: * -> *). (Random a, MonadIO m) => m a
randomIO
(LocalNodeState, ProcessId) -> IO (LocalNodeState, ProcessId)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ( ValidLocalNodeState -> LocalNodeState
LocalNodeValid
(ValidLocalNodeState -> LocalNodeState)
-> ValidLocalNodeState -> LocalNodeState
forall a b. (a -> b) -> a -> b
$ (LocalProcessId -> Accessor ValidLocalNodeState (Maybe LocalProcess)
localProcessWithId LocalProcessId
lpid Accessor ValidLocalNodeState (Maybe LocalProcess)
-> Maybe LocalProcess -> ValidLocalNodeState -> ValidLocalNodeState
forall r a. T r a -> a -> r -> r
^= LocalProcess -> Maybe LocalProcess
forall a. a -> Maybe a
Just LocalProcess
lproc)
(ValidLocalNodeState -> ValidLocalNodeState)
-> (ValidLocalNodeState -> ValidLocalNodeState)
-> ValidLocalNodeState
-> ValidLocalNodeState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (T ValidLocalNodeState Int32
localPidCounter T ValidLocalNodeState Int32
-> Int32 -> ValidLocalNodeState -> ValidLocalNodeState
forall r a. T r a -> a -> r -> r
^= Int32
firstNonReservedProcessId)
(ValidLocalNodeState -> ValidLocalNodeState)
-> (ValidLocalNodeState -> ValidLocalNodeState)
-> ValidLocalNodeState
-> ValidLocalNodeState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (T ValidLocalNodeState Int32
localPidUnique T ValidLocalNodeState Int32
-> Int32 -> ValidLocalNodeState -> ValidLocalNodeState
forall r a. T r a -> a -> r -> r
^= Int32
newUnique)
(ValidLocalNodeState -> ValidLocalNodeState)
-> ValidLocalNodeState -> ValidLocalNodeState
forall a b. (a -> b) -> a -> b
$ ValidLocalNodeState
vst
, ProcessId
pid
)
else
(LocalNodeState, ProcessId) -> IO (LocalNodeState, ProcessId)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ( ValidLocalNodeState -> LocalNodeState
LocalNodeValid
(ValidLocalNodeState -> LocalNodeState)
-> ValidLocalNodeState -> LocalNodeState
forall a b. (a -> b) -> a -> b
$ (LocalProcessId -> Accessor ValidLocalNodeState (Maybe LocalProcess)
localProcessWithId LocalProcessId
lpid Accessor ValidLocalNodeState (Maybe LocalProcess)
-> Maybe LocalProcess -> ValidLocalNodeState -> ValidLocalNodeState
forall r a. T r a -> a -> r -> r
^= LocalProcess -> Maybe LocalProcess
forall a. a -> Maybe a
Just LocalProcess
lproc)
(ValidLocalNodeState -> ValidLocalNodeState)
-> (ValidLocalNodeState -> ValidLocalNodeState)
-> ValidLocalNodeState
-> ValidLocalNodeState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (T ValidLocalNodeState Int32
localPidCounter T ValidLocalNodeState Int32
-> (Int32 -> Int32) -> ValidLocalNodeState -> ValidLocalNodeState
forall r a. T r a -> (a -> a) -> r -> r
^: (Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
1))
(ValidLocalNodeState -> ValidLocalNodeState)
-> ValidLocalNodeState -> ValidLocalNodeState
forall a b. (a -> b) -> a -> b
$ ValidLocalNodeState
vst
, ProcessId
pid
)
startProcess MaskingState
_ LocalNodeState
LocalNodeClosed =
NodeClosedException -> IO (LocalNodeState, ProcessId)
forall e a. Exception e => e -> IO a
throwIO (NodeClosedException -> IO (LocalNodeState, ProcessId))
-> NodeClosedException -> IO (LocalNodeState, ProcessId)
forall a b. (a -> b) -> a -> b
$ NodeId -> NodeClosedException
NodeClosedException (NodeId -> NodeClosedException) -> NodeId -> NodeClosedException
forall a b. (a -> b) -> a -> b
$ LocalNode -> NodeId
localNodeId LocalNode
node
cleanupProcess :: ProcessId
-> ValidLocalNodeState
-> IO (ValidLocalNodeState, [NT.Connection])
cleanupProcess :: ProcessId
-> ValidLocalNodeState -> IO (ValidLocalNodeState, [Connection])
cleanupProcess ProcessId
pid ValidLocalNodeState
vst = do
let pid' :: Identifier
pid' = ProcessId -> Identifier
ProcessIdentifier ProcessId
pid
let (Map (Identifier, Identifier) (Connection, ImplicitReconnect)
affected, Map (Identifier, Identifier) (Connection, ImplicitReconnect)
unaffected) = ((Identifier, Identifier)
-> (Connection, ImplicitReconnect) -> Bool)
-> Map (Identifier, Identifier) (Connection, ImplicitReconnect)
-> (Map (Identifier, Identifier) (Connection, ImplicitReconnect),
Map (Identifier, Identifier) (Connection, ImplicitReconnect))
forall k a. (k -> a -> Bool) -> Map k a -> (Map k a, Map k a)
Map.partitionWithKey (\(Identifier
fr, Identifier
_to) !(Connection, ImplicitReconnect)
_v -> Identifier -> Identifier -> Bool
impliesDeathOf Identifier
pid' Identifier
fr) (ValidLocalNodeState
vst ValidLocalNodeState
-> T ValidLocalNodeState
(Map (Identifier, Identifier) (Connection, ImplicitReconnect))
-> Map (Identifier, Identifier) (Connection, ImplicitReconnect)
forall r a. r -> T r a -> a
^. T ValidLocalNodeState
(Map (Identifier, Identifier) (Connection, ImplicitReconnect))
localConnections)
(ValidLocalNodeState, [Connection])
-> IO (ValidLocalNodeState, [Connection])
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ( (LocalProcessId -> Accessor ValidLocalNodeState (Maybe LocalProcess)
localProcessWithId (ProcessId -> LocalProcessId
processLocalId ProcessId
pid) Accessor ValidLocalNodeState (Maybe LocalProcess)
-> Maybe LocalProcess -> ValidLocalNodeState -> ValidLocalNodeState
forall r a. T r a -> a -> r -> r
^= Maybe LocalProcess
forall a. Maybe a
Nothing)
(ValidLocalNodeState -> ValidLocalNodeState)
-> (ValidLocalNodeState -> ValidLocalNodeState)
-> ValidLocalNodeState
-> ValidLocalNodeState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (T ValidLocalNodeState
(Map (Identifier, Identifier) (Connection, ImplicitReconnect))
localConnections T ValidLocalNodeState
(Map (Identifier, Identifier) (Connection, ImplicitReconnect))
-> Map (Identifier, Identifier) (Connection, ImplicitReconnect)
-> ValidLocalNodeState
-> ValidLocalNodeState
forall r a. T r a -> a -> r -> r
^= Map (Identifier, Identifier) (Connection, ImplicitReconnect)
unaffected)
(ValidLocalNodeState -> ValidLocalNodeState)
-> ValidLocalNodeState -> ValidLocalNodeState
forall a b. (a -> b) -> a -> b
$ ValidLocalNodeState
vst
, ((Connection, ImplicitReconnect) -> Connection)
-> [(Connection, ImplicitReconnect)] -> [Connection]
forall a b. (a -> b) -> [a] -> [b]
map (Connection, ImplicitReconnect) -> Connection
forall a b. (a, b) -> a
fst ([(Connection, ImplicitReconnect)] -> [Connection])
-> [(Connection, ImplicitReconnect)] -> [Connection]
forall a b. (a -> b) -> a -> b
$ Map (Identifier, Identifier) (Connection, ImplicitReconnect)
-> [(Connection, ImplicitReconnect)]
forall k a. Map k a -> [a]
Map.elems Map (Identifier, Identifier) (Connection, ImplicitReconnect)
affected
)
type IncomingConnection = (NT.EndPointAddress, IncomingTarget)
data IncomingTarget =
Uninit
| ToProc ProcessId (Weak (CQueue Message))
| ToChan SendPortId TypedChannel
| ToNode
data ConnectionState = ConnectionState {
ConnectionState -> Map ConnectionId IncomingConnection
_incoming :: !(Map NT.ConnectionId IncomingConnection)
, ConnectionState -> Map EndPointAddress (Set ConnectionId)
_incomingFrom :: !(Map NT.EndPointAddress (Set NT.ConnectionId))
}
initConnectionState :: ConnectionState
initConnectionState :: ConnectionState
initConnectionState = ConnectionState {
_incoming :: Map ConnectionId IncomingConnection
_incoming = Map ConnectionId IncomingConnection
forall k a. Map k a
Map.empty
, _incomingFrom :: Map EndPointAddress (Set ConnectionId)
_incomingFrom = Map EndPointAddress (Set ConnectionId)
forall k a. Map k a
Map.empty
}
incoming :: Accessor ConnectionState (Map NT.ConnectionId IncomingConnection)
incoming :: Accessor ConnectionState (Map ConnectionId IncomingConnection)
incoming = (ConnectionState -> Map ConnectionId IncomingConnection)
-> (Map ConnectionId IncomingConnection
-> ConnectionState -> ConnectionState)
-> Accessor ConnectionState (Map ConnectionId IncomingConnection)
forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor ConnectionState -> Map ConnectionId IncomingConnection
_incoming (\Map ConnectionId IncomingConnection
conns ConnectionState
st -> ConnectionState
st { _incoming = conns })
incomingAt :: NT.ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt :: ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt ConnectionId
cid = Accessor ConnectionState (Map ConnectionId IncomingConnection)
incoming Accessor ConnectionState (Map ConnectionId IncomingConnection)
-> T (Map ConnectionId IncomingConnection)
(Maybe IncomingConnection)
-> Accessor ConnectionState (Maybe IncomingConnection)
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> ConnectionId
-> T (Map ConnectionId IncomingConnection)
(Maybe IncomingConnection)
forall key elem.
Ord key =>
key -> Accessor (Map key elem) (Maybe elem)
DAC.mapMaybe ConnectionId
cid
incomingFrom :: NT.EndPointAddress -> Accessor ConnectionState (Set NT.ConnectionId)
incomingFrom :: EndPointAddress -> Accessor ConnectionState (Set ConnectionId)
incomingFrom EndPointAddress
addr = Accessor ConnectionState (Map EndPointAddress (Set ConnectionId))
aux Accessor ConnectionState (Map EndPointAddress (Set ConnectionId))
-> T (Map EndPointAddress (Set ConnectionId)) (Set ConnectionId)
-> Accessor ConnectionState (Set ConnectionId)
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> Set ConnectionId
-> EndPointAddress
-> T (Map EndPointAddress (Set ConnectionId)) (Set ConnectionId)
forall key elem.
Ord key =>
elem -> key -> Accessor (Map key elem) elem
DAC.mapDefault Set ConnectionId
forall a. Set a
Set.empty EndPointAddress
addr
where
aux :: Accessor ConnectionState (Map EndPointAddress (Set ConnectionId))
aux = (ConnectionState -> Map EndPointAddress (Set ConnectionId))
-> (Map EndPointAddress (Set ConnectionId)
-> ConnectionState -> ConnectionState)
-> Accessor
ConnectionState (Map EndPointAddress (Set ConnectionId))
forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor ConnectionState -> Map EndPointAddress (Set ConnectionId)
_incomingFrom (\Map EndPointAddress (Set ConnectionId)
fr ConnectionState
st -> ConnectionState
st { _incomingFrom = fr })
handleIncomingMessages :: LocalNode -> IO ()
handleIncomingMessages :: LocalNode -> IO ()
handleIncomingMessages LocalNode
node = ConnectionState -> IO ()
go ConnectionState
initConnectionState
IO () -> (NodeClosedException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Exception.catch` \(NodeClosedException NodeId
_) -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
where
go :: ConnectionState -> IO ()
go :: ConnectionState -> IO ()
go !ConnectionState
st = do
Event
event <- EndPoint -> IO Event
NT.receive EndPoint
endpoint
case Event
event of
NT.ConnectionOpened ConnectionId
cid Reliability
rel EndPointAddress
theirAddr ->
if Reliability
rel Reliability -> Reliability -> Bool
forall a. Eq a => a -> a -> Bool
== Reliability
NT.ReliableOrdered
then
LocalNode -> MxEvent -> IO ()
trace LocalNode
node (ConnectionId -> EndPointAddress -> MxEvent
MxConnected ConnectionId
cid EndPointAddress
theirAddr)
IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ConnectionState -> IO ()
go (
(ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt ConnectionId
cid Accessor ConnectionState (Maybe IncomingConnection)
-> Maybe IncomingConnection -> ConnectionState -> ConnectionState
forall r a. T r a -> a -> r -> r
^= IncomingConnection -> Maybe IncomingConnection
forall a. a -> Maybe a
Just (EndPointAddress
theirAddr, IncomingTarget
Uninit))
(ConnectionState -> ConnectionState)
-> (ConnectionState -> ConnectionState)
-> ConnectionState
-> ConnectionState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (EndPointAddress -> Accessor ConnectionState (Set ConnectionId)
incomingFrom EndPointAddress
theirAddr Accessor ConnectionState (Set ConnectionId)
-> (Set ConnectionId -> Set ConnectionId)
-> ConnectionState
-> ConnectionState
forall r a. T r a -> (a -> a) -> r -> r
^: ConnectionId -> Set ConnectionId -> Set ConnectionId
forall a. Ord a => a -> Set a -> Set a
Set.insert ConnectionId
cid)
(ConnectionState -> ConnectionState)
-> ConnectionState -> ConnectionState
forall a b. (a -> b) -> a -> b
$ ConnectionState
st
)
else ConnectionId -> ConnectionState -> String -> IO ()
invalidRequest ConnectionId
cid ConnectionState
st (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$
String
"attempt to connect with unsupported reliability " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Reliability -> String
forall a. Show a => a -> String
show Reliability
rel
NT.Received ConnectionId
cid [ByteString]
payload ->
case ConnectionState
st ConnectionState
-> Accessor ConnectionState (Maybe IncomingConnection)
-> Maybe IncomingConnection
forall r a. r -> T r a -> a
^. ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt ConnectionId
cid of
Just (EndPointAddress
_, ToProc ProcessId
pid Weak (CQueue Message)
weakQueue) -> do
Maybe (CQueue Message)
mQueue <- Weak (CQueue Message) -> IO (Maybe (CQueue Message))
forall v. Weak v -> IO (Maybe v)
deRefWeak Weak (CQueue Message)
weakQueue
Maybe (CQueue Message) -> (CQueue Message -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe (CQueue Message)
mQueue ((CQueue Message -> IO ()) -> IO ())
-> (CQueue Message -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \CQueue Message
queue -> do
let msg :: Message
msg = [ByteString] -> Message
payloadToMessage [ByteString]
payload
CQueue Message -> Message -> IO ()
forall a. CQueue a -> a -> IO ()
enqueue CQueue Message
queue Message
msg
LocalNode -> MxEvent -> IO ()
trace LocalNode
node (ProcessId -> Message -> MxEvent
MxReceived ProcessId
pid Message
msg)
ConnectionState -> IO ()
go ConnectionState
st
Just (EndPointAddress
_, ToChan SendPortId
chId (TypedChannel Weak (TQueue a)
chan')) -> do
Maybe (TQueue a)
mChan <- Weak (TQueue a) -> IO (Maybe (TQueue a))
forall v. Weak v -> IO (Maybe v)
deRefWeak Weak (TQueue a)
chan'
Maybe (TQueue a) -> (TQueue a -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe (TQueue a)
mChan ((TQueue a -> IO ()) -> IO ()) -> (TQueue a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \TQueue a
chan -> do
a
msg' <- STM a -> IO a
forall a. STM a -> IO a
atomically (STM a -> IO a) -> STM a -> IO a
forall a b. (a -> b) -> a -> b
$ do
a
msg <- a -> STM a
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> STM a) -> a -> STM a
forall a b. (a -> b) -> a -> b
$! ByteString -> a
forall a. Binary a => ByteString -> a
decode ([ByteString] -> ByteString
BSL.fromChunks [ByteString]
payload)
TQueue a -> a -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue a
chan a
msg
a -> STM a
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return a
msg
LocalNode -> MxEvent -> IO ()
trace LocalNode
node (MxEvent -> IO ()) -> MxEvent -> IO ()
forall a b. (a -> b) -> a -> b
$ SendPortId -> Message -> MxEvent
MxReceivedPort SendPortId
chId (Message -> MxEvent) -> Message -> MxEvent
forall a b. (a -> b) -> a -> b
$ a -> Message
forall a. Serializable a => a -> Message
unsafeCreateUnencodedMessage a
msg'
ConnectionState -> IO ()
go ConnectionState
st
Just (EndPointAddress
_, IncomingTarget
ToNode) -> do
let ctrlMsg :: NCMsg
ctrlMsg = ByteString -> NCMsg
forall a. Binary a => ByteString -> a
decode (ByteString -> NCMsg)
-> ([ByteString] -> ByteString) -> [ByteString] -> NCMsg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [ByteString] -> ByteString
BSL.fromChunks ([ByteString] -> NCMsg) -> [ByteString] -> NCMsg
forall a b. (a -> b) -> a -> b
$ [ByteString]
payload
Chan NCMsg -> NCMsg -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan NCMsg
ctrlChan (NCMsg -> IO ()) -> NCMsg -> IO ()
forall a b. (a -> b) -> a -> b
$! NCMsg
ctrlMsg
ConnectionState -> IO ()
go ConnectionState
st
Just (EndPointAddress
src, IncomingTarget
Uninit) ->
case ByteString -> Identifier
forall a. Binary a => ByteString -> a
decode ([ByteString] -> ByteString
BSL.fromChunks [ByteString]
payload) of
ProcessIdentifier ProcessId
pid -> do
let lpid :: LocalProcessId
lpid = ProcessId -> LocalProcessId
processLocalId ProcessId
pid
Maybe LocalProcess
mProc <- LocalNode
-> (ValidLocalNodeState -> IO (Maybe LocalProcess))
-> IO (Maybe LocalProcess)
forall r. LocalNode -> (ValidLocalNodeState -> IO r) -> IO r
withValidLocalState LocalNode
node ((ValidLocalNodeState -> IO (Maybe LocalProcess))
-> IO (Maybe LocalProcess))
-> (ValidLocalNodeState -> IO (Maybe LocalProcess))
-> IO (Maybe LocalProcess)
forall a b. (a -> b) -> a -> b
$ Maybe LocalProcess -> IO (Maybe LocalProcess)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe LocalProcess -> IO (Maybe LocalProcess))
-> (ValidLocalNodeState -> Maybe LocalProcess)
-> ValidLocalNodeState
-> IO (Maybe LocalProcess)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ValidLocalNodeState
-> Accessor ValidLocalNodeState (Maybe LocalProcess)
-> Maybe LocalProcess
forall r a. r -> T r a -> a
^. LocalProcessId -> Accessor ValidLocalNodeState (Maybe LocalProcess)
localProcessWithId LocalProcessId
lpid)
case Maybe LocalProcess
mProc of
Just LocalProcess
proc ->
ConnectionState -> IO ()
go (ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt ConnectionId
cid Accessor ConnectionState (Maybe IncomingConnection)
-> Maybe IncomingConnection -> ConnectionState -> ConnectionState
forall r a. T r a -> a -> r -> r
^= IncomingConnection -> Maybe IncomingConnection
forall a. a -> Maybe a
Just (EndPointAddress
src, ProcessId -> Weak (CQueue Message) -> IncomingTarget
ToProc ProcessId
pid (LocalProcess -> Weak (CQueue Message)
processWeakQ LocalProcess
proc)) (ConnectionState -> ConnectionState)
-> ConnectionState -> ConnectionState
forall a b. (a -> b) -> a -> b
$ ConnectionState
st)
Maybe LocalProcess
Nothing ->
ConnectionState -> IO ()
go (ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt ConnectionId
cid Accessor ConnectionState (Maybe IncomingConnection)
-> Maybe IncomingConnection -> ConnectionState -> ConnectionState
forall r a. T r a -> a -> r -> r
^= Maybe IncomingConnection
forall a. Maybe a
Nothing (ConnectionState -> ConnectionState)
-> ConnectionState -> ConnectionState
forall a b. (a -> b) -> a -> b
$ ConnectionState
st)
SendPortIdentifier SendPortId
chId -> do
let lcid :: Int32
lcid = SendPortId -> Int32
sendPortLocalId SendPortId
chId
lpid :: LocalProcessId
lpid = ProcessId -> LocalProcessId
processLocalId (SendPortId -> ProcessId
sendPortProcessId SendPortId
chId)
Maybe LocalProcess
mProc <- LocalNode
-> (ValidLocalNodeState -> IO (Maybe LocalProcess))
-> IO (Maybe LocalProcess)
forall r. LocalNode -> (ValidLocalNodeState -> IO r) -> IO r
withValidLocalState LocalNode
node ((ValidLocalNodeState -> IO (Maybe LocalProcess))
-> IO (Maybe LocalProcess))
-> (ValidLocalNodeState -> IO (Maybe LocalProcess))
-> IO (Maybe LocalProcess)
forall a b. (a -> b) -> a -> b
$ Maybe LocalProcess -> IO (Maybe LocalProcess)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe LocalProcess -> IO (Maybe LocalProcess))
-> (ValidLocalNodeState -> Maybe LocalProcess)
-> ValidLocalNodeState
-> IO (Maybe LocalProcess)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ValidLocalNodeState
-> Accessor ValidLocalNodeState (Maybe LocalProcess)
-> Maybe LocalProcess
forall r a. r -> T r a -> a
^. LocalProcessId -> Accessor ValidLocalNodeState (Maybe LocalProcess)
localProcessWithId LocalProcessId
lpid)
case Maybe LocalProcess
mProc of
Just LocalProcess
proc -> do
Maybe TypedChannel
mChannel <- StrictMVar LocalProcessState
-> (LocalProcessState -> IO (Maybe TypedChannel))
-> IO (Maybe TypedChannel)
forall a b. StrictMVar a -> (a -> IO b) -> IO b
withMVar (LocalProcess -> StrictMVar LocalProcessState
processState LocalProcess
proc) ((LocalProcessState -> IO (Maybe TypedChannel))
-> IO (Maybe TypedChannel))
-> (LocalProcessState -> IO (Maybe TypedChannel))
-> IO (Maybe TypedChannel)
forall a b. (a -> b) -> a -> b
$ Maybe TypedChannel -> IO (Maybe TypedChannel)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe TypedChannel -> IO (Maybe TypedChannel))
-> (LocalProcessState -> Maybe TypedChannel)
-> LocalProcessState
-> IO (Maybe TypedChannel)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (LocalProcessState
-> T LocalProcessState (Maybe TypedChannel) -> Maybe TypedChannel
forall r a. r -> T r a -> a
^. Int32 -> T LocalProcessState (Maybe TypedChannel)
typedChannelWithId Int32
lcid)
case Maybe TypedChannel
mChannel of
Just TypedChannel
channel ->
ConnectionState -> IO ()
go (ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt ConnectionId
cid Accessor ConnectionState (Maybe IncomingConnection)
-> Maybe IncomingConnection -> ConnectionState -> ConnectionState
forall r a. T r a -> a -> r -> r
^= IncomingConnection -> Maybe IncomingConnection
forall a. a -> Maybe a
Just (EndPointAddress
src, SendPortId -> TypedChannel -> IncomingTarget
ToChan SendPortId
chId TypedChannel
channel) (ConnectionState -> ConnectionState)
-> ConnectionState -> ConnectionState
forall a b. (a -> b) -> a -> b
$ ConnectionState
st)
Maybe TypedChannel
Nothing ->
ConnectionId -> ConnectionState -> String -> IO ()
invalidRequest ConnectionId
cid ConnectionState
st (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$
String
"incoming attempt to connect to unknown channel of"
String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" process " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show (SendPortId -> ProcessId
sendPortProcessId SendPortId
chId)
Maybe LocalProcess
Nothing ->
ConnectionState -> IO ()
go (ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt ConnectionId
cid Accessor ConnectionState (Maybe IncomingConnection)
-> Maybe IncomingConnection -> ConnectionState -> ConnectionState
forall r a. T r a -> a -> r -> r
^= Maybe IncomingConnection
forall a. Maybe a
Nothing (ConnectionState -> ConnectionState)
-> ConnectionState -> ConnectionState
forall a b. (a -> b) -> a -> b
$ ConnectionState
st)
NodeIdentifier NodeId
nid ->
if NodeId
nid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== LocalNode -> NodeId
localNodeId LocalNode
node
then ConnectionState -> IO ()
go (ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt ConnectionId
cid Accessor ConnectionState (Maybe IncomingConnection)
-> Maybe IncomingConnection -> ConnectionState -> ConnectionState
forall r a. T r a -> a -> r -> r
^= IncomingConnection -> Maybe IncomingConnection
forall a. a -> Maybe a
Just (EndPointAddress
src, IncomingTarget
ToNode) (ConnectionState -> ConnectionState)
-> ConnectionState -> ConnectionState
forall a b. (a -> b) -> a -> b
$ ConnectionState
st)
else ConnectionId -> ConnectionState -> String -> IO ()
invalidRequest ConnectionId
cid ConnectionState
st (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$
String
"incoming attempt to connect to a different node -"
String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" I'm " String -> String -> String
forall a. [a] -> [a] -> [a]
++ NodeId -> String
forall a. Show a => a -> String
show (LocalNode -> NodeId
localNodeId LocalNode
node)
String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" but the remote peer wants to connect to "
String -> String -> String
forall a. [a] -> [a] -> [a]
++ NodeId -> String
forall a. Show a => a -> String
show NodeId
nid
Maybe IncomingConnection
Nothing ->
ConnectionId -> ConnectionState -> String -> IO ()
invalidRequest ConnectionId
cid ConnectionState
st
String
"message received from an unknown connection"
NT.ConnectionClosed ConnectionId
cid ->
case ConnectionState
st ConnectionState
-> Accessor ConnectionState (Maybe IncomingConnection)
-> Maybe IncomingConnection
forall r a. r -> T r a -> a
^. ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt ConnectionId
cid of
Maybe IncomingConnection
Nothing ->
ConnectionId -> ConnectionState -> String -> IO ()
invalidRequest ConnectionId
cid ConnectionState
st String
"closed unknown connection"
Just (EndPointAddress
src, IncomingTarget
_) -> do
LocalNode -> MxEvent -> IO ()
trace LocalNode
node (ConnectionId -> EndPointAddress -> MxEvent
MxDisconnected ConnectionId
cid EndPointAddress
src)
ConnectionState -> IO ()
go ( (ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt ConnectionId
cid Accessor ConnectionState (Maybe IncomingConnection)
-> Maybe IncomingConnection -> ConnectionState -> ConnectionState
forall r a. T r a -> a -> r -> r
^= Maybe IncomingConnection
forall a. Maybe a
Nothing)
(ConnectionState -> ConnectionState)
-> (ConnectionState -> ConnectionState)
-> ConnectionState
-> ConnectionState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (EndPointAddress -> Accessor ConnectionState (Set ConnectionId)
incomingFrom EndPointAddress
src Accessor ConnectionState (Set ConnectionId)
-> (Set ConnectionId -> Set ConnectionId)
-> ConnectionState
-> ConnectionState
forall r a. T r a -> (a -> a) -> r -> r
^: ConnectionId -> Set ConnectionId -> Set ConnectionId
forall a. Ord a => a -> Set a -> Set a
Set.delete ConnectionId
cid)
(ConnectionState -> ConnectionState)
-> ConnectionState -> ConnectionState
forall a b. (a -> b) -> a -> b
$ ConnectionState
st
)
NT.ErrorEvent (NT.TransportError (NT.EventConnectionLost EndPointAddress
theirAddr) String
_) -> do
let nid :: Identifier
nid = NodeId -> Identifier
NodeIdentifier (NodeId -> Identifier) -> NodeId -> Identifier
forall a b. (a -> b) -> a -> b
$ EndPointAddress -> NodeId
NodeId EndPointAddress
theirAddr
Chan NCMsg -> NCMsg -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan NCMsg
ctrlChan NCMsg
{ ctrlMsgSender :: Identifier
ctrlMsgSender = Identifier
nid
, ctrlMsgSignal :: ProcessSignal
ctrlMsgSignal = Identifier -> DiedReason -> ProcessSignal
Died Identifier
nid DiedReason
DiedDisconnect
}
let notLost :: ConnectionId -> Bool
notLost ConnectionId
k = Bool -> Bool
not (ConnectionId
k ConnectionId -> Set ConnectionId -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.member` (ConnectionState
st ConnectionState
-> Accessor ConnectionState (Set ConnectionId) -> Set ConnectionId
forall r a. r -> T r a -> a
^. EndPointAddress -> Accessor ConnectionState (Set ConnectionId)
incomingFrom EndPointAddress
theirAddr))
LocalNode -> Identifier -> IO ()
closeImplicitReconnections LocalNode
node Identifier
nid
ConnectionState -> IO ()
go ( (EndPointAddress -> Accessor ConnectionState (Set ConnectionId)
incomingFrom EndPointAddress
theirAddr Accessor ConnectionState (Set ConnectionId)
-> Set ConnectionId -> ConnectionState -> ConnectionState
forall r a. T r a -> a -> r -> r
^= Set ConnectionId
forall a. Set a
Set.empty)
(ConnectionState -> ConnectionState)
-> (ConnectionState -> ConnectionState)
-> ConnectionState
-> ConnectionState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Accessor ConnectionState (Map ConnectionId IncomingConnection)
incoming Accessor ConnectionState (Map ConnectionId IncomingConnection)
-> (Map ConnectionId IncomingConnection
-> Map ConnectionId IncomingConnection)
-> ConnectionState
-> ConnectionState
forall r a. T r a -> (a -> a) -> r -> r
^: (ConnectionId -> IncomingConnection -> Bool)
-> Map ConnectionId IncomingConnection
-> Map ConnectionId IncomingConnection
forall k a. (k -> a -> Bool) -> Map k a -> Map k a
Map.filterWithKey (Bool -> IncomingConnection -> Bool
forall a b. a -> b -> a
const (Bool -> IncomingConnection -> Bool)
-> (ConnectionId -> Bool)
-> ConnectionId
-> IncomingConnection
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionId -> Bool
notLost))
(ConnectionState -> ConnectionState)
-> ConnectionState -> ConnectionState
forall a b. (a -> b) -> a -> b
$ ConnectionState
st
)
NT.ErrorEvent (NT.TransportError EventErrorCode
NT.EventEndPointFailed String
str) ->
String -> IO ()
forall a. String -> IO a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Cloud Haskell fatal error: end point failed: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
str
NT.ErrorEvent (NT.TransportError EventErrorCode
NT.EventTransportFailed String
str) ->
String -> IO ()
forall a. String -> IO a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Cloud Haskell fatal error: transport failed: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
str
Event
NT.EndPointClosed ->
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
NT.ReceivedMulticast MulticastAddress
_ [ByteString]
_ ->
String -> IO ()
forall a. String -> IO a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Cloud Haskell fatal error: received unexpected multicast"
invalidRequest :: NT.ConnectionId -> ConnectionState -> String -> IO ()
invalidRequest :: ConnectionId -> ConnectionState -> String -> IO ()
invalidRequest ConnectionId
cid ConnectionState
st String
msg = do
LocalNode -> String -> [TraceArg] -> IO ()
traceEventFmtIO LocalNode
node String
"" [ String -> TraceArg
TraceStr (String -> TraceArg) -> String -> TraceArg
forall a b. (a -> b) -> a -> b
$ String
" [network] invalid request"
String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" (" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
msg String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"): "
, (ConnectionId -> TraceArg
forall a. Show a => a -> TraceArg
Trace ConnectionId
cid)
]
ConnectionState -> IO ()
go ( ConnectionId -> Accessor ConnectionState (Maybe IncomingConnection)
incomingAt ConnectionId
cid Accessor ConnectionState (Maybe IncomingConnection)
-> Maybe IncomingConnection -> ConnectionState -> ConnectionState
forall r a. T r a -> a -> r -> r
^= Maybe IncomingConnection
forall a. Maybe a
Nothing
(ConnectionState -> ConnectionState)
-> ConnectionState -> ConnectionState
forall a b. (a -> b) -> a -> b
$ ConnectionState
st
)
endpoint :: EndPoint
endpoint = LocalNode -> EndPoint
localEndPoint LocalNode
node
ctrlChan :: Chan NCMsg
ctrlChan = LocalNode -> Chan NCMsg
localCtrlChan LocalNode
node
runNodeController :: LocalNode -> IO ()
runNodeController :: LocalNode -> IO ()
runNodeController LocalNode
node =
ReaderT LocalNode IO () -> LocalNode -> IO ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (StateT NCState (ReaderT LocalNode IO) ()
-> NCState -> ReaderT LocalNode IO ()
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m a
evalStateT (NC () -> StateT NCState (ReaderT LocalNode IO) ()
forall a. NC a -> StateT NCState (ReaderT LocalNode IO) a
unNC NC ()
nodeController) NCState
initNCState) LocalNode
node
IO () -> (NodeClosedException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Exception.catch` \(NodeClosedException NodeId
_) -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
data NCState = NCState
{
NCState -> BiMultiMap Identifier ProcessId ()
_links :: !(BiMultiMap Identifier ProcessId ())
, NCState -> BiMultiMap Identifier ProcessId MonitorRef
_monitors :: !(BiMultiMap Identifier ProcessId MonitorRef)
, NCState -> Map String ProcessId
_registeredHere :: !(Map String ProcessId)
, NCState -> Map ProcessId [(NodeId, Int)]
_registeredOnNodes :: !(Map ProcessId [(NodeId,Int)])
}
newtype NC a = NC { forall a. NC a -> StateT NCState (ReaderT LocalNode IO) a
unNC :: StateT NCState (ReaderT LocalNode IO) a }
deriving ( Functor NC
Functor NC =>
(forall a. a -> NC a)
-> (forall a b. NC (a -> b) -> NC a -> NC b)
-> (forall a b c. (a -> b -> c) -> NC a -> NC b -> NC c)
-> (forall a b. NC a -> NC b -> NC b)
-> (forall a b. NC a -> NC b -> NC a)
-> Applicative NC
forall a. a -> NC a
forall a b. NC a -> NC b -> NC a
forall a b. NC a -> NC b -> NC b
forall a b. NC (a -> b) -> NC a -> NC b
forall a b c. (a -> b -> c) -> NC a -> NC b -> NC c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
$cpure :: forall a. a -> NC a
pure :: forall a. a -> NC a
$c<*> :: forall a b. NC (a -> b) -> NC a -> NC b
<*> :: forall a b. NC (a -> b) -> NC a -> NC b
$cliftA2 :: forall a b c. (a -> b -> c) -> NC a -> NC b -> NC c
liftA2 :: forall a b c. (a -> b -> c) -> NC a -> NC b -> NC c
$c*> :: forall a b. NC a -> NC b -> NC b
*> :: forall a b. NC a -> NC b -> NC b
$c<* :: forall a b. NC a -> NC b -> NC a
<* :: forall a b. NC a -> NC b -> NC a
Applicative
, (forall a b. (a -> b) -> NC a -> NC b)
-> (forall a b. a -> NC b -> NC a) -> Functor NC
forall a b. a -> NC b -> NC a
forall a b. (a -> b) -> NC a -> NC b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall a b. (a -> b) -> NC a -> NC b
fmap :: forall a b. (a -> b) -> NC a -> NC b
$c<$ :: forall a b. a -> NC b -> NC a
<$ :: forall a b. a -> NC b -> NC a
Functor
, Applicative NC
Applicative NC =>
(forall a b. NC a -> (a -> NC b) -> NC b)
-> (forall a b. NC a -> NC b -> NC b)
-> (forall a. a -> NC a)
-> Monad NC
forall a. a -> NC a
forall a b. NC a -> NC b -> NC b
forall a b. NC a -> (a -> NC b) -> NC b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
$c>>= :: forall a b. NC a -> (a -> NC b) -> NC b
>>= :: forall a b. NC a -> (a -> NC b) -> NC b
$c>> :: forall a b. NC a -> NC b -> NC b
>> :: forall a b. NC a -> NC b -> NC b
$creturn :: forall a. a -> NC a
return :: forall a. a -> NC a
Monad
, Monad NC
Monad NC => (forall a. IO a -> NC a) -> MonadIO NC
forall a. IO a -> NC a
forall (m :: * -> *).
Monad m =>
(forall a. IO a -> m a) -> MonadIO m
$cliftIO :: forall a. IO a -> NC a
liftIO :: forall a. IO a -> NC a
MonadIO
, MonadState NCState
, MonadReader LocalNode
)
initNCState :: NCState
initNCState :: NCState
initNCState = NCState { _links :: BiMultiMap Identifier ProcessId ()
_links = BiMultiMap Identifier ProcessId ()
forall a b v. BiMultiMap a b v
BiMultiMap.empty
, _monitors :: BiMultiMap Identifier ProcessId MonitorRef
_monitors = BiMultiMap Identifier ProcessId MonitorRef
forall a b v. BiMultiMap a b v
BiMultiMap.empty
, _registeredHere :: Map String ProcessId
_registeredHere = Map String ProcessId
forall k a. Map k a
Map.empty
, _registeredOnNodes :: Map ProcessId [(NodeId, Int)]
_registeredOnNodes = Map ProcessId [(NodeId, Int)]
forall k a. Map k a
Map.empty
}
data ProcessKillException =
ProcessKillException !ProcessId !String
deriving (Typeable)
instance Exception ProcessKillException
instance Show ProcessKillException where
show :: ProcessKillException -> String
show (ProcessKillException ProcessId
pid String
reason) =
String
"killed-by=" String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
",reason=" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
reason
ncSendToProcess :: ProcessId -> Message -> NC ()
ncSendToProcess :: ProcessId -> Message -> NC ()
ncSendToProcess = Bool -> ProcessId -> Message -> NC ()
ncSendToProcessAndTrace Bool
True
ncSendToProcessAndTrace :: Bool -> ProcessId -> Message -> NC ()
ncSendToProcessAndTrace :: Bool -> ProcessId -> Message -> NC ()
ncSendToProcessAndTrace Bool
shouldTrace ProcessId
pid Message
msg = do
LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
if ProcessId -> NodeId
processNodeId ProcessId
pid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== LocalNode -> NodeId
localNodeId LocalNode
node
then Bool -> LocalNode -> ProcessId -> Message -> NC ()
ncEffectLocalSendAndTrace Bool
shouldTrace LocalNode
node ProcessId
pid Message
msg
else IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> NCMsg -> IO ()
forall a.
Binary a =>
LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
sendBinary LocalNode
node
(NodeId -> Identifier
NodeIdentifier (NodeId -> Identifier) -> NodeId -> Identifier
forall a b. (a -> b) -> a -> b
$ LocalNode -> NodeId
localNodeId LocalNode
node)
(NodeId -> Identifier
NodeIdentifier (NodeId -> Identifier) -> NodeId -> Identifier
forall a b. (a -> b) -> a -> b
$ ProcessId -> NodeId
processNodeId ProcessId
pid)
ImplicitReconnect
WithImplicitReconnect
NCMsg { ctrlMsgSender :: Identifier
ctrlMsgSender = NodeId -> Identifier
NodeIdentifier (NodeId -> Identifier) -> NodeId -> Identifier
forall a b. (a -> b) -> a -> b
$ LocalNode -> NodeId
localNodeId LocalNode
node
, ctrlMsgSignal :: ProcessSignal
ctrlMsgSignal = LocalProcessId -> Message -> ProcessSignal
UnreliableSend (ProcessId -> LocalProcessId
processLocalId ProcessId
pid) Message
msg
}
ncSendToNode :: NodeId -> NCMsg -> NC ()
ncSendToNode :: NodeId -> NCMsg -> NC ()
ncSendToNode NodeId
to NCMsg
msg = do
LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ if NodeId
to NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== LocalNode -> NodeId
localNodeId LocalNode
node
then Chan NCMsg -> NCMsg -> IO ()
forall a. Chan a -> a -> IO ()
writeChan (LocalNode -> Chan NCMsg
localCtrlChan LocalNode
node) (NCMsg -> IO ()) -> NCMsg -> IO ()
forall a b. (a -> b) -> a -> b
$! NCMsg
msg
else LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> NCMsg -> IO ()
forall a.
Binary a =>
LocalNode
-> Identifier -> Identifier -> ImplicitReconnect -> a -> IO ()
sendBinary LocalNode
node
(NodeId -> Identifier
NodeIdentifier (NodeId -> Identifier) -> NodeId -> Identifier
forall a b. (a -> b) -> a -> b
$ LocalNode -> NodeId
localNodeId LocalNode
node)
(NodeId -> Identifier
NodeIdentifier NodeId
to)
ImplicitReconnect
WithImplicitReconnect
NCMsg
msg
traceNotifyDied :: LocalNode -> Identifier -> DiedReason -> NC ()
traceNotifyDied :: LocalNode -> Identifier -> DiedReason -> NC ()
traceNotifyDied LocalNode
node Identifier
ident DiedReason
reason =
IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> (MxEventBus -> IO ()) -> IO ()
withLocalTracer LocalNode
node ((MxEventBus -> IO ()) -> IO ()) -> (MxEventBus -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \MxEventBus
t ->
case Identifier
ident of
(NodeIdentifier NodeId
nid) -> MxEventBus -> MxEvent -> IO ()
traceEvent MxEventBus
t (NodeId -> DiedReason -> MxEvent
MxNodeDied NodeId
nid DiedReason
reason)
(ProcessIdentifier ProcessId
pid) -> MxEventBus -> MxEvent -> IO ()
traceEvent MxEventBus
t (ProcessId -> DiedReason -> MxEvent
MxProcessDied ProcessId
pid DiedReason
reason)
Identifier
_ -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
traceEventFmtIO :: LocalNode
-> String
-> [TraceArg]
-> IO ()
traceEventFmtIO :: LocalNode -> String -> [TraceArg] -> IO ()
traceEventFmtIO LocalNode
node String
fmt [TraceArg]
args =
LocalNode -> (MxEventBus -> IO ()) -> IO ()
withLocalTracer LocalNode
node ((MxEventBus -> IO ()) -> IO ()) -> (MxEventBus -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \MxEventBus
t -> MxEventBus -> String -> [TraceArg] -> IO ()
traceLogFmt MxEventBus
t String
fmt [TraceArg]
args
trace :: LocalNode -> MxEvent -> IO ()
trace :: LocalNode -> MxEvent -> IO ()
trace LocalNode
node MxEvent
ev = LocalNode -> (MxEventBus -> IO ()) -> IO ()
withLocalTracer LocalNode
node ((MxEventBus -> IO ()) -> IO ()) -> (MxEventBus -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \MxEventBus
t -> MxEventBus -> MxEvent -> IO ()
traceEvent MxEventBus
t MxEvent
ev
withLocalTracer :: LocalNode -> (MxEventBus -> IO ()) -> IO ()
withLocalTracer :: LocalNode -> (MxEventBus -> IO ()) -> IO ()
withLocalTracer LocalNode
node MxEventBus -> IO ()
act = MxEventBus -> IO ()
act (LocalNode -> MxEventBus
localEventBus LocalNode
node)
nodeController :: NC ()
nodeController :: NC ()
nodeController = do
LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
NC () -> NC ()
forall (m :: * -> *) a b. Monad m => m a -> m b
forever' (NC () -> NC ()) -> NC () -> NC ()
forall a b. (a -> b) -> a -> b
$ do
NCMsg
msg <- IO NCMsg -> NC NCMsg
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO NCMsg -> NC NCMsg) -> IO NCMsg -> NC NCMsg
forall a b. (a -> b) -> a -> b
$ Chan NCMsg -> IO NCMsg
forall a. Chan a -> IO a
readChan (LocalNode -> Chan NCMsg
localCtrlChan LocalNode
node)
case ProcessSignal -> Maybe NodeId
destNid (NCMsg -> ProcessSignal
ctrlMsgSignal NCMsg
msg) of
Just NodeId
nid' | NodeId
nid' NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
/= LocalNode -> NodeId
localNodeId LocalNode
node ->
NodeId -> NCMsg -> NC ()
ncSendToNode NodeId
nid' NCMsg
msg
Maybe NodeId
_ ->
() -> NC ()
forall a. a -> NC a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
case NCMsg
msg of
NCMsg (ProcessIdentifier ProcessId
from) (Link Identifier
them) ->
ProcessId -> Identifier -> Maybe MonitorRef -> NC ()
ncEffectMonitor ProcessId
from Identifier
them Maybe MonitorRef
forall a. Maybe a
Nothing
NCMsg (ProcessIdentifier ProcessId
from) (Monitor MonitorRef
ref) ->
ProcessId -> Identifier -> Maybe MonitorRef -> NC ()
ncEffectMonitor ProcessId
from (MonitorRef -> Identifier
monitorRefIdent MonitorRef
ref) (MonitorRef -> Maybe MonitorRef
forall a. a -> Maybe a
Just MonitorRef
ref)
NCMsg (ProcessIdentifier ProcessId
from) (Unlink Identifier
them) ->
ProcessId -> Identifier -> NC ()
ncEffectUnlink ProcessId
from Identifier
them
NCMsg (ProcessIdentifier ProcessId
from) (Unmonitor MonitorRef
ref) ->
ProcessId -> MonitorRef -> NC ()
ncEffectUnmonitor ProcessId
from MonitorRef
ref
NCMsg Identifier
_from (Died Identifier
ident DiedReason
reason) ->
Identifier -> DiedReason -> NC ()
ncEffectDied Identifier
ident DiedReason
reason
NCMsg (ProcessIdentifier ProcessId
from) (Spawn Closure (Process ())
proc SpawnRef
ref) ->
ProcessId -> Closure (Process ()) -> SpawnRef -> NC ()
ncEffectSpawn ProcessId
from Closure (Process ())
proc SpawnRef
ref
NCMsg (ProcessIdentifier ProcessId
from) (Register String
label NodeId
atnode Maybe ProcessId
pid Bool
force) ->
ProcessId -> String -> NodeId -> Maybe ProcessId -> Bool -> NC ()
ncEffectRegister ProcessId
from String
label NodeId
atnode Maybe ProcessId
pid Bool
force
NCMsg (ProcessIdentifier ProcessId
from) (WhereIs String
label) ->
ProcessId -> String -> NC ()
ncEffectWhereIs ProcessId
from String
label
NCMsg Identifier
_ (NamedSend String
label Message
msg') ->
String -> Message -> NC ()
ncEffectNamedSend String
label Message
msg'
NCMsg Identifier
_ (UnreliableSend LocalProcessId
lpid Message
msg') ->
LocalNode -> ProcessId -> Message -> NC ()
ncEffectLocalSend LocalNode
node (NodeId -> LocalProcessId -> ProcessId
ProcessId (LocalNode -> NodeId
localNodeId LocalNode
node) LocalProcessId
lpid) Message
msg'
NCMsg Identifier
_ (LocalSend ProcessId
to Message
msg') ->
LocalNode -> ProcessId -> Message -> NC ()
ncEffectLocalSend LocalNode
node ProcessId
to Message
msg'
NCMsg Identifier
_ (LocalPortSend SendPortId
to Message
msg') ->
SendPortId -> Message -> NC ()
ncEffectLocalPortSend SendPortId
to Message
msg'
NCMsg (ProcessIdentifier ProcessId
from) (Kill ProcessId
to String
reason) ->
ProcessId -> ProcessId -> String -> NC ()
ncEffectKill ProcessId
from ProcessId
to String
reason
NCMsg (ProcessIdentifier ProcessId
from) (Exit ProcessId
to Message
reason) ->
ProcessId -> ProcessId -> Message -> NC ()
ncEffectExit ProcessId
from ProcessId
to Message
reason
NCMsg (ProcessIdentifier ProcessId
from) (GetInfo ProcessId
pid) ->
ProcessId -> ProcessId -> NC ()
ncEffectGetInfo ProcessId
from ProcessId
pid
NCMsg Identifier
_ ProcessSignal
SigShutdown ->
IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ do
EndPoint -> IO ()
NT.closeEndPoint (LocalNode -> EndPoint
localEndPoint LocalNode
node)
IO () -> IO Any -> IO ()
forall a b. IO a -> IO b -> IO a
`Exception.finally` NodeClosedException -> IO Any
forall e a. Exception e => e -> IO a
throwIO (NodeId -> NodeClosedException
NodeClosedException (NodeId -> NodeClosedException) -> NodeId -> NodeClosedException
forall a b. (a -> b) -> a -> b
$ LocalNode -> NodeId
localNodeId LocalNode
node)
NCMsg (ProcessIdentifier ProcessId
from) (GetNodeStats NodeId
nid) ->
ProcessId -> NodeId -> NC ()
ncEffectGetNodeStats ProcessId
from NodeId
nid
NCMsg
unexpected ->
String -> NC ()
forall a. HasCallStack => String -> a
error (String -> NC ()) -> String -> NC ()
forall a b. (a -> b) -> a -> b
$ String
"nodeController: unexpected message " String -> String -> String
forall a. [a] -> [a] -> [a]
++ NCMsg -> String
forall a. Show a => a -> String
show NCMsg
unexpected
ncEffectMonitor :: ProcessId
-> Identifier
-> Maybe MonitorRef
-> NC ()
ncEffectMonitor :: ProcessId -> Identifier -> Maybe MonitorRef -> NC ()
ncEffectMonitor ProcessId
from Identifier
them Maybe MonitorRef
mRef = do
LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
Bool
shouldLink <-
if Bool -> Bool
not (LocalNode -> Identifier -> Bool
isLocal LocalNode
node Identifier
them)
then Bool -> NC Bool
forall a. a -> NC a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
else Identifier -> NC Bool
isValidLocalIdentifier Identifier
them
case (Bool
shouldLink, LocalNode -> Identifier -> Bool
isLocal LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
from)) of
(Bool
True, Bool
_) ->
case Maybe MonitorRef
mRef of
Just MonitorRef
ref -> (NCState -> NCState) -> NC ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((NCState -> NCState) -> NC ()) -> (NCState -> NCState) -> NC ()
forall a b. (a -> b) -> a -> b
$ Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
monitors Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
-> (BiMultiMap Identifier ProcessId MonitorRef
-> BiMultiMap Identifier ProcessId MonitorRef)
-> NCState
-> NCState
forall r a. T r a -> (a -> a) -> r -> r
^: Identifier
-> ProcessId
-> MonitorRef
-> BiMultiMap Identifier ProcessId MonitorRef
-> BiMultiMap Identifier ProcessId MonitorRef
forall a b v.
(Ord a, Ord b, Ord v) =>
a -> b -> v -> BiMultiMap a b v -> BiMultiMap a b v
BiMultiMap.insert Identifier
them ProcessId
from MonitorRef
ref
Maybe MonitorRef
Nothing -> (NCState -> NCState) -> NC ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((NCState -> NCState) -> NC ()) -> (NCState -> NCState) -> NC ()
forall a b. (a -> b) -> a -> b
$ Accessor NCState (BiMultiMap Identifier ProcessId ())
links Accessor NCState (BiMultiMap Identifier ProcessId ())
-> (BiMultiMap Identifier ProcessId ()
-> BiMultiMap Identifier ProcessId ())
-> NCState
-> NCState
forall r a. T r a -> (a -> a) -> r -> r
^: Identifier
-> ProcessId
-> ()
-> BiMultiMap Identifier ProcessId ()
-> BiMultiMap Identifier ProcessId ()
forall a b v.
(Ord a, Ord b, Ord v) =>
a -> b -> v -> BiMultiMap a b v -> BiMultiMap a b v
BiMultiMap.insert Identifier
them ProcessId
from ()
(Bool
False, Bool
True) ->
ProcessId -> Identifier -> DiedReason -> Maybe MonitorRef -> NC ()
notifyDied ProcessId
from Identifier
them DiedReason
DiedUnknownId Maybe MonitorRef
mRef
(Bool
False, Bool
False) ->
NodeId -> NCMsg -> NC ()
ncSendToNode (ProcessId -> NodeId
processNodeId ProcessId
from) (NCMsg -> NC ()) -> NCMsg -> NC ()
forall a b. (a -> b) -> a -> b
$ NCMsg
{ ctrlMsgSender :: Identifier
ctrlMsgSender = NodeId -> Identifier
NodeIdentifier (LocalNode -> NodeId
localNodeId LocalNode
node)
, ctrlMsgSignal :: ProcessSignal
ctrlMsgSignal = Identifier -> DiedReason -> ProcessSignal
Died Identifier
them DiedReason
DiedUnknownId
}
ncEffectUnlink :: ProcessId -> Identifier -> NC ()
ncEffectUnlink :: ProcessId -> Identifier -> NC ()
ncEffectUnlink ProcessId
from Identifier
them = do
LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
Bool -> NC () -> NC ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (LocalNode -> Identifier -> Bool
isLocal LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
from)) (NC () -> NC ()) -> NC () -> NC ()
forall a b. (a -> b) -> a -> b
$
case Identifier
them of
ProcessIdentifier ProcessId
pid ->
ProcessId -> DidUnlinkProcess -> NC ()
forall a. Serializable a => ProcessId -> a -> NC ()
postAsMessage ProcessId
from (DidUnlinkProcess -> NC ()) -> DidUnlinkProcess -> NC ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> DidUnlinkProcess
DidUnlinkProcess ProcessId
pid
NodeIdentifier NodeId
nid ->
ProcessId -> DidUnlinkNode -> NC ()
forall a. Serializable a => ProcessId -> a -> NC ()
postAsMessage ProcessId
from (DidUnlinkNode -> NC ()) -> DidUnlinkNode -> NC ()
forall a b. (a -> b) -> a -> b
$ NodeId -> DidUnlinkNode
DidUnlinkNode NodeId
nid
SendPortIdentifier SendPortId
cid ->
ProcessId -> DidUnlinkPort -> NC ()
forall a. Serializable a => ProcessId -> a -> NC ()
postAsMessage ProcessId
from (DidUnlinkPort -> NC ()) -> DidUnlinkPort -> NC ()
forall a b. (a -> b) -> a -> b
$ SendPortId -> DidUnlinkPort
DidUnlinkPort SendPortId
cid
(NCState -> NCState) -> NC ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((NCState -> NCState) -> NC ()) -> (NCState -> NCState) -> NC ()
forall a b. (a -> b) -> a -> b
$ Accessor NCState (BiMultiMap Identifier ProcessId ())
links Accessor NCState (BiMultiMap Identifier ProcessId ())
-> (BiMultiMap Identifier ProcessId ()
-> BiMultiMap Identifier ProcessId ())
-> NCState
-> NCState
forall r a. T r a -> (a -> a) -> r -> r
^: Identifier
-> ProcessId
-> ()
-> BiMultiMap Identifier ProcessId ()
-> BiMultiMap Identifier ProcessId ()
forall a b v.
(Ord a, Ord b, Ord v) =>
a -> b -> v -> BiMultiMap a b v -> BiMultiMap a b v
BiMultiMap.delete Identifier
them ProcessId
from ()
ncEffectUnmonitor :: ProcessId -> MonitorRef -> NC ()
ncEffectUnmonitor :: ProcessId -> MonitorRef -> NC ()
ncEffectUnmonitor ProcessId
from MonitorRef
ref = do
LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
Bool -> NC () -> NC ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (LocalNode -> Identifier -> Bool
isLocal LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
from)) (NC () -> NC ()) -> NC () -> NC ()
forall a b. (a -> b) -> a -> b
$
ProcessId -> DidUnmonitor -> NC ()
forall a. Serializable a => ProcessId -> a -> NC ()
postAsMessage ProcessId
from (DidUnmonitor -> NC ()) -> DidUnmonitor -> NC ()
forall a b. (a -> b) -> a -> b
$ MonitorRef -> DidUnmonitor
DidUnmonitor MonitorRef
ref
(NCState -> NCState) -> NC ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((NCState -> NCState) -> NC ()) -> (NCState -> NCState) -> NC ()
forall a b. (a -> b) -> a -> b
$ Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
monitors Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
-> (BiMultiMap Identifier ProcessId MonitorRef
-> BiMultiMap Identifier ProcessId MonitorRef)
-> NCState
-> NCState
forall r a. T r a -> (a -> a) -> r -> r
^: Identifier
-> ProcessId
-> MonitorRef
-> BiMultiMap Identifier ProcessId MonitorRef
-> BiMultiMap Identifier ProcessId MonitorRef
forall a b v.
(Ord a, Ord b, Ord v) =>
a -> b -> v -> BiMultiMap a b v -> BiMultiMap a b v
BiMultiMap.delete (MonitorRef -> Identifier
monitorRefIdent MonitorRef
ref) ProcessId
from MonitorRef
ref
ncEffectDied :: Identifier -> DiedReason -> NC ()
ncEffectDied :: Identifier -> DiedReason -> NC ()
ncEffectDied Identifier
ident DiedReason
reason = do
LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
LocalNode -> Identifier -> DiedReason -> NC ()
traceNotifyDied LocalNode
node Identifier
ident DiedReason
reason
(Map Identifier (Set (ProcessId, ()))
affectedLinks, BiMultiMap Identifier ProcessId ()
unaffectedLinks) <- (NCState
-> (Map Identifier (Set (ProcessId, ())),
BiMultiMap Identifier ProcessId ()))
-> NC
(Map Identifier (Set (ProcessId, ())),
BiMultiMap Identifier ProcessId ())
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (Identifier
-> BiMultiMap Identifier ProcessId ()
-> (Map Identifier (Set (ProcessId, ())),
BiMultiMap Identifier ProcessId ())
forall a v.
(Ord a, Ord v) =>
Identifier
-> BiMultiMap Identifier a v
-> (Map Identifier (Set (a, v)), BiMultiMap Identifier a v)
splitNotif Identifier
ident (BiMultiMap Identifier ProcessId ()
-> (Map Identifier (Set (ProcessId, ())),
BiMultiMap Identifier ProcessId ()))
-> (NCState -> BiMultiMap Identifier ProcessId ())
-> NCState
-> (Map Identifier (Set (ProcessId, ())),
BiMultiMap Identifier ProcessId ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (NCState
-> Accessor NCState (BiMultiMap Identifier ProcessId ())
-> BiMultiMap Identifier ProcessId ()
forall r a. r -> T r a -> a
^. Accessor NCState (BiMultiMap Identifier ProcessId ())
links))
(Map Identifier (Set (ProcessId, MonitorRef))
affectedMons, BiMultiMap Identifier ProcessId MonitorRef
unaffectedMons) <- (NCState
-> (Map Identifier (Set (ProcessId, MonitorRef)),
BiMultiMap Identifier ProcessId MonitorRef))
-> NC
(Map Identifier (Set (ProcessId, MonitorRef)),
BiMultiMap Identifier ProcessId MonitorRef)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (Identifier
-> BiMultiMap Identifier ProcessId MonitorRef
-> (Map Identifier (Set (ProcessId, MonitorRef)),
BiMultiMap Identifier ProcessId MonitorRef)
forall a v.
(Ord a, Ord v) =>
Identifier
-> BiMultiMap Identifier a v
-> (Map Identifier (Set (a, v)), BiMultiMap Identifier a v)
splitNotif Identifier
ident (BiMultiMap Identifier ProcessId MonitorRef
-> (Map Identifier (Set (ProcessId, MonitorRef)),
BiMultiMap Identifier ProcessId MonitorRef))
-> (NCState -> BiMultiMap Identifier ProcessId MonitorRef)
-> NCState
-> (Map Identifier (Set (ProcessId, MonitorRef)),
BiMultiMap Identifier ProcessId MonitorRef)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (NCState
-> Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
-> BiMultiMap Identifier ProcessId MonitorRef
forall r a. r -> T r a -> a
^. Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
monitors))
let localOnly :: Bool
localOnly = case Identifier
ident of NodeIdentifier NodeId
_ -> Bool
True ; Identifier
_ -> Bool
False
[(Identifier, Set (ProcessId, ()))]
-> ((Identifier, Set (ProcessId, ())) -> NC ()) -> NC ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map Identifier (Set (ProcessId, ()))
-> [(Identifier, Set (ProcessId, ()))]
forall k a. Map k a -> [(k, a)]
Map.toList Map Identifier (Set (ProcessId, ()))
affectedLinks) (((Identifier, Set (ProcessId, ())) -> NC ()) -> NC ())
-> ((Identifier, Set (ProcessId, ())) -> NC ()) -> NC ()
forall a b. (a -> b) -> a -> b
$ \(Identifier
them, Set (ProcessId, ())
uss) ->
Set (ProcessId, ()) -> ((ProcessId, ()) -> NC ()) -> NC ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Set (ProcessId, ())
uss (((ProcessId, ()) -> NC ()) -> NC ())
-> ((ProcessId, ()) -> NC ()) -> NC ()
forall a b. (a -> b) -> a -> b
$ \(ProcessId
us, ()
_) ->
Bool -> NC () -> NC ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
localOnly Bool -> Bool -> Bool
forall a. Ord a => a -> a -> Bool
<= LocalNode -> Identifier -> Bool
isLocal LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
us)) (NC () -> NC ()) -> NC () -> NC ()
forall a b. (a -> b) -> a -> b
$
ProcessId -> Identifier -> DiedReason -> Maybe MonitorRef -> NC ()
notifyDied ProcessId
us Identifier
them DiedReason
reason Maybe MonitorRef
forall a. Maybe a
Nothing
[(Identifier, Set (ProcessId, MonitorRef))]
-> ((Identifier, Set (ProcessId, MonitorRef)) -> NC ()) -> NC ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map Identifier (Set (ProcessId, MonitorRef))
-> [(Identifier, Set (ProcessId, MonitorRef))]
forall k a. Map k a -> [(k, a)]
Map.toList Map Identifier (Set (ProcessId, MonitorRef))
affectedMons) (((Identifier, Set (ProcessId, MonitorRef)) -> NC ()) -> NC ())
-> ((Identifier, Set (ProcessId, MonitorRef)) -> NC ()) -> NC ()
forall a b. (a -> b) -> a -> b
$ \(Identifier
them, Set (ProcessId, MonitorRef)
refs) ->
Set (ProcessId, MonitorRef)
-> ((ProcessId, MonitorRef) -> NC ()) -> NC ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Set (ProcessId, MonitorRef)
refs (((ProcessId, MonitorRef) -> NC ()) -> NC ())
-> ((ProcessId, MonitorRef) -> NC ()) -> NC ()
forall a b. (a -> b) -> a -> b
$ \(ProcessId
us, MonitorRef
ref) ->
Bool -> NC () -> NC ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
localOnly Bool -> Bool -> Bool
forall a. Ord a => a -> a -> Bool
<= LocalNode -> Identifier -> Bool
isLocal LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
us)) (NC () -> NC ()) -> NC () -> NC ()
forall a b. (a -> b) -> a -> b
$
ProcessId -> Identifier -> DiedReason -> Maybe MonitorRef -> NC ()
notifyDied ProcessId
us Identifier
them DiedReason
reason (MonitorRef -> Maybe MonitorRef
forall a. a -> Maybe a
Just MonitorRef
ref)
(NodeId -> NC ()) -> [NodeId] -> NC ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (LocalNode -> NodeId -> NC ()
forwardDeath LocalNode
node) ([NodeId] -> NC ()) -> [NodeId] -> NC ()
forall a b. (a -> b) -> a -> b
$
[ NodeId
nid | ProcessIdentifier ProcessId
pid <- [Identifier
ident]
, Identifier
i <- Set Identifier -> [Identifier]
forall a. Set a -> [a]
Set.toList (Set Identifier -> [Identifier]) -> Set Identifier -> [Identifier]
forall a b. (a -> b) -> a -> b
$ Set Identifier -> Set Identifier -> Set Identifier
forall a. Ord a => Set a -> Set a -> Set a
Set.union
(((Identifier, ()) -> Identifier)
-> Set (Identifier, ()) -> Set Identifier
forall b a. Ord b => (a -> b) -> Set a -> Set b
Set.map (Identifier, ()) -> Identifier
forall a b. (a, b) -> a
fst (Set (Identifier, ()) -> Set Identifier)
-> Set (Identifier, ()) -> Set Identifier
forall a b. (a -> b) -> a -> b
$ ProcessId
-> BiMultiMap Identifier ProcessId () -> Set (Identifier, ())
forall b a v. Ord b => b -> BiMultiMap a b v -> Set (a, v)
BiMultiMap.lookupBy2nd ProcessId
pid BiMultiMap Identifier ProcessId ()
unaffectedLinks)
(((Identifier, MonitorRef) -> Identifier)
-> Set (Identifier, MonitorRef) -> Set Identifier
forall b a. Ord b => (a -> b) -> Set a -> Set b
Set.map (Identifier, MonitorRef) -> Identifier
forall a b. (a, b) -> a
fst (Set (Identifier, MonitorRef) -> Set Identifier)
-> Set (Identifier, MonitorRef) -> Set Identifier
forall a b. (a -> b) -> a -> b
$ ProcessId
-> BiMultiMap Identifier ProcessId MonitorRef
-> Set (Identifier, MonitorRef)
forall b a v. Ord b => b -> BiMultiMap a b v -> Set (a, v)
BiMultiMap.lookupBy2nd ProcessId
pid BiMultiMap Identifier ProcessId MonitorRef
unaffectedMons)
, let nid :: NodeId
nid = Identifier -> NodeId
nodeOf Identifier
i
, NodeId
nid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
/= LocalNode -> NodeId
localNodeId LocalNode
node
]
let deleteDeads :: (Ord a, Ord v)
=> BiMultiMap a ProcessId v -> BiMultiMap a ProcessId v
deleteDeads :: forall a v.
(Ord a, Ord v) =>
BiMultiMap a ProcessId v -> BiMultiMap a ProcessId v
deleteDeads = case Identifier
ident of
ProcessIdentifier ProcessId
pid -> ProcessId -> BiMultiMap a ProcessId v -> BiMultiMap a ProcessId v
forall a b v.
(Ord a, Ord b, Ord v) =>
b -> BiMultiMap a b v -> BiMultiMap a b v
BiMultiMap.deleteAllBy2nd ProcessId
pid
Identifier
_ -> (Map ProcessId (Set (a, v)), BiMultiMap a ProcessId v)
-> BiMultiMap a ProcessId v
forall a b. (a, b) -> b
snd ((Map ProcessId (Set (a, v)), BiMultiMap a ProcessId v)
-> BiMultiMap a ProcessId v)
-> (BiMultiMap a ProcessId v
-> (Map ProcessId (Set (a, v)), BiMultiMap a ProcessId v))
-> BiMultiMap a ProcessId v
-> BiMultiMap a ProcessId v
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ProcessId -> Set (a, v) -> Bool)
-> BiMultiMap a ProcessId v
-> (Map ProcessId (Set (a, v)), BiMultiMap a ProcessId v)
forall a b v.
(Ord a, Ord b, Ord v) =>
(b -> Set (a, v) -> Bool)
-> BiMultiMap a b v -> (Map b (Set (a, v)), BiMultiMap a b v)
BiMultiMap.partitionWithKeyBy2nd
(\ProcessId
pid Set (a, v)
_ -> Identifier
ident Identifier -> Identifier -> Bool
`impliesDeathOf` ProcessId -> Identifier
ProcessIdentifier ProcessId
pid)
unaffectedLinks' :: BiMultiMap Identifier ProcessId ()
unaffectedLinks' = BiMultiMap Identifier ProcessId ()
-> BiMultiMap Identifier ProcessId ()
forall a v.
(Ord a, Ord v) =>
BiMultiMap a ProcessId v -> BiMultiMap a ProcessId v
deleteDeads BiMultiMap Identifier ProcessId ()
unaffectedLinks
unaffectedMons' :: BiMultiMap Identifier ProcessId MonitorRef
unaffectedMons' = BiMultiMap Identifier ProcessId MonitorRef
-> BiMultiMap Identifier ProcessId MonitorRef
forall a v.
(Ord a, Ord v) =>
BiMultiMap a ProcessId v -> BiMultiMap a ProcessId v
deleteDeads BiMultiMap Identifier ProcessId MonitorRef
unaffectedMons
(NCState -> NCState) -> NC ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((NCState -> NCState) -> NC ()) -> (NCState -> NCState) -> NC ()
forall a b. (a -> b) -> a -> b
$ (Accessor NCState (BiMultiMap Identifier ProcessId ())
links Accessor NCState (BiMultiMap Identifier ProcessId ())
-> BiMultiMap Identifier ProcessId () -> NCState -> NCState
forall r a. T r a -> a -> r -> r
^= BiMultiMap Identifier ProcessId ()
unaffectedLinks') (NCState -> NCState) -> (NCState -> NCState) -> NCState -> NCState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
monitors Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
-> BiMultiMap Identifier ProcessId MonitorRef -> NCState -> NCState
forall r a. T r a -> a -> r -> r
^= BiMultiMap Identifier ProcessId MonitorRef
unaffectedMons')
let toDrop :: ProcessId -> Bool
toDrop ProcessId
pid = Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Identifier
ident Identifier -> Identifier -> Bool
`impliesDeathOf` ProcessId -> Identifier
ProcessIdentifier ProcessId
pid
(Map String ProcessId
keepNames, Map String ProcessId
dropNames) <- (ProcessId -> Bool)
-> Map String ProcessId
-> (Map String ProcessId, Map String ProcessId)
forall a k. (a -> Bool) -> Map k a -> (Map k a, Map k a)
Map.partition ProcessId -> Bool
toDrop (Map String ProcessId
-> (Map String ProcessId, Map String ProcessId))
-> NC (Map String ProcessId)
-> NC (Map String ProcessId, Map String ProcessId)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (NCState -> Map String ProcessId) -> NC (Map String ProcessId)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (NCState -> T NCState (Map String ProcessId) -> Map String ProcessId
forall r a. r -> T r a -> a
^. T NCState (Map String ProcessId)
registeredHere)
((String, ProcessId) -> NC ()) -> [(String, ProcessId)] -> NC ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(String
p, ProcessId
l) -> IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> MxEvent -> IO ()
trace LocalNode
node (ProcessId -> String -> MxEvent
MxUnRegistered ProcessId
l String
p)) (Map String ProcessId -> [(String, ProcessId)]
forall k a. Map k a -> [(k, a)]
Map.toList Map String ProcessId
dropNames)
(NCState -> NCState) -> NC ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((NCState -> NCState) -> NC ()) -> (NCState -> NCState) -> NC ()
forall a b. (a -> b) -> a -> b
$ T NCState (Map String ProcessId)
registeredHere T NCState (Map String ProcessId)
-> Map String ProcessId -> NCState -> NCState
forall r a. T r a -> a -> r -> r
^= Map String ProcessId
keepNames
[Maybe (ProcessId, [(NodeId, Int)])]
remaining <- (Map ProcessId [(NodeId, Int)] -> [(ProcessId, [(NodeId, Int)])])
-> NC (Map ProcessId [(NodeId, Int)])
-> NC [(ProcessId, [(NodeId, Int)])]
forall a b. (a -> b) -> NC a -> NC b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Map ProcessId [(NodeId, Int)] -> [(ProcessId, [(NodeId, Int)])]
forall k a. Map k a -> [(k, a)]
Map.toList ((NCState -> Map ProcessId [(NodeId, Int)])
-> NC (Map ProcessId [(NodeId, Int)])
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (NCState
-> T NCState (Map ProcessId [(NodeId, Int)])
-> Map ProcessId [(NodeId, Int)]
forall r a. r -> T r a -> a
^. T NCState (Map ProcessId [(NodeId, Int)])
registeredOnNodes)) NC [(ProcessId, [(NodeId, Int)])]
-> ([(ProcessId, [(NodeId, Int)])]
-> NC [Maybe (ProcessId, [(NodeId, Int)])])
-> NC [Maybe (ProcessId, [(NodeId, Int)])]
forall a b. NC a -> (a -> NC b) -> NC b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
((ProcessId, [(NodeId, Int)])
-> NC (Maybe (ProcessId, [(NodeId, Int)])))
-> [(ProcessId, [(NodeId, Int)])]
-> NC [Maybe (ProcessId, [(NodeId, Int)])]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (\(ProcessId
pid,[(NodeId, Int)]
nidlist) ->
case Identifier
ident Identifier -> Identifier -> Bool
`impliesDeathOf` ProcessId -> Identifier
ProcessIdentifier ProcessId
pid of
Bool
True ->
do [(NodeId, Int)] -> ((NodeId, Int) -> NC ()) -> NC ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(NodeId, Int)]
nidlist (((NodeId, Int) -> NC ()) -> NC ())
-> ((NodeId, Int) -> NC ()) -> NC ()
forall a b. (a -> b) -> a -> b
$ \(NodeId
nid,Int
_) ->
Bool -> NC () -> NC ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ LocalNode -> Identifier -> Bool
isLocal LocalNode
node (NodeId -> Identifier
NodeIdentifier NodeId
nid))
(LocalNode -> NodeId -> NC ()
forwardDeath LocalNode
node NodeId
nid)
Maybe (ProcessId, [(NodeId, Int)])
-> NC (Maybe (ProcessId, [(NodeId, Int)]))
forall a. a -> NC a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (ProcessId, [(NodeId, Int)])
forall a. Maybe a
Nothing
Bool
False -> Maybe (ProcessId, [(NodeId, Int)])
-> NC (Maybe (ProcessId, [(NodeId, Int)]))
forall a. a -> NC a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (ProcessId, [(NodeId, Int)])
-> NC (Maybe (ProcessId, [(NodeId, Int)])))
-> Maybe (ProcessId, [(NodeId, Int)])
-> NC (Maybe (ProcessId, [(NodeId, Int)]))
forall a b. (a -> b) -> a -> b
$ (ProcessId, [(NodeId, Int)]) -> Maybe (ProcessId, [(NodeId, Int)])
forall a. a -> Maybe a
Just (ProcessId
pid,[(NodeId, Int)]
nidlist) )
(NCState -> NCState) -> NC ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((NCState -> NCState) -> NC ()) -> (NCState -> NCState) -> NC ()
forall a b. (a -> b) -> a -> b
$ T NCState (Map ProcessId [(NodeId, Int)])
registeredOnNodes T NCState (Map ProcessId [(NodeId, Int)])
-> Map ProcessId [(NodeId, Int)] -> NCState -> NCState
forall r a. T r a -> a -> r -> r
^= ([(ProcessId, [(NodeId, Int)])] -> Map ProcessId [(NodeId, Int)]
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([Maybe (ProcessId, [(NodeId, Int)])]
-> [(ProcessId, [(NodeId, Int)])]
forall a. [Maybe a] -> [a]
catMaybes [Maybe (ProcessId, [(NodeId, Int)])]
remaining))
where
forwardDeath :: LocalNode -> NodeId -> NC ()
forwardDeath LocalNode
node NodeId
nid = NodeId -> NCMsg -> NC ()
ncSendToNode NodeId
nid
NCMsg { ctrlMsgSender :: Identifier
ctrlMsgSender = NodeId -> Identifier
NodeIdentifier (LocalNode -> NodeId
localNodeId LocalNode
node)
, ctrlMsgSignal :: ProcessSignal
ctrlMsgSignal = Identifier -> DiedReason -> ProcessSignal
Died Identifier
ident DiedReason
reason
}
ncEffectSpawn :: ProcessId -> Closure (Process ()) -> SpawnRef -> NC ()
ncEffectSpawn :: ProcessId -> Closure (Process ()) -> SpawnRef -> NC ()
ncEffectSpawn ProcessId
pid Closure (Process ())
cProc SpawnRef
ref = do
Either String (Process ())
mProc <- Closure (Process ()) -> NC (Either String (Process ()))
forall a. Typeable a => Closure a -> NC (Either String a)
unClosure Closure (Process ())
cProc
let proc :: Process ()
proc = case Either String (Process ())
mProc of
Left String
err -> String -> Process ()
forall a. String -> Process a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Error: Could not resolve closure: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
err
Right Process ()
p -> Process ()
p
LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
ProcessId
pid' <- IO ProcessId -> NC ProcessId
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ProcessId -> NC ProcessId) -> IO ProcessId -> NC ProcessId
forall a b. (a -> b) -> a -> b
$ LocalNode -> Process () -> IO ProcessId
forkProcess LocalNode
node Process ()
proc
ProcessId -> Message -> NC ()
ncSendToProcess ProcessId
pid (Message -> NC ()) -> Message -> NC ()
forall a b. (a -> b) -> a -> b
$ DidSpawn -> Message
forall a. Serializable a => a -> Message
unsafeCreateUnencodedMessage (DidSpawn -> Message) -> DidSpawn -> Message
forall a b. (a -> b) -> a -> b
$ SpawnRef -> ProcessId -> DidSpawn
DidSpawn SpawnRef
ref ProcessId
pid'
ncEffectRegister :: ProcessId -> String -> NodeId -> Maybe ProcessId -> Bool -> NC ()
ncEffectRegister :: ProcessId -> String -> NodeId -> Maybe ProcessId -> Bool -> NC ()
ncEffectRegister ProcessId
from String
label NodeId
atnode Maybe ProcessId
mPid Bool
reregistration = do
LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
Maybe ProcessId
currentVal <- (NCState -> Maybe ProcessId) -> NC (Maybe ProcessId)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (NCState -> T NCState (Maybe ProcessId) -> Maybe ProcessId
forall r a. r -> T r a -> a
^. String -> T NCState (Maybe ProcessId)
registeredHereFor String
label)
Bool
isOk <-
case Maybe ProcessId
mPid of
Maybe ProcessId
Nothing ->
Bool -> NC Bool
forall a. a -> NC a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> NC Bool) -> Bool -> NC Bool
forall a b. (a -> b) -> a -> b
$ Maybe ProcessId -> Bool
forall a. Maybe a -> Bool
isJust Maybe ProcessId
currentVal
Just ProcessId
thepid ->
do Bool
isvalidlocal <- Identifier -> NC Bool
isValidLocalIdentifier (ProcessId -> Identifier
ProcessIdentifier ProcessId
thepid)
Bool -> NC Bool
forall a. a -> NC a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> NC Bool) -> Bool -> NC Bool
forall a b. (a -> b) -> a -> b
$ (Maybe ProcessId -> Bool
forall a. Maybe a -> Bool
isNothing Maybe ProcessId
currentVal Bool -> Bool -> Bool
forall a. Eq a => a -> a -> Bool
/= Bool
reregistration) Bool -> Bool -> Bool
&&
(Bool -> Bool
not (LocalNode -> Identifier -> Bool
isLocal LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
thepid) ) Bool -> Bool -> Bool
|| Bool
isvalidlocal )
if LocalNode -> Identifier -> Bool
isLocal LocalNode
node (NodeId -> Identifier
NodeIdentifier NodeId
atnode)
then do Bool -> NC () -> NC ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
isOk (NC () -> NC ()) -> NC () -> NC ()
forall a b. (a -> b) -> a -> b
$
do (NCState -> NCState) -> NC ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((NCState -> NCState) -> NC ()) -> (NCState -> NCState) -> NC ()
forall a b. (a -> b) -> a -> b
$ String -> T NCState (Maybe ProcessId)
registeredHereFor String
label T NCState (Maybe ProcessId)
-> Maybe ProcessId -> NCState -> NCState
forall r a. T r a -> a -> r -> r
^= Maybe ProcessId
mPid
LocalNode -> Maybe ProcessId -> Maybe ProcessId -> NC ()
updateRemote LocalNode
node Maybe ProcessId
currentVal Maybe ProcessId
mPid
case Maybe ProcessId
mPid of
(Just ProcessId
p) -> do
if Bool
reregistration
then IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> MxEvent -> IO ()
trace LocalNode
node (ProcessId -> String -> MxEvent
MxUnRegistered (Maybe ProcessId -> ProcessId
forall a. HasCallStack => Maybe a -> a
fromJust Maybe ProcessId
currentVal) String
label)
else () -> NC ()
forall a. a -> NC a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> MxEvent -> IO ()
trace LocalNode
node (ProcessId -> String -> MxEvent
MxRegistered ProcessId
p String
label)
Maybe ProcessId
Nothing -> IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> MxEvent -> IO ()
trace LocalNode
node (ProcessId -> String -> MxEvent
MxUnRegistered (Maybe ProcessId -> ProcessId
forall a. HasCallStack => Maybe a -> a
fromJust Maybe ProcessId
currentVal) String
label)
Maybe ProcessId
newVal <- (NCState -> Maybe ProcessId) -> NC (Maybe ProcessId)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (NCState -> T NCState (Maybe ProcessId) -> Maybe ProcessId
forall r a. r -> T r a -> a
^. String -> T NCState (Maybe ProcessId)
registeredHereFor String
label)
ProcessId -> Message -> NC ()
ncSendToProcess ProcessId
from (Message -> NC ()) -> Message -> NC ()
forall a b. (a -> b) -> a -> b
$ RegisterReply -> Message
forall a. Serializable a => a -> Message
unsafeCreateUnencodedMessage (RegisterReply -> Message) -> RegisterReply -> Message
forall a b. (a -> b) -> a -> b
$
String -> Bool -> Maybe ProcessId -> RegisterReply
RegisterReply String
label Bool
isOk Maybe ProcessId
newVal
else let operation :: NodeId -> [(NodeId, Int)] -> [(NodeId, Int)]
operation =
case Bool
reregistration of
Bool
True -> ([(NodeId, Int)] -> NodeId -> [(NodeId, Int)])
-> NodeId -> [(NodeId, Int)] -> [(NodeId, Int)]
forall a b c. (a -> b -> c) -> b -> a -> c
flip [(NodeId, Int)] -> NodeId -> [(NodeId, Int)]
forall {a} {t}. (Num a, Eq a, Eq t) => [(t, a)] -> t -> [(t, a)]
decList
Bool
False -> ([(NodeId, Int)] -> NodeId -> [(NodeId, Int)])
-> NodeId -> [(NodeId, Int)] -> [(NodeId, Int)]
forall a b c. (a -> b -> c) -> b -> a -> c
flip [(NodeId, Int)] -> NodeId -> [(NodeId, Int)]
forall {b} {t}. (Num b, Eq t) => [(t, b)] -> t -> [(t, b)]
incList
in case Maybe ProcessId
mPid of
Maybe ProcessId
Nothing -> () -> NC ()
forall a. a -> NC a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just ProcessId
pid -> (NCState -> NCState) -> NC ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' ((NCState -> NCState) -> NC ()) -> (NCState -> NCState) -> NC ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> Accessor NCState (Maybe [(NodeId, Int)])
registeredOnNodesFor ProcessId
pid Accessor NCState (Maybe [(NodeId, Int)])
-> (Maybe [(NodeId, Int)] -> Maybe [(NodeId, Int)])
-> NCState
-> NCState
forall r a. T r a -> (a -> a) -> r -> r
^: (([(NodeId, Int)] -> [(NodeId, Int)])
-> Maybe [(NodeId, Int)] -> Maybe [(NodeId, Int)]
forall {a} {a}. ([a] -> [a]) -> Maybe [a] -> Maybe [a]
maybeify (([(NodeId, Int)] -> [(NodeId, Int)])
-> Maybe [(NodeId, Int)] -> Maybe [(NodeId, Int)])
-> ([(NodeId, Int)] -> [(NodeId, Int)])
-> Maybe [(NodeId, Int)]
-> Maybe [(NodeId, Int)]
forall a b. (a -> b) -> a -> b
$ NodeId -> [(NodeId, Int)] -> [(NodeId, Int)]
operation NodeId
atnode)
where updateRemote :: LocalNode -> Maybe ProcessId -> Maybe ProcessId -> NC ()
updateRemote LocalNode
node (Just ProcessId
oldval) (Just ProcessId
newval) | ProcessId -> NodeId
processNodeId ProcessId
oldval NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
/= ProcessId -> NodeId
processNodeId ProcessId
newval =
do LocalNode -> NodeId -> ProcessSignal -> NC ()
forward LocalNode
node (ProcessId -> NodeId
processNodeId ProcessId
oldval) (String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
atnode (ProcessId -> Maybe ProcessId
forall a. a -> Maybe a
Just ProcessId
oldval) Bool
True)
LocalNode -> NodeId -> ProcessSignal -> NC ()
forward LocalNode
node (ProcessId -> NodeId
processNodeId ProcessId
newval) (String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
atnode (ProcessId -> Maybe ProcessId
forall a. a -> Maybe a
Just ProcessId
newval) Bool
False)
updateRemote LocalNode
node Maybe ProcessId
Nothing (Just ProcessId
newval) =
LocalNode -> NodeId -> ProcessSignal -> NC ()
forward LocalNode
node (ProcessId -> NodeId
processNodeId ProcessId
newval) (String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
atnode (ProcessId -> Maybe ProcessId
forall a. a -> Maybe a
Just ProcessId
newval) Bool
False)
updateRemote LocalNode
node (Just ProcessId
oldval) Maybe ProcessId
Nothing =
LocalNode -> NodeId -> ProcessSignal -> NC ()
forward LocalNode
node (ProcessId -> NodeId
processNodeId ProcessId
oldval) (String -> NodeId -> Maybe ProcessId -> Bool -> ProcessSignal
Register String
label NodeId
atnode (ProcessId -> Maybe ProcessId
forall a. a -> Maybe a
Just ProcessId
oldval) Bool
True)
updateRemote LocalNode
_ Maybe ProcessId
_ Maybe ProcessId
_ = () -> NC ()
forall a. a -> NC a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
maybeify :: ([a] -> [a]) -> Maybe [a] -> Maybe [a]
maybeify [a] -> [a]
f Maybe [a]
Nothing = [a] -> Maybe [a]
forall {a}. [a] -> Maybe [a]
unmaybeify ([a] -> Maybe [a]) -> [a] -> Maybe [a]
forall a b. (a -> b) -> a -> b
$ [a] -> [a]
f []
maybeify [a] -> [a]
f (Just [a]
x) = [a] -> Maybe [a]
forall {a}. [a] -> Maybe [a]
unmaybeify ([a] -> Maybe [a]) -> [a] -> Maybe [a]
forall a b. (a -> b) -> a -> b
$ [a] -> [a]
f [a]
x
unmaybeify :: [a] -> Maybe [a]
unmaybeify [] = Maybe [a]
forall a. Maybe a
Nothing
unmaybeify [a]
x = [a] -> Maybe [a]
forall a. a -> Maybe a
Just [a]
x
incList :: [(t, b)] -> t -> [(t, b)]
incList [] t
tag = [(t
tag,b
1)]
incList ((t
atag,b
acount):[(t, b)]
xs) t
tag | t
tagt -> t -> Bool
forall a. Eq a => a -> a -> Bool
==t
atag = (t
atag,b
acountb -> b -> b
forall a. Num a => a -> a -> a
+b
1) (t, b) -> [(t, b)] -> [(t, b)]
forall a. a -> [a] -> [a]
: [(t, b)]
xs
incList ((t, b)
x:[(t, b)]
xs) t
tag = (t, b)
x (t, b) -> [(t, b)] -> [(t, b)]
forall a. a -> [a] -> [a]
: [(t, b)] -> t -> [(t, b)]
incList [(t, b)]
xs t
tag
decList :: [(t, a)] -> t -> [(t, a)]
decList [] t
_ = []
decList ((t
atag,a
1):[(t, a)]
xs) t
tag | t
atag t -> t -> Bool
forall a. Eq a => a -> a -> Bool
== t
tag = [(t, a)]
xs
decList ((t
atag,a
n):[(t, a)]
xs) t
tag | t
atag t -> t -> Bool
forall a. Eq a => a -> a -> Bool
== t
tag = (t
atag,a
na -> a -> a
forall a. Num a => a -> a -> a
-a
1)(t, a) -> [(t, a)] -> [(t, a)]
forall a. a -> [a] -> [a]
:[(t, a)]
xs
decList ((t, a)
x:[(t, a)]
xs) t
tag = (t, a)
x(t, a) -> [(t, a)] -> [(t, a)]
forall a. a -> [a] -> [a]
:[(t, a)] -> t -> [(t, a)]
decList [(t, a)]
xs t
tag
forward :: LocalNode -> NodeId -> ProcessSignal -> NC ()
forward LocalNode
node NodeId
to ProcessSignal
reg =
Bool -> NC () -> NC ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ LocalNode -> Identifier -> Bool
isLocal LocalNode
node (NodeId -> Identifier
NodeIdentifier NodeId
to)) (NC () -> NC ()) -> NC () -> NC ()
forall a b. (a -> b) -> a -> b
$
NodeId -> NCMsg -> NC ()
ncSendToNode NodeId
to (NCMsg -> NC ()) -> NCMsg -> NC ()
forall a b. (a -> b) -> a -> b
$ NCMsg { ctrlMsgSender :: Identifier
ctrlMsgSender = ProcessId -> Identifier
ProcessIdentifier ProcessId
from
, ctrlMsgSignal :: ProcessSignal
ctrlMsgSignal = ProcessSignal
reg
}
ncEffectWhereIs :: ProcessId -> String -> NC ()
ncEffectWhereIs :: ProcessId -> String -> NC ()
ncEffectWhereIs ProcessId
from String
label = do
Maybe ProcessId
mPid <- (NCState -> Maybe ProcessId) -> NC (Maybe ProcessId)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (NCState -> T NCState (Maybe ProcessId) -> Maybe ProcessId
forall r a. r -> T r a -> a
^. String -> T NCState (Maybe ProcessId)
registeredHereFor String
label)
ProcessId -> Message -> NC ()
ncSendToProcess ProcessId
from (Message -> NC ()) -> Message -> NC ()
forall a b. (a -> b) -> a -> b
$ WhereIsReply -> Message
forall a. Serializable a => a -> Message
unsafeCreateUnencodedMessage (WhereIsReply -> Message) -> WhereIsReply -> Message
forall a b. (a -> b) -> a -> b
$ String -> Maybe ProcessId -> WhereIsReply
WhereIsReply String
label Maybe ProcessId
mPid
ncEffectNamedSend :: String -> Message -> NC ()
ncEffectNamedSend :: String -> Message -> NC ()
ncEffectNamedSend String
label Message
msg = do
Maybe ProcessId
mPid <- (NCState -> Maybe ProcessId) -> NC (Maybe ProcessId)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (NCState -> T NCState (Maybe ProcessId) -> Maybe ProcessId
forall r a. r -> T r a -> a
^. String -> T NCState (Maybe ProcessId)
registeredHereFor String
label)
Maybe ProcessId -> (ProcessId -> NC ()) -> NC ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe ProcessId
mPid ((ProcessId -> NC ()) -> NC ()) -> (ProcessId -> NC ()) -> NC ()
forall a b. (a -> b) -> a -> b
$ \ProcessId
to ->
Bool -> ProcessId -> Message -> NC ()
ncSendToProcessAndTrace (String
label String -> String -> Bool
forall a. Eq a => a -> a -> Bool
/= String
"trace.logger") ProcessId
to Message
msg
ncEffectLocalSend :: LocalNode -> ProcessId -> Message -> NC ()
ncEffectLocalSend :: LocalNode -> ProcessId -> Message -> NC ()
ncEffectLocalSend = Bool -> LocalNode -> ProcessId -> Message -> NC ()
ncEffectLocalSendAndTrace Bool
True
ncEffectLocalSendAndTrace :: Bool -> LocalNode -> ProcessId -> Message -> NC ()
ncEffectLocalSendAndTrace :: Bool -> LocalNode -> ProcessId -> Message -> NC ()
ncEffectLocalSendAndTrace Bool
shouldTrace LocalNode
node ProcessId
to Message
msg =
IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> ProcessId -> (LocalProcess -> IO ()) -> IO ()
withLocalProc LocalNode
node ProcessId
to ((LocalProcess -> IO ()) -> IO ())
-> (LocalProcess -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \LocalProcess
p -> do
CQueue Message -> Message -> IO ()
forall a. CQueue a -> a -> IO ()
enqueue (LocalProcess -> CQueue Message
processQueue LocalProcess
p) Message
msg
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
shouldTrace (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> MxEvent -> IO ()
trace LocalNode
node (ProcessId -> Message -> MxEvent
MxReceived ProcessId
to Message
msg)
ncEffectLocalPortSend :: SendPortId -> Message -> NC ()
ncEffectLocalPortSend :: SendPortId -> Message -> NC ()
ncEffectLocalPortSend SendPortId
from Message
msg = do
LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
let pid :: ProcessId
pid = SendPortId -> ProcessId
sendPortProcessId SendPortId
from
cid :: Int32
cid = SendPortId -> Int32
sendPortLocalId SendPortId
from
IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> ProcessId -> (LocalProcess -> IO ()) -> IO ()
withLocalProc LocalNode
node ProcessId
pid ((LocalProcess -> IO ()) -> IO ())
-> (LocalProcess -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \LocalProcess
proc -> do
Maybe TypedChannel
mChan <- StrictMVar LocalProcessState
-> (LocalProcessState -> IO (Maybe TypedChannel))
-> IO (Maybe TypedChannel)
forall a b. StrictMVar a -> (a -> IO b) -> IO b
withMVar (LocalProcess -> StrictMVar LocalProcessState
processState LocalProcess
proc) ((LocalProcessState -> IO (Maybe TypedChannel))
-> IO (Maybe TypedChannel))
-> (LocalProcessState -> IO (Maybe TypedChannel))
-> IO (Maybe TypedChannel)
forall a b. (a -> b) -> a -> b
$ Maybe TypedChannel -> IO (Maybe TypedChannel)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe TypedChannel -> IO (Maybe TypedChannel))
-> (LocalProcessState -> Maybe TypedChannel)
-> LocalProcessState
-> IO (Maybe TypedChannel)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (LocalProcessState
-> T LocalProcessState (Maybe TypedChannel) -> Maybe TypedChannel
forall r a. r -> T r a -> a
^. Int32 -> T LocalProcessState (Maybe TypedChannel)
typedChannelWithId Int32
cid)
case Maybe TypedChannel
mChan of
Maybe TypedChannel
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just (TypedChannel Weak (TQueue a)
chan') -> do
Maybe (TQueue a)
ch <- Weak (TQueue a) -> IO (Maybe (TQueue a))
forall v. Weak v -> IO (Maybe v)
deRefWeak Weak (TQueue a)
chan'
Maybe (TQueue a) -> (TQueue a -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe (TQueue a)
ch ((TQueue a -> IO ()) -> IO ()) -> (TQueue a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \TQueue a
chan -> LocalNode -> SendPortId -> Message -> TQueue a -> IO ()
forall a. LocalNode -> SendPortId -> Message -> TQueue a -> IO ()
deliverChan LocalNode
node SendPortId
from Message
msg TQueue a
chan
where deliverChan :: forall a . LocalNode -> SendPortId -> Message -> TQueue a -> IO ()
deliverChan :: forall a. LocalNode -> SendPortId -> Message -> TQueue a -> IO ()
deliverChan LocalNode
n SendPortId
p (UnencodedMessage Fingerprint
_ a
raw) TQueue a
chan' = do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue a -> a -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue a
chan' ((a -> a
forall a b. a -> b
unsafeCoerce a
raw) :: a)
LocalNode -> MxEvent -> IO ()
trace LocalNode
n (SendPortId -> Message -> MxEvent
MxReceivedPort SendPortId
p (Message -> MxEvent) -> Message -> MxEvent
forall a b. (a -> b) -> a -> b
$ a -> Message
forall a. Serializable a => a -> Message
unsafeCreateUnencodedMessage a
raw)
deliverChan LocalNode
_ SendPortId
_ (EncodedMessage Fingerprint
_ ByteString
_) TQueue a
_ =
String -> IO ()
forall a. HasCallStack => String -> a
error String
"invalid local channel delivery"
ncEffectKill :: ProcessId -> ProcessId -> String -> NC ()
ncEffectKill :: ProcessId -> ProcessId -> String -> NC ()
ncEffectKill ProcessId
from ProcessId
to String
reason = do
LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
Bool -> NC () -> NC ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (LocalNode -> Identifier -> Bool
isLocal LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
to)) (NC () -> NC ()) -> NC () -> NC ()
forall a b. (a -> b) -> a -> b
$
ProcessId -> ProcessKillException -> NC ()
forall e. Exception e => ProcessId -> e -> NC ()
throwException ProcessId
to (ProcessKillException -> NC ()) -> ProcessKillException -> NC ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> String -> ProcessKillException
ProcessKillException ProcessId
from String
reason
ncEffectExit :: ProcessId -> ProcessId -> Message -> NC ()
ncEffectExit :: ProcessId -> ProcessId -> Message -> NC ()
ncEffectExit ProcessId
from ProcessId
to Message
reason = do
LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
Bool -> NC () -> NC ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (LocalNode -> Identifier -> Bool
isLocal LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
to)) (NC () -> NC ()) -> NC () -> NC ()
forall a b. (a -> b) -> a -> b
$
ProcessId -> ProcessExitException -> NC ()
forall e. Exception e => ProcessId -> e -> NC ()
throwException ProcessId
to (ProcessExitException -> NC ()) -> ProcessExitException -> NC ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> Message -> ProcessExitException
ProcessExitException ProcessId
from Message
reason
ncEffectGetInfo :: ProcessId -> ProcessId -> NC ()
ncEffectGetInfo :: ProcessId -> ProcessId -> NC ()
ncEffectGetInfo ProcessId
from ProcessId
pid =
let lpid :: LocalProcessId
lpid = ProcessId -> LocalProcessId
processLocalId ProcessId
pid
them :: Identifier
them = (ProcessId -> Identifier
ProcessIdentifier ProcessId
pid)
in do
LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
Maybe LocalProcess
mProc <- IO (Maybe LocalProcess) -> NC (Maybe LocalProcess)
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe LocalProcess) -> NC (Maybe LocalProcess))
-> IO (Maybe LocalProcess) -> NC (Maybe LocalProcess)
forall a b. (a -> b) -> a -> b
$ LocalNode
-> (ValidLocalNodeState -> IO (Maybe LocalProcess))
-> IO (Maybe LocalProcess)
forall r. LocalNode -> (ValidLocalNodeState -> IO r) -> IO r
withValidLocalState LocalNode
node
((ValidLocalNodeState -> IO (Maybe LocalProcess))
-> IO (Maybe LocalProcess))
-> (ValidLocalNodeState -> IO (Maybe LocalProcess))
-> IO (Maybe LocalProcess)
forall a b. (a -> b) -> a -> b
$ Maybe LocalProcess -> IO (Maybe LocalProcess)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe LocalProcess -> IO (Maybe LocalProcess))
-> (ValidLocalNodeState -> Maybe LocalProcess)
-> ValidLocalNodeState
-> IO (Maybe LocalProcess)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ValidLocalNodeState
-> Accessor ValidLocalNodeState (Maybe LocalProcess)
-> Maybe LocalProcess
forall r a. r -> T r a -> a
^. LocalProcessId -> Accessor ValidLocalNodeState (Maybe LocalProcess)
localProcessWithId LocalProcessId
lpid)
case Maybe LocalProcess
mProc of
Maybe LocalProcess
Nothing -> Bool -> ProcessId -> ProcessInfoNone -> NC ()
forall a. Serializable a => Bool -> ProcessId -> a -> NC ()
dispatch (LocalNode -> Identifier -> Bool
isLocal LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
from))
ProcessId
from (DiedReason -> ProcessInfoNone
ProcessInfoNone DiedReason
DiedUnknownId)
Just LocalProcess
proc -> do
Set ProcessId
itsLinks <- ((ProcessId, ()) -> ProcessId)
-> Set (ProcessId, ()) -> Set ProcessId
forall b a. Ord b => (a -> b) -> Set a -> Set b
Set.map (ProcessId, ()) -> ProcessId
forall a b. (a, b) -> a
fst (Set (ProcessId, ()) -> Set ProcessId)
-> (BiMultiMap Identifier ProcessId () -> Set (ProcessId, ()))
-> BiMultiMap Identifier ProcessId ()
-> Set ProcessId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identifier
-> BiMultiMap Identifier ProcessId () -> Set (ProcessId, ())
forall a b v. Ord a => a -> BiMultiMap a b v -> Set (b, v)
BiMultiMap.lookupBy1st Identifier
them (BiMultiMap Identifier ProcessId () -> Set ProcessId)
-> NC (BiMultiMap Identifier ProcessId ()) -> NC (Set ProcessId)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
(NCState -> BiMultiMap Identifier ProcessId ())
-> NC (BiMultiMap Identifier ProcessId ())
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (NCState
-> Accessor NCState (BiMultiMap Identifier ProcessId ())
-> BiMultiMap Identifier ProcessId ()
forall r a. r -> T r a -> a
^. Accessor NCState (BiMultiMap Identifier ProcessId ())
links)
Set (ProcessId, MonitorRef)
itsMons <- Identifier
-> BiMultiMap Identifier ProcessId MonitorRef
-> Set (ProcessId, MonitorRef)
forall a b v. Ord a => a -> BiMultiMap a b v -> Set (b, v)
BiMultiMap.lookupBy1st Identifier
them (BiMultiMap Identifier ProcessId MonitorRef
-> Set (ProcessId, MonitorRef))
-> NC (BiMultiMap Identifier ProcessId MonitorRef)
-> NC (Set (ProcessId, MonitorRef))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (NCState -> BiMultiMap Identifier ProcessId MonitorRef)
-> NC (BiMultiMap Identifier ProcessId MonitorRef)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (NCState
-> Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
-> BiMultiMap Identifier ProcessId MonitorRef
forall r a. r -> T r a -> a
^. Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
monitors)
Map String ProcessId
registered <- (NCState -> Map String ProcessId) -> NC (Map String ProcessId)
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets (NCState -> T NCState (Map String ProcessId) -> Map String ProcessId
forall r a. r -> T r a -> a
^. T NCState (Map String ProcessId)
registeredHere)
Int
size <- IO Int -> NC Int
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> NC Int) -> IO Int -> NC Int
forall a b. (a -> b) -> a -> b
$ CQueue Message -> IO Int
forall a. CQueue a -> IO Int
queueSize (CQueue Message -> IO Int) -> CQueue Message -> IO Int
forall a b. (a -> b) -> a -> b
$ LocalProcess -> CQueue Message
processQueue (LocalProcess -> CQueue Message) -> LocalProcess -> CQueue Message
forall a b. (a -> b) -> a -> b
$ LocalProcess
proc
let reg :: [String]
reg = Map String ProcessId -> [String]
forall {a}. Map a ProcessId -> [a]
registeredNames Map String ProcessId
registered
Bool -> ProcessId -> ProcessInfo -> NC ()
forall a. Serializable a => Bool -> ProcessId -> a -> NC ()
dispatch (LocalNode -> Identifier -> Bool
isLocal LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
from))
ProcessId
from
ProcessInfo {
infoNode :: NodeId
infoNode = (ProcessId -> NodeId
processNodeId ProcessId
pid)
, infoRegisteredNames :: [String]
infoRegisteredNames = [String]
reg
, infoMessageQueueLength :: Int
infoMessageQueueLength = Int
size
, infoMonitors :: [(ProcessId, MonitorRef)]
infoMonitors = Set (ProcessId, MonitorRef) -> [(ProcessId, MonitorRef)]
forall a. Set a -> [a]
Set.toList Set (ProcessId, MonitorRef)
itsMons
, infoLinks :: [ProcessId]
infoLinks = Set ProcessId -> [ProcessId]
forall a. Set a -> [a]
Set.toList Set ProcessId
itsLinks
}
where dispatch :: (Serializable a)
=> Bool
-> ProcessId
-> a
-> NC ()
dispatch :: forall a. Serializable a => Bool -> ProcessId -> a -> NC ()
dispatch Bool
True ProcessId
dest a
pInfo = ProcessId -> a -> NC ()
forall a. Serializable a => ProcessId -> a -> NC ()
postAsMessage ProcessId
dest (a -> NC ()) -> a -> NC ()
forall a b. (a -> b) -> a -> b
$ a
pInfo
dispatch Bool
False ProcessId
dest a
pInfo =
ProcessId -> Message -> NC ()
ncSendToProcess ProcessId
dest (Message -> NC ()) -> Message -> NC ()
forall a b. (a -> b) -> a -> b
$ a -> Message
forall a. Serializable a => a -> Message
unsafeCreateUnencodedMessage a
pInfo
registeredNames :: Map a ProcessId -> [a]
registeredNames = ([a] -> a -> ProcessId -> [a]) -> [a] -> Map a ProcessId -> [a]
forall a k b. (a -> k -> b -> a) -> a -> Map k b -> a
Map.foldlWithKey (\[a]
ks a
k ProcessId
v -> if ProcessId
v ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
pid
then (a
ka -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
ks)
else [a]
ks) []
ncEffectGetNodeStats :: ProcessId -> NodeId -> NC ()
ncEffectGetNodeStats :: ProcessId -> NodeId -> NC ()
ncEffectGetNodeStats ProcessId
from NodeId
_nid = do
LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
NCState
ncState <- NC NCState
forall s (m :: * -> *). MonadState s m => m s
StateT.get
ValidLocalNodeState
nodeState <- IO ValidLocalNodeState -> NC ValidLocalNodeState
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ValidLocalNodeState -> NC ValidLocalNodeState)
-> IO ValidLocalNodeState -> NC ValidLocalNodeState
forall a b. (a -> b) -> a -> b
$ LocalNode
-> (ValidLocalNodeState -> IO ValidLocalNodeState)
-> IO ValidLocalNodeState
forall r. LocalNode -> (ValidLocalNodeState -> IO r) -> IO r
withValidLocalState LocalNode
node ValidLocalNodeState -> IO ValidLocalNodeState
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return
let stats :: NodeStats
stats =
NodeStats {
nodeStatsNode :: NodeId
nodeStatsNode = LocalNode -> NodeId
localNodeId LocalNode
node
, nodeStatsRegisteredNames :: Int
nodeStatsRegisteredNames = Map String ProcessId -> Int
forall k a. Map k a -> Int
Map.size (Map String ProcessId -> Int) -> Map String ProcessId -> Int
forall a b. (a -> b) -> a -> b
$ NCState
ncState NCState -> T NCState (Map String ProcessId) -> Map String ProcessId
forall r a. r -> T r a -> a
^. T NCState (Map String ProcessId)
registeredHere
, nodeStatsMonitors :: Int
nodeStatsMonitors = BiMultiMap Identifier ProcessId MonitorRef -> Int
forall a b v. BiMultiMap a b v -> Int
BiMultiMap.size (BiMultiMap Identifier ProcessId MonitorRef -> Int)
-> BiMultiMap Identifier ProcessId MonitorRef -> Int
forall a b. (a -> b) -> a -> b
$ NCState
ncState NCState
-> Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
-> BiMultiMap Identifier ProcessId MonitorRef
forall r a. r -> T r a -> a
^. Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
monitors
, nodeStatsLinks :: Int
nodeStatsLinks = BiMultiMap Identifier ProcessId () -> Int
forall a b v. BiMultiMap a b v -> Int
BiMultiMap.size (BiMultiMap Identifier ProcessId () -> Int)
-> BiMultiMap Identifier ProcessId () -> Int
forall a b. (a -> b) -> a -> b
$ NCState
ncState NCState
-> Accessor NCState (BiMultiMap Identifier ProcessId ())
-> BiMultiMap Identifier ProcessId ()
forall r a. r -> T r a -> a
^. Accessor NCState (BiMultiMap Identifier ProcessId ())
links
, nodeStatsProcesses :: Int
nodeStatsProcesses = Map LocalProcessId LocalProcess -> Int
forall k a. Map k a -> Int
Map.size (ValidLocalNodeState
nodeState ValidLocalNodeState
-> T ValidLocalNodeState (Map LocalProcessId LocalProcess)
-> Map LocalProcessId LocalProcess
forall r a. r -> T r a -> a
^. T ValidLocalNodeState (Map LocalProcessId LocalProcess)
localProcesses)
}
ProcessId -> NodeStats -> NC ()
forall a. Serializable a => ProcessId -> a -> NC ()
postAsMessage ProcessId
from NodeStats
stats
notifyDied :: ProcessId
-> Identifier
-> DiedReason
-> Maybe MonitorRef
-> NC ()
notifyDied :: ProcessId -> Identifier -> DiedReason -> Maybe MonitorRef -> NC ()
notifyDied ProcessId
dest Identifier
src DiedReason
reason Maybe MonitorRef
mRef = do
LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
case (LocalNode -> Identifier -> Bool
isLocal LocalNode
node (ProcessId -> Identifier
ProcessIdentifier ProcessId
dest), Maybe MonitorRef
mRef, Identifier
src) of
(Bool
True, Just MonitorRef
ref, ProcessIdentifier ProcessId
pid) ->
ProcessId -> ProcessMonitorNotification -> NC ()
forall a. Serializable a => ProcessId -> a -> NC ()
postAsMessage ProcessId
dest (ProcessMonitorNotification -> NC ())
-> ProcessMonitorNotification -> NC ()
forall a b. (a -> b) -> a -> b
$ MonitorRef -> ProcessId -> DiedReason -> ProcessMonitorNotification
ProcessMonitorNotification MonitorRef
ref ProcessId
pid DiedReason
reason
(Bool
True, Just MonitorRef
ref, NodeIdentifier NodeId
nid) ->
ProcessId -> NodeMonitorNotification -> NC ()
forall a. Serializable a => ProcessId -> a -> NC ()
postAsMessage ProcessId
dest (NodeMonitorNotification -> NC ())
-> NodeMonitorNotification -> NC ()
forall a b. (a -> b) -> a -> b
$ MonitorRef -> NodeId -> DiedReason -> NodeMonitorNotification
NodeMonitorNotification MonitorRef
ref NodeId
nid DiedReason
reason
(Bool
True, Just MonitorRef
ref, SendPortIdentifier SendPortId
cid) ->
ProcessId -> PortMonitorNotification -> NC ()
forall a. Serializable a => ProcessId -> a -> NC ()
postAsMessage ProcessId
dest (PortMonitorNotification -> NC ())
-> PortMonitorNotification -> NC ()
forall a b. (a -> b) -> a -> b
$ MonitorRef -> SendPortId -> DiedReason -> PortMonitorNotification
PortMonitorNotification MonitorRef
ref SendPortId
cid DiedReason
reason
(Bool
True, Maybe MonitorRef
Nothing, ProcessIdentifier ProcessId
pid) ->
ProcessId -> ProcessLinkException -> NC ()
forall e. Exception e => ProcessId -> e -> NC ()
throwException ProcessId
dest (ProcessLinkException -> NC ()) -> ProcessLinkException -> NC ()
forall a b. (a -> b) -> a -> b
$ ProcessId -> DiedReason -> ProcessLinkException
ProcessLinkException ProcessId
pid DiedReason
reason
(Bool
True, Maybe MonitorRef
Nothing, NodeIdentifier NodeId
pid) ->
ProcessId -> NodeLinkException -> NC ()
forall e. Exception e => ProcessId -> e -> NC ()
throwException ProcessId
dest (NodeLinkException -> NC ()) -> NodeLinkException -> NC ()
forall a b. (a -> b) -> a -> b
$ NodeId -> DiedReason -> NodeLinkException
NodeLinkException NodeId
pid DiedReason
reason
(Bool
True, Maybe MonitorRef
Nothing, SendPortIdentifier SendPortId
pid) ->
ProcessId -> PortLinkException -> NC ()
forall e. Exception e => ProcessId -> e -> NC ()
throwException ProcessId
dest (PortLinkException -> NC ()) -> PortLinkException -> NC ()
forall a b. (a -> b) -> a -> b
$ SendPortId -> DiedReason -> PortLinkException
PortLinkException SendPortId
pid DiedReason
reason
(Bool
False, Maybe MonitorRef
_, Identifier
_) ->
NodeId -> NCMsg -> NC ()
ncSendToNode (ProcessId -> NodeId
processNodeId ProcessId
dest) (NCMsg -> NC ()) -> NCMsg -> NC ()
forall a b. (a -> b) -> a -> b
$ NCMsg
{ ctrlMsgSender :: Identifier
ctrlMsgSender = NodeId -> Identifier
NodeIdentifier (LocalNode -> NodeId
localNodeId LocalNode
node)
, ctrlMsgSignal :: ProcessSignal
ctrlMsgSignal = Identifier -> DiedReason -> ProcessSignal
Died Identifier
src DiedReason
reason
}
destNid :: ProcessSignal -> Maybe NodeId
destNid :: ProcessSignal -> Maybe NodeId
destNid (Link Identifier
ident) = NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just (NodeId -> Maybe NodeId) -> NodeId -> Maybe NodeId
forall a b. (a -> b) -> a -> b
$ Identifier -> NodeId
nodeOf Identifier
ident
destNid (Unlink Identifier
ident) = NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just (NodeId -> Maybe NodeId) -> NodeId -> Maybe NodeId
forall a b. (a -> b) -> a -> b
$ Identifier -> NodeId
nodeOf Identifier
ident
destNid (Monitor MonitorRef
ref) = NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just (NodeId -> Maybe NodeId) -> NodeId -> Maybe NodeId
forall a b. (a -> b) -> a -> b
$ Identifier -> NodeId
nodeOf (MonitorRef -> Identifier
monitorRefIdent MonitorRef
ref)
destNid (Unmonitor MonitorRef
ref) = NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just (NodeId -> Maybe NodeId) -> NodeId -> Maybe NodeId
forall a b. (a -> b) -> a -> b
$ Identifier -> NodeId
nodeOf (MonitorRef -> Identifier
monitorRefIdent MonitorRef
ref)
destNid (Spawn Closure (Process ())
_ SpawnRef
_) = Maybe NodeId
forall a. Maybe a
Nothing
destNid (Register String
_ NodeId
_ Maybe ProcessId
_ Bool
_) = Maybe NodeId
forall a. Maybe a
Nothing
destNid (WhereIs String
_) = Maybe NodeId
forall a. Maybe a
Nothing
destNid (NamedSend String
_ Message
_) = Maybe NodeId
forall a. Maybe a
Nothing
destNid (UnreliableSend LocalProcessId
_ Message
_) = Maybe NodeId
forall a. Maybe a
Nothing
destNid (Died Identifier
_ DiedReason
_) = Maybe NodeId
forall a. Maybe a
Nothing
destNid (Kill ProcessId
pid String
_) = NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just (NodeId -> Maybe NodeId) -> NodeId -> Maybe NodeId
forall a b. (a -> b) -> a -> b
$ ProcessId -> NodeId
processNodeId ProcessId
pid
destNid (Exit ProcessId
pid Message
_) = NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just (NodeId -> Maybe NodeId) -> NodeId -> Maybe NodeId
forall a b. (a -> b) -> a -> b
$ ProcessId -> NodeId
processNodeId ProcessId
pid
destNid (GetInfo ProcessId
pid) = NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just (NodeId -> Maybe NodeId) -> NodeId -> Maybe NodeId
forall a b. (a -> b) -> a -> b
$ ProcessId -> NodeId
processNodeId ProcessId
pid
destNid (GetNodeStats NodeId
nid) = NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just NodeId
nid
destNid (LocalSend ProcessId
pid Message
_) = NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just (NodeId -> Maybe NodeId) -> NodeId -> Maybe NodeId
forall a b. (a -> b) -> a -> b
$ ProcessId -> NodeId
processNodeId ProcessId
pid
destNid (LocalPortSend SendPortId
cid Message
_) = NodeId -> Maybe NodeId
forall a. a -> Maybe a
Just (NodeId -> Maybe NodeId) -> NodeId -> Maybe NodeId
forall a b. (a -> b) -> a -> b
$ ProcessId -> NodeId
processNodeId (SendPortId -> ProcessId
sendPortProcessId SendPortId
cid)
destNid (ProcessSignal
SigShutdown) = Maybe NodeId
forall a. Maybe a
Nothing
isLocal :: LocalNode -> Identifier -> Bool
isLocal :: LocalNode -> Identifier -> Bool
isLocal LocalNode
nid Identifier
ident = Identifier -> NodeId
nodeOf Identifier
ident NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== LocalNode -> NodeId
localNodeId LocalNode
nid
unClosure :: Typeable a => Closure a -> NC (Either String a)
unClosure :: forall a. Typeable a => Closure a -> NC (Either String a)
unClosure Closure a
closure = do
RemoteTable
rtable <- LocalNode -> RemoteTable
remoteTable (LocalNode -> RemoteTable) -> NC LocalNode -> NC RemoteTable
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
Either String a -> NC (Either String a)
forall a. a -> NC a
forall (m :: * -> *) a. Monad m => a -> m a
return (RemoteTable -> Closure a -> Either String a
forall a. Typeable a => RemoteTable -> Closure a -> Either String a
Static.unclosure RemoteTable
rtable Closure a
closure)
isValidLocalIdentifier :: Identifier -> NC Bool
isValidLocalIdentifier :: Identifier -> NC Bool
isValidLocalIdentifier Identifier
ident = do
LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
IO Bool -> NC Bool
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> NC Bool)
-> ((ValidLocalNodeState -> IO Bool) -> IO Bool)
-> (ValidLocalNodeState -> IO Bool)
-> NC Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LocalNode -> (ValidLocalNodeState -> IO Bool) -> IO Bool
forall r. LocalNode -> (ValidLocalNodeState -> IO r) -> IO r
withValidLocalState LocalNode
node ((ValidLocalNodeState -> IO Bool) -> NC Bool)
-> (ValidLocalNodeState -> IO Bool) -> NC Bool
forall a b. (a -> b) -> a -> b
$ \ValidLocalNodeState
nSt ->
case Identifier
ident of
NodeIdentifier NodeId
nid ->
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ NodeId
nid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== LocalNode -> NodeId
localNodeId LocalNode
node
ProcessIdentifier ProcessId
pid -> do
let mProc :: Maybe LocalProcess
mProc = ValidLocalNodeState
nSt ValidLocalNodeState
-> Accessor ValidLocalNodeState (Maybe LocalProcess)
-> Maybe LocalProcess
forall r a. r -> T r a -> a
^. LocalProcessId -> Accessor ValidLocalNodeState (Maybe LocalProcess)
localProcessWithId (ProcessId -> LocalProcessId
processLocalId ProcessId
pid)
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Maybe LocalProcess -> Bool
forall a. Maybe a -> Bool
isJust Maybe LocalProcess
mProc
SendPortIdentifier SendPortId
cid -> do
let pid :: ProcessId
pid = SendPortId -> ProcessId
sendPortProcessId SendPortId
cid
mProc :: Maybe LocalProcess
mProc = ValidLocalNodeState
nSt ValidLocalNodeState
-> Accessor ValidLocalNodeState (Maybe LocalProcess)
-> Maybe LocalProcess
forall r a. r -> T r a -> a
^. LocalProcessId -> Accessor ValidLocalNodeState (Maybe LocalProcess)
localProcessWithId (ProcessId -> LocalProcessId
processLocalId ProcessId
pid)
case Maybe LocalProcess
mProc of
Maybe LocalProcess
Nothing -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Just LocalProcess
proc -> StrictMVar LocalProcessState
-> (LocalProcessState -> IO Bool) -> IO Bool
forall a b. StrictMVar a -> (a -> IO b) -> IO b
withMVar (LocalProcess -> StrictMVar LocalProcessState
processState LocalProcess
proc) ((LocalProcessState -> IO Bool) -> IO Bool)
-> (LocalProcessState -> IO Bool) -> IO Bool
forall a b. (a -> b) -> a -> b
$ \LocalProcessState
pSt -> do
let mCh :: Maybe TypedChannel
mCh = LocalProcessState
pSt LocalProcessState
-> T LocalProcessState (Maybe TypedChannel) -> Maybe TypedChannel
forall r a. r -> T r a -> a
^. Int32 -> T LocalProcessState (Maybe TypedChannel)
typedChannelWithId (SendPortId -> Int32
sendPortLocalId SendPortId
cid)
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Maybe TypedChannel -> Bool
forall a. Maybe a -> Bool
isJust Maybe TypedChannel
mCh
postAsMessage :: Serializable a => ProcessId -> a -> NC ()
postAsMessage :: forall a. Serializable a => ProcessId -> a -> NC ()
postAsMessage ProcessId
pid = ProcessId -> Message -> NC ()
postMessage ProcessId
pid (Message -> NC ()) -> (a -> Message) -> a -> NC ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Message
forall a. Serializable a => a -> Message
createUnencodedMessage
postMessage :: ProcessId -> Message -> NC ()
postMessage :: ProcessId -> Message -> NC ()
postMessage ProcessId
pid Message
msg = do
LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> ProcessId -> (LocalProcess -> IO ()) -> IO ()
withLocalProc LocalNode
node ProcessId
pid ((LocalProcess -> IO ()) -> IO ())
-> (LocalProcess -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \LocalProcess
p -> CQueue Message -> Message -> IO ()
forall a. CQueue a -> a -> IO ()
enqueue (LocalProcess -> CQueue Message
processQueue LocalProcess
p) Message
msg
throwException :: Exception e => ProcessId -> e -> NC ()
throwException :: forall e. Exception e => ProcessId -> e -> NC ()
throwException ProcessId
pid e
e = do
LocalNode
node <- NC LocalNode
forall r (m :: * -> *). MonadReader r m => m r
ask
IO () -> NC ()
forall a. IO a -> NC a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> NC ()) -> IO () -> NC ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> ProcessId -> (LocalProcess -> IO ()) -> IO ()
withLocalProc LocalNode
node ProcessId
pid ((LocalProcess -> IO ()) -> IO ())
-> (LocalProcess -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \LocalProcess
p -> ThreadId -> e -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo (LocalProcess -> ThreadId
processThread LocalProcess
p) e
e
withLocalProc :: LocalNode -> ProcessId -> (LocalProcess -> IO ()) -> IO ()
withLocalProc :: LocalNode -> ProcessId -> (LocalProcess -> IO ()) -> IO ()
withLocalProc LocalNode
node ProcessId
pid LocalProcess -> IO ()
p =
let lpid :: LocalProcessId
lpid = ProcessId -> LocalProcessId
processLocalId ProcessId
pid in do
IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ LocalNode -> (ValidLocalNodeState -> IO (IO ())) -> IO (IO ())
forall r. LocalNode -> (ValidLocalNodeState -> IO r) -> IO r
withValidLocalState LocalNode
node ((ValidLocalNodeState -> IO (IO ())) -> IO (IO ()))
-> (ValidLocalNodeState -> IO (IO ())) -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ \ValidLocalNodeState
vst ->
IO () -> IO (IO ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> IO (IO ())) -> IO () -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ Maybe LocalProcess -> (LocalProcess -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ValidLocalNodeState
vst ValidLocalNodeState
-> Accessor ValidLocalNodeState (Maybe LocalProcess)
-> Maybe LocalProcess
forall r a. r -> T r a -> a
^. LocalProcessId -> Accessor ValidLocalNodeState (Maybe LocalProcess)
localProcessWithId LocalProcessId
lpid) LocalProcess -> IO ()
p
links :: Accessor NCState (BiMultiMap Identifier ProcessId ())
links :: Accessor NCState (BiMultiMap Identifier ProcessId ())
links = (NCState -> BiMultiMap Identifier ProcessId ())
-> (BiMultiMap Identifier ProcessId () -> NCState -> NCState)
-> Accessor NCState (BiMultiMap Identifier ProcessId ())
forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor NCState -> BiMultiMap Identifier ProcessId ()
_links (\BiMultiMap Identifier ProcessId ()
ls NCState
st -> NCState
st { _links = ls })
monitors :: Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
monitors :: Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
monitors = (NCState -> BiMultiMap Identifier ProcessId MonitorRef)
-> (BiMultiMap Identifier ProcessId MonitorRef
-> NCState -> NCState)
-> Accessor NCState (BiMultiMap Identifier ProcessId MonitorRef)
forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor NCState -> BiMultiMap Identifier ProcessId MonitorRef
_monitors (\BiMultiMap Identifier ProcessId MonitorRef
ms NCState
st -> NCState
st { _monitors = ms })
registeredHere :: Accessor NCState (Map String ProcessId)
registeredHere :: T NCState (Map String ProcessId)
registeredHere = (NCState -> Map String ProcessId)
-> (Map String ProcessId -> NCState -> NCState)
-> T NCState (Map String ProcessId)
forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor NCState -> Map String ProcessId
_registeredHere (\Map String ProcessId
ry NCState
st -> NCState
st { _registeredHere = ry })
registeredOnNodes :: Accessor NCState (Map ProcessId [(NodeId, Int)])
registeredOnNodes :: T NCState (Map ProcessId [(NodeId, Int)])
registeredOnNodes = (NCState -> Map ProcessId [(NodeId, Int)])
-> (Map ProcessId [(NodeId, Int)] -> NCState -> NCState)
-> T NCState (Map ProcessId [(NodeId, Int)])
forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor NCState -> Map ProcessId [(NodeId, Int)]
_registeredOnNodes (\Map ProcessId [(NodeId, Int)]
ry NCState
st -> NCState
st { _registeredOnNodes = ry })
registeredHereFor :: String -> Accessor NCState (Maybe ProcessId)
registeredHereFor :: String -> T NCState (Maybe ProcessId)
registeredHereFor String
ident = T NCState (Map String ProcessId)
registeredHere T NCState (Map String ProcessId)
-> T (Map String ProcessId) (Maybe ProcessId)
-> T NCState (Maybe ProcessId)
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> String -> T (Map String ProcessId) (Maybe ProcessId)
forall key elem.
Ord key =>
key -> Accessor (Map key elem) (Maybe elem)
DAC.mapMaybe String
ident
registeredOnNodesFor :: ProcessId -> Accessor NCState (Maybe [(NodeId,Int)])
registeredOnNodesFor :: ProcessId -> Accessor NCState (Maybe [(NodeId, Int)])
registeredOnNodesFor ProcessId
ident = T NCState (Map ProcessId [(NodeId, Int)])
registeredOnNodes T NCState (Map ProcessId [(NodeId, Int)])
-> T (Map ProcessId [(NodeId, Int)]) (Maybe [(NodeId, Int)])
-> Accessor NCState (Maybe [(NodeId, Int)])
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> ProcessId
-> T (Map ProcessId [(NodeId, Int)]) (Maybe [(NodeId, Int)])
forall key elem.
Ord key =>
key -> Accessor (Map key elem) (Maybe elem)
DAC.mapMaybe ProcessId
ident
splitNotif :: (Ord a, Ord v)
=> Identifier
-> BiMultiMap Identifier a v
-> (Map Identifier (Set (a,v)), BiMultiMap Identifier a v)
splitNotif :: forall a v.
(Ord a, Ord v) =>
Identifier
-> BiMultiMap Identifier a v
-> (Map Identifier (Set (a, v)), BiMultiMap Identifier a v)
splitNotif Identifier
ident =
(Identifier -> Set (a, v) -> Bool)
-> BiMultiMap Identifier a v
-> (Map Identifier (Set (a, v)), BiMultiMap Identifier a v)
forall a b v.
(Ord a, Ord b, Ord v) =>
(a -> Set (b, v) -> Bool)
-> BiMultiMap a b v -> (Map a (Set (b, v)), BiMultiMap a b v)
BiMultiMap.partitionWithKeyBy1st (\Identifier
k !Set (a, v)
_v -> Identifier
ident Identifier -> Identifier -> Bool
`impliesDeathOf` Identifier
k)
modify' :: MonadState s m => (s -> s) -> m ()
modify' :: forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' s -> s
f = m s
forall s (m :: * -> *). MonadState s m => m s
StateT.get m s -> (s -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \s
s -> s -> m ()
forall s (m :: * -> *). MonadState s m => s -> m ()
StateT.put (s -> m ()) -> s -> m ()
forall a b. (a -> b) -> a -> b
$! s -> s
f s
s