module Festung.Concurrency.Gig ( newGig , newGig_ , Job , JobStatus ) where import Control.Concurrent import Control.Monad import Festung.Concurrency.Job import Festung.Concurrency.Utils (readChanTimeout, forkIOForSure) -- | Same as new job, but times out -- -- Gigs are time limited jobs. Meaning that if no new action are -- sent to a gig, the gig terminates. (or times out) newGig :: Int -> a -> (a -> b -> IO (JobStatus, a)) -> IO (Job b) newGig timeout init_ f = do chan <- newChan job <- newJob init_ $ \state cmd -> do writeChan chan cmd f state cmd timer <- newTimer timeout $ killJob job copier <- forkIOForSure $ forever (writeChan chan =<< readChan (snd timer)) onExit job $ mapM_ killThread [fst timer, copier] return job -- | Create a gig without any state (just consuming the messages and possibly timing out) newGig_ :: Int -> (b -> IO ()) -> IO (Job b) newGig_ timeout f = newGig timeout () $ const (keepGoing f) -- | This is an internal function -- -- This reads from the returned channel, and execute the action if no new value -- is sent into its channel. -- -- @ -- chan <- newTimer 1500 $ do -- killJob foo -- @ newTimer :: Int -> IO () -> IO (ThreadId, Chan a) newTimer timeout action = do chan <- newChan thread <- forkIOForSure $ loop chan return (thread, chan) where loop chan = do value <- readChanTimeout timeout chan case value of Just _ -> loop chan Nothing -> action