-- This Source Code Form is subject to the terms of the Mozilla Public -- License, v. 2.0. If a copy of the MPL was not distributed with this -- file, You can obtain one at http://mozilla.org/MPL/2.0/. module Database.CQL.IO.Jobs ( Jobs , JobReplaced (..) , newJobs , runJob , runJob_ , tryRunJob , tryRunJob_ , cancelJobs , listJobs , listJobKeys ) where import Control.Concurrent import Control.Concurrent.Async import Control.Exception (asyncExceptionFromException, asyncExceptionToException) import Control.Monad import Control.Monad.Catch import Control.Monad.IO.Class import Data.IORef import Data.Map.Strict (Map) import Data.Typeable import Data.Unique import qualified Data.Map.Strict as Map -- | A registry for asynchronous computations ("jobs") associated with keys -- of type @k@, with only a single job running at a time for a particular key. newtype Jobs k = Jobs (IORef (Map k Job)) -- | Internal representation of a job. The 'Unique' value ensures that -- a job that finishes or is aborted never accidentily removes a newly -- registered job for the same key from the 'Jobs' registry. data Job = Job { _jobUniq :: !Unique , jobAsync :: !(Async ()) } -- | The asynchronous exception used to cancel a job if it is replaced -- by another job. data JobReplaced = JobReplaced deriving (Eq, Show, Typeable) instance Exception JobReplaced where toException = asyncExceptionToException fromException = asyncExceptionFromException newJobs :: MonadIO m => m (Jobs k) newJobs = liftIO $ Jobs <$> newIORef Map.empty -- | 'runJob' and ignore the result. runJob_ :: (MonadIO m, Ord k) => Jobs k -> k -> IO () -> m () runJob_ j k = void . runJob j k -- | Run an asynchronous job for a key. If there is a running job for the same -- key, it is replaced and cancelled with a 'JobReplaced' exception. runJob :: (MonadIO m, Ord k) => Jobs k -> k -> IO () -> m (Async ()) runJob j@(Jobs ref) k = runJobWith addJob j k where addJob new = atomicModifyIORef' ref $ \jobs -> let jobs' = Map.insert k new jobs old = Map.lookup k jobs val = jobAsync new in (jobs', (True, val, old)) -- | 'tryRunJob' and ignore the result. tryRunJob_ :: (MonadIO m, Ord k) => Jobs k -> k -> IO () -> m () tryRunJob_ j k = void . tryRunJob j k -- | Try to run an asynchronous job for a key. If there is a running job for -- the same key, 'Nothing' is returned and the job will not run. tryRunJob :: (MonadIO m, Ord k) => Jobs k -> k -> IO () -> m (Maybe (Async ())) tryRunJob j@(Jobs ref) k = runJobWith addJob j k where addJob new = atomicModifyIORef' ref $ \jobs -> if Map.member k jobs then (jobs, (False, Nothing, Nothing)) else let jobs' = Map.insert k new jobs val = Just (jobAsync new) in (jobs', (True, val, Nothing)) -- | Cancel all running jobs. cancelJobs :: MonadIO m => Jobs k -> m () cancelJobs (Jobs d) = liftIO $ do jobs <- Map.elems <$> atomicModifyIORef' d (\m -> (Map.empty, m)) mapM_ (cancel . jobAsync) jobs -- | List all running jobs. listJobs :: MonadIO m => Jobs k -> m [(k, Async ())] listJobs (Jobs j) = liftIO $ Map.foldrWithKey f [] <$> readIORef j where f k a b = (k, jobAsync a) : b -- | List the keys of all running jobs. listJobKeys :: MonadIO m => Jobs k -> m [k] listJobKeys (Jobs j) = liftIO $ Map.keys <$> readIORef j ------------------------------------------------------------------------------ -- Internal runJobWith :: (MonadIO m, Ord k) => (Job -> IO (Bool, a, Maybe Job)) -> Jobs k -> k -> IO () -> m a runJobWith addJob (Jobs ref) k io = liftIO $ do u <- newUnique l <- newEmptyMVar -- Once the async is created and waiting on the latch @l@, it must -- either be unblocked by putMVar or cancelled, hence masking -- between 'async' and 'run' (nb. 'takeMVar' is interruptible). mask $ \restore -> do new <- async $ do takeMVar l restore io remove u `catches` [ Handler $ \x@JobReplaced -> throwM x , Handler $ \x@SomeException{} -> remove u >> throwM x ] restore (run u l new) `onException` cancel new where run u l new = do (ok, a, old) <- addJob (Job u new) mapM_ ((`cancelWith` JobReplaced) . jobAsync) old if ok then putMVar l () else cancel new return a remove u = atomicModifyIORef' ref $ \jobs -> let update = Map.update $ \a -> case a of Job u' _ | u == u' -> Nothing _ -> Just a in (update k jobs, ())