{-# LANGUAGE GADTs #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Control.Concurrent.Supervisor.Types
( SupervisionCtx
, Supervisor
, QueueLike(..)
, Child_
, DeadLetter
, RestartAction
, SupervisionEvent(..)
, RestartStrategy(..)
, RestartResult(..)
, newSupervisor
, fibonacciRetryPolicy
, shutdownSupervisor
, eventStream
, activeChildren
, forkSupervised
, monitorWith
) where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.IO.Class
import Control.Retry
import qualified Data.HashMap.Strict as Map
import Data.IORef
import Data.Time
import Numeric.Natural
import System.Clock (Clock(Monotonic), TimeSpec, getTime)
type Mailbox = TChan DeadLetter
data SupervisionCtx q = SupervisionCtx {
_sc_mailbox :: Mailbox
, _sc_parent_mailbox :: !(IORef (Maybe Mailbox))
, _sc_children :: !(IORef (Map.HashMap ThreadId (Child_ q)))
, _sc_eventStream :: q SupervisionEvent
, _sc_eventStreamSize :: !Natural
, _sc_strategy :: !RestartStrategy
}
data Supervisor q = Supervisor {
_sp_myTid :: !ThreadId
, _sp_ctx :: !(SupervisionCtx q)
}
class QueueLike q where
newQueueIO :: Natural -> IO (q a)
readQueue :: q a -> STM a
writeQueue :: q a -> a -> STM ()
instance QueueLike TQueue where
newQueueIO = const newTQueueIO
readQueue = readTQueue
writeQueue = writeTQueue
instance QueueLike TBQueue where
newQueueIO = newTBQueueIO
readQueue = readTBQueue
writeQueue q e = do
isFull <- isFullTBQueue q
unless isFull $ writeTBQueue q e
data DeadLetter = DeadLetter !LetterEpoch !ThreadId !SomeException
type Epoch = TimeSpec
newtype LetterEpoch = LetterEpoch Epoch deriving Show
newtype ChildEpoch = ChildEpoch Epoch deriving Show
data RestartResult =
Restarted !ThreadId !ThreadId !RetryStatus !UTCTime
| StaleDeadLetter !ThreadId !LetterEpoch !ChildEpoch !UTCTime
| RestartFailed SupervisionEvent
deriving Show
data Child_ q = Worker !ChildEpoch !RetryStatus (RetryPolicyM IO) RestartAction
| Supvsr !ChildEpoch !RetryStatus (RetryPolicyM IO) !(Supervisor q)
type RestartAction = ThreadId -> IO ThreadId
data SupervisionEvent =
ChildBorn !ThreadId !UTCTime
| ChildDied !ThreadId !SomeException !UTCTime
| ChildRestarted !ThreadId !ThreadId !RetryStatus !UTCTime
| ChildNotFound !ThreadId !UTCTime
| StaleDeadLetterReceived !ThreadId !LetterEpoch !ChildEpoch !UTCTime
| ChildRestartLimitReached !ThreadId !RetryStatus !UTCTime
| ChildFinished !ThreadId !UTCTime
deriving Show
data RestartStrategy = OneForOne
deriving Show
fibonacciRetryPolicy :: RetryPolicyM IO
fibonacciRetryPolicy = fibonacciBackoff 100
getEpoch :: MonadIO m => m Epoch
getEpoch = liftIO $ getTime Monotonic
tryNotifyParent :: IORef (Maybe Mailbox) -> ThreadId -> SomeException -> IO ()
tryNotifyParent mbPMbox myId ex = do
readIORef mbPMbox >>= \m -> case m of
Nothing -> return ()
Just m' -> do
e <- getEpoch
atomically $ writeTChan m' (DeadLetter (LetterEpoch e) myId ex)
newSupervisor :: QueueLike q
=> RestartStrategy
-> Natural
-> IO (Supervisor q)
newSupervisor strategy size = do
parentMbx <- newIORef Nothing
mbx <- newTChanIO
es <- newQueueIO size
cld <- newIORef Map.empty
let ctx = SupervisionCtx {
_sc_mailbox = mbx
, _sc_parent_mailbox = parentMbx
, _sc_eventStream = es
, _sc_children = cld
, _sc_strategy = strategy
, _sc_eventStreamSize = size
}
tid <- forkFinally (handleEvents ctx) $ \res -> case res of
Left ex -> do
bracket myThreadId return $ \myId -> do
tryNotifyParent parentMbx myId ex
Right v -> return v
go ctx tid
where
go ctx tid = do
return Supervisor {
_sp_myTid = tid
, _sp_ctx = ctx
}
eventStream :: QueueLike q => Supervisor q -> q SupervisionEvent
eventStream Supervisor{_sp_ctx} = _sc_eventStream _sp_ctx
activeChildren :: QueueLike q => Supervisor q -> IO Int
activeChildren Supervisor{_sp_ctx} = do
readIORef (_sc_children _sp_ctx) >>= return . length . Map.keys
shutdownSupervisor :: QueueLike q => Supervisor q -> IO ()
shutdownSupervisor (Supervisor tid ctx) = do
chMap <- readIORef (_sc_children ctx)
processChildren (Map.toList chMap)
killThread tid
where
processChildren [] = return ()
processChildren (x:xs) = do
case x of
(workerTid, Worker{}) -> killThread workerTid
(_, Supvsr _ _ _ s) -> shutdownSupervisor s
processChildren xs
forkSupervised :: QueueLike q
=> Supervisor q
-> RetryPolicyM IO
-> IO ()
-> IO ThreadId
forkSupervised sup@Supervisor{..} policy act =
bracket (supervised sup act) return $ \newChild -> do
e <- getEpoch
let ch = Worker (ChildEpoch e) defaultRetryStatus policy (const (supervised sup act))
atomicModifyIORef' (_sc_children _sp_ctx) $ \chMap -> (Map.insert newChild ch chMap, ())
now <- getCurrentTime
atomically $ writeQueue (_sc_eventStream _sp_ctx) (ChildBorn newChild now)
return newChild
supervised :: QueueLike q => Supervisor q -> IO () -> IO ThreadId
supervised Supervisor{..} act = forkFinally act $ \res -> case res of
Left ex -> bracket myThreadId return $ \myId -> do
e <- getEpoch
atomically $ writeTChan (_sc_mailbox _sp_ctx) (DeadLetter (LetterEpoch e) myId ex)
Right _ -> bracket myThreadId return $ \myId -> do
now <- getCurrentTime
atomicModifyIORef' (_sc_children _sp_ctx) $ \chMap -> (Map.delete myId chMap, ())
atomically $ writeQueue (_sc_eventStream _sp_ctx) (ChildFinished myId now)
ignoringStaleLetters :: ThreadId
-> LetterEpoch
-> ChildEpoch
-> IO RestartResult
-> IO RestartResult
ignoringStaleLetters tid deadLetterEpoch@(LetterEpoch l) childEpoch@(ChildEpoch c) act = do
now <- getCurrentTime
if l < c then return (StaleDeadLetter tid deadLetterEpoch childEpoch now) else act
restartChild :: QueueLike q
=> SupervisionCtx q
-> LetterEpoch
-> UTCTime
-> ThreadId
-> IO RestartResult
restartChild ctx deadLetterEpoch now newDeath = do
chMap <- readIORef (_sc_children ctx)
case Map.lookup newDeath chMap of
Nothing -> return $ RestartFailed (ChildNotFound newDeath now)
Just (Worker workerEpoch rState rPolicy act) -> ignoringStaleLetters newDeath deadLetterEpoch workerEpoch $ do
runRetryPolicy rState rPolicy emitEventChildRestartLimitReached $ \newRState -> do
e <- getEpoch
let ch = Worker (ChildEpoch e) newRState rPolicy act
newThreadId <- act newDeath
writeIORef (_sc_children ctx) (Map.insert newThreadId ch $! Map.delete newDeath chMap)
emitEventChildRestarted newThreadId newRState
Just (Supvsr supervisorEpoch rState rPolicy (Supervisor deathSup ctx')) -> do
ignoringStaleLetters newDeath deadLetterEpoch supervisorEpoch $ do
runRetryPolicy rState rPolicy emitEventChildRestartLimitReached $ \newRState -> do
e <- getEpoch
restartedSup <- newSupervisor (_sc_strategy ctx) (_sc_eventStreamSize ctx')
let ch = Supvsr (ChildEpoch e) newRState rPolicy restartedSup
let newThreadId = _sp_myTid restartedSup
writeIORef (_sc_children ctx) (Map.insert newThreadId ch $! Map.delete deathSup chMap)
emitEventChildRestarted newThreadId newRState
where
emitEventChildRestarted newThreadId newRState = do
return $ Restarted newDeath newThreadId newRState now
emitEventChildRestartLimitReached newRState = do
return $ RestartFailed (ChildRestartLimitReached newDeath newRState now)
runRetryPolicy :: RetryStatus
-> RetryPolicyM IO
-> (RetryStatus -> IO RestartResult)
-> (RetryStatus -> IO RestartResult)
-> IO RestartResult
runRetryPolicy rState rPolicy ifAbort ifThrottle = do
maybeDelay <- getRetryPolicyM rPolicy rState
case maybeDelay of
Nothing -> ifAbort rState
Just delay ->
let newRState = rState { rsIterNumber = rsIterNumber rState + 1
, rsCumulativeDelay = rsCumulativeDelay rState + delay
, rsPreviousDelay = Just (maybe 0 (const delay) (rsPreviousDelay rState))
}
in threadDelay delay >> ifThrottle newRState
restartOneForOne :: QueueLike q
=> SupervisionCtx q
-> LetterEpoch
-> UTCTime
-> ThreadId
-> IO RestartResult
restartOneForOne = restartChild
handleEvents :: QueueLike q => SupervisionCtx q -> IO ()
handleEvents ctx@SupervisionCtx{..} = do
(DeadLetter epoch newDeath ex) <- atomically $ readTChan _sc_mailbox
now <- getCurrentTime
atomically $ writeQueue _sc_eventStream (ChildDied newDeath ex now)
case asyncExceptionFromException ex of
Just (_ :: AsyncException) -> do
atomicModifyIORef' _sc_children $ \chMap -> (Map.delete newDeath chMap, ())
atomically $ writeQueue _sc_eventStream (ChildDied newDeath ex now)
handleEvents ctx
Nothing -> do
restartResult <- case _sc_strategy of
OneForOne -> restartOneForOne ctx epoch now newDeath
atomically $ case restartResult of
StaleDeadLetter tid le we tm -> do
writeQueue _sc_eventStream (StaleDeadLetterReceived tid le we tm)
RestartFailed reason -> do
writeQueue _sc_eventStream reason
Restarted oldId newId rStatus tm ->
writeQueue _sc_eventStream (ChildRestarted oldId newId rStatus tm)
handleEvents ctx
monitorWith :: QueueLike q
=> RetryPolicyM IO
-> Supervisor q
-> Supervisor q
-> IO ThreadId
monitorWith policy sup1 sup2 = do
let sup1Children = _sc_children (_sp_ctx sup1)
let sup1Mailbox = _sc_mailbox (_sp_ctx sup1)
let sup2Id = _sp_myTid sup2
let sup2ParentMailbox = _sc_parent_mailbox (_sp_ctx sup2)
readIORef sup2ParentMailbox >>= \mbox -> case mbox of
Just _ -> return sup2Id
Nothing -> do
e <- getEpoch
let sup2RetryStatus = defaultRetryStatus
let ch' = Supvsr (ChildEpoch e) sup2RetryStatus policy sup2
atomicModifyIORef' sup1Children $ \chMap -> (Map.insert sup2Id ch' chMap, ())
duped <- atomically $ dupTChan sup1Mailbox
atomicModifyIORef' sup2ParentMailbox $ const (Just duped, ())
return sup2Id