module Control.Distributed.Process.ManagedProcess.Internal.GenProcess
( recvLoop
, precvLoop
, currentTimeout
, systemTimeout
, drainTimeout
, processState
, processDefinition
, processFilters
, processUnhandledMsgPolicy
, processQueue
, gets
, getAndModifyState
, modifyState
, setUserTimeout
, setProcessState
, GenProcess
, peek
, push
, enqueue
, dequeue
, addUserTimer
, removeUserTimer
, eval
, act
, runAfter
, evalAfter
) where
import Control.Applicative (liftA2)
import Control.Distributed.Process
( match
, matchAny
, matchMessage
, handleMessage
, handleMessageIf
, receiveTimeout
, receiveWait
, forward
, catchesExit
, catchExit
, die
, unsafeWrapMessage
, Process
, ProcessId
, Match
)
import qualified Control.Distributed.Process as P
( liftIO
)
import Control.Distributed.Process.Internal.Types
( Message(..)
, ProcessExitException(..)
)
import Control.Distributed.Process.ManagedProcess.Server
( handleCast
, handleExitIf
, stop
, continue
)
import Control.Distributed.Process.ManagedProcess.Timer
( Timer(timerDelay)
, TimerKey
, TimedOut(..)
, delayTimer
, startTimer
, stopTimer
, matchTimeout
, matchKey
, matchRun
)
import Control.Distributed.Process.ManagedProcess.Internal.Types hiding (Message)
import qualified Control.Distributed.Process.ManagedProcess.Internal.PriorityQueue as Q
( empty
, dequeue
, enqueue
, peek
, toList
)
import Control.Distributed.Process.Extras
( ExitReason(..)
, Shutdown(..)
)
import qualified Control.Distributed.Process.Extras.SystemLog as Log
import Control.Distributed.Process.Extras.Time
import Control.Distributed.Process.Serializable (Serializable)
import Control.Monad (void)
import Control.Monad.Catch
( mask_
, catch
, throwM
, mask
, SomeException
)
import qualified Control.Monad.State.Strict as ST
( get
)
import Data.IORef (newIORef, atomicModifyIORef')
import Data.Maybe (fromJust)
import qualified Data.Map.Strict as Map
( size
, insert
, delete
, lookup
, empty
, foldrWithKey
)
type Safe = Bool
gets :: forall s a . (ProcessState s -> a) -> GenProcess s a
gets f = ST.get >>= \(s :: State s) -> liftIO $ do
atomicModifyIORef' s $ \(s' :: ProcessState s) -> (s', f s' :: a)
modifyState :: (ProcessState s -> ProcessState s) -> GenProcess s ()
modifyState f =
ST.get >>= \s -> liftIO $ mask_ $ do
atomicModifyIORef' s $ \s' -> (f s', ())
getAndModifyState :: (ProcessState s -> (ProcessState s, a))
-> GenProcess s a
getAndModifyState f =
ST.get >>= \s -> liftIO $ mask_ $ do
atomicModifyIORef' s $ \s' -> f s'
setProcessState :: s -> GenProcess s ()
setProcessState st' =
modifyState $ \st@ProcessState{..} -> st { procState = st' }
setDrainTimeout :: Timer -> GenProcess s ()
setDrainTimeout t = modifyState $ \st@ProcessState{..} -> st { sysTimeout = t }
setUserTimeout :: Delay -> GenProcess s ()
setUserTimeout d =
modifyState $ \st@ProcessState{..} -> st { usrTimeout = d }
addUserTimer :: Timer -> Message -> GenProcess s TimerKey
addUserTimer t m =
getAndModifyState $ \st@ProcessState{..} ->
let sz = Map.size usrTimers
tk = sz + 1
in (st { usrTimers = (Map.insert tk (t, m) usrTimers) }, tk)
removeUserTimer :: TimerKey -> GenProcess s ()
removeUserTimer i =
modifyState $ \st@ProcessState{..} -> st { usrTimers = (Map.delete i usrTimers) }
consumeTimer :: forall s a . TimerKey -> (Message -> GenProcess s a) -> GenProcess s a
consumeTimer k f = do
mt <- gets usrTimers
let tm = Map.lookup k mt
let ut = Map.delete k mt
modifyState $ \st@ProcessState{..} -> st { usrTimers = ut }
case tm of
Nothing -> lift $ die $ "GenProcess.consumeTimer - InvalidTimerKey"
Just (_, m) -> f m
processDefinition :: GenProcess s (ProcessDefinition s)
processDefinition = gets procDef
processPriorities :: GenProcess s ([DispatchPriority s])
processPriorities = gets procPrio
processFilters :: GenProcess s ([DispatchFilter s])
processFilters = gets procFilters
processState :: GenProcess s s
processState = gets procState
processUnhandledMsgPolicy :: GenProcess s UnhandledMessagePolicy
processUnhandledMsgPolicy = gets (unhandledMessagePolicy . procDef)
processQueue :: GenProcess s [Message]
processQueue = gets internalQ >>= return . Q.toList
systemTimeout :: GenProcess s Timer
systemTimeout = gets sysTimeout
timeoutPolicy :: GenProcess s RecvTimeoutPolicy
timeoutPolicy = gets timeoutSpec
drainTimeout :: GenProcess s Delay
drainTimeout = gets (timerDelay . sysTimeout)
currentTimeout :: GenProcess s Delay
currentTimeout = gets usrTimeout
updateQueue :: (Queue -> Queue) -> GenProcess s ()
updateQueue f =
modifyState $ \st@ProcessState{..} -> st { internalQ = f internalQ }
evalAfter :: forall s m . (Serializable m) => TimeInterval -> m -> s -> Action s
evalAfter d m s = act $ runAfter d m >> setProcessState s
act :: forall s . GenProcess s () -> Action s
act = return . ProcessActivity
eval :: forall s . GenProcess s (ProcessAction s) -> Action s
eval = return . ProcessExpression
runAfter :: forall s m . (Serializable m) => TimeInterval -> m -> GenProcess s ()
runAfter d m = do
t <- lift $ startTimer (Delay d)
void $ addUserTimer t (unsafeWrapMessage m)
dequeue :: GenProcess s (Maybe Message)
dequeue = getAndModifyState $ \st -> do
let pq = internalQ st
case Q.dequeue pq of
Nothing -> (st, Nothing)
Just (m, q') -> (st { internalQ = q' }, Just m)
peek :: GenProcess s (Maybe Message)
peek = getAndModifyState $ \st -> do
let pq = internalQ st
(st, Q.peek pq)
push :: forall s . Message -> GenProcess s ()
push m = do
st <- processState
enqueueMessage st [ PrioritiseInfo {
prioritise = (\_ m' ->
return $ Just ((101 :: Int), m')) :: s -> Message -> Process (Maybe (Int, Message)) } ] m
enqueue :: forall s . Message -> GenProcess s ()
enqueue m = do
st <- processState
enqueueMessage st [] m
enqueueMessage :: forall s . s
-> [DispatchPriority s]
-> Message
-> GenProcess s ()
enqueueMessage s [] m' =
enqueueMessage s [ PrioritiseInfo {
prioritise = (\_ m ->
return $ Just ((1 :: Int), m)) :: s -> Message -> Process (Maybe (Int, Message)) } ] m'
enqueueMessage s (p:ps) m' = let checkPrio = prioritise p s in do
(lift $ checkPrio m') >>= doEnqueue s ps m'
where
doEnqueue :: s
-> [DispatchPriority s]
-> Message
-> Maybe (Int, Message)
-> GenProcess s ()
doEnqueue s' ps' msg Nothing = enqueueMessage s' ps' msg
doEnqueue _ _ _ (Just (i, m)) = updateQueue (Q.enqueue (i * (1 :: Int)) m)
class DynMessageHandler d where
dynHandleMessage :: UnhandledMessagePolicy
-> s
-> d s
-> Message
-> Process (Maybe (ProcessAction s))
instance DynMessageHandler Dispatcher where
dynHandleMessage _ s (Dispatch d) msg = handleMessage msg (d s)
dynHandleMessage _ s (DispatchIf d c) msg = handleMessageIf msg (c s) (d s)
instance DynMessageHandler ExternDispatcher where
dynHandleMessage _ s (DispatchCC _ d) msg = handleMessage msg (d s)
dynHandleMessage _ s (DispatchSTM _ d _ _) msg = handleMessage msg (d s)
instance DynMessageHandler DeferredDispatcher where
dynHandleMessage _ s (DeferredDispatcher d) = d s
class DynFilterHandler d where
dynHandleFilter :: s
-> d s
-> Message
-> Process (Maybe (Filter s))
instance DynFilterHandler DispatchFilter where
dynHandleFilter s (FilterApi d) msg = handleMessage msg (d s)
dynHandleFilter s (FilterAny d) msg = handleMessage msg (d s)
dynHandleFilter s (FilterRaw d) msg = d s msg
dynHandleFilter s (FilterState d) _ = d s
precvLoop :: PrioritisedProcessDefinition s
-> s
-> Delay
-> Process ExitReason
precvLoop ppDef pState recvDelay = do
st <- P.liftIO $ newIORef $ ProcessState { timeoutSpec = recvTimeout ppDef
, sysTimeout = delayTimer Infinity
, usrTimeout = recvDelay
, internalQ = Q.empty
, procState = pState
, procDef = processDef ppDef
, procPrio = priorities ppDef
, procFilters = filters ppDef
, usrTimers = Map.empty
}
mask $ \restore -> do
res <- catch (fmap Right $ restore $ loop st)
(\(e :: SomeException) -> return $ Left e)
ps <- P.liftIO $ atomicModifyIORef' st $ \s' -> (s', s')
let st' = procState ps
pd = procDef ps
sh = shutdownHandler pd
case res of
Right (exitReason, _) -> do
restore $ sh (CleanShutdown st') exitReason
return exitReason
Left ex -> do
restore $ sh (LastKnown st') (ExitOther $ show ex)
throwM ex
where
loop st' = catchExit (runProcess st' recvQueue)
(\_ (r :: ExitReason) -> return (r, st'))
recvQueue :: GenProcess s ExitReason
recvQueue = do
pd <- processDefinition
let ex = trapExit:(exitHandlers $ pd)
let exHandlers = map (\d' -> (dispatchExit d')) ex
catch (drainMailbox >> processNext >>= nextAction)
(\(e :: ProcessExitException) ->
handleExit exHandlers e >>= nextAction)
where
handleExit :: [(s -> ProcessId -> Message -> Process (Maybe (ProcessAction s)))]
-> ProcessExitException
-> GenProcess s (ProcessAction s)
handleExit [] ex = throwM ex
handleExit (h:hs) ex@(ProcessExitException pid msg) = do
r <- processState >>= \s -> lift $ h s pid msg
case r of
Nothing -> handleExit hs ex
Just p -> return p
nextAction :: ProcessAction s -> GenProcess s ExitReason
nextAction ac
| ProcessExpression expr <- ac = expr >>= nextAction
| ProcessActivity act' <- ac = act' >> recvQueue
| ProcessSkip <- ac = recvQueue
| ProcessContinue ps' <- ac = recvQueueAux ps'
| ProcessTimeout d ps' <- ac = setUserTimeout d >> recvQueueAux ps'
| ProcessStop xr <- ac = return xr
| ProcessStopping ps' xr <- ac = setProcessState ps' >> return xr
| ProcessHibernate d' s' <- ac = (lift $ block d') >> recvQueueAux s'
| ProcessBecome pd' ps' <- ac = do
modifyState $ \st@ProcessState{..} -> st { procDef = pd', procState = ps' }
recvQueue
| otherwise = return $ ExitOther "IllegalState"
recvQueueAux st = setProcessState st >> recvQueue
processNext :: GenProcess s (ProcessAction s)
processNext = do
(up, pf) <- gets $ liftA2 (,) (unhandledMessagePolicy . procDef) procFilters
case pf of
[] -> consumeMessage
_ -> filterMessage (filterNext False up pf Nothing)
consumeMessage = applyNext dequeue processApply
filterMessage = applyNext peek
filterNext :: Safe
-> UnhandledMessagePolicy
-> [DispatchFilter s]
-> Maybe (Filter s)
-> Message
-> GenProcess s (ProcessAction s)
filterNext isSafe mp' fs mf msg
| Just (FilterSafe s') <- mf = filterNext True mp' fs (Just $ FilterOk s') msg
| Just (FilterSkip s') <- mf = setProcessState s' >> dequeue >> return ProcessSkip
| Just (FilterStop s' r) <- mf = return $ ProcessStopping s' r
| isSafe
, Just (FilterOk s') <- mf
, [] <- fs = do setProcessState s'
act' <- processApply msg
dequeue >> return act'
| Just (FilterOk s') <- mf
, [] <- fs = setProcessState s' >> applyNext dequeue processApply
| Nothing <- mf, [] <- fs = applyNext dequeue processApply
| Just (FilterOk s') <- mf
, (f:fs') <- fs = do
setProcessState s'
act' <- lift $ dynHandleFilter s' f msg
filterNext isSafe mp' fs' act' msg
| Just (FilterReject _ s') <- mf = do
setProcessState s' >> dequeue >>= lift . applyPolicy mp' s' . fromJust
| Nothing <- mf
, (f:fs') <- fs = processState >>= \s' -> do
lift (dynHandleFilter s' f msg) >>= \a -> filterNext isSafe mp' fs' a msg
applyNext :: (GenProcess s (Maybe Message))
-> (Message -> GenProcess s (ProcessAction s))
-> GenProcess s (ProcessAction s)
applyNext queueOp handler = do
next <- queueOp
case next of
Nothing -> drainOrTimeout
Just msg -> handler msg
processApply msg = do
(def, pState) <- gets $ liftA2 (,) procDef procState
let pol = unhandledMessagePolicy def
apiMatchers = map (dynHandleMessage pol pState) (apiHandlers def)
infoMatchers = map (dynHandleMessage pol pState) (infoHandlers def)
extMatchers = map (dynHandleMessage pol pState) (externHandlers def)
shutdown' = dynHandleMessage pol pState shutdownHandler'
ms' = (shutdown':extMatchers) ++ apiMatchers ++ infoMatchers
processApplyAux ms' pol pState msg
processApplyAux [] p' s' m' = lift $ applyPolicy p' s' m'
processApplyAux (h:hs) p' s' m' = do
attempt <- lift $ h m'
case attempt of
Nothing -> processApplyAux hs p' s' m'
Just act' -> return act'
drainMailbox :: GenProcess s ()
drainMailbox = do
ps <- processState
pd <- processDefinition
pp <- processPriorities
ut <- gets usrTimers
let ts = Map.foldrWithKey (\k (t, _) ms -> ms ++ matchKey k t) [] ut
let ms = ts ++ (matchAny (return . Right) : (mkMatchers ps pd))
timerAcc <- timeoutPolicy >>= \spec -> case spec of
RecvTimer _ -> return Nothing
RecvMaxBacklog cnt -> return $ Just cnt
mask_ $ do
tt <- maybeStartTimer
drainAux ps pp timerAcc (ms ++ matchTimeout tt)
(lift $ stopTimer tt) >>= setDrainTimeout
drainAux :: s
-> [DispatchPriority s]
-> Limit
-> [Match (Either TimedOut Message)]
-> GenProcess s ()
drainAux ps' pp' maxbq ms = do
(cnt, m) <- scanMailbox maxbq ms
case m of
Nothing -> return ()
Just (Right m') -> do enqueueMessage ps' pp' m'
drainAux ps' pp' cnt ms
Just (Left TimedOut) -> return ()
Just (Left (Yield i)) ->
consumeTimer i push >> drainAux ps' pp' cnt ms
maybeStartTimer :: GenProcess s Timer
maybeStartTimer = do
tp <- timeoutPolicy
t <- case tp of
RecvTimer d -> (lift $ startTimer $ Delay d)
_ -> return $ delayTimer Infinity
setDrainTimeout t
return t
scanMailbox :: Limit
-> [Match (Either TimedOut Message)]
-> GenProcess s (Limit, Maybe (Either TimedOut Message))
scanMailbox lim ms
| Just 0 <- lim = return (lim, Just $ Left TimedOut)
| Just c <- lim = do
lift $ fmap (Just (c 1), ) (receiveTimeout 0 ms)
| otherwise = lift $ fmap (lim, ) (receiveTimeout 0 ms)
drainOrTimeout :: GenProcess s (ProcessAction s)
drainOrTimeout = do
pd <- processDefinition
ps <- processState
ud <- currentTimeout
mr <- mkMatchRunners
let ump = unhandledMessagePolicy pd
hto = timeoutHandler pd
matches = mr ++ ((matchMessage return):map (matchExtern ump ps) (externHandlers pd))
recv = case ud of
Infinity -> lift $ fmap Just (receiveWait matches)
NoDelay -> lift $ receiveTimeout 0 matches
Delay i -> lift $ receiveTimeout (asTimeout i) matches
mask $ \restore -> recv >>= \r ->
case r of
Nothing -> restore $ lift $ hto ps ud
Just m -> do
pp <- processPriorities
enqueueMessage ps pp m
restore $ return ProcessSkip
mkMatchRunners :: GenProcess s [Match Message]
mkMatchRunners = do
ut <- gets usrTimers
fn <- mkRunner
let ms = Map.foldrWithKey (\k (t, _) ms' -> ms' ++ matchRun fn k t) [] ut
return ms
mkRunner :: GenProcess s (TimerKey -> Process Message)
mkRunner = do
st <- ST.get
let fn = \k -> do (m, _) <- runProcess st (consumeTimer k return)
return m
return fn
mkMatchers :: s
-> ProcessDefinition s
-> [Match (Either TimedOut Message)]
mkMatchers st df =
map (matchMapExtern (unhandledMessagePolicy df) st toRight)
(externHandlers df)
toRight :: Message -> Either TimedOut Message
toRight = Right
recvLoop :: ProcessDefinition s -> s -> Delay -> Process ExitReason
recvLoop pDef pState recvDelay =
let p = unhandledMessagePolicy pDef
handleTimeout = timeoutHandler pDef
handleStop = shutdownHandler pDef
shutdown' = matchDispatch p pState shutdownHandler'
extMatchers = map (matchDispatch p pState) (externHandlers pDef)
matchers = extMatchers ++ (map (matchDispatch p pState) (apiHandlers pDef))
ex' = (trapExit:(exitHandlers pDef))
ms' = (shutdown':matchers) ++ matchAux p pState (infoHandlers pDef)
in do
ac <- catchesExit (processReceive ms' handleTimeout pState recvDelay)
(map (\d' -> (dispatchExit d') pState) ex')
case ac of
ProcessSkip -> recvLoop pDef pState recvDelay
(ProcessContinue s') -> recvLoop pDef s' recvDelay
(ProcessTimeout t' s') -> recvLoop pDef s' t'
(ProcessHibernate d' s') -> block d' >> recvLoop pDef s' recvDelay
(ProcessStop r) -> handleStop (LastKnown pState) r >> return (r :: ExitReason)
(ProcessStopping s' r) -> handleStop (LastKnown s') r >> return (r :: ExitReason)
(ProcessBecome d' s') -> recvLoop d' s' recvDelay
(ProcessActivity _) -> die $ "recvLoop.InvalidState - ProcessActivityNotSupported"
(ProcessExpression _) -> die $ "recvLoop.InvalidState - ProcessExpressionNotSupported"
where
matchAux :: UnhandledMessagePolicy
-> s
-> [DeferredDispatcher s]
-> [Match (ProcessAction s)]
matchAux p ps ds = [matchAny (auxHandler (applyPolicy p ps) ps ds)]
auxHandler :: (Message -> Process (ProcessAction s))
-> s
-> [DeferredDispatcher s]
-> Message
-> Process (ProcessAction s)
auxHandler policy _ [] msg = policy msg
auxHandler policy st (d:ds :: [DeferredDispatcher s]) msg
| length ds > 0 = let dh = dispatchInfo d in do
m <- dh st msg
case m of
Nothing -> auxHandler policy st ds msg
Just act' -> return act'
| otherwise = let dh = dispatchInfo d in do
m <- dh st msg
case m of
Nothing -> policy msg
Just act' -> return act'
processReceive :: [Match (ProcessAction s)]
-> TimeoutHandler s
-> s
-> Delay
-> Process (ProcessAction s)
processReceive ms handleTimeout st d = do
next <- recv ms d
case next of
Nothing -> handleTimeout st d
Just pa -> return pa
recv :: [Match (ProcessAction s)]
-> Delay
-> Process (Maybe (ProcessAction s))
recv matches d' =
case d' of
Infinity -> receiveWait matches >>= return . Just
NoDelay -> receiveTimeout 0 matches
Delay t' -> receiveTimeout (asTimeout t') matches
shutdownHandler' :: Dispatcher s
shutdownHandler' = handleCast (\_ Shutdown -> stop $ ExitNormal)
trapExit :: ExitSignalDispatcher s
trapExit = handleExitIf (\_ e -> e == ExitShutdown)
(\_ _ (r :: ExitReason) -> stop r)
block :: TimeInterval -> Process ()
block i =
void $ receiveTimeout (asTimeout i) [ match (\(_ :: TimedOut) -> return ()) ]
applyPolicy :: UnhandledMessagePolicy
-> s
-> Message
-> Process (ProcessAction s)
applyPolicy p s m =
case p of
Terminate -> stop $ ExitOther "UnhandledInput"
DeadLetter pid -> forward m pid >> continue s
Drop -> continue s
Log -> logIt >> continue s
where
logIt =
Log.report Log.info Log.logChannel $ "Unhandled Gen Input Message: " ++ (show m)