{-# LANGUAGE UndecidableInstances #-}
module Control.Eff.Concurrent.Protocol.Watchdog
( startLink
, Watchdog
, attachTemporary
, attachPermanent
, getCrashReports
, CrashRate(..)
, crashCount
, crashTimeSpan
, crashesPerSeconds
, CrashCount
, CrashTimeSpan
, ChildWatch(..)
, parent
, crashes
, ExonerationTimer(..)
, CrashReport(..)
, crashTime
, crashReason
, exonerationTimerReference
) where
import Control.DeepSeq
import Control.Eff (Eff, Member, lift, Lifted)
import Control.Eff.Concurrent.Misc
import Control.Eff.Concurrent.Process
import Control.Eff.Concurrent.Process.Timer
import Control.Eff.Concurrent.Protocol
import Control.Eff.Concurrent.Protocol.Client
import Control.Eff.Concurrent.Protocol.Wrapper
import qualified Control.Eff.Concurrent.Protocol.Observer as Observer
import Control.Eff.Concurrent.Protocol.Observer (Observer)
import qualified Control.Eff.Concurrent.Protocol.Broker as Broker
import Control.Eff.Concurrent.Protocol.Broker (Broker)
import qualified Control.Eff.Concurrent.Protocol.StatefulServer as Stateful
import qualified Control.Eff.Concurrent.Protocol.EffectfulServer as Effectful
import Control.Lens
import Control.Eff.Log
import Data.Set (Set)
import Data.Typeable
import qualified Data.Set as Set
import Data.Map (Map)
import qualified Data.Map as Map
import Data.Time.Clock
import Data.Kind (Type)
import Data.Default
import Data.Text (pack)
import GHC.Stack (HasCallStack)
import Data.Maybe (isJust)
import Data.Foldable (traverse_, forM_)
import Control.Monad (when)
data Watchdog (child :: Type) deriving Typeable
startLink
:: forall child e q
. ( HasCallStack
, Typeable child
, FilteredLogging (Processes q)
, Member Logs q
, HasProcesses e q
, Tangible (Broker.ChildId child)
, Ord (Broker.ChildId child)
, HasPdu (Effectful.ServerPdu child)
, Lifted IO q
)
=> CrashRate -> Eff e (Endpoint (Watchdog child))
startLink = Stateful.startLink @(Watchdog child) . StartWatchDog
attachTemporary
:: forall child q e
. ( HasCallStack
, FilteredLogging e
, Typeable child
, HasPdu (Effectful.ServerPdu child)
, Tangible (Broker.ChildId child)
, Ord (Broker.ChildId child)
, HasProcesses e q
)
=> Endpoint (Watchdog child) -> Endpoint (Broker child) -> Eff e ()
attachTemporary wd broker =
callWithTimeout wd (Attach broker False) (TimeoutMicros 1_000_000)
attachPermanent
:: forall child q e
. ( HasCallStack
, FilteredLogging e
, Typeable child
, HasPdu (Effectful.ServerPdu child)
, Tangible (Broker.ChildId child)
, Ord (Broker.ChildId child)
, HasProcesses e q
)
=> Endpoint (Watchdog child) -> Endpoint (Broker child) -> Eff e ()
attachPermanent wd broker =
callWithTimeout wd (Attach broker True) (TimeoutMicros 1_000_000)
getCrashReports
:: forall child q e
. ( HasCallStack
, FilteredLogging e
, Typeable child
, HasPdu (Effectful.ServerPdu child)
, Tangible (Broker.ChildId child)
, Ord (Broker.ChildId child)
, HasProcesses e q
, Lifted IO q
, Lifted IO e
, Member Logs e
)
=> Endpoint (Watchdog child) -> Eff e (Map (Broker.ChildId child) (ChildWatch child))
getCrashReports wd = callWithTimeout wd GetCrashReports (TimeoutMicros 5_000_000)
instance Typeable child => HasPdu (Watchdog child) where
type instance EmbeddedPduList (Watchdog child) = '[Observer (Broker.ChildEvent child)]
data Pdu (Watchdog child) r where
Attach :: Endpoint (Broker child) -> Bool -> Pdu (Watchdog child) ('Synchronous ())
GetCrashReports :: Pdu (Watchdog child) ('Synchronous (Map (Broker.ChildId child) (ChildWatch child)))
OnChildEvent :: Broker.ChildEvent child -> Pdu (Watchdog child) 'Asynchronous
deriving Typeable
instance Typeable child => HasPduPrism (Watchdog child) (Observer (Broker.ChildEvent child)) where
embedPdu (Observer.Observed e) = OnChildEvent e
fromPdu (OnChildEvent x) = Just (Observer.Observed x)
fromPdu _ = Nothing
instance (NFData (Broker.ChildId child)) => NFData (Pdu (Watchdog child) r) where
rnf (Attach e b) = rnf e `seq` rnf b `seq` ()
rnf GetCrashReports = ()
rnf (OnChildEvent o) = rnf o
instance
( Show (Broker.ChildId child)
, Typeable child
, Typeable (Effectful.ServerPdu child)
)
=> Show (Pdu (Watchdog child) r) where
showsPrec d (Attach e False) = showParen (d>=10) (showString "attach-temporary: " . shows e)
showsPrec d (Attach e True) = showParen (d>=10) (showString "attach-permanent: " . shows e)
showsPrec _ GetCrashReports = showString "get-crash-reports"
showsPrec d (OnChildEvent o) = showParen (d>=10) (showString "on-child-event: " . showsPrec 10 o)
data BrokerWatch =
MkBrokerWatch { _brokerMonitor :: MonitorReference, _isPermanent :: Bool }
deriving (Typeable)
instance Show BrokerWatch where
showsPrec d (MkBrokerWatch mon False) = showParen (d>=10) (showString "temporary-broker: " . showsPrec 10 mon)
showsPrec d (MkBrokerWatch mon True) = showParen (d>=10) (showString "permanent-broker: " . showsPrec 10 mon)
brokerMonitor :: Lens' BrokerWatch MonitorReference
brokerMonitor = lens _brokerMonitor (\(MkBrokerWatch _ x) m -> MkBrokerWatch m x)
isPermanent :: Lens' BrokerWatch Bool
isPermanent = lens _isPermanent (\(MkBrokerWatch x _) m -> MkBrokerWatch x m)
instance
( Typeable child
, HasPdu (Effectful.ServerPdu child)
, Tangible (Broker.ChildId child)
, Ord (Broker.ChildId child)
, Eq (Broker.ChildId child)
, Lifted IO e
, Member Logs e
) => Stateful.Server (Watchdog child) (Processes e) where
data instance StartArgument (Watchdog child) =
StartWatchDog { _crashRate :: CrashRate
}
deriving Typeable
data instance Model (Watchdog child) =
WatchdogModel { _brokers :: Map (Endpoint (Broker child)) BrokerWatch
, _watched :: Map (Broker.ChildId child) (ChildWatch child)
}
update me startArg =
\case
Effectful.OnCall rt (Attach broker permanent) -> do
logDebug ( "attaching "
<> if permanent then "permanently" else "temporarily"
<> " to: " <> pack (show broker)
)
oldMonitor <- Stateful.preuseModel @(Watchdog child) (brokers . at broker . _Just . brokerMonitor)
newMonitor <- maybe (monitor (broker^.fromEndpoint)) return oldMonitor
case oldMonitor of
Nothing -> do
logDebug ("start observing: " <> pack (show broker))
Observer.registerObserver @(Broker.ChildEvent child) broker me
Just _ ->
logDebug ("already observing " <> pack (show broker))
let newBrokerWatch = MkBrokerWatch newMonitor permanent
Stateful.modifyModel (brokers . at broker ?~ newBrokerWatch)
Observer.registerObserver @(Broker.ChildEvent child) broker me
sendReply rt ()
Effectful.OnCall rt GetCrashReports ->
Stateful.useModel @(Watchdog child) watched >>= sendReply rt
Effectful.OnCast (OnChildEvent e) ->
case e of
down@(Broker.OnBrokerShuttingDown broker) -> do
logInfo ("received: " <> pack (show down))
removeBroker me broker
spawned@(Broker.OnChildSpawned broker _ _) -> do
logInfo ("received: " <> pack (show spawned))
currentModel <- Stateful.getModel @(Watchdog child)
when (not (Set.member broker (currentModel ^. brokers . to Map.keysSet)))
(logWarning ("received child event for unknown broker: " <> pack (show spawned)))
down@(Broker.OnChildDown broker cId _ ExitNormally) -> do
logInfo ("received: " <> pack (show down))
currentModel <- Stateful.getModel @(Watchdog child)
if not (Set.member broker (currentModel ^. brokers . to Map.keysSet))
then logWarning ("received child event for unknown broker: " <> pack (show down))
else removeAndCleanChild @child cId
down@(Broker.OnChildDown broker cId _ reason) -> do
logInfo ("received: " <> pack (show down))
currentModel <- Stateful.getModel @(Watchdog child)
if not (Set.member broker (currentModel ^. brokers . to Map.keysSet))
then
logWarning ("received child event for unknown broker: " <> pack (show down))
else do
let recentCrashes = countRecentCrashes broker cId currentModel
rate = startArg ^. crashRate
maxCrashCount = rate ^. crashCount
if recentCrashes < maxCrashCount then do
logNotice ("restarting (" <> pack (show recentCrashes) <> "/" <> pack (show maxCrashCount) <> "): "
<> pack (show cId) <> " of " <> pack (show broker))
res <- Broker.spawnChild broker cId
logNotice ("restarted: " <> pack (show cId) <> " of "
<> pack (show broker) <> ": " <> pack (show res))
crash <- startExonerationTimer @child cId reason (rate ^. crashTimeSpan)
if isJust (currentModel ^? childWatchesById cId)
then do
logDebug ("recording crash for child: " <> pack (show cId) <> " of " <> pack (show broker))
Stateful.modifyModel (watched @child . at cId . _Just . crashes %~ Set.insert crash)
else do
logDebug ("recording crash for new child: " <> pack (show cId) <> " of " <> pack (show broker))
Stateful.modifyModel (watched @child . at cId .~ Just (MkChildWatch broker (Set.singleton crash)))
else do
logWarning ("restart rate exceeded: " <> pack (show rate)
<> ", for child: " <> pack (show cId)
<> " of " <> pack (show broker))
removeAndCleanChild @child cId
forM_ (currentModel ^? brokers . at broker . _Just) $ \bw ->
if bw ^. isPermanent then do
logError ("a child of a permanent broker crashed too often, interrupting: " <> pack (show broker))
let r = ExitUnhandledError "restart frequency exceeded"
demonitor (bw ^. brokerMonitor)
sendShutdown (broker ^. fromEndpoint) r
exitBecause r
else
logError ("a child of a temporary broker crashed too often: " <> pack (show broker))
Effectful.OnDown pd@(ProcessDown _mref _ pid) -> do
logDebug ("received " <> pack (show pd))
let broker = asEndpoint pid
removeBroker @child me broker
Effectful.OnTimeOut t -> do
logError ("received: " <> pack (show t))
Effectful.OnMessage (fromStrictDynamic -> Just (MkExonerationTimer cId ref :: ExonerationTimer (Broker.ChildId child))) -> do
logInfo ("exonerating: " <> pack (show cId))
Stateful.modifyModel
(watched @child . at cId . _Just . crashes %~ Set.filter (\c -> c^.exonerationTimerReference /= ref))
Effectful.OnMessage t -> do
logError ("received: " <> pack (show t))
Effectful.OnInterrupt reason -> do
logError ("received: " <> pack (show reason))
crashRate :: Lens' (Stateful.StartArgument (Watchdog child)) CrashRate
crashRate = lens _crashRate (\m x -> m {_crashRate = x})
data CrashRate =
CrashesPerSeconds { _crashCount :: CrashCount
, _crashTimeSpan :: CrashTimeSpan
}
deriving (Typeable, Eq, Ord)
instance Default CrashRate where
def = 3 `crashesPerSeconds` 30
instance Show CrashRate where
showsPrec d (CrashesPerSeconds count time) =
showParen (d>=7) (shows count . showString " crashes/" . shows time . showString " seconds")
instance NFData CrashRate where
rnf (CrashesPerSeconds c t) = c `seq` t `seq` ()
type CrashCount = Int
type CrashTimeSpan = Int
crashesPerSeconds :: CrashCount -> CrashTimeSpan -> CrashRate
crashesPerSeconds = CrashesPerSeconds
crashCount :: Lens' CrashRate CrashCount
crashCount = lens _crashCount (\(CrashesPerSeconds _ time) count -> CrashesPerSeconds count time )
crashTimeSpan :: Lens' CrashRate CrashTimeSpan
crashTimeSpan = lens _crashTimeSpan (\(CrashesPerSeconds count _) time -> CrashesPerSeconds count time)
data CrashReport a =
MkCrashReport { _exonerationTimerReference :: TimerReference
, _crashTime :: UTCTime
, _crashReason :: Interrupt 'NoRecovery
}
deriving (Eq, Ord, Typeable)
instance (Show a, Typeable a) => Show (CrashReport a) where
showsPrec d c =
showParen (d>=10)
( showString "crash report: "
. showString " time: " . showsPrec 10 (c^.crashTime)
. showString " reason: " . showsPrec 10 (c^.crashReason)
. showString " " . showsPrec 10 (c^.exonerationTimerReference)
)
instance NFData (CrashReport a) where
rnf (MkCrashReport !a !b !c) = rnf a `seq` rnf b `seq` rnf c `seq` ()
crashTime :: Lens' (CrashReport a) UTCTime
crashTime = lens _crashTime (\c t -> c { _crashTime = t})
crashReason :: Lens' (CrashReport a) (Interrupt 'NoRecovery)
crashReason = lens _crashReason (\c t -> c { _crashReason = t})
exonerationTimerReference :: Lens' (CrashReport a) TimerReference
exonerationTimerReference = lens _exonerationTimerReference (\c t -> c { _exonerationTimerReference = t})
startExonerationTimer :: forall child a q e .
(HasProcesses e q, Lifted IO q, Lifted IO e, Show a, NFData a, Typeable a, Typeable child)
=> a -> Interrupt 'NoRecovery -> CrashTimeSpan -> Eff e (CrashReport a)
startExonerationTimer cId r t = do
let title = MkProcessTitle ("ExonerationTimer<" <> pack (showSTypeable @child ">") <> pack (show cId))
me <- self
ref <- sendAfterWithTitle title me (TimeoutMicros (t * 1_000_000)) (MkExonerationTimer cId)
now <- lift getCurrentTime
return (MkCrashReport ref now r)
data ExonerationTimer a = MkExonerationTimer !a !TimerReference
deriving (Eq, Ord, Typeable)
instance NFData a => NFData (ExonerationTimer a) where
rnf (MkExonerationTimer !x !r) = rnf r `seq` rnf x `seq` ()
instance Show a => Show (ExonerationTimer a) where
showsPrec d (MkExonerationTimer x r) =
showParen (d >= 10)
( showString "exonerate: " . showsPrec 10 x
. showString " after: " . showsPrec 10 r
)
data ChildWatch child =
MkChildWatch
{ _parent :: Endpoint (Broker child)
, _crashes :: Set (CrashReport (Broker.ChildId child))
}
deriving Typeable
instance NFData (ChildWatch child) where
rnf (MkChildWatch p c) =
rnf p `seq` rnf c `seq` ()
instance (Typeable child, Typeable (Broker.ChildId child), Show (Broker.ChildId child)) => Show (ChildWatch child) where
showsPrec d (MkChildWatch p c) =
showParen (d>=10) ( showString "child-watch: parent: "
. showsPrec 10 p
. showString " crashes: "
. foldr (.) id (showsPrec 10 <$> Set.toList c)
)
parent :: Lens' (ChildWatch child) (Endpoint (Broker child))
parent = lens _parent (\m x -> m {_parent = x})
crashes :: Lens' (ChildWatch child) (Set (CrashReport (Broker.ChildId child)))
crashes = lens _crashes (\m x -> m {_crashes = x})
instance Default (Stateful.Model (Watchdog child)) where
def = WatchdogModel def Map.empty
instance ( Show (Broker.ChildId child)
, Typeable (Broker.ChildId child)
, Typeable child
)
=> Show (Stateful.Model (Watchdog child))
where
showsPrec d (WatchdogModel bs cs) =
showParen (d>=10)
(showString "watchdog model broker watches: "
. showsPrec 10 bs
. showString " watchdog model child watches: "
. showsPrec 10 cs
)
watched :: Lens' (Stateful.Model (Watchdog child)) (Map (Broker.ChildId child) (ChildWatch child))
watched = lens _watched (\m x -> m {_watched = x})
childWatches :: IndexedTraversal' (Broker.ChildId child) (Stateful.Model (Watchdog child)) (ChildWatch child)
childWatches = watched . itraversed
childWatchesById ::
Eq (Broker.ChildId child)
=> Broker.ChildId child
-> Traversal' (Stateful.Model (Watchdog child)) (ChildWatch child)
childWatchesById theCId = childWatches . ifiltered (\cId _ -> cId == theCId)
childWatchesByParenAndId ::
Eq (Broker.ChildId child)
=> Endpoint (Broker child)
-> Broker.ChildId child
-> Traversal' (Stateful.Model (Watchdog child)) (ChildWatch child)
childWatchesByParenAndId theParent theCId =
childWatches . ifiltered (\cId cw -> cw ^. parent == theParent && cId == theCId)
countRecentCrashes ::
Eq (Broker.ChildId child)
=> Endpoint (Broker child)
-> Broker.ChildId child
-> Stateful.Model (Watchdog child)
-> CrashCount
countRecentCrashes theParent theCId theModel =
length (theModel ^.. childWatchesByParenAndId theParent theCId . crashes . folded)
brokers :: Lens' (Stateful.Model (Watchdog child)) (Map (Endpoint (Broker child)) BrokerWatch)
brokers = lens _brokers (\m x -> m {_brokers = x})
removeAndCleanChild ::
forall child q e.
( HasProcesses e q
, Typeable child
, Typeable (Broker.ChildId child)
, Ord (Broker.ChildId child)
, Show (Broker.ChildId child)
, Member (Stateful.ModelState (Watchdog child)) e
, Member Logs e
)
=> Broker.ChildId child
-> Eff e ()
removeAndCleanChild cId = do
oldModel <- Stateful.modifyAndGetModel (watched @child . at cId .~ Nothing)
forMOf_ (childWatchesById cId) oldModel $ \w -> do
logDebug ("removing client entry: " <> pack (show cId))
forMOf_ (crashes . folded . exonerationTimerReference) w cancelTimer
logDebug (pack (show w))
removeBroker ::
forall child q e.
( HasProcesses e q
, Typeable child
, Tangible (Broker.ChildId child)
, Typeable (Effectful.ServerPdu child)
, Ord (Broker.ChildId child)
, Show (Broker.ChildId child)
, Member (Stateful.ModelState (Watchdog child)) e
, Member Logs e
)
=> Endpoint (Watchdog child)
-> Endpoint (Broker child)
-> Eff e ()
removeBroker me broker = do
oldModel <- Stateful.getAndModifyModel @(Watchdog child)
( (brokers . at broker .~ Nothing)
. (watched %~ Map.filter (\cw -> cw^.parent /= broker))
)
forM_ (oldModel ^? brokers . at broker . _Just) $ \deadBroker -> do
logNotice ("dettaching: " <> pack (show deadBroker) <> " " <> pack (show broker))
let forgottenChildren = oldModel ^.. watched . itraversed . filtered (\cw -> cw^.parent == broker)
traverse_ (logNotice . ("forgetting: " <>) . pack . show) forgottenChildren
Observer.forgetObserver @(Broker.ChildEvent child) broker me
when (view isPermanent deadBroker) $ do
logError ("permanent broker exited: " <> pack (show broker))
exitBecause (ExitOtherProcessNotRunning (broker ^. fromEndpoint))