module Control.Concurrent.Worker (
Worker(..),
startWorker,
sendTask, pushTask,
stopWorker,
module Control.Concurrent.Task
) where
import Control.Concurrent.MVar
import Control.Monad.IO.Class
import Control.Monad.Catch
import Control.Monad.Error
import Control.Concurrent.FiniteChan
import Control.Concurrent.Task
data Worker m = Worker {
workerChan :: Chan (Task (), m ()),
workerWrap :: forall a. m a -> m a,
workerTask :: MVar (Task ()),
workerRestart :: IO Bool }
startWorker :: MonadIO m => (m () -> IO ()) -> (m () -> m ()) -> (forall a. m a -> m a) -> IO (Worker m)
startWorker run initialize wrap = do
ch <- newChan
taskVar <- newEmptyMVar
let
start = forkTask $ do
run $ initialize $ processWork
processSkip
processWork = whileJust (liftM (fmap snd) $ liftIO $ getChan ch) id
processSkip = whileJust (liftM (fmap fst) $ getChan ch) taskStop
whileJust :: Monad m => m (Maybe a) -> (a -> m b) -> m ()
whileJust v act = v >>= maybe (return ()) (\x -> act x >> whileJust v act)
start >>= putMVar taskVar
let
restart = do
task <- readMVar taskVar
stopped <- taskStopped task
when stopped (start >>= void . swapMVar taskVar)
return stopped
return $ Worker ch wrap taskVar restart
sendTask :: (MonadCatch m, MonadIO m) => Worker m -> m a -> IO (Task a)
sendTask w = runTask_ putTask' . workerWrap w where
putTask' t act = putChan (workerChan w) (fmap (const ()) t, act)
pushTask :: (MonadCatch m, MonadIO m) => Worker m -> m a -> IO (Task a)
pushTask w act = workerRestart w >> sendTask w act
stopWorker :: Worker m -> IO ()
stopWorker = closeChan . workerChan