module Control.Distributed.Process.Internal.Types
(
NodeId(..)
, LocalProcessId(..)
, ProcessId(..)
, Identifier(..)
, nodeOf
, firstNonReservedProcessId
, nullProcessId
, LocalNode(..)
, Tracer(..)
, MxEventBus(..)
, LocalNodeState(..)
, LocalProcess(..)
, LocalProcessState(..)
, Process(..)
, runLocalProcess
, ImplicitReconnect(..)
, LocalSendPortId
, SendPortId(..)
, TypedChannel(..)
, SendPort(..)
, ReceivePort(..)
, Message(..)
, isEncoded
, createMessage
, createUnencodedMessage
, unsafeCreateUnencodedMessage
, messageToPayload
, payloadToMessage
, MonitorRef(..)
, ProcessMonitorNotification(..)
, NodeMonitorNotification(..)
, PortMonitorNotification(..)
, ProcessExitException(..)
, ProcessLinkException(..)
, NodeLinkException(..)
, PortLinkException(..)
, ProcessRegistrationException(..)
, DiedReason(..)
, DidUnmonitor(..)
, DidUnlinkProcess(..)
, DidUnlinkNode(..)
, DidUnlinkPort(..)
, SpawnRef(..)
, DidSpawn(..)
, WhereIsReply(..)
, RegisterReply(..)
, ProcessInfo(..)
, ProcessInfoNone(..)
, NodeStats(..)
, NCMsg(..)
, ProcessSignal(..)
, localProcesses
, localPidCounter
, localPidUnique
, localConnections
, localProcessWithId
, localConnectionBetween
, monitorCounter
, spawnCounter
, channelCounter
, typedChannels
, typedChannelWithId
, forever'
) where
import System.Mem.Weak (Weak)
import Data.Map (Map)
import Data.Int (Int32)
import Data.Data (Data)
import Data.Typeable (Typeable, typeOf)
import Data.Binary (Binary(put, get), putWord8, getWord8, encode)
import qualified Data.ByteString as BSS (ByteString, concat, copy)
import qualified Data.ByteString.Lazy as BSL
( ByteString
, toChunks
, splitAt
, fromChunks
, length
)
import qualified Data.ByteString.Lazy.Internal as BSL (ByteString(..))
import Data.Accessor (Accessor, accessor)
import Control.Category ((>>>))
import Control.DeepSeq (NFData(..))
import Control.Exception (Exception)
import Control.Concurrent (ThreadId)
import Control.Concurrent.Chan (Chan)
import Control.Concurrent.STM (STM)
import Control.Concurrent.STM.TChan (TChan)
import qualified Network.Transport as NT (EndPoint, EndPointAddress, Connection)
import Control.Applicative (Applicative, Alternative, (<$>), (<*>))
import Control.Monad.Reader (MonadReader(..), ReaderT, runReaderT)
import Control.Monad.IO.Class (MonadIO)
import Control.Distributed.Process.Serializable
( Fingerprint
, Serializable
, fingerprint
, encodeFingerprint
, sizeOfFingerprint
, decodeFingerprint
, showFingerprint
)
import Control.Distributed.Process.Internal.CQueue (CQueue)
import Control.Distributed.Process.Internal.StrictMVar (StrictMVar)
import Control.Distributed.Process.Internal.WeakTQueue (TQueue)
import Control.Distributed.Static (RemoteTable, Closure)
import qualified Control.Distributed.Process.Internal.StrictContainerAccessors as DAC (mapMaybe)
import Data.Hashable
import GHC.Generics
newtype NodeId = NodeId { nodeAddress :: NT.EndPointAddress }
deriving (Eq, Ord, Typeable, Data, Generic)
instance Binary NodeId where
instance NFData NodeId
instance Hashable NodeId where
instance Show NodeId where
show (NodeId addr) = "nid://" ++ show addr
data LocalProcessId = LocalProcessId
{ lpidUnique :: !Int32
, lpidCounter :: !Int32
}
deriving (Eq, Ord, Typeable, Data, Generic, Show)
instance Hashable LocalProcessId where
data ProcessId = ProcessId
{
processNodeId :: !NodeId
, processLocalId :: !LocalProcessId
}
deriving (Eq, Ord, Typeable, Data, Generic)
instance Binary ProcessId where
instance NFData ProcessId where
instance Hashable ProcessId where
instance Show ProcessId where
show (ProcessId (NodeId addr) (LocalProcessId _ lid))
= "pid://" ++ show addr ++ ":" ++ show lid
data Identifier =
NodeIdentifier !NodeId
| ProcessIdentifier !ProcessId
| SendPortIdentifier !SendPortId
deriving (Eq, Ord, Generic)
instance Hashable Identifier where
instance Show Identifier where
show (NodeIdentifier nid) = show nid
show (ProcessIdentifier pid) = show pid
show (SendPortIdentifier cid) = show cid
nodeOf :: Identifier -> NodeId
nodeOf (NodeIdentifier nid) = nid
nodeOf (ProcessIdentifier pid) = processNodeId pid
nodeOf (SendPortIdentifier cid) = processNodeId (sendPortProcessId cid)
firstNonReservedProcessId :: Int32
firstNonReservedProcessId = 1
nullProcessId :: NodeId -> ProcessId
nullProcessId nid =
ProcessId { processNodeId = nid
, processLocalId = LocalProcessId { lpidUnique = 0
, lpidCounter = 0
}
}
data Tracer = Tracer
{
tracerPid :: !ProcessId
, weakQ :: !(Weak (CQueue Message))
}
data MxEventBus =
MxEventBusInitialising
| MxEventBus
{
agent :: !ProcessId
, tracer :: !Tracer
, evbuss :: !(Weak (CQueue Message))
, mxNew :: !(((TChan Message, TChan Message) -> Process ()) -> IO ProcessId)
}
data LocalNode = LocalNode
{
localNodeId :: !NodeId
, localEndPoint :: !NT.EndPoint
, localState :: !(StrictMVar LocalNodeState)
, localCtrlChan :: !(Chan NCMsg)
, localEventBus :: !MxEventBus
, remoteTable :: !RemoteTable
}
data ImplicitReconnect = WithImplicitReconnect | NoImplicitReconnect
deriving (Eq, Show)
data LocalNodeState = LocalNodeState
{
_localProcesses :: !(Map LocalProcessId LocalProcess)
, _localPidCounter :: !Int32
, _localPidUnique :: !Int32
, _localConnections :: !(Map (Identifier, Identifier)
(NT.Connection, ImplicitReconnect))
}
data LocalProcess = LocalProcess
{ processQueue :: !(CQueue Message)
, processWeakQ :: !(Weak (CQueue Message))
, processId :: !ProcessId
, processState :: !(StrictMVar LocalProcessState)
, processThread :: !ThreadId
, processNode :: !LocalNode
}
runLocalProcess :: LocalProcess -> Process a -> IO a
runLocalProcess lproc proc = runReaderT (unProcess proc) lproc
data LocalProcessState = LocalProcessState
{ _monitorCounter :: !Int32
, _spawnCounter :: !Int32
, _channelCounter :: !Int32
, _typedChannels :: !(Map LocalSendPortId TypedChannel)
}
newtype Process a = Process {
unProcess :: ReaderT LocalProcess IO a
}
deriving (Functor, Monad, MonadIO, MonadReader LocalProcess, Typeable, Applicative)
type LocalSendPortId = Int32
data SendPortId = SendPortId {
sendPortProcessId :: !ProcessId
, sendPortLocalId :: !LocalSendPortId
}
deriving (Eq, Ord, Typeable, Generic)
instance Hashable SendPortId where
instance Show SendPortId where
show (SendPortId (ProcessId (NodeId addr) (LocalProcessId _ plid)) clid)
= "cid://" ++ show addr ++ ":" ++ show plid ++ ":" ++ show clid
data TypedChannel = forall a. Serializable a => TypedChannel (Weak (TQueue a))
newtype SendPort a = SendPort {
sendPortId :: SendPortId
}
deriving (Typeable, Generic, Show, Eq, Ord)
instance (Serializable a) => Binary (SendPort a) where
instance (Hashable a) => Hashable (SendPort a) where
instance (NFData a) => NFData (SendPort a) where
newtype ReceivePort a = ReceivePort { receiveSTM :: STM a }
deriving (Typeable, Functor, Applicative, Alternative, Monad)
data Message =
EncodedMessage
{ messageFingerprint :: !Fingerprint
, messageEncoding :: !BSL.ByteString
} |
forall a . Serializable a =>
UnencodedMessage
{
messageFingerprint :: !Fingerprint
, messagePayload :: !a
}
deriving (Typeable)
instance Show Message where
show (EncodedMessage fp enc) = show enc ++ " :: " ++ showFingerprint fp []
show (UnencodedMessage _ uenc) = "[unencoded message] :: " ++ (show $ typeOf uenc)
isEncoded :: Message -> Bool
isEncoded (EncodedMessage _ _) = True
isEncoded _ = False
createMessage :: Serializable a => a -> Message
createMessage a = EncodedMessage (fingerprint a) (encode a)
createUnencodedMessage :: Serializable a => a -> Message
createUnencodedMessage a =
let encoded = encode a in BSL.length encoded `seq` UnencodedMessage (fingerprint a) a
unsafeCreateUnencodedMessage :: Serializable a => a -> Message
unsafeCreateUnencodedMessage a = UnencodedMessage (fingerprint a) a
messageToPayload :: Message -> [BSS.ByteString]
messageToPayload (EncodedMessage fp enc) = encodeFingerprint fp : BSL.toChunks enc
messageToPayload (UnencodedMessage fp m) = messageToPayload ((EncodedMessage fp (encode m)))
payloadToMessage :: [BSS.ByteString] -> Message
payloadToMessage payload = EncodedMessage fp (copy msg)
where
encFp :: BSL.ByteString
msg :: BSL.ByteString
(encFp, msg) = BSL.splitAt (fromIntegral sizeOfFingerprint)
$ BSL.fromChunks payload
fp :: Fingerprint
fp = decodeFingerprint . BSS.concat . BSL.toChunks $ encFp
copy :: BSL.ByteString -> BSL.ByteString
copy (BSL.Chunk bs BSL.Empty) = BSL.Chunk (BSS.copy bs) BSL.Empty
copy bsl = BSL.fromChunks . return . BSS.concat . BSL.toChunks $ bsl
data MonitorRef = MonitorRef
{
monitorRefIdent :: !Identifier
, monitorRefCounter :: !Int32
}
deriving (Eq, Ord, Show, Typeable, Generic)
instance Hashable MonitorRef where
data ProcessMonitorNotification =
ProcessMonitorNotification !MonitorRef !ProcessId !DiedReason
deriving (Typeable, Show)
data NodeMonitorNotification =
NodeMonitorNotification !MonitorRef !NodeId !DiedReason
deriving (Typeable, Show)
data PortMonitorNotification =
PortMonitorNotification !MonitorRef !SendPortId !DiedReason
deriving (Typeable, Show)
data ProcessLinkException =
ProcessLinkException !ProcessId !DiedReason
deriving (Typeable, Show)
data NodeLinkException =
NodeLinkException !NodeId !DiedReason
deriving (Typeable, Show)
data PortLinkException =
PortLinkException !SendPortId !DiedReason
deriving (Typeable, Show)
data ProcessRegistrationException =
ProcessRegistrationException !String
deriving (Typeable, Show)
data ProcessExitException =
ProcessExitException !ProcessId !Message
deriving Typeable
instance Exception ProcessExitException
instance Show ProcessExitException where
show (ProcessExitException pid _) = "exit-from=" ++ (show pid)
instance Exception ProcessLinkException
instance Exception NodeLinkException
instance Exception PortLinkException
instance Exception ProcessRegistrationException
data DiedReason =
DiedNormal
| DiedException !String
| DiedDisconnect
| DiedNodeDown
| DiedUnknownId
deriving (Show, Eq)
newtype DidUnmonitor = DidUnmonitor MonitorRef
deriving (Typeable, Binary)
newtype DidUnlinkProcess = DidUnlinkProcess ProcessId
deriving (Typeable, Binary)
newtype DidUnlinkNode = DidUnlinkNode NodeId
deriving (Typeable, Binary)
newtype DidUnlinkPort = DidUnlinkPort SendPortId
deriving (Typeable, Binary)
newtype SpawnRef = SpawnRef Int32
deriving (Show, Binary, Typeable, Eq)
data DidSpawn = DidSpawn SpawnRef ProcessId
deriving (Show, Typeable)
data WhereIsReply = WhereIsReply String (Maybe ProcessId)
deriving (Show, Typeable)
data RegisterReply = RegisterReply String Bool
deriving (Show, Typeable)
data NodeStats = NodeStats {
nodeStatsNode :: NodeId
, nodeStatsRegisteredNames :: Int
, nodeStatsMonitors :: Int
, nodeStatsLinks :: Int
, nodeStatsProcesses :: Int
}
deriving (Show, Eq, Typeable)
data ProcessInfo = ProcessInfo {
infoNode :: NodeId
, infoRegisteredNames :: [String]
, infoMessageQueueLength :: Maybe Int
, infoMonitors :: [(ProcessId, MonitorRef)]
, infoLinks :: [ProcessId]
} deriving (Show, Eq, Typeable)
data ProcessInfoNone = ProcessInfoNone DiedReason
deriving (Show, Typeable)
data NCMsg = NCMsg
{ ctrlMsgSender :: !Identifier
, ctrlMsgSignal :: !ProcessSignal
}
deriving Show
data ProcessSignal =
Link !Identifier
| Unlink !Identifier
| Monitor !MonitorRef
| Unmonitor !MonitorRef
| Died Identifier !DiedReason
| Spawn !(Closure (Process ())) !SpawnRef
| WhereIs !String
| Register !String !NodeId !(Maybe ProcessId) !Bool
| NamedSend !String !Message
| LocalSend !ProcessId !Message
| LocalPortSend !SendPortId !Message
| Kill !ProcessId !String
| Exit !ProcessId !Message
| GetInfo !ProcessId
| SigShutdown
| GetNodeStats !NodeId
deriving Show
instance Binary Message where
put msg = put $ messageToPayload msg
get = payloadToMessage <$> get
instance Binary LocalProcessId where
put lpid = put (lpidUnique lpid) >> put (lpidCounter lpid)
get = LocalProcessId <$> get <*> get
instance Binary ProcessMonitorNotification where
put (ProcessMonitorNotification ref pid reason) = put ref >> put pid >> put reason
get = ProcessMonitorNotification <$> get <*> get <*> get
instance Binary NodeMonitorNotification where
put (NodeMonitorNotification ref pid reason) = put ref >> put pid >> put reason
get = NodeMonitorNotification <$> get <*> get <*> get
instance Binary PortMonitorNotification where
put (PortMonitorNotification ref pid reason) = put ref >> put pid >> put reason
get = PortMonitorNotification <$> get <*> get <*> get
instance Binary NCMsg where
put msg = put (ctrlMsgSender msg) >> put (ctrlMsgSignal msg)
get = NCMsg <$> get <*> get
instance Binary MonitorRef where
put ref = put (monitorRefIdent ref) >> put (monitorRefCounter ref)
get = MonitorRef <$> get <*> get
instance Binary ProcessSignal where
put (Link pid) = putWord8 0 >> put pid
put (Unlink pid) = putWord8 1 >> put pid
put (Monitor ref) = putWord8 2 >> put ref
put (Unmonitor ref) = putWord8 3 >> put ref
put (Died who reason) = putWord8 4 >> put who >> put reason
put (Spawn proc ref) = putWord8 5 >> put proc >> put ref
put (WhereIs label) = putWord8 6 >> put label
put (Register label nid pid force) = putWord8 7 >> put label >> put nid >> put pid >> put force
put (NamedSend label msg) = putWord8 8 >> put label >> put (messageToPayload msg)
put (Kill pid reason) = putWord8 9 >> put pid >> put reason
put (Exit pid reason) = putWord8 10 >> put pid >> put (messageToPayload reason)
put (LocalSend to' msg) = putWord8 11 >> put to' >> put (messageToPayload msg)
put (LocalPortSend sid msg) = putWord8 12 >> put sid >> put (messageToPayload msg)
put (GetInfo about) = putWord8 30 >> put about
put (SigShutdown) = putWord8 31
put (GetNodeStats nid) = putWord8 32 >> put nid
get = do
header <- getWord8
case header of
0 -> Link <$> get
1 -> Unlink <$> get
2 -> Monitor <$> get
3 -> Unmonitor <$> get
4 -> Died <$> get <*> get
5 -> Spawn <$> get <*> get
6 -> WhereIs <$> get
7 -> Register <$> get <*> get <*> get <*> get
8 -> NamedSend <$> get <*> (payloadToMessage <$> get)
9 -> Kill <$> get <*> get
10 -> Exit <$> get <*> (payloadToMessage <$> get)
11 -> LocalSend <$> get <*> (payloadToMessage <$> get)
12 -> LocalPortSend <$> get <*> (payloadToMessage <$> get)
30 -> GetInfo <$> get
31 -> return SigShutdown
32 -> GetNodeStats <$> get
_ -> fail "ProcessSignal.get: invalid"
instance Binary DiedReason where
put DiedNormal = putWord8 0
put (DiedException e) = putWord8 1 >> put e
put DiedDisconnect = putWord8 2
put DiedNodeDown = putWord8 3
put DiedUnknownId = putWord8 4
get = do
header <- getWord8
case header of
0 -> return DiedNormal
1 -> DiedException <$> get
2 -> return DiedDisconnect
3 -> return DiedNodeDown
4 -> return DiedUnknownId
_ -> fail "DiedReason.get: invalid"
instance Binary DidSpawn where
put (DidSpawn ref pid) = put ref >> put pid
get = DidSpawn <$> get <*> get
instance Binary SendPortId where
put cid = put (sendPortProcessId cid) >> put (sendPortLocalId cid)
get = SendPortId <$> get <*> get
instance NFData SendPortId where
rnf cid = (sendPortProcessId cid) `seq` (sendPortLocalId cid) `seq` ()
instance Binary Identifier where
put (ProcessIdentifier pid) = putWord8 0 >> put pid
put (NodeIdentifier nid) = putWord8 1 >> put nid
put (SendPortIdentifier cid) = putWord8 2 >> put cid
get = do
header <- getWord8
case header of
0 -> ProcessIdentifier <$> get
1 -> NodeIdentifier <$> get
2 -> SendPortIdentifier <$> get
_ -> fail "Identifier.get: invalid"
instance Binary WhereIsReply where
put (WhereIsReply label mPid) = put label >> put mPid
get = WhereIsReply <$> get <*> get
instance Binary RegisterReply where
put (RegisterReply label ok) = put label >> put ok
get = RegisterReply <$> get <*> get
instance Binary ProcessInfo where
get = ProcessInfo <$> get <*> get <*> get <*> get <*> get
put pInfo = put (infoNode pInfo)
>> put (infoRegisteredNames pInfo)
>> put (infoMessageQueueLength pInfo)
>> put (infoMonitors pInfo)
>> put (infoLinks pInfo)
instance Binary NodeStats where
get = NodeStats <$> get <*> get <*> get <*> get <*> get
put nStats = put (nodeStatsNode nStats)
>> put (nodeStatsRegisteredNames nStats)
>> put (nodeStatsMonitors nStats)
>> put (nodeStatsLinks nStats)
>> put (nodeStatsProcesses nStats)
instance Binary ProcessInfoNone where
get = ProcessInfoNone <$> get
put (ProcessInfoNone r) = put r
localProcesses :: Accessor LocalNodeState (Map LocalProcessId LocalProcess)
localProcesses = accessor _localProcesses (\procs st -> st { _localProcesses = procs })
localPidCounter :: Accessor LocalNodeState Int32
localPidCounter = accessor _localPidCounter (\ctr st -> st { _localPidCounter = ctr })
localPidUnique :: Accessor LocalNodeState Int32
localPidUnique = accessor _localPidUnique (\unq st -> st { _localPidUnique = unq })
localConnections :: Accessor LocalNodeState (Map (Identifier, Identifier) (NT.Connection, ImplicitReconnect))
localConnections = accessor _localConnections (\conns st -> st { _localConnections = conns })
localProcessWithId :: LocalProcessId -> Accessor LocalNodeState (Maybe LocalProcess)
localProcessWithId lpid = localProcesses >>> DAC.mapMaybe lpid
localConnectionBetween :: Identifier -> Identifier -> Accessor LocalNodeState (Maybe (NT.Connection, ImplicitReconnect))
localConnectionBetween from' to' = localConnections >>> DAC.mapMaybe (from', to')
monitorCounter :: Accessor LocalProcessState Int32
monitorCounter = accessor _monitorCounter (\cnt st -> st { _monitorCounter = cnt })
spawnCounter :: Accessor LocalProcessState Int32
spawnCounter = accessor _spawnCounter (\cnt st -> st { _spawnCounter = cnt })
channelCounter :: Accessor LocalProcessState LocalSendPortId
channelCounter = accessor _channelCounter (\cnt st -> st { _channelCounter = cnt })
typedChannels :: Accessor LocalProcessState (Map LocalSendPortId TypedChannel)
typedChannels = accessor _typedChannels (\cs st -> st { _typedChannels = cs })
typedChannelWithId :: LocalSendPortId -> Accessor LocalProcessState (Maybe TypedChannel)
typedChannelWithId cid = typedChannels >>> DAC.mapMaybe cid
forever' :: Monad m => m a -> m b
forever' a = let a' = a >> a' in a'