module Simulation.Aivika.Distributed.Optimistic.Internal.ConnectionManager
(ConnectionManager,
ConnectionParams(..),
newConnectionManager,
tryAddMessageReceiver,
addMessageReceiver,
removeMessageReceiver,
clearMessageReceivers,
reconnectMessageReceivers,
filterMessageReceivers,
existsMessageReceiver,
trySendKeepAlive,
trySendKeepAliveUTC) where
import qualified Data.Map as M
import qualified Data.Set as S
import Data.Maybe
import Data.Either
import Data.IORef
import Data.Time.Clock
import Data.Word
import Control.Monad
import Control.Monad.Trans
import Control.Concurrent
import qualified Control.Distributed.Process as DP
import Simulation.Aivika.Distributed.Optimistic.Internal.Priority
import Simulation.Aivika.Distributed.Optimistic.Internal.Message
data ConnectionParams =
ConnectionParams { connLoggingPriority :: Priority,
connKeepAliveInterval :: Int,
connReconnectingDelay :: Int,
connMonitoringDelay :: Int
}
data ConnectionManager =
ConnectionManager { connParams :: ConnectionParams,
connKeepAliveTimestamp :: IORef UTCTime,
connReceivers :: IORef (M.Map DP.ProcessId ConnectionMessageReceiver)
}
data ConnectionMessageReceiver =
ConnectionMessageReceiver { connReceiverProcess :: DP.ProcessId,
connReceiverNodeMonitor :: IORef (Maybe DP.MonitorRef),
connReceiverMonitor :: IORef (Maybe DP.MonitorRef)
}
newConnectionManager :: ConnectionParams -> IO ConnectionManager
newConnectionManager ps =
do timestamp <- getCurrentTime >>= newIORef
receivers <- newIORef M.empty
return ConnectionManager { connParams = ps,
connKeepAliveTimestamp = timestamp,
connReceivers = receivers }
tryAddMessageReceiver :: ConnectionManager -> DP.ProcessId -> DP.Process Bool
tryAddMessageReceiver manager pid =
do f <- liftIO $
existsMessageReceiver manager pid
if f
then return False
else do addMessageReceiver manager pid
return True
addMessageReceiver :: ConnectionManager -> DP.ProcessId -> DP.Process ()
addMessageReceiver manager pid =
do r1 <- liftIO $ newIORef Nothing
r2 <- liftIO $ newIORef Nothing
let x = ConnectionMessageReceiver { connReceiverProcess = pid,
connReceiverNodeMonitor = r1,
connReceiverMonitor = r2 }
monitorMessageReceiver manager x
liftIO $
modifyIORef (connReceivers manager) $
M.insert pid x
removeMessageReceiver :: ConnectionManager -> DP.ProcessId -> DP.Process ()
removeMessageReceiver manager pid =
do rs <- liftIO $ readIORef (connReceivers manager)
case M.lookup pid rs of
Nothing ->
logConnectionManager manager WARNING $ "Could not find the monitored process " ++ show pid
Just r ->
do unmonitorMessageReceiver manager r
liftIO $
modifyIORef (connReceivers manager) $
M.delete pid
clearMessageReceivers :: ConnectionManager -> DP.Process ()
clearMessageReceivers manager =
do rs <- liftIO $ readIORef (connReceivers manager)
forM_ (M.elems rs) $ \r ->
do let pid = connReceiverProcess r
removeMessageReceiver manager pid
reconnectMessageReceivers :: ConnectionManager -> [DP.ProcessId] -> DP.Process ()
reconnectMessageReceivers manager pids =
do rs <- messageReceivers manager pids
unless (null rs) $
do forM_ rs $
unmonitorMessageReceiver manager
liftIO $
threadDelay (connReconnectingDelay $ connParams manager)
logConnectionManager manager NOTICE "Begin reconnecting..."
forM_ rs $
reconnectToMessageReceiver manager
liftIO $
threadDelay (connMonitoringDelay $ connParams manager)
logConnectionManager manager NOTICE "Begin remonitoring..."
forM_ rs $
monitorMessageReceiver manager
unmonitorMessageReceiver :: ConnectionManager -> ConnectionMessageReceiver -> DP.Process ()
unmonitorMessageReceiver manager r =
do unmonitorMessageReceiverProcess manager r
unmonitorMessageReceiverNode manager r
monitorMessageReceiver :: ConnectionManager -> ConnectionMessageReceiver -> DP.Process ()
monitorMessageReceiver manager r =
do monitorMessageReceiverNode manager r
monitorMessageReceiverProcess manager r
unmonitorMessageReceiverProcess :: ConnectionManager -> ConnectionMessageReceiver -> DP.Process ()
unmonitorMessageReceiverProcess manager r =
do let pid = connReceiverProcess r
ref <- liftIO $ readIORef (connReceiverMonitor r)
case ref of
Just m ->
do logConnectionManager manager NOTICE $ "Unmonitoring process " ++ show pid
DP.unmonitor m
liftIO $ writeIORef (connReceiverMonitor r) Nothing
Nothing ->
logConnectionManager manager WARNING $ "Could not find the monitor reference for process " ++ show pid
monitorMessageReceiverProcess :: ConnectionManager -> ConnectionMessageReceiver -> DP.Process ()
monitorMessageReceiverProcess manager r =
do let pid = connReceiverProcess r
ref <- liftIO $ readIORef (connReceiverMonitor r)
case ref of
Nothing ->
do logConnectionManager manager NOTICE $ "Monitoring process " ++ show pid
x <- DP.monitor pid
liftIO $ writeIORef (connReceiverMonitor r) (Just x)
Just x0 ->
do logConnectionManager manager WARNING $ "Re-monitoring process " ++ show pid
x <- DP.monitor pid
DP.unmonitor x0
liftIO $ writeIORef (connReceiverMonitor r) (Just x)
unmonitorMessageReceiverNode :: ConnectionManager -> ConnectionMessageReceiver -> DP.Process ()
unmonitorMessageReceiverNode manager r =
do let nid = DP.processNodeId $ connReceiverProcess r
ref <- liftIO $ readIORef (connReceiverNodeMonitor r)
case ref of
Just m ->
do logConnectionManager manager NOTICE $ "Unmonitoring node " ++ show nid
DP.unmonitor m
liftIO $ writeIORef (connReceiverNodeMonitor r) Nothing
Nothing ->
logConnectionManager manager WARNING $ "Could not find the monitor reference for node " ++ show nid
monitorMessageReceiverNode :: ConnectionManager -> ConnectionMessageReceiver -> DP.Process ()
monitorMessageReceiverNode manager r =
do let nid = DP.processNodeId $ connReceiverProcess r
ref <- liftIO $ readIORef (connReceiverNodeMonitor r)
case ref of
Nothing ->
do logConnectionManager manager NOTICE $ "Monitoring node " ++ show nid
x <- DP.monitorNode nid
liftIO $ writeIORef (connReceiverNodeMonitor r) (Just x)
Just x0 ->
do logConnectionManager manager WARNING $ "Re-monitoring node " ++ show nid
x <- DP.monitorNode nid
DP.unmonitor x0
liftIO $ writeIORef (connReceiverNodeMonitor r) (Just x)
reconnectToMessageReceiver :: ConnectionManager -> ConnectionMessageReceiver -> DP.Process ()
reconnectToMessageReceiver manager r =
do let pid = connReceiverProcess r
logConnectionManager manager NOTICE $ "Direct reconnecting to " ++ show pid
DP.reconnect pid
existsMessageReceiver :: ConnectionManager -> DP.ProcessId -> IO Bool
existsMessageReceiver manager pid =
readIORef (connReceivers manager) >>=
return . M.member pid
trySendKeepAlive :: ConnectionManager -> DP.Process ()
trySendKeepAlive manager =
do empty <- liftIO $ fmap M.null $ readIORef (connReceivers manager)
unless empty $
do utc <- liftIO getCurrentTime
trySendKeepAliveUTC manager utc
trySendKeepAliveUTC :: ConnectionManager -> UTCTime -> DP.Process ()
trySendKeepAliveUTC manager utc =
do empty <- liftIO $ fmap M.null $ readIORef (connReceivers manager)
unless empty $
do f <- liftIO $ shouldSendKeepAlive manager utc
when f $
do
logConnectionManager manager INFO $
"Sending keep-alive messages"
liftIO $ writeIORef (connKeepAliveTimestamp manager) utc
rs <- liftIO $ readIORef (connReceivers manager)
forM_ rs $ \r ->
do let pid = connReceiverProcess r
DP.usend pid KeepAliveMessage
shouldSendKeepAlive :: ConnectionManager -> UTCTime -> IO Bool
shouldSendKeepAlive manager utc =
do utc0 <- readIORef (connKeepAliveTimestamp manager)
let dt = fromRational $ toRational (diffUTCTime utc utc0)
return $
secondsToMicroseconds dt > (connKeepAliveInterval $ connParams manager)
secondsToMicroseconds :: Double -> Int
secondsToMicroseconds x = fromInteger $ toInteger $ round (1000000 * x)
messageReceivers :: ConnectionManager -> [DP.ProcessId] -> DP.Process [ConnectionMessageReceiver]
messageReceivers manager pids =
do rs <- liftIO $ readIORef (connReceivers manager)
fmap mconcat $
forM pids $ \pid ->
case M.lookup pid rs of
Just x -> return [x]
Nothing ->
do logConnectionManager manager WARNING $ "Could not find the monitored process " ++ show pid
return []
filterMessageReceivers :: ConnectionManager -> [DP.ProcessMonitorNotification] -> DP.Process [DP.ProcessId]
filterMessageReceivers manager ms =
do rs <- liftIO $ readIORef (connReceivers manager)
fmap (S.toList . S.fromList . mconcat) $
forM ms $ \(DP.ProcessMonitorNotification ref pid _) ->
case M.lookup pid rs of
Nothing ->
do logConnectionManager manager WARNING $ "Could not find the monitored process " ++ show pid
return []
Just x ->
do ref0 <- liftIO $ readIORef (connReceiverMonitor x)
if ref0 == Just ref
then return [pid]
else do logConnectionManager manager NOTICE $ "Received the old monitor reference for process " ++ show pid
return []
logConnectionManager :: ConnectionManager -> Priority -> String -> DP.Process ()
{-# INLINE logConnectionManager #-}
logConnectionManager manager p message =
when (connLoggingPriority (connParams manager) <= p) $
DP.say $
embracePriority p ++ " " ++ message