{-# LANGUAGE NamedFieldPuns #-} module Festung.Concurrency.Job ( Job , Command(..) , JobStatus(..) , newJob , newJob_ , sendMappedCommand , sendCommand , killJob , keepGoing , isJobExited , isJobRunning , onExit ) where import Control.Concurrent import Control.Exception import Control.Monad import Data.Maybe import Festung.Concurrency.Utils (readAnyMVar) import Festung.Utils (eitherUnitToMaybe) data Job a = Job { chan :: Chan a , exited :: MVar () , tid :: ThreadId } instance Show (Job a) where show _ = "Job" data Command c r = Command c (MVar r) data JobStatus = KeepGoing | Stop -- | Folds over all message sent to the job -- -- Takes the initial state as the first argument. -- The second argument is the folding function. -- The last argument is the action to execute on exit. newJob :: a -> (a -> b -> IO (JobStatus, a)) -> IO (Job b) newJob init_ f = do chan <- newChan exited <- newEmptyMVar running <- newEmptyMVar tid <- forkIO $ finally (putMVar running () >> consume chan init_) (putMVar exited ()) readMVar running return Job { chan = chan, exited = exited, tid = tid } where consume chan ini = do msg <- readChan chan (status, state) <- mask_ $ f ini msg case status of KeepGoing -> consume chan state Stop -> return () -- | Make a function always running keepGoing :: (a -> IO b) -> (a -> IO (JobStatus, b)) keepGoing = (fmap . fmap) ((,) KeepGoing) -- | Create a job without any state (just consuming the messages) newJob_ :: (b -> IO ()) -> IO (Job b) newJob_ f = newJob () $ const (keepGoing f) -- | map the command before sending it to the job -- -- Some job can receive multiple commands, therefore the use a -- union type: -- -- @ -- JobCommand = SquareCmd (Command Int Int) -- | ConcatCmd (Command (String, String) String) -- ... -- @ -- -- This gives the ability to map the command with the constructor of the union type -- before sending it to the job in question. -- -- This sends the command in question, and waits for the result to be sent back. -- -- If the job is exited, it returns @Nothing@ sendMappedCommand :: (Command c r -> a) -> Job a -> c -> IO (Maybe r) sendMappedCommand constructor Job{chan,exited} command = do responder <- newEmptyMVar writeChan chan $ constructor (Command command responder) eitherUnitToMaybe <$> readAnyMVar exited responder -- | Same as @'sendMappedCommand', but for a job receiving only one command type. sendCommand :: Job (Command c r) -> c -> IO (Maybe r) sendCommand = sendMappedCommand id -- | Stops a running job, and wait for it to stop. -- -- If the job is not running, this is a noop. killJob :: Job a -> IO () killJob Job{tid,exited} = killThread tid >> readMVar exited isJobExited :: Job a -> IO Bool isJobExited Job{exited} = isJust <$> tryReadMVar exited isJobRunning :: Job a -> IO Bool isJobRunning job = not <$> isJobExited job -- | Register a finalizer when the job exits (even after an exception) onExit :: Job a -> IO () -> IO () onExit Job{exited} action = void $ forkIO (readMVar exited >> action)