module Web.Spock.Worker
(
WorkQueue
, WorkHandler
, newWorker
, addWork
, WorkExecution (..)
, WorkResult (..)
, ErrorHandler, InternalError
)
where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad.Trans
import Control.Monad.Trans.Error
import Data.Time
import Web.Spock
import qualified Web.Spock.Worker.Queue as Q
type InternalError = String
type ErrorHandler a
= InternalError -> a -> IO WorkResult
type WorkHandler conn sess st a
= a -> ErrorT InternalError (WebStateM conn sess st) WorkResult
newtype WorkQueue a
= WorkQueue { unWorkQueue :: Q.WorkerQueue UTCTime a }
data WorkExecution
= WorkNow
| WorkIn NominalDiffTime
| WorkAt UTCTime
data WorkResult
= WorkComplete
| WorkError
| WorkRepeatIn NominalDiffTime
| WorkRepeatAt UTCTime
deriving (Show, Eq)
newWorker :: Int
-> WorkHandler conn sess st a
-> ErrorHandler a
-> SpockM conn sess st (WorkQueue a)
newWorker maxSize workHandler errorHandler =
do heart <- getSpockHeart
q <- liftIO $ Q.newQueue maxSize
_ <- liftIO $ forkIO (runSpockIO heart $ core q)
return (WorkQueue q)
where
core q =
do work <- liftIO $ atomically $ Q.dequeue q
res <-
do workRes <- runErrorT $ workHandler work
case workRes of
Left err -> liftIO (errorHandler err work)
Right r -> return r
now <- liftIO $ getCurrentTime
case res of
WorkRepeatIn secs ->
addWork (WorkIn secs) work (WorkQueue q)
WorkRepeatAt time ->
addWork (WorkAt time) work (WorkQueue q)
_ ->
return ()
addWork :: MonadIO m => WorkExecution -> a -> WorkQueue a -> m ()
addWork we work (WorkQueue q) =
liftIO $
do now <- getCurrentTime
let execTime =
case we of
WorkNow -> now
WorkIn later -> addUTCTime later now
WorkAt ts -> ts
atomically $ Q.enqueue execTime work q