{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} module Database.CQRS.TaskManager ( TaskManager , Task(..) , runTaskManager ) where import Control.Concurrent (threadDelay) import Control.Monad (forever, when) import Control.Monad.Trans import Data.Hashable (Hashable) import Data.List (find, sortOn) import Data.Maybe (isNothing, mapMaybe) import Pipes ((>->)) import qualified Control.Monad.Except as Exc import qualified Control.Monad.State.Strict as St import qualified Data.Time as T import qualified Pipes import qualified Database.CQRS as CQRS import qualified Database.CQRS.ReadModel.AggregateStore as CQRS.AS -- | A task as accumulated by a 'TaskManager'. data Task action = Task { action :: action , from :: Maybe T.UTCTime -- ^ Time from which the task can be run. } -- | Projection aggregating a list of tasks from a stream of events together -- with some state. type TaskManager event action st = CQRS.Aggregator event ([Task action], st) -- | Repeatedly loop through the streams and run the tasks. -- -- If it finds no work, it waits 5 minutes before the next run or less if it -- knows of an earlier planned task. runTaskManager :: forall streamFamily action st m. ( Exc.MonadError CQRS.Error m , CQRS.Stream m (CQRS.StreamType streamFamily) , CQRS.StreamFamily m streamFamily , Hashable (CQRS.StreamIdentifier streamFamily) , MonadIO m , Ord (CQRS.EventIdentifier (CQRS.StreamType streamFamily)) , Ord (CQRS.StreamIdentifier streamFamily) , Show (CQRS.EventIdentifier (CQRS.StreamType streamFamily)) ) => streamFamily -> (CQRS.StreamIdentifier streamFamily -> st) -- ^ Initial state of the task manager for any given stream. -> TaskManager (CQRS.EventWithContext' (CQRS.StreamType streamFamily)) action st -> (action -> m ()) -> m () runTaskManager streamFamily mkInitState taskManager runAction = do as <- CQRS.AS.makeAggregateStore streamFamily taskManager (([],) . mkInitState) 0 1000 forever $ do -- Next run in 5 minutes (unless changed.) initialNextRun <- T.addUTCTime (5 * 60) <$> liftIO T.getCurrentTime mNextRun <- flip St.execStateT (Just initialNextRun) . Pipes.runEffect $ do Pipes.hoist lift (CQRS.latestEventIdentifiers streamFamily) >-> streamProcessor as case mNextRun of Nothing -> pure () -- Do not wait. Just nextRun -> liftIO $ do now <- T.getCurrentTime when (now < nextRun) $ threadDelay . (1000000*) . round $ T.diffUTCTime nextRun now where streamProcessor :: CQRS.AS.AggregateStore streamFamily ([Task action], st) -> Pipes.Consumer ( CQRS.StreamIdentifier streamFamily , CQRS.EventIdentifier (CQRS.StreamType streamFamily) ) (St.StateT (Maybe T.UTCTime) m) () streamProcessor as = do -- We don't actually care about the latest event identifier as it might -- have changed. We used the aggregate store to fetch the latest version. (streamId, _) <- Pipes.await lift . lift $ runSomeImmediateTasks as streamId 5 mNextTaskTime <- lift . lift $ runPlannedTasks as streamId tasks <- lift . lift $ fst . CQRS.AS.aggregate <$> CQRS.query as streamId -- Do not wait after this run if there are some immediate tasks left. when (any (isNothing . from) tasks) $ St.modify' $ const Nothing -- Wait less time if we know that there is an earlier upcoming task. case mNextTaskTime of Just t -> St.modify' $ fmap (min t) Nothing -> pure () -- Run n immediate tasks. -- It refetches the tasks after each run since executing one task can change -- the list of tasks itself. runSomeImmediateTasks :: CQRS.AS.AggregateStore streamFamily ([Task action], st) -> CQRS.StreamIdentifier streamFamily -> Int -> m () runSomeImmediateTasks as streamId n | n > 0 = do tasks <- fst . CQRS.AS.aggregate <$> CQRS.query as streamId case find (isNothing . from) tasks of Nothing -> pure () Just task -> do runAction . action $ task runSomeImmediateTasks as streamId (n-1) | otherwise = pure () -- Run planned tasks and return the time of the earliest task planned in -- the future if any. runPlannedTasks :: CQRS.AS.AggregateStore streamFamily ([Task action], st) -> CQRS.StreamIdentifier streamFamily -> m (Maybe T.UTCTime) runPlannedTasks as streamId = do tasks <- fst . CQRS.AS.aggregate <$> CQRS.query as streamId now <- liftIO T.getCurrentTime case sortOn fst . mapMaybe (\Task{..} -> (, action) <$> from) $ tasks of (t, action) : _ | now < t -> do runAction action runPlannedTasks as streamId | otherwise -> pure $ Just t _ -> pure Nothing