module Control.Distributed.Process.Supervisor
(
ChildSpec(..)
, ChildKey
, ChildType(..)
, ChildTerminationPolicy(..)
, ChildStart(..)
, RegisteredName(LocalName, CustomRegister)
, RestartPolicy(..)
, ChildRef(..)
, isRunning
, isRestarting
, Child
, StaticLabel
, SupervisorPid
, ChildPid
, StarterPid
, ToChildStart(..)
, start
, run
, MaxRestarts
, maxRestarts
, RestartLimit(..)
, limit
, defaultLimits
, RestartMode(..)
, RestartOrder(..)
, RestartStrategy(..)
, ShutdownMode(..)
, restartOne
, restartAll
, restartLeft
, restartRight
, addChild
, AddChildResult(..)
, StartChildResult(..)
, startChild
, startNewChild
, terminateChild
, TerminateChildResult(..)
, deleteChild
, DeleteChildResult(..)
, restartChild
, RestartChildResult(..)
, shutdown
, shutdownAndWait
, lookupChild
, listChildren
, SupervisorStats(..)
, statistics
, StartFailure(..)
, ChildInitFailure(..)
) where
import Control.DeepSeq (NFData)
import Control.Distributed.Process.Supervisor.Types
import Control.Distributed.Process hiding (call)
import Control.Distributed.Process.Serializable()
import Control.Distributed.Process.Extras.Internal.Primitives hiding (monitor)
import Control.Distributed.Process.Extras.Internal.Types
( ExitReason(..)
)
import Control.Distributed.Process.ManagedProcess
( handleCall
, handleInfo
, reply
, continue
, stop
, stopWith
, input
, defaultProcess
, prioritised
, InitHandler
, InitResult(..)
, ProcessAction
, ProcessReply
, ProcessDefinition(..)
, PrioritisedProcessDefinition(..)
, Priority(..)
, DispatchPriority
, UnhandledMessagePolicy(Drop)
)
import qualified Control.Distributed.Process.ManagedProcess.UnsafeClient as Unsafe
( call
, cast
)
import qualified Control.Distributed.Process.ManagedProcess as MP
( pserve
)
import Control.Distributed.Process.ManagedProcess.Server.Priority
( prioritiseCast_
, prioritiseCall_
, prioritiseInfo_
, setPriority
)
import Control.Distributed.Process.ManagedProcess.Server.Restricted
( RestrictedProcess
, Result
, RestrictedAction
, getState
, putState
)
import qualified Control.Distributed.Process.ManagedProcess.Server.Restricted as Restricted
( handleCallIf
, handleCall
, handleCast
, reply
, continue
)
import Control.Distributed.Process.Extras.SystemLog
( LogClient
, LogChan
, LogText
, Logger(..)
)
import qualified Control.Distributed.Process.Extras.SystemLog as Log
import Control.Distributed.Process.Extras.Time
import Control.Exception (SomeException, throwIO)
import Control.Monad.Error
import Data.Accessor
( Accessor
, accessor
, (^:)
, (.>)
, (^=)
, (^.)
)
import Data.Binary
import Data.Foldable (find, foldlM, toList)
import Data.List (foldl')
import qualified Data.List as List (delete)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Sequence
( Seq
, ViewL(EmptyL, (:<))
, ViewR(EmptyR, (:>))
, (<|)
, (|>)
, (><)
, filter)
import qualified Data.Sequence as Seq
import Data.Time.Clock
( NominalDiffTime
, UTCTime
, getCurrentTime
, diffUTCTime
)
import Data.Typeable (Typeable)
#if ! MIN_VERSION_base(4,6,0)
import Prelude hiding (catch, filter, init, rem)
#else
import Prelude hiding (filter, init, rem)
#endif
import GHC.Generics
class ToChildStart a where
toChildStart :: a -> Process ChildStart
instance ToChildStart (Closure (Process ())) where
toChildStart = return . RunClosure
instance ToChildStart (Closure (SupervisorPid -> Process (ChildPid, Message))) where
toChildStart = return . CreateHandle
expectTriple :: Process (SupervisorPid, ChildKey, SendPort ChildPid)
expectTriple = expect
instance ToChildStart (Process ()) where
toChildStart proc = do
starterPid <- spawnLocal $ do
(supervisor, _, sendPidPort) <- expectTriple
link supervisor
spawnIt proc supervisor sendPidPort
tcsProcLoop proc
return (StarterProcess starterPid)
where
tcsProcLoop :: Process () -> Process ()
tcsProcLoop p = forever' $ do
(supervisor, _, sendPidPort) <- expectTriple
spawnIt p supervisor sendPidPort
spawnIt :: Process ()
-> SupervisorPid
-> SendPort ChildPid
-> Process ()
spawnIt proc' supervisor sendPidPort = do
supervisedPid <- spawnLocal $ do
link supervisor
self <- getSelfPid
(proc' `catches` [ Handler $ filterInitFailures supervisor self
, Handler $ logFailure supervisor self ])
`catchesExit` [\_ m -> handleMessageIf m (== ExitShutdown)
(\_ -> return ())]
sendChan sendPidPort supervisedPid
instance (Resolvable a) => ToChildStart (SupervisorPid -> Process a) where
toChildStart proc = do
starterPid <- spawnLocal $ do
(supervisor, _, sendPidPort) <- expectTriple
link supervisor
injectIt proc supervisor sendPidPort >> injectorLoop proc
return $ StarterProcess starterPid
where
injectorLoop :: Resolvable a
=> (SupervisorPid -> Process a)
-> Process ()
injectorLoop p = forever' $ do
(supervisor, _, sendPidPort) <- expectTriple
injectIt p supervisor sendPidPort
injectIt :: Resolvable a
=> (SupervisorPid -> Process a)
-> SupervisorPid
-> SendPort ChildPid
-> Process ()
injectIt proc' supervisor sendPidPort = do
addr <- proc' supervisor
mPid <- resolve addr
case mPid of
Nothing -> die "UnresolvableAddress in startChild instance"
Just p -> sendChan sendPidPort p
data DeleteChild = DeleteChild !ChildKey
deriving (Typeable, Generic)
instance Binary DeleteChild where
instance NFData DeleteChild where
data FindReq = FindReq ChildKey
deriving (Typeable, Generic)
instance Binary FindReq where
instance NFData FindReq where
data StatsReq = StatsReq
deriving (Typeable, Generic)
instance Binary StatsReq where
instance NFData StatsReq where
data ListReq = ListReq
deriving (Typeable, Generic)
instance Binary ListReq where
instance NFData ListReq where
type ImmediateStart = Bool
data AddChildReq = AddChild !ImmediateStart !ChildSpec
deriving (Typeable, Generic, Show)
instance Binary AddChildReq where
instance NFData AddChildReq where
data AddChildRes = Exists ChildRef | Added State
data StartChildReq = StartChild !ChildKey
deriving (Typeable, Generic)
instance Binary StartChildReq where
instance NFData StartChildReq where
data RestartChildReq = RestartChildReq !ChildKey
deriving (Typeable, Generic, Show, Eq)
instance Binary RestartChildReq where
instance NFData RestartChildReq where
data TerminateChildReq = TerminateChildReq !ChildKey
deriving (Typeable, Generic, Show, Eq)
instance Binary TerminateChildReq where
instance NFData TerminateChildReq where
data IgnoreChildReq = IgnoreChildReq !ChildPid
deriving (Typeable, Generic)
instance Binary IgnoreChildReq where
instance NFData IgnoreChildReq where
type ChildSpecs = Seq Child
type Prefix = ChildSpecs
type Suffix = ChildSpecs
data StatsType = Active | Specified
data LogSink = LogProcess !LogClient | LogChan
instance Logger LogSink where
logMessage LogChan = logMessage Log.logChannel
logMessage (LogProcess client') = logMessage client'
data State = State {
_specs :: ChildSpecs
, _active :: Map ChildPid ChildKey
, _strategy :: RestartStrategy
, _restartPeriod :: NominalDiffTime
, _restarts :: [UTCTime]
, _stats :: SupervisorStats
, _logger :: LogSink
, shutdownStrategy :: ShutdownMode
}
start :: RestartStrategy -> ShutdownMode -> [ChildSpec] -> Process SupervisorPid
start rs ss cs = spawnLocal $ run rs ss cs
run :: RestartStrategy -> ShutdownMode -> [ChildSpec] -> Process ()
run rs ss specs' = MP.pserve (rs, ss, specs') supInit serverDefinition
statistics :: Addressable a => a -> Process (SupervisorStats)
statistics = (flip Unsafe.call) StatsReq
lookupChild :: Addressable a => a -> ChildKey -> Process (Maybe (ChildRef, ChildSpec))
lookupChild addr key = Unsafe.call addr $ FindReq key
listChildren :: Addressable a => a -> Process [Child]
listChildren addr = Unsafe.call addr ListReq
addChild :: Addressable a => a -> ChildSpec -> Process AddChildResult
addChild addr spec = Unsafe.call addr $ AddChild False spec
startChild :: Addressable a => a -> ChildKey -> Process StartChildResult
startChild addr key = Unsafe.call addr $ StartChild key
startNewChild :: Addressable a
=> a
-> ChildSpec
-> Process AddChildResult
startNewChild addr spec = Unsafe.call addr $ AddChild True spec
deleteChild :: Addressable a => a -> ChildKey -> Process DeleteChildResult
deleteChild sid childKey = Unsafe.call sid $ DeleteChild childKey
terminateChild :: Addressable a
=> a
-> ChildKey
-> Process TerminateChildResult
terminateChild sid = Unsafe.call sid . TerminateChildReq
restartChild :: Addressable a
=> a
-> ChildKey
-> Process RestartChildResult
restartChild sid = Unsafe.call sid . RestartChildReq
shutdown :: Resolvable a => a -> Process ()
shutdown sid = do
mPid <- resolve sid
case mPid of
Nothing -> return ()
Just p -> exit p ExitShutdown
shutdownAndWait :: Resolvable a => a -> Process ()
shutdownAndWait sid = do
mPid <- resolve sid
case mPid of
Nothing -> return ()
Just p -> withMonitor p $ do
shutdown p
receiveWait [ matchIf (\(ProcessMonitorNotification _ p' _) -> p' == p)
(\_ -> return ())
]
supInit :: InitHandler (RestartStrategy, ShutdownMode, [ChildSpec]) State
supInit (strategy', shutdown', specs') = do
logClient <- Log.client
let client' = case logClient of
Nothing -> LogChan
Just c -> LogProcess c
let initState = ( (
restartPeriod ^= configuredRestartPeriod
)
. (strategy ^= strategy')
. (logger ^= client')
$ emptyState shutdown'
)
catch (foldlM initChild initState specs' >>= return . (flip InitOk) Infinity)
(\(e :: SomeException) -> do
sup <- getSelfPid
logEntry Log.error $
mkReport "Could not init supervisor " sup "noproc" (show e)
return $ InitStop (show e))
where
initChild :: State -> ChildSpec -> Process State
initChild st ch =
case (findChild (childKey ch) st) of
Just (ref, _) -> die $ StartFailureDuplicateChild ref
Nothing -> tryStartChild ch >>= initialised st ch
configuredRestartPeriod =
let maxT' = maxT (intensity strategy')
tI = asTimeout maxT'
tMs = (fromIntegral tI * (0.000001 :: Float))
in fromRational (toRational tMs) :: NominalDiffTime
initialised :: State
-> ChildSpec
-> Either StartFailure ChildRef
-> Process State
initialised _ _ (Left err) = liftIO $ throwIO $ ChildInitFailure (show err)
initialised state spec (Right ref) = do
mPid <- resolve ref
case mPid of
Nothing -> die $ (childKey spec) ++ ": InvalidChildRef"
Just childPid -> do
return $ ( (active ^: Map.insert childPid chId)
. (specs ^: (|> (ref, spec)))
$ bumpStats Active chType (+1) state
)
where chId = childKey spec
chType = childType spec
emptyState :: ShutdownMode -> State
emptyState strat = State {
_specs = Seq.empty
, _active = Map.empty
, _strategy = restartAll
, _restartPeriod = (fromIntegral (0 :: Integer)) :: NominalDiffTime
, _restarts = []
, _stats = emptyStats
, _logger = LogChan
, shutdownStrategy = strat
}
emptyStats :: SupervisorStats
emptyStats = SupervisorStats {
_children = 0
, _workers = 0
, _supervisors = 0
, _running = 0
, _activeSupervisors = 0
, _activeWorkers = 0
, totalRestarts = 0
}
serverDefinition :: PrioritisedProcessDefinition State
serverDefinition = prioritised processDefinition supPriorities
where
supPriorities :: [DispatchPriority State]
supPriorities = [
prioritiseCast_ (\(IgnoreChildReq _) -> setPriority 100)
, prioritiseInfo_ (\(ProcessMonitorNotification _ _ _) -> setPriority 99 )
, prioritiseCall_ (\(_ :: FindReq) ->
(setPriority 10) :: Priority (Maybe (ChildRef, ChildSpec)))
]
processDefinition :: ProcessDefinition State
processDefinition =
defaultProcess {
apiHandlers = [
Restricted.handleCast handleIgnore
, handleCall handleTerminateChild
, Restricted.handleCall handleDeleteChild
, Restricted.handleCallIf (input (\(AddChild immediate _) -> not immediate))
handleAddChild
, handleCall handleStartNewChild
, handleCall handleStartChild
, handleCall handleRestartChild
, Restricted.handleCall handleLookupChild
, Restricted.handleCall handleListChildren
, Restricted.handleCall handleGetStats
]
, infoHandlers = [handleInfo handleMonitorSignal]
, shutdownHandler = handleShutdown
, unhandledMessagePolicy = Drop
} :: ProcessDefinition State
handleLookupChild :: FindReq
-> RestrictedProcess State (Result (Maybe (ChildRef, ChildSpec)))
handleLookupChild (FindReq key) = getState >>= Restricted.reply . findChild key
handleListChildren :: ListReq
-> RestrictedProcess State (Result [Child])
handleListChildren _ = getState >>= Restricted.reply . toList . (^. specs)
handleAddChild :: AddChildReq
-> RestrictedProcess State (Result AddChildResult)
handleAddChild req = getState >>= return . doAddChild req True >>= doReply
where doReply :: AddChildRes -> RestrictedProcess State (Result AddChildResult)
doReply (Added s) = putState s >> Restricted.reply (ChildAdded ChildStopped)
doReply (Exists e) = Restricted.reply (ChildFailedToStart $ StartFailureDuplicateChild e)
handleIgnore :: IgnoreChildReq
-> RestrictedProcess State RestrictedAction
handleIgnore (IgnoreChildReq childPid) = do
state <- getState
let (cId, active') =
Map.updateLookupWithKey (\_ _ -> Nothing) childPid $ state ^. active
case cId of
Nothing -> Restricted.continue
Just c -> do
putState $ ( (active ^= active')
. (resetChildIgnored c)
$ state
)
Restricted.continue
where
resetChildIgnored :: ChildKey -> State -> State
resetChildIgnored key state =
maybe state id $ updateChild key (setChildStopped True) state
handleDeleteChild :: DeleteChild
-> RestrictedProcess State (Result DeleteChildResult)
handleDeleteChild (DeleteChild k) = getState >>= handleDelete k
where
handleDelete :: ChildKey
-> State
-> RestrictedProcess State (Result DeleteChildResult)
handleDelete key state =
let (prefix, suffix) = Seq.breakl ((== key) . childKey . snd) $ state ^. specs
in case (Seq.viewl suffix) of
EmptyL -> Restricted.reply ChildNotFound
child :< remaining -> tryDeleteChild child prefix remaining state
tryDeleteChild (ref, spec) pfx sfx st
| ref == ChildStopped = do
putState $ ( (specs ^= pfx >< sfx)
$ bumpStats Specified (childType spec) decrement st
)
Restricted.reply ChildDeleted
| otherwise = Restricted.reply $ ChildNotStopped ref
handleStartChild :: State
-> StartChildReq
-> Process (ProcessReply StartChildResult State)
handleStartChild state (StartChild key) =
let child = findChild key state in
case child of
Nothing ->
reply ChildStartUnknownId state
Just (ref@(ChildRunning _), _) ->
reply (ChildStartFailed (StartFailureAlreadyRunning ref)) state
Just (ref@(ChildRunningExtra _ _), _) ->
reply (ChildStartFailed (StartFailureAlreadyRunning ref)) state
Just (ref@(ChildRestarting _), _) ->
reply (ChildStartFailed (StartFailureAlreadyRunning ref)) state
Just (_, spec) -> do
started <- doStartChild spec state
case started of
Left err -> reply (ChildStartFailed err) state
Right (ref, st') -> reply (ChildStartOk ref) st'
handleStartNewChild :: State
-> AddChildReq
-> Process (ProcessReply AddChildResult State)
handleStartNewChild state req@(AddChild _ spec) =
let added = doAddChild req False state in
case added of
Exists e -> reply (ChildFailedToStart $ StartFailureDuplicateChild e) state
Added _ -> attemptStart state spec
where
attemptStart st ch = do
started <- tryStartChild ch
case started of
Left err -> reply (ChildFailedToStart err) $ removeChild spec st
Right ref -> do
let st' = ( (specs ^: (|> (ref, spec)))
$ bumpStats Specified (childType spec) (+1) st
)
in reply (ChildAdded ref) $ markActive st' ref ch
handleRestartChild :: State
-> RestartChildReq
-> Process (ProcessReply RestartChildResult State)
handleRestartChild state (RestartChildReq key) =
let child = findChild key state in
case child of
Nothing ->
reply ChildRestartUnknownId state
Just (ref@(ChildRunning _), _) ->
reply (ChildRestartFailed (StartFailureAlreadyRunning ref)) state
Just (ref@(ChildRunningExtra _ _), _) ->
reply (ChildRestartFailed (StartFailureAlreadyRunning ref)) state
Just (ref@(ChildRestarting _), _) ->
reply (ChildRestartFailed (StartFailureAlreadyRunning ref)) state
Just (_, spec) -> do
started <- doStartChild spec state
case started of
Left err -> reply (ChildRestartFailed err) state
Right (ref, st') -> reply (ChildRestartOk ref) st'
handleTerminateChild :: State
-> TerminateChildReq
-> Process (ProcessReply TerminateChildResult State)
handleTerminateChild state (TerminateChildReq key) =
let child = findChild key state in
case child of
Nothing ->
reply TerminateChildUnknownId state
Just (ChildStopped, _) ->
reply TerminateChildOk state
Just (ref, spec) ->
reply TerminateChildOk =<< doTerminateChild ref spec state
handleGetStats :: StatsReq
-> RestrictedProcess State (Result SupervisorStats)
handleGetStats _ = Restricted.reply . (^. stats) =<< getState
handleMonitorSignal :: State
-> ProcessMonitorNotification
-> Process (ProcessAction State)
handleMonitorSignal state (ProcessMonitorNotification _ childPid reason) = do
let (cId, active') =
Map.updateLookupWithKey (\_ _ -> Nothing) childPid $ state ^. active
let mSpec =
case cId of
Nothing -> Nothing
Just c -> fmap snd $ findChild c state
case mSpec of
Nothing -> continue $ (active ^= active') state
Just spec -> tryRestart childPid state active' spec reason
handleShutdown :: State -> ExitReason -> Process ()
handleShutdown state (ExitOther reason) = terminateChildren state >> die reason
handleShutdown state _ = terminateChildren state
tryRestart :: ChildPid
-> State
-> Map ChildPid ChildKey
-> ChildSpec
-> DiedReason
-> Process (ProcessAction State)
tryRestart childPid state active' spec reason = do
sup <- getSelfPid
logEntry Log.debug $ do
mkReport "tryRestart" sup (childKey spec) (show reason)
case state ^. strategy of
RestartOne _ -> tryRestartChild childPid state active' spec reason
strat -> do
case (childRestart spec, isNormal reason) of
(Intrinsic, True) -> stopWith newState ExitNormal
(Transient, True) -> continue newState
(Temporary, _) -> continue removeTemp
_ -> tryRestartBranch strat spec reason $ newState
where
newState = (active ^= active') state
removeTemp = removeChild spec $ newState
isNormal (DiedException _) = False
isNormal _ = True
tryRestartBranch :: RestartStrategy
-> ChildSpec
-> DiedReason
-> State
-> Process (ProcessAction State)
tryRestartBranch rs sp dr st =
let mode' = mode rs
tree' = case rs of
RestartAll _ _ -> childSpecs
RestartLeft _ _ -> subTreeL
RestartRight _ _ -> subTreeR
_ -> error "IllegalState"
proc = case mode' of
RestartEach _ -> stopStart
RestartInOrder _ -> restartL
RestartRevOrder _ -> reverseRestart
dir' = order mode' in do
proc tree' dir'
where
stopStart :: ChildSpecs -> RestartOrder -> Process (ProcessAction State)
stopStart tree order' = do
let tree' = case order' of
LeftToRight -> tree
RightToLeft -> Seq.reverse tree
state <- addRestart activeState
case state of
Nothing -> die errorMaxIntensityReached
Just st' -> apply (foldlM stopStartIt st' tree')
reverseRestart :: ChildSpecs
-> RestartOrder
-> Process (ProcessAction State)
reverseRestart tree LeftToRight = restartL tree RightToLeft
reverseRestart tree dir@(RightToLeft) = restartL (Seq.reverse tree) dir
restartL :: ChildSpecs -> RestartOrder -> Process (ProcessAction State)
restartL tree ro = do
let rev = (ro == RightToLeft)
let tree' = case rev of
False -> tree
True -> Seq.reverse tree
state <- addRestart activeState
case state of
Nothing -> die errorMaxIntensityReached
Just st' -> foldlM stopIt st' tree >>= \s -> do
apply $ foldlM startIt s tree'
stopStartIt :: State -> Child -> Process State
stopStartIt s ch@(cr, cs) = doTerminateChild cr cs s >>= (flip startIt) ch
stopIt :: State -> Child -> Process State
stopIt s (cr, cs) = doTerminateChild cr cs s
startIt :: State -> Child -> Process State
startIt s (_, cs)
| isTemporary (childRestart cs) = return $ removeChild cs s
| otherwise = ensureActive cs =<< doStartChild cs s
ensureActive :: ChildSpec
-> Either StartFailure (ChildRef, State)
-> Process State
ensureActive cs it
| (Right (ref, st')) <- it = return $ markActive st' ref cs
| (Left err) <- it = die $ ExitOther $ (childKey cs) ++ ": " ++ (show err)
| otherwise = error "IllegalState"
apply :: (Process State) -> Process (ProcessAction State)
apply proc = do
catchExit (proc >>= continue) (\(_ :: ProcessId) -> stop)
activeState = maybe st id $ updateChild (childKey sp)
(setChildStopped False) st
subTreeL :: ChildSpecs
subTreeL =
let (prefix, suffix) = splitTree Seq.breakl
in case (Seq.viewl suffix) of
child :< _ -> prefix |> child
EmptyL -> prefix
subTreeR :: ChildSpecs
subTreeR =
let (prefix, suffix) = splitTree Seq.breakr
in case (Seq.viewr suffix) of
_ :> child -> child <| prefix
EmptyR -> prefix
splitTree splitWith = splitWith ((== childKey sp) . childKey . snd) childSpecs
childSpecs :: ChildSpecs
childSpecs =
let cs = activeState ^. specs
ck = childKey sp
rs' = childRestart sp
in case (isTransient rs', isTemporary rs', dr) of
(True, _, DiedNormal) -> filter ((/= ck) . childKey . snd) cs
(_, True, _) -> filter ((/= ck) . childKey . snd) cs
_ -> cs
tryRestartChild :: ChildPid
-> State
-> Map ChildPid ChildKey
-> ChildSpec
-> DiedReason
-> Process (ProcessAction State)
tryRestartChild childPid st active' spec reason
| DiedNormal <- reason
, True <- isTransient (childRestart spec) = continue childDown
| True <- isTemporary (childRestart spec) = continue childRemoved
| DiedNormal <- reason
, True <- isIntrinsic (childRestart spec) = stopWith updateStopped ExitNormal
| otherwise = doRestartChild childPid spec reason st
where
childDown = (active ^= active') $ updateStopped
childRemoved = (active ^= active') $ removeChild spec st
updateStopped = maybe st id $ updateChild chKey (setChildStopped False) st
chKey = childKey spec
doRestartChild :: ChildPid -> ChildSpec -> DiedReason -> State -> Process (ProcessAction State)
doRestartChild _ spec _ state = do
state' <- addRestart state
case state' of
Nothing -> die errorMaxIntensityReached
Just st -> do
start' <- doStartChild spec st
case start' of
Right (ref, st') -> continue $ markActive st' ref spec
Left err -> do
sup <- getSelfPid
if isTemporary (childRestart spec)
then do
logEntry Log.warning $
mkReport "Error in temporary child" sup (childKey spec) (show err)
continue $ ( (active ^: Map.filter (/= chKey))
. (bumpStats Active chType decrement)
. (bumpStats Specified chType decrement)
$ removeChild spec st)
else do
logEntry Log.error $
mkReport "Unrecoverable error in child. Stopping supervisor"
sup (childKey spec) (show err)
stopWith st $ ExitOther $ "Unrecoverable error in child " ++ (childKey spec)
where
chKey = childKey spec
chType = childType spec
addRestart :: State -> Process (Maybe State)
addRestart state = do
now <- liftIO $ getCurrentTime
let acc = foldl' (accRestarts now) [] (now:restarted)
case length acc of
n | n > maxAttempts -> return Nothing
_ -> return $ Just $ (restarts ^= acc) $ state
where
maxAttempts = maxNumberOfRestarts $ maxR $ maxIntensity
slot = state ^. restartPeriod
restarted = state ^. restarts
maxIntensity = state ^. strategy .> restartIntensity
accRestarts :: UTCTime -> [UTCTime] -> UTCTime -> [UTCTime]
accRestarts now' acc r =
let diff = diffUTCTime now' r in
if diff > slot then acc else (r:acc)
doStartChild :: ChildSpec
-> State
-> Process (Either StartFailure (ChildRef, State))
doStartChild spec st = do
restart <- tryStartChild spec
case restart of
Left f -> return $ Left f
Right p -> do
let mState = updateChild chKey (chRunning p) st
case mState of
Nothing -> die $ "InternalError in doStartChild " ++ show spec
Just s' -> return $ Right $ (p, markActive s' p spec)
where
chKey = childKey spec
chRunning :: ChildRef -> Child -> Prefix -> Suffix -> State -> Maybe State
chRunning newRef (_, chSpec) prefix suffix st' =
Just $ ( (specs ^= prefix >< ((newRef, chSpec) <| suffix))
$ bumpStats Active (childType spec) (+1) st'
)
tryStartChild :: ChildSpec
-> Process (Either StartFailure ChildRef)
tryStartChild ChildSpec{..} =
case childStart of
RunClosure proc -> do
mProc <- catch (unClosure proc >>= return . Right)
(\(e :: SomeException) -> return $ Left (show e))
case mProc of
Left err -> logStartFailure $ StartFailureBadClosure err
Right p -> wrapClosure childRegName p >>= return . Right
CreateHandle fn -> do
mFn <- catch (unClosure fn >>= return . Right)
(\(e :: SomeException) -> return $ Left (show e))
case mFn of
Left err -> logStartFailure $ StartFailureBadClosure err
Right fn' -> do
wrapHandle childRegName fn' >>= return . Right
StarterProcess starterPid ->
wrapRestarterProcess childRegName starterPid
where
logStartFailure sf = do
sup <- getSelfPid
logEntry Log.error $ mkReport "Child Start Error" sup childKey (show sf)
return $ Left sf
wrapClosure :: Maybe RegisteredName
-> Process ()
-> Process ChildRef
wrapClosure regName proc = do
supervisor <- getSelfPid
childPid <- spawnLocal $ do
self <- getSelfPid
link supervisor
maybeRegister regName self
() <- expect
(proc `catches` [ Handler $ filterInitFailures supervisor self
, Handler $ logFailure supervisor self ])
`catchesExit` [
(\_ m -> handleMessageIf m (== ExitShutdown)
(\_ -> return ()))]
void $ monitor childPid
send childPid ()
return $ ChildRunning childPid
wrapHandle :: Maybe RegisteredName
-> (SupervisorPid -> Process (ChildPid, Message))
-> Process ChildRef
wrapHandle regName proc = do
super <- getSelfPid
(childPid, msg) <- proc super
maybeRegister regName childPid
void $ monitor childPid
return $ ChildRunningExtra childPid msg
wrapRestarterProcess :: Maybe RegisteredName
-> StarterPid
-> Process (Either StartFailure ChildRef)
wrapRestarterProcess regName starterPid = do
sup <- getSelfPid
(sendPid, recvPid) <- newChan
ref <- monitor starterPid
send starterPid (sup, childKey, sendPid)
ePid <- receiveWait [
matchChan recvPid (\(pid :: ChildPid) -> return $ Right pid)
, matchIf (\(ProcessMonitorNotification mref _ dr) ->
mref == ref && dr /= DiedNormal)
(\(ProcessMonitorNotification _ _ dr) ->
return $ Left dr)
] `finally` (unmonitor ref)
case ePid of
Right pid -> do
maybeRegister regName pid
void $ monitor pid
return $ Right $ ChildRunning pid
Left dr -> return $ Left $ StartFailureDied dr
maybeRegister :: Maybe RegisteredName -> ChildPid -> Process ()
maybeRegister Nothing _ = return ()
maybeRegister (Just (LocalName n)) pid = register n pid
maybeRegister (Just (GlobalName _)) _ = return ()
maybeRegister (Just (CustomRegister clj)) pid = do
mProc <- catch (unClosure clj >>= return . Right)
(\(e :: SomeException) -> return $ Left (show e))
case mProc of
Left err -> die $ ExitOther (show err)
Right p -> p pid
filterInitFailures :: SupervisorPid
-> ChildPid
-> ChildInitFailure
-> Process ()
filterInitFailures sup childPid ex = do
case ex of
ChildInitFailure _ -> do
logEntry Log.error $ mkReport "ChildInitFailure" sup (show childPid) (show ex)
liftIO $ throwIO ex
ChildInitIgnore -> Unsafe.cast sup $ IgnoreChildReq childPid
terminateChildren :: State -> Process ()
terminateChildren state = do
case (shutdownStrategy state) of
ParallelShutdown -> do
let allChildren = toList $ state ^. specs
terminatorPids <- forM allChildren $ \ch -> do
pid <- spawnLocal $ void $ syncTerminate ch $ (active ^= Map.empty) state
void $ monitor pid
return pid
terminationErrors <- collectExits [] terminatorPids
case terminationErrors of
[] -> return ()
_ -> do
sup <- getSelfPid
void $ logEntry Log.error $
mkReport "Errors in terminateChildren / ParallelShutdown"
sup "n/a" (show terminationErrors)
SequentialShutdown ord -> do
let specs' = state ^. specs
let allChildren = case ord of
RightToLeft -> Seq.reverse specs'
LeftToRight -> specs'
void $ foldlM (flip syncTerminate) state (toList allChildren)
where
syncTerminate :: Child -> State -> Process State
syncTerminate (cr, cs) state' = doTerminateChild cr cs state'
collectExits :: [(ProcessId, DiedReason)]
-> [ChildPid]
-> Process [(ProcessId, DiedReason)]
collectExits errors [] = return errors
collectExits errors pids = do
(pid, reason) <- receiveWait [
match (\(ProcessMonitorNotification _ pid' reason') -> do
return (pid', reason'))
]
let remaining = List.delete pid pids
case reason of
DiedNormal -> collectExits errors remaining
_ -> collectExits ((pid, reason):errors) remaining
doTerminateChild :: ChildRef -> ChildSpec -> State -> Process State
doTerminateChild ref spec state = do
mPid <- resolve ref
case mPid of
Nothing -> return state
Just pid -> do
stopped <- childShutdown (childStop spec) pid state
state' <- shutdownComplete state pid stopped
return $ ( (active ^: Map.delete pid)
$ state'
)
where
shutdownComplete :: State -> ChildPid -> DiedReason -> Process State
shutdownComplete _ _ DiedNormal = return $ updateStopped
shutdownComplete state' pid (r :: DiedReason) = do
logShutdown (state' ^. logger) chKey pid r >> return state'
chKey = childKey spec
updateStopped = maybe state id $ updateChild chKey (setChildStopped False) state
childShutdown :: ChildTerminationPolicy
-> ChildPid
-> State
-> Process DiedReason
childShutdown policy childPid st = do
case policy of
(TerminateTimeout t) -> exit childPid ExitShutdown >> await childPid t st
TerminateImmediately -> do
kill childPid "TerminatedBySupervisor"
void $ await childPid Infinity st
return DiedNormal
where
await :: ChildPid -> Delay -> State -> Process DiedReason
await childPid' delay state = do
let monitored = (Map.member childPid' $ state ^. active)
let recv = case delay of
Infinity -> receiveWait (matches childPid') >>= return . Just
NoDelay -> receiveTimeout 0 (matches childPid')
Delay t -> receiveTimeout (asTimeout t) (matches childPid')
let recv' = if monitored then recv else withMonitor childPid' recv
recv' >>= maybe (childShutdown TerminateImmediately childPid' state) return
matches :: ChildPid -> [Match DiedReason]
matches p = [
matchIf (\(ProcessMonitorNotification _ p' _) -> p == p')
(\(ProcessMonitorNotification _ _ r) -> return r)
]
errorMaxIntensityReached :: ExitReason
errorMaxIntensityReached = ExitOther "ReachedMaxRestartIntensity"
logShutdown :: LogSink -> ChildKey -> ChildPid -> DiedReason -> Process ()
logShutdown log' child childPid reason = do
sup <- getSelfPid
Log.info log' $ mkReport banner sup (show childPid) shutdownReason
where
banner = "Child Shutdown Complete"
shutdownReason = (show reason) ++ ", child-key: " ++ child
logFailure :: SupervisorPid -> ChildPid -> SomeException -> Process ()
logFailure sup childPid ex = do
logEntry Log.notice $ mkReport "Detected Child Exit" sup (show childPid) (show ex)
liftIO $ throwIO ex
logEntry :: (LogChan -> LogText -> Process ()) -> String -> Process ()
logEntry lg = Log.report lg Log.logChannel
mkReport :: String -> SupervisorPid -> String -> String -> String
mkReport b s c r = foldl' (\x xs -> xs ++ " " ++ x) "" items
where
items :: [String]
items = [ "[" ++ s' ++ "]" | s' <- [ b
, "supervisor: " ++ show s
, "child: " ++ c
, "reason: " ++ r] ]
type Ignored = Bool
setChildStopped :: Ignored -> Child -> Prefix -> Suffix -> State -> Maybe State
setChildStopped ignored child prefix remaining st =
let spec = snd child
rType = childRestart spec
newRef = if ignored then ChildStartIgnored else ChildStopped
in case isTemporary rType of
True -> Just $ (specs ^= prefix >< remaining) $ st
False -> Just $ (specs ^= prefix >< ((newRef, spec) <| remaining)) st
doAddChild :: AddChildReq -> Bool -> State -> AddChildRes
doAddChild (AddChild _ spec) update st =
let chType = childType spec
in case (findChild (childKey spec) st) of
Just (ref, _) -> Exists ref
Nothing ->
case update of
True -> Added $ ( (specs ^: (|> (ChildStopped, spec)))
$ bumpStats Specified chType (+1) st
)
False -> Added st
updateChild :: ChildKey
-> (Child -> Prefix -> Suffix -> State -> Maybe State)
-> State
-> Maybe State
updateChild key updateFn state =
let (prefix, suffix) = Seq.breakl ((== key) . childKey . snd) $ state ^. specs
in case (Seq.viewl suffix) of
EmptyL -> Nothing
child :< remaining -> updateFn child prefix remaining state
removeChild :: ChildSpec -> State -> State
removeChild spec state =
let k = childKey spec
in specs ^: filter ((/= k) . childKey . snd) $ state
markActive :: State -> ChildRef -> ChildSpec -> State
markActive state ref spec =
case ref of
ChildRunning (pid :: ChildPid) -> inserted pid
ChildRunningExtra pid _ -> inserted pid
_ -> error $ "InternalError"
where
inserted pid' = active ^: Map.insert pid' (childKey spec) $ state
decrement :: Int -> Int
decrement n = n 1
findChild :: ChildKey -> State -> Maybe (ChildRef, ChildSpec)
findChild key st = find ((== key) . childKey . snd) $ st ^. specs
bumpStats :: StatsType -> ChildType -> (Int -> Int) -> State -> State
bumpStats Specified Supervisor fn st = (bump fn) . (stats .> supervisors ^: fn) $ st
bumpStats Specified Worker fn st = (bump fn) . (stats .> workers ^: fn) $ st
bumpStats Active Worker fn st = (stats .> running ^: fn) . (stats .> activeWorkers ^: fn) $ st
bumpStats Active Supervisor fn st = (stats .> running ^: fn) . (stats .> activeSupervisors ^: fn) $ st
bump :: (Int -> Int) -> State -> State
bump with' = stats .> children ^: with'
isTemporary :: RestartPolicy -> Bool
isTemporary = (== Temporary)
isTransient :: RestartPolicy -> Bool
isTransient = (== Transient)
isIntrinsic :: RestartPolicy -> Bool
isIntrinsic = (== Intrinsic)
active :: Accessor State (Map ChildPid ChildKey)
active = accessor _active (\act' st -> st { _active = act' })
strategy :: Accessor State RestartStrategy
strategy = accessor _strategy (\s st -> st { _strategy = s })
restartIntensity :: Accessor RestartStrategy RestartLimit
restartIntensity = accessor intensity (\i l -> l { intensity = i })
restartPeriod :: Accessor State NominalDiffTime
restartPeriod = accessor _restartPeriod (\p st -> st { _restartPeriod = p })
restarts :: Accessor State [UTCTime]
restarts = accessor _restarts (\r st -> st { _restarts = r })
specs :: Accessor State ChildSpecs
specs = accessor _specs (\sp' st -> st { _specs = sp' })
stats :: Accessor State SupervisorStats
stats = accessor _stats (\st' st -> st { _stats = st' })
logger :: Accessor State LogSink
logger = accessor _logger (\l st -> st { _logger = l })
children :: Accessor SupervisorStats Int
children = accessor _children (\c st -> st { _children = c })
workers :: Accessor SupervisorStats Int
workers = accessor _workers (\c st -> st { _workers = c })
running :: Accessor SupervisorStats Int
running = accessor _running (\r st -> st { _running = r })
supervisors :: Accessor SupervisorStats Int
supervisors = accessor _supervisors (\c st -> st { _supervisors = c })
activeWorkers :: Accessor SupervisorStats Int
activeWorkers = accessor _activeWorkers (\c st -> st { _activeWorkers = c })
activeSupervisors :: Accessor SupervisorStats Int
activeSupervisors = accessor _activeSupervisors (\c st -> st { _activeSupervisors = c })