----------------------------------------------------------------------------- -- | -- Module : Control.Distributed.Process.Async -- Copyright : (c) Tim Watson 2012 -- License : BSD3 (see the file LICENSE) -- -- Maintainer : Tim Watson -- Stability : experimental -- Portability : non-portable (requires concurrency) -- -- This API provides a means for spawning asynchronous operations, waiting -- for their results, cancelling them and various other utilities. -- Asynchronous operations can be executed on remote nodes. -- -- [Asynchronous Operations] -- -- There is an implicit contract for async workers; Workers must exit -- normally (i.e., should not call the 'exit', 'die' or 'terminate' -- Cloud Haskell primitives), otherwise the 'AsyncResult' will end up being -- @AsyncFailed DiedException@ instead of containing the result. -- -- Portions of this file are derived from the @Control.Concurrent.Async@ -- module, from the @async@ package written by Simon Marlow. ----------------------------------------------------------------------------- module Control.Distributed.Process.Async ( -- * Exported types AsyncRef , AsyncTask(..) , Async , AsyncResult(..) -- * Spawning asynchronous operations , async , asyncLinked , task , remoteTask , monitorAsync , asyncWorker -- * Cancelling asynchronous operations , cancel , cancelWait , cancelWith , cancelKill -- * Querying for results , poll , check , wait , waitAny -- * Waiting with timeouts , waitAnyTimeout , waitTimeout , waitCancelTimeout , waitCheckTimeout -- * STM versions , pollSTM , waitAnyCancel , waitEither , waitEither_ , waitBoth ) where import Control.Applicative import Control.Concurrent.STM hiding (check) import Control.Distributed.Process hiding (catch, finally) import Control.Distributed.Process.Serializable import Control.Distributed.Process.Async.Internal.Types import Control.Monad import Control.Monad.Catch (finally) import Data.Maybe ( fromMaybe ) import System.Timeout (timeout) import Prelude -- | Wraps a regular @Process a@ as an 'AsyncTask'. task :: Process a -> AsyncTask a task = AsyncTask -- | Wraps the components required and builds a remote 'AsyncTask'. remoteTask :: Static (SerializableDict a) -> NodeId -> Closure (Process a) -> AsyncTask a remoteTask = AsyncRemoteTask -- | Given an 'Async' handle, monitor the worker process. monitorAsync :: Async a -> Process MonitorRef monitorAsync = monitor . _asyncWorker -- | Spawns an asynchronous action and returns a handle to it, -- which can be used to obtain its status and/or result or interact -- with it (using the API exposed by this module). -- async :: (Serializable a) => AsyncTask a -> Process (Async a) async = asyncDo False -- | Provides the pid of the worker process performing the async operation. asyncWorker :: Async a -> ProcessId asyncWorker = _asyncWorker -- | This is a useful variant of 'async' that ensures an @Async@ task is -- never left running unintentionally. We ensure that if the caller's process -- exits, that the worker is killed. -- -- There is currently a contract for async workers, that they should -- exit normally (i.e., they should not call the @exit@ or @kill@ with their own -- 'ProcessId' nor use the @terminate@ primitive to cease functining), otherwise -- the 'AsyncResult' will end up being @AsyncFailed DiedException@ instead of -- containing the desired result. -- asyncLinked :: (Serializable a) => AsyncTask a -> Process (Async a) asyncLinked = asyncDo True -- private API asyncDo :: (Serializable a) => Bool -> AsyncTask a -> Process (Async a) asyncDo shouldLink (AsyncRemoteTask d n c) = asyncDo shouldLink $ AsyncTask $ call d n c asyncDo shouldLink (AsyncTask proc) = do root <- getSelfPid result <- liftIO newEmptyTMVarIO sigStart <- liftIO newEmptyTMVarIO (sp, rp) <- newChan -- listener/response proxy insulator <- spawnLocal $ do worker <- spawnLocal $ do liftIO $ atomically $ takeTMVar sigStart r <- proc void $ liftIO $ atomically $ putTMVar result (AsyncDone r) sendChan sp worker -- let the parent process know the worker pid wref <- monitor worker rref <- if shouldLink then fmap Just (monitor root) else return Nothing finally (pollUntilExit worker result) (unmonitor wref >> return (maybe (return ()) unmonitor rref)) workerPid <- receiveChan rp liftIO $ atomically $ putTMVar sigStart () return Async { _asyncWorker = workerPid , _asyncMonitor = insulator , _asyncWait = readTMVar result } where pollUntilExit :: (Serializable a) => ProcessId -> TMVar (AsyncResult a) -> Process () pollUntilExit wpid result' = do r <- receiveWait [ match (\c@CancelWait -> kill wpid "cancel" >> return (Left c)) , match (\(ProcessMonitorNotification _ pid' r) -> return (Right (pid', r))) ] case r of Left CancelWait -> liftIO $ atomically $ putTMVar result' AsyncCancelled Right (fpid, d) | fpid == wpid -> case d of DiedNormal -> return () _ -> liftIO $ atomically $ void $ tryPutTMVar result' (AsyncFailed d) | otherwise -> do kill wpid "linkFailed" receiveWait [ matchIf (\(ProcessMonitorNotification _ pid' _) -> pid' == wpid ) $ \_ -> return () ] liftIO $ atomically $ void $ tryPutTMVar result' (AsyncLinkFailed d) -- | Check whether an 'Async' has completed yet. poll :: (Serializable a) => Async a -> Process (AsyncResult a) poll hAsync = do r <- liftIO $ atomically $ pollSTM hAsync return $ fromMaybe AsyncPending r -- | Like 'poll' but returns 'Nothing' if @(poll hAsync) == AsyncPending@. check :: (Serializable a) => Async a -> Process (Maybe (AsyncResult a)) check hAsync = poll hAsync >>= \r -> case r of AsyncPending -> return Nothing ar -> return (Just ar) -- | Wait for an asynchronous operation to complete or timeout. waitCheckTimeout :: (Serializable a) => Int -> Async a -> Process (AsyncResult a) waitCheckTimeout t hAsync = fmap (fromMaybe AsyncPending) (waitTimeout t hAsync) -- | Wait for an asynchronous action to complete, and return its -- value. The result (which can include failure and/or cancellation) is -- encoded by the 'AsyncResult' type. -- -- @wait = liftIO . atomically . waitSTM@ -- {-# INLINE wait #-} wait :: Async a -> Process (AsyncResult a) wait = liftIO . atomically . waitSTM -- | Wait for an asynchronous operation to complete or timeout. waitTimeout :: (Serializable a) => Int -> Async a -> Process (Maybe (AsyncResult a)) waitTimeout t hAsync = liftIO $ timeout t $ atomically $ waitSTM hAsync -- | Wait for an asynchronous operation to complete or timeout. -- If it times out, then 'cancelWait' the async handle. -- waitCancelTimeout :: (Serializable a) => Int -> Async a -> Process (AsyncResult a) waitCancelTimeout t hAsync = do r <- waitTimeout t hAsync case r of Nothing -> cancelWait hAsync Just ar -> return ar -- | Wait for any of the supplied @Async@s to complete. If multiple -- 'Async's complete, then the value returned corresponds to the first -- completed 'Async' in the list. -- -- NB: Unlike @AsyncChan@, 'Async' does not discard its 'AsyncResult' once -- read, therefore the semantics of this function are different to the -- former. Specifically, if @asyncs = [a1, a2, a3]@ and @(AsyncDone _) = a1@ -- then the remaining @a2, a3@ will never be returned by 'waitAny'. -- waitAny :: (Serializable a) => [Async a] -> Process (Async a, AsyncResult a) waitAny asyncs = liftIO $ waitAnySTM asyncs -- | Like 'waitAny', but also cancels the other asynchronous -- operations as soon as one has completed. -- waitAnyCancel :: (Serializable a) => [Async a] -> Process (Async a, AsyncResult a) waitAnyCancel asyncs = waitAny asyncs `finally` mapM_ cancel asyncs -- | Wait for the first of two @Async@s to finish. -- waitEither :: Async a -> Async b -> Process (Either (AsyncResult a) (AsyncResult b)) waitEither left right = liftIO $ atomically $ (Left <$> waitSTM left) `orElse` (Right <$> waitSTM right) -- | Like 'waitEither', but the result is ignored. -- waitEither_ :: Async a -> Async b -> Process () waitEither_ left right = liftIO $ atomically $ (void $ waitSTM left) `orElse` (void $ waitSTM right) -- | Waits for both @Async@s to finish. -- waitBoth :: Async a -> Async b -> Process (AsyncResult a, AsyncResult b) waitBoth left right = liftIO $ atomically $ do a <- waitSTM left `orElse` (waitSTM right >> retry) b <- waitSTM right return (a,b) -- | Like 'waitAny' but times out after the specified delay. waitAnyTimeout :: (Serializable a) => Int -> [Async a] -> Process (Maybe (AsyncResult a)) waitAnyTimeout delay asyncs = liftIO $ timeout delay $ do r <- waitAnySTM asyncs return $ snd r -- | Cancel an asynchronous operation. cancel :: Async a -> Process () cancel (Async _ g _) = send g CancelWait -- | Cancel an asynchronous operation and wait for the cancellation to complete. cancelWait :: (Serializable a) => Async a -> Process (AsyncResult a) cancelWait hAsync = cancel hAsync >> wait hAsync -- | Cancel an asynchronous operation immediately. cancelWith :: (Serializable b) => b -> Async a -> Process () cancelWith reason hAsync = exit (_asyncWorker hAsync) reason -- | Like 'cancelWith' but sends a @kill@ instruction instead of an exit. cancelKill :: String -> Async a -> Process () cancelKill reason hAsync = kill (_asyncWorker hAsync) reason -------------------------------------------------------------------------------- -- STM Specific API -- -------------------------------------------------------------------------------- -- | STM version of 'waitAny'. waitAnySTM :: [Async a] -> IO (Async a, AsyncResult a) waitAnySTM asyncs = atomically $ foldr orElse retry $ map (\a -> do r <- waitSTM a; return (a, r)) asyncs -- | A version of 'wait' that can be used inside an STM transaction. -- waitSTM :: Async a -> STM (AsyncResult a) waitSTM (Async _ _ w) = w -- | A version of 'poll' that can be used inside an STM transaction. -- {-# INLINE pollSTM #-} pollSTM :: Async a -> STM (Maybe (AsyncResult a)) pollSTM (Async _ _ w) = (Just <$> w) `orElse` return Nothing