{-# LANGUAGE DeriveGeneric, DeriveDataTypeable #-} -- | -- Module : Simulation.Aivika.Distributed.Optimistic.Internal.DIO -- Copyright : Copyright (c) 2015-2017, David Sorokin -- License : BSD3 -- Maintainer : David Sorokin -- Stability : experimental -- Tested with: GHC 7.10.3 -- -- This module defines a distributed computation based on 'IO'. -- module Simulation.Aivika.Distributed.Optimistic.Internal.DIO (DIO(..), DIOParams(..), DIOEnv(..), DIOStrategy(..), invokeDIO, runDIO, runDIOWithEnv, defaultDIOParams, defaultDIOEnv, terminateDIO, registerDIO, unregisterDIO, monitorProcessDIO, dioParams, messageChannel, messageInboxId, timeServerId, sendMessageDIO, sendMessagesDIO, sendAcknowledgementMessageDIO, sendAcknowledgementMessagesDIO, sendLocalTimeDIO, sendRequestGlobalTimeDIO, logDIO, liftDistributedUnsafe) where import Data.Typeable import Data.Binary import Data.IORef import Data.Time.Clock import GHC.Generics import Control.Applicative import Control.Monad import Control.Monad.Trans import Control.Exception (throw) import Control.Monad.Catch as C import qualified Control.Distributed.Process as DP import Control.Concurrent import Control.Concurrent.STM import System.Timeout import Simulation.Aivika.Trans.Exception import Simulation.Aivika.Trans.Internal.Types import Simulation.Aivika.Distributed.Optimistic.Internal.Channel import Simulation.Aivika.Distributed.Optimistic.Internal.Message import Simulation.Aivika.Distributed.Optimistic.Internal.TimeServer import Simulation.Aivika.Distributed.Optimistic.Internal.Priority import Simulation.Aivika.Distributed.Optimistic.Internal.ConnectionManager import Simulation.Aivika.Distributed.Optimistic.State -- | The parameters for the 'DIO' computation. data DIOParams = DIOParams { dioLoggingPriority :: Priority, -- ^ The logging priority dioName :: String, -- ^ The name of the logical process. dioTimeHorizon :: Maybe Double, -- ^ The time horizon in modeling time units. dioUndoableLogSizeThreshold :: Int, -- ^ The undoable log size threshold used for detecting an overflow dioOutputMessageQueueSizeThreshold :: Int, -- ^ The output message queue size threshold used for detecting an overflow dioTransientMessageQueueSizeThreshold :: Int, -- ^ The transient message queue size threshold used for detecting an overflow dioSyncTimeout :: Int, -- ^ The timeout in microseconds used for synchronising the operations dioAllowPrematureIO :: Bool, -- ^ Whether to allow performing the premature IO action; otherwise, raise an error dioAllowSkippingOutdatedMessage :: Bool, -- ^ Whether to allow skipping an outdated message with the receive time less than the global time, -- which is possible after reconnection dioProcessMonitoringEnabled :: Bool, -- ^ Whether the process monitoring is enabled dioProcessMonitoringDelay :: Int, -- ^ The delay in microseconds which must be applied for monitoring every remote process. dioProcessReconnectingEnabled :: Bool, -- ^ Whether the automatic reconnecting to processes is enabled when the monitoring is enabled dioProcessReconnectingDelay :: Int, -- ^ The delay in microseconds before reconnecting to the remote process dioKeepAliveInterval :: Int, -- ^ The interval in microseconds for sending keep-alive messages dioTimeServerAcknowledgementTimeout :: Int, -- ^ The timeout in microseconds for receiving an acknowledgement message from the time server dioSimulationMonitoringInterval :: Int, -- ^ The interval in microseconds between sending the simulation monitoring messages dioSimulationMonitoringTimeout :: Int, -- ^ The timeout in microseconds when processing the monitoring messages dioStrategy :: DIOStrategy, -- ^ The logical process strategy dioProcessDisconnectingEnabled :: Bool -- ^ Whether the process disconnecting is enabled when the monitoring is enabled but the reconnecting is disabled } deriving (Eq, Ord, Show, Typeable, Generic) instance Binary DIOParams -- | Those 'DIO' environment parameters that cannot be serialized and passed to another process via the net. data DIOEnv = DIOEnv { dioSimulationMonitoringAction :: Maybe (LogicalProcessState -> DP.Process ()) -- ^ The simulation monitoring action } -- | The logical process strategy. data DIOStrategy = WaitIndefinitelyForTimeServer -- ^ Wait for the time server forever | TerminateDueToTimeServerTimeout Int -- ^ Terminate due to the exceeded time server timeout in microseconds, but not less than 'dioSyncTimeout' deriving (Eq, Ord, Show, Typeable, Generic) instance Binary DIOStrategy -- | The distributed computation based on 'IO'. newtype DIO a = DIO { unDIO :: DIOContext -> DP.Process a -- ^ Unwrap the computation. } -- | The context of the 'DIO' computation. data DIOContext = DIOContext { dioChannel :: Channel LogicalProcessMessage, -- ^ The channel of messages. dioInboxId :: DP.ProcessId, -- ^ The inbox process identifier. dioTimeServerId :: DP.ProcessId, -- ^ The time server process dioParams0 :: DIOParams, -- ^ The parameters of the computation. dioRegisteredInTimeServer :: TVar Bool, -- ^ Whether the compution is registered in the time server. dioUnregisteredFromTimeServer :: TVar Bool, -- ^ Whether the compution is unregistered from the time server. dioTimeServerTerminating :: TVar Bool -- ^ Whether the compution asked to terminate the time server. } instance Monad DIO where {-# INLINE (>>=) #-} (DIO m) >>= k = DIO $ \ps -> m ps >>= \a -> let m' = unDIO (k a) in m' ps instance Applicative DIO where {-# INLINE pure #-} pure = DIO . const . pure {-# INLINE (<*>) #-} (<*>) = ap instance Functor DIO where {-# INLINE fmap #-} fmap f (DIO m) = DIO $ fmap f . m instance MonadException DIO where catchComp (DIO m) h = DIO $ \ps -> C.catch (m ps) (\e -> unDIO (h e) ps) finallyComp (DIO m1) (DIO m2) = DIO $ \ps -> C.finally (m1 ps) (m2 ps) throwComp e = DIO $ \ps -> throw e -- | Invoke the 'DIO' computation. invokeDIO :: DIOContext -> DIO a -> DP.Process a {-# INLINE invokeDIO #-} invokeDIO ps (DIO m) = m ps -- | Lift the distributed 'Process' computation. liftDistributedUnsafe :: DP.Process a -> DIO a liftDistributedUnsafe = DIO . const -- | The default parameters for the 'DIO' computation defaultDIOParams :: DIOParams defaultDIOParams = DIOParams { dioLoggingPriority = WARNING, dioName = "LP", dioTimeHorizon = Nothing, dioUndoableLogSizeThreshold = 10000000, dioOutputMessageQueueSizeThreshold = 10000, dioTransientMessageQueueSizeThreshold = 5000, dioSyncTimeout = 60000000, dioAllowPrematureIO = False, dioAllowSkippingOutdatedMessage = True, dioProcessMonitoringEnabled = False, dioProcessMonitoringDelay = 5000000, dioProcessReconnectingEnabled = False, dioProcessReconnectingDelay = 5000000, dioKeepAliveInterval = 5000000, dioTimeServerAcknowledgementTimeout = 5000000, dioSimulationMonitoringInterval = 30000000, dioSimulationMonitoringTimeout = 100000, dioStrategy = TerminateDueToTimeServerTimeout 300000000, dioProcessDisconnectingEnabled = False } -- | The default environment parameters for the 'DIO' computation defaultDIOEnv :: DIOEnv defaultDIOEnv = DIOEnv { dioSimulationMonitoringAction = Nothing } -- | Return the computation context. dioContext :: DIO DIOContext dioContext = DIO return -- | Return the parameters of the current computation. dioParams :: DIO DIOParams dioParams = DIO $ return . dioParams0 -- | Return the chanel of messages. messageChannel :: DIO (Channel LogicalProcessMessage) messageChannel = DIO $ return . dioChannel -- | Return the process identifier of the inbox that receives messages. messageInboxId :: DIO DP.ProcessId messageInboxId = DIO $ return . dioInboxId -- | Return the time server process identifier. timeServerId :: DIO DP.ProcessId timeServerId = DIO $ return . dioTimeServerId -- | Terminate the simulation including the processes in -- all nodes connected to the time server. terminateDIO :: DIO () terminateDIO = DIO $ \ctx -> do let ps = dioParams0 ctx logProcess ps INFO "Terminating the simulation..." sender <- invokeDIO ctx messageInboxId receiver <- invokeDIO ctx timeServerId let inbox = sender if dioProcessMonitoringEnabled ps then DP.send inbox (SendTerminateTimeServerMessage receiver sender) else DP.send receiver (TerminateTimeServerMessage sender) liftIO $ timeout (dioTimeServerAcknowledgementTimeout ps) $ atomically $ do f <- readTVar (dioTimeServerTerminating ctx) unless f retry DP.send inbox TerminateInboxProcessMessage return () -- | Register the simulation process in the time server, which -- requires some initial quorum to start synchronizing the global time. registerDIO :: DIO () registerDIO = DIO $ \ctx -> do let ps = dioParams0 ctx logProcess ps INFO "Registering the simulation process..." sender <- invokeDIO ctx messageInboxId receiver <- invokeDIO ctx timeServerId let inbox = sender if dioProcessMonitoringEnabled ps then DP.send inbox (SendRegisterLogicalProcessMessage receiver sender) else DP.send receiver (RegisterLogicalProcessMessage sender) liftIO $ timeout (dioTimeServerAcknowledgementTimeout ps) $ atomically $ do f <- readTVar (dioRegisteredInTimeServer ctx) unless f retry return () -- | Unregister the simulation process from the time server -- without affecting the processes in other nodes connected to -- the corresponding time server. unregisterDIO :: DIO () unregisterDIO = DIO $ \ctx -> do let ps = dioParams0 ctx logProcess ps INFO "Unregistering the simulation process..." sender <- invokeDIO ctx messageInboxId receiver <- invokeDIO ctx timeServerId let inbox = sender if dioProcessMonitoringEnabled ps then DP.send inbox (SendUnregisterLogicalProcessMessage receiver sender) else DP.send receiver (UnregisterLogicalProcessMessage sender) liftIO $ timeout (dioTimeServerAcknowledgementTimeout ps) $ atomically $ do f <- readTVar (dioUnregisteredFromTimeServer ctx) unless f retry DP.send inbox TerminateInboxProcessMessage return () -- | The internal logical process message. data InternalLogicalProcessMessage = InternalLogicalProcessMessage LogicalProcessMessage -- ^ the logical process message | InternalProcessMonitorNotification DP.ProcessMonitorNotification -- ^ the process monitor notification | InternalInboxProcessMessage InboxProcessMessage -- ^ the inbox process message | InternalGeneralMessage GeneralMessage -- ^ the general message -- | Handle the specified exception. handleException :: DIOParams -> SomeException -> DP.Process () handleException ps e = do --- logProcess ps ERROR $ "Exception occurred: " ++ show e --- C.throwM e -- | Run the computation using the specified parameters along with time server process -- identifier and return the inbox process identifier and a new simulation process. runDIO :: DIO a -> DIOParams -> DP.ProcessId -> DP.Process (DP.ProcessId, DP.Process a) runDIO m ps serverId = runDIOWithEnv m ps defaultDIOEnv serverId -- | A full version of 'runDIO' that also allows specifying the environment parameters. runDIOWithEnv :: DIO a -> DIOParams -> DIOEnv -> DP.ProcessId -> DP.Process (DP.ProcessId, DP.Process a) runDIOWithEnv m ps env serverId = do ch <- liftIO newChannel let connParams = ConnectionParams { connLoggingPriority = dioLoggingPriority ps, connKeepAliveInterval = dioKeepAliveInterval ps, connReconnectingDelay = dioProcessReconnectingDelay ps, connMonitoringDelay = dioProcessMonitoringDelay ps } connManager <- liftIO $ newConnectionManager connParams terminated <- liftIO $ newIORef False registeredInTimeServer <- liftIO $ newTVarIO False unregisteredFromTimeServer <- liftIO $ newTVarIO False timeServerTerminating <- liftIO $ newTVarIO False timeServerTimestamp <- liftIO $ getCurrentTime >>= newIORef let loop0 = forever $ do let f1 :: LogicalProcessMessage -> DP.Process InternalLogicalProcessMessage f1 x = return (InternalLogicalProcessMessage x) f2 :: DP.ProcessMonitorNotification -> DP.Process InternalLogicalProcessMessage f2 x = return (InternalProcessMonitorNotification x) f3 :: InboxProcessMessage -> DP.Process InternalLogicalProcessMessage f3 x = return (InternalInboxProcessMessage x) f4 :: GeneralMessage -> DP.Process InternalLogicalProcessMessage f4 x = return (InternalGeneralMessage x) x <- fmap Just $ DP.receiveWait [DP.match f1, DP.match f2, DP.match f3, DP.match f4] case x of Nothing -> return () Just (InternalLogicalProcessMessage m) -> do processTimeServerMessage m ps serverId timeServerTimestamp liftIO $ writeChannel ch m Just (InternalProcessMonitorNotification m@(DP.ProcessMonitorNotification _ _ _)) -> handleProcessMonitorNotification m ps ch connManager serverId Just (InternalInboxProcessMessage m) -> case m of SendQueueMessage pid m -> DP.usend pid (QueueMessage m) SendQueueMessageBulk pid ms -> forM_ ms $ \m -> DP.usend pid (QueueMessage m) SendAcknowledgementQueueMessage pid m -> DP.usend pid (AcknowledgementQueueMessage m) SendAcknowledgementQueueMessageBulk pid ms -> forM_ ms $ \m -> DP.usend pid (AcknowledgementQueueMessage m) SendLocalTimeMessage receiver sender t -> DP.usend receiver (LocalTimeMessage sender t) SendRequestGlobalTimeMessage receiver sender -> DP.usend receiver (RequestGlobalTimeMessage sender) SendRegisterLogicalProcessMessage receiver sender -> DP.usend receiver (RegisterLogicalProcessMessage sender) SendUnregisterLogicalProcessMessage receiver sender -> DP.usend receiver (UnregisterLogicalProcessMessage sender) SendTerminateTimeServerMessage receiver sender -> DP.usend receiver (TerminateTimeServerMessage sender) MonitorProcessMessage pid -> tryAddMessageReceiver connManager pid >> return () TrySendProcessKeepAliveMessage -> trySendKeepAlive connManager RegisterLogicalProcessAcknowledgementMessage pid -> do --- logProcess ps INFO "Registered the logical process in the time server" --- liftIO $ atomically $ writeTVar registeredInTimeServer True UnregisterLogicalProcessAcknowledgementMessage pid -> do --- logProcess ps INFO "Unregistered the logical process from the time server" --- liftIO $ atomically $ writeTVar unregisteredFromTimeServer True TerminateTimeServerAcknowledgementMessage pid -> do --- logProcess ps INFO "Started terminating the time server" --- liftIO $ atomically $ writeTVar timeServerTerminating True TerminateInboxProcessMessage -> do --- logProcess ps INFO "Terminating the inbox and keep-alive processes..." --- liftIO $ do atomicWriteIORef terminated True writeChannel ch AbortSimulationMessage DP.terminate Just (InternalGeneralMessage m) -> handleGeneralMessage m ps ch connManager loop = C.finally loop0 (do liftIO $ do atomicWriteIORef terminated True writeChannel ch AbortSimulationMessage clearMessageReceivers connManager) inboxId <- DP.spawnLocal $ C.catch loop (handleException ps) DP.spawnLocal $ let loop = do f <- liftIO $ readIORef terminated unless f $ do liftIO $ threadDelay (dioKeepAliveInterval ps) DP.send inboxId TrySendProcessKeepAliveMessage loop in C.catch loop (handleException ps) DP.spawnLocal $ let stop = do f <- liftIO $ readIORef terminated return (f || dioStrategy ps == WaitIndefinitelyForTimeServer) loop = do f <- stop unless f $ do liftIO $ threadDelay (dioSyncTimeout ps) f <- stop unless f $ do validateTimeServer ps inboxId timeServerTimestamp loop in C.catch loop (handleException ps) case dioSimulationMonitoringAction env of Nothing -> return () Just act -> do monitorId <- DP.spawnLocal $ let loop = do f <- liftIO $ readIORef terminated unless f $ do x <- DP.expectTimeout (dioSimulationMonitoringTimeout ps) case x of Nothing -> return () Just st -> act st loop in C.catch loop (handleException ps) DP.spawnLocal $ let loop = do f <- liftIO $ readIORef terminated unless f $ do liftIO $ do threadDelay (dioSimulationMonitoringInterval ps) writeChannel ch (ProvideLogicalProcessStateMessage monitorId) loop in C.catch loop (handleException ps) return () let simulation = unDIO m DIOContext { dioChannel = ch, dioInboxId = inboxId, dioTimeServerId = serverId, dioParams0 = ps, dioRegisteredInTimeServer = registeredInTimeServer, dioUnregisteredFromTimeServer = unregisteredFromTimeServer, dioTimeServerTerminating = timeServerTerminating } return (inboxId, simulation) -- | Handle the process monitor notification. handleProcessMonitorNotification :: DP.ProcessMonitorNotification -> DIOParams -> Channel LogicalProcessMessage -> ConnectionManager -> DP.ProcessId -> DP.Process () handleProcessMonitorNotification m@(DP.ProcessMonitorNotification _ pid0 reason) ps ch connManager serverId = do let recv m@(DP.ProcessMonitorNotification _ _ _) = do --- logProcess ps WARNING $ "Received a process monitor notification " ++ show m --- liftIO $ writeChannel ch (ProcessMonitorNotificationMessage m) return m recv m when (pid0 == serverId) $ case reason of DP.DiedNormal -> processTimeServerTerminated ps serverId DP.DiedException _ -> processTimeServerTerminated ps serverId DP.DiedNodeDown -> processTimeServerTerminated ps serverId _ -> return () when (dioProcessReconnectingEnabled ps && (reason == DP.DiedDisconnect)) $ do liftIO $ threadDelay (dioProcessReconnectingDelay ps) let pred m@(DP.ProcessMonitorNotification _ _ reason) = reason == DP.DiedDisconnect loop :: [DP.ProcessMonitorNotification] -> DP.Process [DP.ProcessMonitorNotification] loop acc = do y <- DP.receiveTimeout 0 [DP.matchIf pred recv] case y of Nothing -> return $ reverse acc Just m@(DP.ProcessMonitorNotification _ _ _) -> loop (m : acc) ms <- loop [m] pids <- filterMessageReceivers connManager ms reconnectMessageReceivers connManager pids forM_ pids $ \pid -> do --- logProcess ps NOTICE $ "Writing to the channel about reconnecting to " ++ show pid --- liftIO $ writeChannel ch (ReconnectProcessMessage pid) when (dioProcessDisconnectingEnabled ps && not (dioProcessReconnectingEnabled ps)) $ liftIO $ writeChannel ch (DisconnectProcessMessage pid0) -- | Handle the general message. handleGeneralMessage :: GeneralMessage -> DIOParams -> Channel LogicalProcessMessage -> ConnectionManager -> DP.Process () handleGeneralMessage m@KeepAliveMessage ps ch connManager = do --- logProcess ps DEBUG $ "Received " ++ show m --- return () -- | Monitor the specified process. monitorProcessDIO :: DP.ProcessId -> DIO () monitorProcessDIO pid = do ps <- dioParams if dioProcessMonitoringEnabled ps then do inbox <- messageInboxId liftDistributedUnsafe $ DP.send inbox $ MonitorProcessMessage pid else liftDistributedUnsafe $ logProcess ps WARNING "Ignored the process monitoring as it was disabled in the DIO computation parameters" -- | Process the time server message in a stream of messages destined for the logical process. processTimeServerMessage :: LogicalProcessMessage -> DIOParams -> DP.ProcessId -> IORef UTCTime -> DP.Process () processTimeServerMessage ComputeLocalTimeMessage ps serverId r = do liftIO $ getCurrentTime >>= writeIORef r inboxId <- DP.getSelfPid if dioProcessMonitoringEnabled ps then DP.usend serverId (ComputeLocalTimeAcknowledgementMessage inboxId) else DP.send serverId (ComputeLocalTimeAcknowledgementMessage inboxId) processTimeServerMessage (GlobalTimeMessage _) ps serverId r = liftIO $ getCurrentTime >>= writeIORef r processTimeServerMessage _ ps serverId r = return () -- | Validate the time server by the specified inbox and recent timestamp. validateTimeServer :: DIOParams -> DP.ProcessId -> IORef UTCTime -> DP.Process () validateTimeServer ps inboxId r = do --- logProcess ps NOTICE "Validating the time server" --- case dioStrategy ps of WaitIndefinitelyForTimeServer -> return () TerminateDueToTimeServerTimeout timeout -> do utc0 <- liftIO $ readIORef r utc <- liftIO getCurrentTime let dt = fromRational $ toRational (diffUTCTime utc utc0) when (secondsToMicroseconds dt > timeout) $ do --- logProcess ps WARNING "Terminating due to the exceeded time server timeout" --- DP.send inboxId TerminateInboxProcessMessage -- | Process the situation when the time server has suddenly been terminated. processTimeServerTerminated :: DIOParams -> DP.ProcessId -> DP.Process () processTimeServerTerminated ps inboxId = do --- logProcess ps NOTICE "Terminating due to sudden termination of the time server" --- DP.send inboxId TerminateInboxProcessMessage -- | Convert seconds to microseconds. secondsToMicroseconds :: Double -> Int secondsToMicroseconds x = fromInteger $ toInteger $ round (1000000 * x) -- | Send the message. sendMessageDIO :: DP.ProcessId -> Message -> DIO () {-# INLINABLE sendMessageDIO #-} sendMessageDIO pid m = do ps <- dioParams if dioProcessMonitoringEnabled ps then do inbox <- messageInboxId liftDistributedUnsafe $ DP.send inbox (SendQueueMessage pid m) else liftDistributedUnsafe $ DP.send pid (QueueMessage m) -- | Send the bulk of messages. sendMessagesDIO :: DP.ProcessId -> [Message] -> DIO () {-# INLINABLE sendMessagesDIO #-} sendMessagesDIO pid ms = do ps <- dioParams if dioProcessMonitoringEnabled ps then do inbox <- messageInboxId liftDistributedUnsafe $ DP.send inbox (SendQueueMessageBulk pid ms) else do forM_ ms $ \m -> liftDistributedUnsafe $ DP.send pid (QueueMessage m) -- | Send the acknowledgement message. sendAcknowledgementMessageDIO :: DP.ProcessId -> AcknowledgementMessage -> DIO () {-# INLINABLE sendAcknowledgementMessageDIO #-} sendAcknowledgementMessageDIO pid m = do ps <- dioParams if dioProcessMonitoringEnabled ps then do inbox <- messageInboxId liftDistributedUnsafe $ DP.send inbox (SendAcknowledgementQueueMessage pid m) else liftDistributedUnsafe $ DP.send pid (AcknowledgementQueueMessage m) -- | Send the bulk of acknowledgement messages. sendAcknowledgementMessagesDIO :: DP.ProcessId -> [AcknowledgementMessage] -> DIO () {-# INLINABLE sendAcknowledgementMessagesDIO #-} sendAcknowledgementMessagesDIO pid ms = do ps <- dioParams if dioProcessMonitoringEnabled ps then do inbox <- messageInboxId liftDistributedUnsafe $ DP.send inbox (SendAcknowledgementQueueMessageBulk pid ms) else do forM_ ms $ \m -> liftDistributedUnsafe $ DP.send pid (AcknowledgementQueueMessage m) -- | Send the local time to the time server. sendLocalTimeDIO :: DP.ProcessId -> DP.ProcessId -> Double -> DIO () {-# INLINABLE sendLocalTimeDIO #-} sendLocalTimeDIO receiver sender t = do ps <- dioParams if dioProcessMonitoringEnabled ps then do inbox <- messageInboxId liftDistributedUnsafe $ DP.send inbox (SendLocalTimeMessage receiver sender t) else liftDistributedUnsafe $ DP.send receiver (LocalTimeMessage sender t) -- | Send the request for the global virtual time to the time server. sendRequestGlobalTimeDIO :: DP.ProcessId -> DP.ProcessId -> DIO () {-# INLINABLE sendRequestGlobalTimeDIO #-} sendRequestGlobalTimeDIO receiver sender = do ps <- dioParams if dioProcessMonitoringEnabled ps then do inbox <- messageInboxId liftDistributedUnsafe $ DP.send inbox (SendRequestGlobalTimeMessage receiver sender) else liftDistributedUnsafe $ DP.send receiver (RequestGlobalTimeMessage sender) -- | Log the message with the specified priority. logDIO :: Priority -> String -> DIO () {-# INLINE logDIO #-} logDIO p message = do ps <- dioParams when (dioLoggingPriority ps <= p) $ liftDistributedUnsafe $ DP.say $ embracePriority p ++ " " ++ message -- | Log the message with the specified priority. logProcess :: DIOParams -> Priority -> String -> DP.Process () {-# INLINE logProcess #-} logProcess ps p message = when (dioLoggingPriority ps <= p) $ DP.say $ embracePriority p ++ " " ++ message