module Control.Distributed.Process.Internal.Messaging
( sendPayload
, sendBinary
, sendMessage
, disconnect
, closeImplicitReconnections
, impliesDeathOf
, sendCtrlMsg
) where
import Data.Accessor ((^.), (^=))
import Data.Binary (Binary, encode)
import qualified Data.Map as Map (partitionWithKey, elems)
import qualified Data.ByteString.Lazy as BSL (toChunks)
import qualified Data.ByteString as BSS (ByteString)
import Control.Distributed.Process.Serializable ()
import Control.Concurrent (forkIO)
import Control.Concurrent.Chan (writeChan)
import Control.Exception (mask_)
import Control.Monad (unless)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Reader (ask)
import qualified Network.Transport as NT
( Connection
, send
, defaultConnectHints
, connect
, Reliability(ReliableOrdered)
, close
)
import Control.Distributed.Process.Internal.Types
( LocalNode(localState, localEndPoint, localCtrlChan)
, withValidLocalState
, modifyValidLocalState
, modifyValidLocalState_
, Identifier
, localConnections
, localConnectionBetween
, nodeAddress
, nodeOf
, messageToPayload
, createMessage
, NCMsg(..)
, ProcessSignal(Died)
, DiedReason(DiedDisconnect)
, ImplicitReconnect(WithImplicitReconnect)
, NodeId(..)
, ProcessId(..)
, LocalNode(..)
, LocalProcess(..)
, Process(..)
, SendPortId(sendPortProcessId)
, Identifier(NodeIdentifier, ProcessIdentifier, SendPortIdentifier)
)
import Control.Distributed.Process.Serializable (Serializable)
import Data.Foldable (forM_)
sendPayload :: LocalNode
-> Identifier
-> Identifier
-> ImplicitReconnect
-> [BSS.ByteString]
-> IO ()
sendPayload node from to implicitReconnect payload = do
mConn <- connBetween node from to implicitReconnect
didSend <- case mConn of
Just conn -> do
didSend <- NT.send conn payload
case didSend of
Left _err -> return False
Right () -> return True
Nothing -> return False
unless didSend $ do
writeChan (localCtrlChan node) NCMsg
{ ctrlMsgSender = to
, ctrlMsgSignal = Died (NodeIdentifier $ nodeOf to) DiedDisconnect
}
sendBinary :: Binary a
=> LocalNode
-> Identifier
-> Identifier
-> ImplicitReconnect
-> a
-> IO ()
sendBinary node from to implicitReconnect
= sendPayload node from to implicitReconnect . BSL.toChunks . encode
sendMessage :: Serializable a
=> LocalNode
-> Identifier
-> Identifier
-> ImplicitReconnect
-> a
-> IO ()
sendMessage node from to implicitReconnect =
sendPayload node from to implicitReconnect . messageToPayload . createMessage
setupConnBetween :: LocalNode
-> Identifier
-> Identifier
-> ImplicitReconnect
-> IO (Maybe NT.Connection)
setupConnBetween node from to implicitReconnect = do
mConn <- NT.connect (localEndPoint node)
(nodeAddress . nodeOf $ to)
NT.ReliableOrdered
NT.defaultConnectHints
case mConn of
Right conn -> do
didSend <- NT.send conn (BSL.toChunks . encode $ to)
case didSend of
Left _ ->
return Nothing
Right () -> do
modifyValidLocalState_ node $
return . (localConnectionBetween from to ^= Just (conn, implicitReconnect))
return $ Just conn
Left _ ->
return Nothing
connBetween :: LocalNode
-> Identifier
-> Identifier
-> ImplicitReconnect
-> IO (Maybe NT.Connection)
connBetween node from to implicitReconnect = do
mConn <- withValidLocalState node $
return . (^. localConnectionBetween from to)
case mConn of
Just (conn, _) ->
return $ Just conn
Nothing ->
setupConnBetween node from to implicitReconnect
disconnect :: LocalNode -> Identifier -> Identifier -> IO ()
disconnect node from to = mask_ $ do
mio <- modifyValidLocalState node $ \vst ->
case vst ^. localConnectionBetween from to of
Nothing ->
return (vst, return ())
Just (conn, _) -> do
return ( localConnectionBetween from to ^= Nothing $ vst
, NT.close conn
)
forM_ mio forkIO
closeImplicitReconnections :: LocalNode -> Identifier -> IO ()
closeImplicitReconnections node to = mask_ $ do
mconns <- modifyValidLocalState node $ \vst -> do
let shouldClose (_, to') (_, WithImplicitReconnect) = to `impliesDeathOf` to'
shouldClose _ _ = False
let (affected, unaffected) =
Map.partitionWithKey shouldClose (vst ^. localConnections)
return ( localConnections ^= unaffected $ vst
, map fst $ Map.elems affected
)
forM_ mconns $ forkIO . mapM_ NT.close
impliesDeathOf :: Identifier
-> Identifier
-> Bool
NodeIdentifier nid `impliesDeathOf` NodeIdentifier nid' =
nid' == nid
NodeIdentifier nid `impliesDeathOf` ProcessIdentifier pid =
processNodeId pid == nid
NodeIdentifier nid `impliesDeathOf` SendPortIdentifier cid =
processNodeId (sendPortProcessId cid) == nid
ProcessIdentifier pid `impliesDeathOf` ProcessIdentifier pid' =
pid' == pid
ProcessIdentifier pid `impliesDeathOf` SendPortIdentifier cid =
sendPortProcessId cid == pid
SendPortIdentifier cid `impliesDeathOf` SendPortIdentifier cid' =
cid' == cid
_ `impliesDeathOf` _ =
False
sendCtrlMsg :: Maybe NodeId
-> ProcessSignal
-> Process ()
sendCtrlMsg mNid signal = do
proc <- ask
let msg = NCMsg { ctrlMsgSender = ProcessIdentifier (processId proc)
, ctrlMsgSignal = signal
}
case mNid of
Nothing -> do
liftIO $ writeChan (localCtrlChan (processNode proc)) $! msg
Just nid ->
liftIO $ sendBinary (processNode proc)
(ProcessIdentifier (processId proc))
(NodeIdentifier nid)
WithImplicitReconnect
msg