module Database.CQL.IO.Jobs
( Jobs
, JobReplaced (..)
, newJobs
, runJob
, runJob_
, tryRunJob
, tryRunJob_
, cancelJobs
, listJobs
, listJobKeys
) where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception (asyncExceptionFromException, asyncExceptionToException)
import Control.Monad
import Control.Monad.Catch
import Control.Monad.IO.Class
import Data.IORef
import Data.Map.Strict (Map)
import Data.Typeable
import Data.Unique
import qualified Data.Map.Strict as Map
newtype Jobs k = Jobs (IORef (Map k Job))
data Job = Job
{ _jobUniq :: !Unique
, jobAsync :: !(Async ())
}
data JobReplaced = JobReplaced deriving (Eq, Show, Typeable)
instance Exception JobReplaced where
toException = asyncExceptionToException
fromException = asyncExceptionFromException
newJobs :: MonadIO m => m (Jobs k)
newJobs = liftIO $ Jobs <$> newIORef Map.empty
runJob_ :: (MonadIO m, Ord k) => Jobs k -> k -> IO () -> m ()
runJob_ j k = void . runJob j k
runJob :: (MonadIO m, Ord k) => Jobs k -> k -> IO () -> m (Async ())
runJob j@(Jobs ref) k = runJobWith addJob j k
where
addJob new = atomicModifyIORef' ref $ \jobs ->
let jobs' = Map.insert k new jobs
old = Map.lookup k jobs
val = jobAsync new
in (jobs', (True, val, old))
tryRunJob_ :: (MonadIO m, Ord k) => Jobs k -> k -> IO () -> m ()
tryRunJob_ j k = void . tryRunJob j k
tryRunJob :: (MonadIO m, Ord k) => Jobs k -> k -> IO () -> m (Maybe (Async ()))
tryRunJob j@(Jobs ref) k = runJobWith addJob j k
where
addJob new = atomicModifyIORef' ref $ \jobs ->
if Map.member k jobs
then (jobs, (False, Nothing, Nothing))
else
let jobs' = Map.insert k new jobs
val = Just (jobAsync new)
in (jobs', (True, val, Nothing))
cancelJobs :: MonadIO m => Jobs k -> m ()
cancelJobs (Jobs d) = liftIO $ do
jobs <- Map.elems <$> atomicModifyIORef' d (\m -> (Map.empty, m))
mapM_ (cancel . jobAsync) jobs
listJobs :: MonadIO m => Jobs k -> m [(k, Async ())]
listJobs (Jobs j) = liftIO $ Map.foldrWithKey f [] <$> readIORef j
where
f k a b = (k, jobAsync a) : b
listJobKeys :: MonadIO m => Jobs k -> m [k]
listJobKeys (Jobs j) = liftIO $ Map.keys <$> readIORef j
runJobWith :: (MonadIO m, Ord k)
=> (Job -> IO (Bool, a, Maybe Job))
-> Jobs k
-> k
-> IO ()
-> m a
runJobWith addJob (Jobs ref) k io = liftIO $ do
u <- newUnique
l <- newEmptyMVar
mask $ \restore -> do
new <- async $ do
takeMVar l
restore io
remove u
`catches`
[ Handler $ \x@JobReplaced -> throwM x
, Handler $ \x@SomeException{} -> remove u >> throwM x
]
restore (run u l new) `onException` cancel new
where
run u l new = do
(ok, a, old) <- addJob (Job u new)
mapM_ ((`cancelWith` JobReplaced) . jobAsync) old
if ok then putMVar l () else cancel new
return a
remove u = atomicModifyIORef' ref $ \jobs ->
let update = Map.update $ \a ->
case a of
Job u' _ | u == u' -> Nothing
_ -> Just a
in (update k jobs, ())