module Simulation.Aivika.Distributed.Optimistic.Internal.ConnectionManager
(ConnectionManager,
ConnectionParams(..),
newConnectionManager,
tryAddMessageReceiver,
addMessageReceiver,
removeMessageReceiver,
clearMessageReceivers,
reconnectMessageReceivers,
filterMessageReceivers,
existsMessageReceiver,
trySendKeepAlive,
trySendKeepAliveUTC) where
import qualified Data.Map as M
import qualified Data.Set as S
import Data.Maybe
import Data.Either
import Data.IORef
import Data.Time.Clock
import Data.Word
import Control.Monad
import Control.Monad.Trans
import Control.Concurrent
import qualified Control.Distributed.Process as DP
import Simulation.Aivika.Distributed.Optimistic.Internal.Priority
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
data ConnectionParams =
ConnectionParams { ConnectionParams -> Priority
connLoggingPriority :: Priority,
ConnectionParams -> Int
connKeepAliveInterval :: Int,
ConnectionParams -> Int
connReconnectingDelay :: Int,
ConnectionParams -> Int
connMonitoringDelay :: Int
}
data ConnectionManager =
ConnectionManager { ConnectionManager -> ConnectionParams
connParams :: ConnectionParams,
ConnectionManager -> IORef UTCTime
connKeepAliveTimestamp :: IORef UTCTime,
ConnectionManager
-> IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers :: IORef (M.Map DP.ProcessId ConnectionMessageReceiver)
}
data ConnectionMessageReceiver =
ConnectionMessageReceiver { ConnectionMessageReceiver -> ProcessId
connReceiverProcess :: DP.ProcessId,
ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverNodeMonitor :: IORef (Maybe DP.MonitorRef),
ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverMonitor :: IORef (Maybe DP.MonitorRef)
}
newConnectionManager :: ConnectionParams -> IO ConnectionManager
newConnectionManager :: ConnectionParams -> IO ConnectionManager
newConnectionManager ConnectionParams
ps =
do IORef UTCTime
timestamp <- IO UTCTime
getCurrentTime IO UTCTime -> (UTCTime -> IO (IORef UTCTime)) -> IO (IORef UTCTime)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= UTCTime -> IO (IORef UTCTime)
forall a. a -> IO (IORef a)
newIORef
IORef (Map ProcessId ConnectionMessageReceiver)
receivers <- Map ProcessId ConnectionMessageReceiver
-> IO (IORef (Map ProcessId ConnectionMessageReceiver))
forall a. a -> IO (IORef a)
newIORef Map ProcessId ConnectionMessageReceiver
forall k a. Map k a
M.empty
ConnectionManager -> IO ConnectionManager
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ConnectionManager { connParams :: ConnectionParams
connParams = ConnectionParams
ps,
connKeepAliveTimestamp :: IORef UTCTime
connKeepAliveTimestamp = IORef UTCTime
timestamp,
connReceivers :: IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers = IORef (Map ProcessId ConnectionMessageReceiver)
receivers }
tryAddMessageReceiver :: ConnectionManager -> DP.ProcessId -> DP.Process Bool
tryAddMessageReceiver :: ConnectionManager -> ProcessId -> Process Bool
tryAddMessageReceiver ConnectionManager
manager ProcessId
pid =
do Bool
f <- IO Bool -> Process Bool
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> Process Bool) -> IO Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$
ConnectionManager -> ProcessId -> IO Bool
existsMessageReceiver ConnectionManager
manager ProcessId
pid
if Bool
f
then Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else do ConnectionManager -> ProcessId -> Process ()
addMessageReceiver ConnectionManager
manager ProcessId
pid
Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
addMessageReceiver :: ConnectionManager -> DP.ProcessId -> DP.Process ()
addMessageReceiver :: ConnectionManager -> ProcessId -> Process ()
addMessageReceiver ConnectionManager
manager ProcessId
pid =
do IORef (Maybe MonitorRef)
r1 <- IO (IORef (Maybe MonitorRef)) -> Process (IORef (Maybe MonitorRef))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe MonitorRef))
-> Process (IORef (Maybe MonitorRef)))
-> IO (IORef (Maybe MonitorRef))
-> Process (IORef (Maybe MonitorRef))
forall a b. (a -> b) -> a -> b
$ Maybe MonitorRef -> IO (IORef (Maybe MonitorRef))
forall a. a -> IO (IORef a)
newIORef Maybe MonitorRef
forall a. Maybe a
Nothing
IORef (Maybe MonitorRef)
r2 <- IO (IORef (Maybe MonitorRef)) -> Process (IORef (Maybe MonitorRef))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe MonitorRef))
-> Process (IORef (Maybe MonitorRef)))
-> IO (IORef (Maybe MonitorRef))
-> Process (IORef (Maybe MonitorRef))
forall a b. (a -> b) -> a -> b
$ Maybe MonitorRef -> IO (IORef (Maybe MonitorRef))
forall a. a -> IO (IORef a)
newIORef Maybe MonitorRef
forall a. Maybe a
Nothing
let x :: ConnectionMessageReceiver
x = ConnectionMessageReceiver { connReceiverProcess :: ProcessId
connReceiverProcess = ProcessId
pid,
connReceiverNodeMonitor :: IORef (Maybe MonitorRef)
connReceiverNodeMonitor = IORef (Maybe MonitorRef)
r1,
connReceiverMonitor :: IORef (Maybe MonitorRef)
connReceiverMonitor = IORef (Maybe MonitorRef)
r2 }
ConnectionManager -> ConnectionMessageReceiver -> Process ()
monitorMessageReceiver ConnectionManager
manager ConnectionMessageReceiver
x
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
IORef (Map ProcessId ConnectionMessageReceiver)
-> (Map ProcessId ConnectionMessageReceiver
-> Map ProcessId ConnectionMessageReceiver)
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (ConnectionManager
-> IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers ConnectionManager
manager) ((Map ProcessId ConnectionMessageReceiver
-> Map ProcessId ConnectionMessageReceiver)
-> IO ())
-> (Map ProcessId ConnectionMessageReceiver
-> Map ProcessId ConnectionMessageReceiver)
-> IO ()
forall a b. (a -> b) -> a -> b
$
ProcessId
-> ConnectionMessageReceiver
-> Map ProcessId ConnectionMessageReceiver
-> Map ProcessId ConnectionMessageReceiver
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert ProcessId
pid ConnectionMessageReceiver
x
removeMessageReceiver :: ConnectionManager -> DP.ProcessId -> DP.Process ()
removeMessageReceiver :: ConnectionManager -> ProcessId -> Process ()
removeMessageReceiver ConnectionManager
manager ProcessId
pid =
do Map ProcessId ConnectionMessageReceiver
rs <- IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver))
-> IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver)
forall a b. (a -> b) -> a -> b
$ IORef (Map ProcessId ConnectionMessageReceiver)
-> IO (Map ProcessId ConnectionMessageReceiver)
forall a. IORef a -> IO a
readIORef (ConnectionManager
-> IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers ConnectionManager
manager)
case ProcessId
-> Map ProcessId ConnectionMessageReceiver
-> Maybe ConnectionMessageReceiver
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup ProcessId
pid Map ProcessId ConnectionMessageReceiver
rs of
Maybe ConnectionMessageReceiver
Nothing ->
ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Could not find the monitored process " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
Just ConnectionMessageReceiver
r ->
do ConnectionManager -> ConnectionMessageReceiver -> Process ()
unmonitorMessageReceiver ConnectionManager
manager ConnectionMessageReceiver
r
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
IORef (Map ProcessId ConnectionMessageReceiver)
-> (Map ProcessId ConnectionMessageReceiver
-> Map ProcessId ConnectionMessageReceiver)
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (ConnectionManager
-> IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers ConnectionManager
manager) ((Map ProcessId ConnectionMessageReceiver
-> Map ProcessId ConnectionMessageReceiver)
-> IO ())
-> (Map ProcessId ConnectionMessageReceiver
-> Map ProcessId ConnectionMessageReceiver)
-> IO ()
forall a b. (a -> b) -> a -> b
$
ProcessId
-> Map ProcessId ConnectionMessageReceiver
-> Map ProcessId ConnectionMessageReceiver
forall k a. Ord k => k -> Map k a -> Map k a
M.delete ProcessId
pid
clearMessageReceivers :: ConnectionManager -> DP.Process ()
clearMessageReceivers :: ConnectionManager -> Process ()
clearMessageReceivers ConnectionManager
manager =
do Map ProcessId ConnectionMessageReceiver
rs <- IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver))
-> IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver)
forall a b. (a -> b) -> a -> b
$ IORef (Map ProcessId ConnectionMessageReceiver)
-> IO (Map ProcessId ConnectionMessageReceiver)
forall a. IORef a -> IO a
readIORef (ConnectionManager
-> IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers ConnectionManager
manager)
[ConnectionMessageReceiver]
-> (ConnectionMessageReceiver -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Map ProcessId ConnectionMessageReceiver
-> [ConnectionMessageReceiver]
forall k a. Map k a -> [a]
M.elems Map ProcessId ConnectionMessageReceiver
rs) ((ConnectionMessageReceiver -> Process ()) -> Process ())
-> (ConnectionMessageReceiver -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \ConnectionMessageReceiver
r ->
do let pid :: ProcessId
pid = ConnectionMessageReceiver -> ProcessId
connReceiverProcess ConnectionMessageReceiver
r
ConnectionManager -> ProcessId -> Process ()
removeMessageReceiver ConnectionManager
manager ProcessId
pid
reconnectMessageReceivers :: ConnectionManager -> [DP.ProcessId] -> DP.Process ()
reconnectMessageReceivers :: ConnectionManager -> [ProcessId] -> Process ()
reconnectMessageReceivers ConnectionManager
manager [ProcessId]
pids =
do [ConnectionMessageReceiver]
rs <- ConnectionManager
-> [ProcessId] -> Process [ConnectionMessageReceiver]
messageReceivers ConnectionManager
manager [ProcessId]
pids
Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([ConnectionMessageReceiver] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ConnectionMessageReceiver]
rs) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
do [ConnectionMessageReceiver]
-> (ConnectionMessageReceiver -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [ConnectionMessageReceiver]
rs ((ConnectionMessageReceiver -> Process ()) -> Process ())
-> (ConnectionMessageReceiver -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$
ConnectionManager -> ConnectionMessageReceiver -> Process ()
unmonitorMessageReceiver ConnectionManager
manager
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
Int -> IO ()
threadDelay (ConnectionParams -> Int
connReconnectingDelay (ConnectionParams -> Int) -> ConnectionParams -> Int
forall a b. (a -> b) -> a -> b
$ ConnectionManager -> ConnectionParams
connParams ConnectionManager
manager)
ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
NOTICE String
"Begin reconnecting..."
[ConnectionMessageReceiver]
-> (ConnectionMessageReceiver -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [ConnectionMessageReceiver]
rs ((ConnectionMessageReceiver -> Process ()) -> Process ())
-> (ConnectionMessageReceiver -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$
ConnectionManager -> ConnectionMessageReceiver -> Process ()
reconnectToMessageReceiver ConnectionManager
manager
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$
Int -> IO ()
threadDelay (ConnectionParams -> Int
connMonitoringDelay (ConnectionParams -> Int) -> ConnectionParams -> Int
forall a b. (a -> b) -> a -> b
$ ConnectionManager -> ConnectionParams
connParams ConnectionManager
manager)
ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
NOTICE String
"Begin remonitoring..."
[ConnectionMessageReceiver]
-> (ConnectionMessageReceiver -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [ConnectionMessageReceiver]
rs ((ConnectionMessageReceiver -> Process ()) -> Process ())
-> (ConnectionMessageReceiver -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$
ConnectionManager -> ConnectionMessageReceiver -> Process ()
monitorMessageReceiver ConnectionManager
manager
unmonitorMessageReceiver :: ConnectionManager -> ConnectionMessageReceiver -> DP.Process ()
unmonitorMessageReceiver :: ConnectionManager -> ConnectionMessageReceiver -> Process ()
unmonitorMessageReceiver ConnectionManager
manager ConnectionMessageReceiver
r =
do ConnectionManager -> ConnectionMessageReceiver -> Process ()
unmonitorMessageReceiverProcess ConnectionManager
manager ConnectionMessageReceiver
r
ConnectionManager -> ConnectionMessageReceiver -> Process ()
unmonitorMessageReceiverNode ConnectionManager
manager ConnectionMessageReceiver
r
monitorMessageReceiver :: ConnectionManager -> ConnectionMessageReceiver -> DP.Process ()
monitorMessageReceiver :: ConnectionManager -> ConnectionMessageReceiver -> Process ()
monitorMessageReceiver ConnectionManager
manager ConnectionMessageReceiver
r =
do ConnectionManager -> ConnectionMessageReceiver -> Process ()
monitorMessageReceiverNode ConnectionManager
manager ConnectionMessageReceiver
r
ConnectionManager -> ConnectionMessageReceiver -> Process ()
monitorMessageReceiverProcess ConnectionManager
manager ConnectionMessageReceiver
r
unmonitorMessageReceiverProcess :: ConnectionManager -> ConnectionMessageReceiver -> DP.Process ()
unmonitorMessageReceiverProcess :: ConnectionManager -> ConnectionMessageReceiver -> Process ()
unmonitorMessageReceiverProcess ConnectionManager
manager ConnectionMessageReceiver
r =
do let pid :: ProcessId
pid = ConnectionMessageReceiver -> ProcessId
connReceiverProcess ConnectionMessageReceiver
r
Maybe MonitorRef
ref <- IO (Maybe MonitorRef) -> Process (Maybe MonitorRef)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe MonitorRef) -> Process (Maybe MonitorRef))
-> IO (Maybe MonitorRef) -> Process (Maybe MonitorRef)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe MonitorRef) -> IO (Maybe MonitorRef)
forall a. IORef a -> IO a
readIORef (ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverMonitor ConnectionMessageReceiver
r)
case Maybe MonitorRef
ref of
Just MonitorRef
m ->
do ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
NOTICE (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Unmonitoring process " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
MonitorRef -> Process ()
DP.unmonitor MonitorRef
m
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe MonitorRef) -> Maybe MonitorRef -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverMonitor ConnectionMessageReceiver
r) Maybe MonitorRef
forall a. Maybe a
Nothing
Maybe MonitorRef
Nothing ->
ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Could not find the monitor reference for process " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
monitorMessageReceiverProcess :: ConnectionManager -> ConnectionMessageReceiver -> DP.Process ()
monitorMessageReceiverProcess :: ConnectionManager -> ConnectionMessageReceiver -> Process ()
monitorMessageReceiverProcess ConnectionManager
manager ConnectionMessageReceiver
r =
do let pid :: ProcessId
pid = ConnectionMessageReceiver -> ProcessId
connReceiverProcess ConnectionMessageReceiver
r
Maybe MonitorRef
ref <- IO (Maybe MonitorRef) -> Process (Maybe MonitorRef)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe MonitorRef) -> Process (Maybe MonitorRef))
-> IO (Maybe MonitorRef) -> Process (Maybe MonitorRef)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe MonitorRef) -> IO (Maybe MonitorRef)
forall a. IORef a -> IO a
readIORef (ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverMonitor ConnectionMessageReceiver
r)
case Maybe MonitorRef
ref of
Maybe MonitorRef
Nothing ->
do ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
NOTICE (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Monitoring process " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
MonitorRef
x <- ProcessId -> Process MonitorRef
DP.monitor ProcessId
pid
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe MonitorRef) -> Maybe MonitorRef -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverMonitor ConnectionMessageReceiver
r) (MonitorRef -> Maybe MonitorRef
forall a. a -> Maybe a
Just MonitorRef
x)
Just MonitorRef
x0 ->
do ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Re-monitoring process " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
MonitorRef
x <- ProcessId -> Process MonitorRef
DP.monitor ProcessId
pid
MonitorRef -> Process ()
DP.unmonitor MonitorRef
x0
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe MonitorRef) -> Maybe MonitorRef -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverMonitor ConnectionMessageReceiver
r) (MonitorRef -> Maybe MonitorRef
forall a. a -> Maybe a
Just MonitorRef
x)
unmonitorMessageReceiverNode :: ConnectionManager -> ConnectionMessageReceiver -> DP.Process ()
unmonitorMessageReceiverNode :: ConnectionManager -> ConnectionMessageReceiver -> Process ()
unmonitorMessageReceiverNode ConnectionManager
manager ConnectionMessageReceiver
r =
do let nid :: NodeId
nid = ProcessId -> NodeId
DP.processNodeId (ProcessId -> NodeId) -> ProcessId -> NodeId
forall a b. (a -> b) -> a -> b
$ ConnectionMessageReceiver -> ProcessId
connReceiverProcess ConnectionMessageReceiver
r
Maybe MonitorRef
ref <- IO (Maybe MonitorRef) -> Process (Maybe MonitorRef)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe MonitorRef) -> Process (Maybe MonitorRef))
-> IO (Maybe MonitorRef) -> Process (Maybe MonitorRef)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe MonitorRef) -> IO (Maybe MonitorRef)
forall a. IORef a -> IO a
readIORef (ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverNodeMonitor ConnectionMessageReceiver
r)
case Maybe MonitorRef
ref of
Just MonitorRef
m ->
do ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
NOTICE (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Unmonitoring node " String -> String -> String
forall a. [a] -> [a] -> [a]
++ NodeId -> String
forall a. Show a => a -> String
show NodeId
nid
MonitorRef -> Process ()
DP.unmonitor MonitorRef
m
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe MonitorRef) -> Maybe MonitorRef -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverNodeMonitor ConnectionMessageReceiver
r) Maybe MonitorRef
forall a. Maybe a
Nothing
Maybe MonitorRef
Nothing ->
ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Could not find the monitor reference for node " String -> String -> String
forall a. [a] -> [a] -> [a]
++ NodeId -> String
forall a. Show a => a -> String
show NodeId
nid
monitorMessageReceiverNode :: ConnectionManager -> ConnectionMessageReceiver -> DP.Process ()
monitorMessageReceiverNode :: ConnectionManager -> ConnectionMessageReceiver -> Process ()
monitorMessageReceiverNode ConnectionManager
manager ConnectionMessageReceiver
r =
do let nid :: NodeId
nid = ProcessId -> NodeId
DP.processNodeId (ProcessId -> NodeId) -> ProcessId -> NodeId
forall a b. (a -> b) -> a -> b
$ ConnectionMessageReceiver -> ProcessId
connReceiverProcess ConnectionMessageReceiver
r
Maybe MonitorRef
ref <- IO (Maybe MonitorRef) -> Process (Maybe MonitorRef)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe MonitorRef) -> Process (Maybe MonitorRef))
-> IO (Maybe MonitorRef) -> Process (Maybe MonitorRef)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe MonitorRef) -> IO (Maybe MonitorRef)
forall a. IORef a -> IO a
readIORef (ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverNodeMonitor ConnectionMessageReceiver
r)
case Maybe MonitorRef
ref of
Maybe MonitorRef
Nothing ->
do ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
NOTICE (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Monitoring node " String -> String -> String
forall a. [a] -> [a] -> [a]
++ NodeId -> String
forall a. Show a => a -> String
show NodeId
nid
MonitorRef
x <- NodeId -> Process MonitorRef
DP.monitorNode NodeId
nid
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe MonitorRef) -> Maybe MonitorRef -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverNodeMonitor ConnectionMessageReceiver
r) (MonitorRef -> Maybe MonitorRef
forall a. a -> Maybe a
Just MonitorRef
x)
Just MonitorRef
x0 ->
do ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Re-monitoring node " String -> String -> String
forall a. [a] -> [a] -> [a]
++ NodeId -> String
forall a. Show a => a -> String
show NodeId
nid
MonitorRef
x <- NodeId -> Process MonitorRef
DP.monitorNode NodeId
nid
MonitorRef -> Process ()
DP.unmonitor MonitorRef
x0
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe MonitorRef) -> Maybe MonitorRef -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverNodeMonitor ConnectionMessageReceiver
r) (MonitorRef -> Maybe MonitorRef
forall a. a -> Maybe a
Just MonitorRef
x)
reconnectToMessageReceiver :: ConnectionManager -> ConnectionMessageReceiver -> DP.Process ()
reconnectToMessageReceiver :: ConnectionManager -> ConnectionMessageReceiver -> Process ()
reconnectToMessageReceiver ConnectionManager
manager ConnectionMessageReceiver
r =
do let pid :: ProcessId
pid = ConnectionMessageReceiver -> ProcessId
connReceiverProcess ConnectionMessageReceiver
r
ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
NOTICE (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Direct reconnecting to " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
ProcessId -> Process ()
DP.reconnect ProcessId
pid
existsMessageReceiver :: ConnectionManager -> DP.ProcessId -> IO Bool
existsMessageReceiver :: ConnectionManager -> ProcessId -> IO Bool
existsMessageReceiver ConnectionManager
manager ProcessId
pid =
IORef (Map ProcessId ConnectionMessageReceiver)
-> IO (Map ProcessId ConnectionMessageReceiver)
forall a. IORef a -> IO a
readIORef (ConnectionManager
-> IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers ConnectionManager
manager) IO (Map ProcessId ConnectionMessageReceiver)
-> (Map ProcessId ConnectionMessageReceiver -> IO Bool) -> IO Bool
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool)
-> (Map ProcessId ConnectionMessageReceiver -> Bool)
-> Map ProcessId ConnectionMessageReceiver
-> IO Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessId -> Map ProcessId ConnectionMessageReceiver -> Bool
forall k a. Ord k => k -> Map k a -> Bool
M.member ProcessId
pid
trySendKeepAlive :: ConnectionManager -> DP.Process ()
trySendKeepAlive :: ConnectionManager -> Process ()
trySendKeepAlive ConnectionManager
manager =
do Bool
empty <- IO Bool -> Process Bool
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> Process Bool) -> IO Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ (Map ProcessId ConnectionMessageReceiver -> Bool)
-> IO (Map ProcessId ConnectionMessageReceiver) -> IO Bool
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Map ProcessId ConnectionMessageReceiver -> Bool
forall k a. Map k a -> Bool
M.null (IO (Map ProcessId ConnectionMessageReceiver) -> IO Bool)
-> IO (Map ProcessId ConnectionMessageReceiver) -> IO Bool
forall a b. (a -> b) -> a -> b
$ IORef (Map ProcessId ConnectionMessageReceiver)
-> IO (Map ProcessId ConnectionMessageReceiver)
forall a. IORef a -> IO a
readIORef (ConnectionManager
-> IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers ConnectionManager
manager)
Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
empty (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
do UTCTime
utc <- IO UTCTime -> Process UTCTime
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
ConnectionManager -> UTCTime -> Process ()
trySendKeepAliveUTC ConnectionManager
manager UTCTime
utc
trySendKeepAliveUTC :: ConnectionManager -> UTCTime -> DP.Process ()
trySendKeepAliveUTC :: ConnectionManager -> UTCTime -> Process ()
trySendKeepAliveUTC ConnectionManager
manager UTCTime
utc =
do Bool
empty <- IO Bool -> Process Bool
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> Process Bool) -> IO Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ (Map ProcessId ConnectionMessageReceiver -> Bool)
-> IO (Map ProcessId ConnectionMessageReceiver) -> IO Bool
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Map ProcessId ConnectionMessageReceiver -> Bool
forall k a. Map k a -> Bool
M.null (IO (Map ProcessId ConnectionMessageReceiver) -> IO Bool)
-> IO (Map ProcessId ConnectionMessageReceiver) -> IO Bool
forall a b. (a -> b) -> a -> b
$ IORef (Map ProcessId ConnectionMessageReceiver)
-> IO (Map ProcessId ConnectionMessageReceiver)
forall a. IORef a -> IO a
readIORef (ConnectionManager
-> IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers ConnectionManager
manager)
Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
empty (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
do Bool
f <- IO Bool -> Process Bool
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> Process Bool) -> IO Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ ConnectionManager -> UTCTime -> IO Bool
shouldSendKeepAlive ConnectionManager
manager UTCTime
utc
Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
f (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
do
ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
INFO (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
String
"Sending keep-alive messages"
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef UTCTime -> UTCTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (ConnectionManager -> IORef UTCTime
connKeepAliveTimestamp ConnectionManager
manager) UTCTime
utc
Map ProcessId ConnectionMessageReceiver
rs <- IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver))
-> IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver)
forall a b. (a -> b) -> a -> b
$ IORef (Map ProcessId ConnectionMessageReceiver)
-> IO (Map ProcessId ConnectionMessageReceiver)
forall a. IORef a -> IO a
readIORef (ConnectionManager
-> IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers ConnectionManager
manager)
Map ProcessId ConnectionMessageReceiver
-> (ConnectionMessageReceiver -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Map ProcessId ConnectionMessageReceiver
rs ((ConnectionMessageReceiver -> Process ()) -> Process ())
-> (ConnectionMessageReceiver -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \ConnectionMessageReceiver
r ->
do let pid :: ProcessId
pid = ConnectionMessageReceiver -> ProcessId
connReceiverProcess ConnectionMessageReceiver
r
ProcessId -> GeneralMessage -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
DP.usend ProcessId
pid GeneralMessage
KeepAliveMessage
shouldSendKeepAlive :: ConnectionManager -> UTCTime -> IO Bool
shouldSendKeepAlive :: ConnectionManager -> UTCTime -> IO Bool
shouldSendKeepAlive ConnectionManager
manager UTCTime
utc =
do UTCTime
utc0 <- IORef UTCTime -> IO UTCTime
forall a. IORef a -> IO a
readIORef (ConnectionManager -> IORef UTCTime
connKeepAliveTimestamp ConnectionManager
manager)
let dt :: Double
dt = Rational -> Double
forall a. Fractional a => Rational -> a
fromRational (Rational -> Double) -> Rational -> Double
forall a b. (a -> b) -> a -> b
$ NominalDiffTime -> Rational
forall a. Real a => a -> Rational
toRational (UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
utc UTCTime
utc0)
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$
Double -> Int
secondsToMicroseconds Double
dt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> (ConnectionParams -> Int
connKeepAliveInterval (ConnectionParams -> Int) -> ConnectionParams -> Int
forall a b. (a -> b) -> a -> b
$ ConnectionManager -> ConnectionParams
connParams ConnectionManager
manager)
secondsToMicroseconds :: Double -> Int
secondsToMicroseconds :: Double -> Int
secondsToMicroseconds Double
x = Integer -> Int
forall a. Num a => Integer -> a
fromInteger (Integer -> Int) -> Integer -> Int
forall a b. (a -> b) -> a -> b
$ Integer -> Integer
forall a. Integral a => a -> Integer
toInteger (Integer -> Integer) -> Integer -> Integer
forall a b. (a -> b) -> a -> b
$ Double -> Integer
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
1000000 Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
x)
messageReceivers :: ConnectionManager -> [DP.ProcessId] -> DP.Process [ConnectionMessageReceiver]
messageReceivers :: ConnectionManager
-> [ProcessId] -> Process [ConnectionMessageReceiver]
messageReceivers ConnectionManager
manager [ProcessId]
pids =
do Map ProcessId ConnectionMessageReceiver
rs <- IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver))
-> IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver)
forall a b. (a -> b) -> a -> b
$ IORef (Map ProcessId ConnectionMessageReceiver)
-> IO (Map ProcessId ConnectionMessageReceiver)
forall a. IORef a -> IO a
readIORef (ConnectionManager
-> IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers ConnectionManager
manager)
([[ConnectionMessageReceiver]] -> [ConnectionMessageReceiver])
-> Process [[ConnectionMessageReceiver]]
-> Process [ConnectionMessageReceiver]
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [[ConnectionMessageReceiver]] -> [ConnectionMessageReceiver]
forall a. Monoid a => [a] -> a
mconcat (Process [[ConnectionMessageReceiver]]
-> Process [ConnectionMessageReceiver])
-> Process [[ConnectionMessageReceiver]]
-> Process [ConnectionMessageReceiver]
forall a b. (a -> b) -> a -> b
$
[ProcessId]
-> (ProcessId -> Process [ConnectionMessageReceiver])
-> Process [[ConnectionMessageReceiver]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [ProcessId]
pids ((ProcessId -> Process [ConnectionMessageReceiver])
-> Process [[ConnectionMessageReceiver]])
-> (ProcessId -> Process [ConnectionMessageReceiver])
-> Process [[ConnectionMessageReceiver]]
forall a b. (a -> b) -> a -> b
$ \ProcessId
pid ->
case ProcessId
-> Map ProcessId ConnectionMessageReceiver
-> Maybe ConnectionMessageReceiver
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup ProcessId
pid Map ProcessId ConnectionMessageReceiver
rs of
Just ConnectionMessageReceiver
x -> [ConnectionMessageReceiver] -> Process [ConnectionMessageReceiver]
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return [ConnectionMessageReceiver
x]
Maybe ConnectionMessageReceiver
Nothing ->
do ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Could not find the monitored process " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
[ConnectionMessageReceiver] -> Process [ConnectionMessageReceiver]
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return []
filterMessageReceivers :: ConnectionManager -> [DP.ProcessMonitorNotification] -> DP.Process [DP.ProcessId]
filterMessageReceivers :: ConnectionManager
-> [ProcessMonitorNotification] -> Process [ProcessId]
filterMessageReceivers ConnectionManager
manager [ProcessMonitorNotification]
ms =
do Map ProcessId ConnectionMessageReceiver
rs <- IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver))
-> IO (Map ProcessId ConnectionMessageReceiver)
-> Process (Map ProcessId ConnectionMessageReceiver)
forall a b. (a -> b) -> a -> b
$ IORef (Map ProcessId ConnectionMessageReceiver)
-> IO (Map ProcessId ConnectionMessageReceiver)
forall a. IORef a -> IO a
readIORef (ConnectionManager
-> IORef (Map ProcessId ConnectionMessageReceiver)
connReceivers ConnectionManager
manager)
([[ProcessId]] -> [ProcessId])
-> Process [[ProcessId]] -> Process [ProcessId]
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Set ProcessId -> [ProcessId]
forall a. Set a -> [a]
S.toList (Set ProcessId -> [ProcessId])
-> ([[ProcessId]] -> Set ProcessId) -> [[ProcessId]] -> [ProcessId]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [ProcessId] -> Set ProcessId
forall a. Ord a => [a] -> Set a
S.fromList ([ProcessId] -> Set ProcessId)
-> ([[ProcessId]] -> [ProcessId]) -> [[ProcessId]] -> Set ProcessId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [[ProcessId]] -> [ProcessId]
forall a. Monoid a => [a] -> a
mconcat) (Process [[ProcessId]] -> Process [ProcessId])
-> Process [[ProcessId]] -> Process [ProcessId]
forall a b. (a -> b) -> a -> b
$
[ProcessMonitorNotification]
-> (ProcessMonitorNotification -> Process [ProcessId])
-> Process [[ProcessId]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [ProcessMonitorNotification]
ms ((ProcessMonitorNotification -> Process [ProcessId])
-> Process [[ProcessId]])
-> (ProcessMonitorNotification -> Process [ProcessId])
-> Process [[ProcessId]]
forall a b. (a -> b) -> a -> b
$ \(DP.ProcessMonitorNotification MonitorRef
ref ProcessId
pid DiedReason
_) ->
case ProcessId
-> Map ProcessId ConnectionMessageReceiver
-> Maybe ConnectionMessageReceiver
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup ProcessId
pid Map ProcessId ConnectionMessageReceiver
rs of
Maybe ConnectionMessageReceiver
Nothing ->
do ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
WARNING (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Could not find the monitored process " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
[ProcessId] -> Process [ProcessId]
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return []
Just ConnectionMessageReceiver
x ->
do Maybe MonitorRef
ref0 <- IO (Maybe MonitorRef) -> Process (Maybe MonitorRef)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe MonitorRef) -> Process (Maybe MonitorRef))
-> IO (Maybe MonitorRef) -> Process (Maybe MonitorRef)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe MonitorRef) -> IO (Maybe MonitorRef)
forall a. IORef a -> IO a
readIORef (ConnectionMessageReceiver -> IORef (Maybe MonitorRef)
connReceiverMonitor ConnectionMessageReceiver
x)
if Maybe MonitorRef
ref0 Maybe MonitorRef -> Maybe MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef -> Maybe MonitorRef
forall a. a -> Maybe a
Just MonitorRef
ref
then [ProcessId] -> Process [ProcessId]
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return [ProcessId
pid]
else do ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
NOTICE (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Received the old monitor reference for process " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid
[ProcessId] -> Process [ProcessId]
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return []
logConnectionManager :: ConnectionManager -> Priority -> String -> DP.Process ()
{-# INLINE logConnectionManager #-}
logConnectionManager :: ConnectionManager -> Priority -> String -> Process ()
logConnectionManager ConnectionManager
manager Priority
p String
message =
Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnectionParams -> Priority
connLoggingPriority (ConnectionManager -> ConnectionParams
connParams ConnectionManager
manager) Priority -> Priority -> Bool
forall a. Ord a => a -> a -> Bool
<= Priority
p) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
String -> Process ()
DP.say (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$
Priority -> String
embracePriority Priority
p String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
message