-----------------------------------------------------------------------------
-- |
-- Module      :  Control.Distributed.Process.Async
-- Copyright   :  (c) Tim Watson 2012
-- License     :  BSD3 (see the file LICENSE)
--
-- Maintainer  :  Tim Watson <watson.timothy@gmail.com>
-- Stability   :  experimental
-- Portability :  non-portable (requires concurrency)
--
-- This API provides a means for spawning asynchronous operations, waiting
-- for their results, cancelling them and various other utilities.
-- Asynchronous operations can be executed on remote nodes.
--
-- [Asynchronous Operations]
--
-- There is an implicit contract for async workers; Workers must exit
-- normally (i.e., should not call the 'exit', 'die' or 'terminate'
-- Cloud Haskell primitives), otherwise the 'AsyncResult' will end up being
-- @AsyncFailed DiedException@ instead of containing the result.
--
-- Portions of this file are derived from the @Control.Concurrent.Async@
-- module, from the @async@ package written by Simon Marlow.
-----------------------------------------------------------------------------

module Control.Distributed.Process.Async
  ( -- * Exported types
    AsyncRef
  , AsyncTask(..)
  , Async
  , AsyncResult(..)
    -- * Spawning asynchronous operations
  , async
  , asyncLinked
  , task
  , remoteTask
  , monitorAsync
  , asyncWorker
    -- * Cancelling asynchronous operations
  , cancel
  , cancelWait
  , cancelWith
  , cancelKill
    -- * Querying for results
  , poll
  , check
  , wait
  , waitAny
    -- * Waiting with timeouts
  , waitAnyTimeout
  , waitTimeout
  , waitCancelTimeout
  , waitCheckTimeout
    -- * STM versions
  , pollSTM
  , waitSTM
  , waitAnySTM
  , waitAnyCancel
  , waitEither
  , waitEither_
  , waitBoth
  ) where

import Control.Applicative
import Control.Concurrent.STM hiding (check)
import Control.Distributed.Process hiding (catch, finally)
import Control.Distributed.Process.Serializable
import Control.Distributed.Process.Async.Internal.Types
import Control.Monad
import Control.Monad.Catch (finally)
import Data.Maybe
  ( fromMaybe
  )

import System.Timeout (timeout)
import Prelude

-- | Wraps a regular @Process a@ as an 'AsyncTask'.
task :: Process a -> AsyncTask a
task :: forall a. Process a -> AsyncTask a
task = Process a -> AsyncTask a
forall a. Process a -> AsyncTask a
AsyncTask

-- | Wraps the components required and builds a remote 'AsyncTask'.
remoteTask :: Static (SerializableDict a)
              -> NodeId
              -> Closure (Process a)
              -> AsyncTask a
remoteTask :: forall a.
Static (SerializableDict a)
-> NodeId -> Closure (Process a) -> AsyncTask a
remoteTask = Static (SerializableDict a)
-> NodeId -> Closure (Process a) -> AsyncTask a
forall a.
Static (SerializableDict a)
-> NodeId -> Closure (Process a) -> AsyncTask a
AsyncRemoteTask

-- | Given an 'Async' handle, monitor the worker process.
monitorAsync :: Async a -> Process MonitorRef
monitorAsync :: forall a. Async a -> Process MonitorRef
monitorAsync = ProcessId -> Process MonitorRef
monitor (ProcessId -> Process MonitorRef)
-> (Async a -> ProcessId) -> Async a -> Process MonitorRef
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> ProcessId
forall a. Async a -> ProcessId
_asyncWorker

-- | Spawns an asynchronous action and returns a handle to it,
-- which can be used to obtain its status and/or result or interact
-- with it (using the API exposed by this module).
--
async :: (Serializable a) => AsyncTask a -> Process (Async a)
async :: forall a. Serializable a => AsyncTask a -> Process (Async a)
async = Bool -> AsyncTask a -> Process (Async a)
forall a.
Serializable a =>
Bool -> AsyncTask a -> Process (Async a)
asyncDo Bool
False

-- | Provides the pid of the worker process performing the async operation.
asyncWorker :: Async a -> ProcessId
asyncWorker :: forall a. Async a -> ProcessId
asyncWorker = Async a -> ProcessId
forall a. Async a -> ProcessId
_asyncWorker

-- | This is a useful variant of 'async' that ensures an @Async@ task is
-- never left running unintentionally. We ensure that if the caller's process
-- exits, that the worker is killed.
--
-- There is currently a contract for async workers, that they should
-- exit normally (i.e., they should not call the @exit@ or @kill@ with their own
-- 'ProcessId' nor use the @terminate@ primitive to cease functining), otherwise
-- the 'AsyncResult' will end up being @AsyncFailed DiedException@ instead of
-- containing the desired result.
--
asyncLinked :: (Serializable a) => AsyncTask a -> Process (Async a)
asyncLinked :: forall a. Serializable a => AsyncTask a -> Process (Async a)
asyncLinked = Bool -> AsyncTask a -> Process (Async a)
forall a.
Serializable a =>
Bool -> AsyncTask a -> Process (Async a)
asyncDo Bool
True

-- private API
asyncDo :: (Serializable a) => Bool -> AsyncTask a -> Process (Async a)
asyncDo :: forall a.
Serializable a =>
Bool -> AsyncTask a -> Process (Async a)
asyncDo Bool
shouldLink (AsyncRemoteTask Static (SerializableDict a)
d NodeId
n Closure (Process a)
c) =
    Bool -> AsyncTask a -> Process (Async a)
forall a.
Serializable a =>
Bool -> AsyncTask a -> Process (Async a)
asyncDo Bool
shouldLink (AsyncTask a -> Process (Async a))
-> AsyncTask a -> Process (Async a)
forall a b. (a -> b) -> a -> b
$ Process a -> AsyncTask a
forall a. Process a -> AsyncTask a
AsyncTask (Process a -> AsyncTask a) -> Process a -> AsyncTask a
forall a b. (a -> b) -> a -> b
$ Static (SerializableDict a)
-> NodeId -> Closure (Process a) -> Process a
forall a.
Serializable a =>
Static (SerializableDict a)
-> NodeId -> Closure (Process a) -> Process a
call Static (SerializableDict a)
d NodeId
n Closure (Process a)
c
asyncDo Bool
shouldLink (AsyncTask Process a
proc) = do
    ProcessId
root <- Process ProcessId
getSelfPid
    TMVar (AsyncResult a)
result <- IO (TMVar (AsyncResult a)) -> Process (TMVar (AsyncResult a))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMVar (AsyncResult a))
forall a. IO (TMVar a)
newEmptyTMVarIO
    TMVar ()
sigStart <- IO (TMVar ()) -> Process (TMVar ())
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMVar ())
forall a. IO (TMVar a)
newEmptyTMVarIO
    (SendPort ProcessId
sp, ReceivePort ProcessId
rp) <- Process (SendPort ProcessId, ReceivePort ProcessId)
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan

    -- listener/response proxy
    ProcessId
insulator <- Process () -> Process ProcessId
spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$ do
        ProcessId
worker <- Process () -> Process ProcessId
spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$ do
            IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar () -> STM ()
forall a. TMVar a -> STM a
takeTMVar TMVar ()
sigStart
            a
r <- Process a
proc
            Process () -> Process ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$ IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (AsyncResult a) -> AsyncResult a -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (AsyncResult a)
result (a -> AsyncResult a
forall a. a -> AsyncResult a
AsyncDone a
r)

        SendPort ProcessId -> ProcessId -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort ProcessId
sp ProcessId
worker  -- let the parent process know the worker pid

        MonitorRef
wref <- ProcessId -> Process MonitorRef
monitor ProcessId
worker
        Maybe MonitorRef
rref <- if Bool
shouldLink then (MonitorRef -> Maybe MonitorRef)
-> Process MonitorRef -> Process (Maybe MonitorRef)
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap MonitorRef -> Maybe MonitorRef
forall a. a -> Maybe a
Just (ProcessId -> Process MonitorRef
monitor ProcessId
root) else Maybe MonitorRef -> Process (Maybe MonitorRef)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe MonitorRef
forall a. Maybe a
Nothing
        Process () -> Process (Process ()) -> Process ()
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
finally (ProcessId -> TMVar (AsyncResult a) -> Process ()
forall a.
Serializable a =>
ProcessId -> TMVar (AsyncResult a) -> Process ()
pollUntilExit ProcessId
worker TMVar (AsyncResult a)
result)
                (MonitorRef -> Process ()
unmonitor MonitorRef
wref Process () -> Process (Process ()) -> Process (Process ())
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
                    Process () -> Process (Process ())
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Process ()
-> (MonitorRef -> Process ()) -> Maybe MonitorRef -> Process ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) MonitorRef -> Process ()
unmonitor Maybe MonitorRef
rref))

    ProcessId
workerPid <- ReceivePort ProcessId -> Process ProcessId
forall a. Serializable a => ReceivePort a -> Process a
receiveChan ReceivePort ProcessId
rp
    IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar () -> () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar ()
sigStart ()

    Async a -> Process (Async a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Async { _asyncWorker :: ProcessId
_asyncWorker  = ProcessId
workerPid
                 , _asyncMonitor :: ProcessId
_asyncMonitor = ProcessId
insulator
                 , _asyncWait :: STM (AsyncResult a)
_asyncWait    = TMVar (AsyncResult a) -> STM (AsyncResult a)
forall a. TMVar a -> STM a
readTMVar TMVar (AsyncResult a)
result
                 }

  where
    pollUntilExit :: (Serializable a)
                  => ProcessId
                  -> TMVar (AsyncResult a)
                  -> Process ()
    pollUntilExit :: forall a.
Serializable a =>
ProcessId -> TMVar (AsyncResult a) -> Process ()
pollUntilExit ProcessId
wpid TMVar (AsyncResult a)
result' = do
      Either CancelWait (ProcessId, DiedReason)
r <- [Match (Either CancelWait (ProcessId, DiedReason))]
-> Process (Either CancelWait (ProcessId, DiedReason))
forall b. [Match b] -> Process b
receiveWait [
          (CancelWait -> Process (Either CancelWait (ProcessId, DiedReason)))
-> Match (Either CancelWait (ProcessId, DiedReason))
forall a b. Serializable a => (a -> Process b) -> Match b
match (\c :: CancelWait
c@CancelWait
CancelWait -> ProcessId -> String -> Process ()
kill ProcessId
wpid String
"cancel" Process ()
-> Process (Either CancelWait (ProcessId, DiedReason))
-> Process (Either CancelWait (ProcessId, DiedReason))
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either CancelWait (ProcessId, DiedReason)
-> Process (Either CancelWait (ProcessId, DiedReason))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (CancelWait -> Either CancelWait (ProcessId, DiedReason)
forall a b. a -> Either a b
Left CancelWait
c))
        , (ProcessMonitorNotification
 -> Process (Either CancelWait (ProcessId, DiedReason)))
-> Match (Either CancelWait (ProcessId, DiedReason))
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(ProcessMonitorNotification MonitorRef
_ ProcessId
pid' DiedReason
r) ->
                  Either CancelWait (ProcessId, DiedReason)
-> Process (Either CancelWait (ProcessId, DiedReason))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ((ProcessId, DiedReason)
-> Either CancelWait (ProcessId, DiedReason)
forall a b. b -> Either a b
Right (ProcessId
pid', DiedReason
r)))
        ]
      case Either CancelWait (ProcessId, DiedReason)
r of
          Left CancelWait
CancelWait
            -> IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (AsyncResult a) -> AsyncResult a -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (AsyncResult a)
result' AsyncResult a
forall a. AsyncResult a
AsyncCancelled
          Right (ProcessId
fpid, DiedReason
d)
            | ProcessId
fpid ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
wpid -> case DiedReason
d of
                DiedReason
DiedNormal -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                DiedReason
_          -> IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM Bool -> STM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM Bool -> STM ()) -> STM Bool -> STM ()
forall a b. (a -> b) -> a -> b
$
                                TMVar (AsyncResult a) -> AsyncResult a -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (AsyncResult a)
result' (DiedReason -> AsyncResult a
forall a. DiedReason -> AsyncResult a
AsyncFailed DiedReason
d)
            | Bool
otherwise -> do
                ProcessId -> String -> Process ()
kill ProcessId
wpid String
"linkFailed"
                [Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait
                  [ (ProcessMonitorNotification -> Bool)
-> (ProcessMonitorNotification -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(ProcessMonitorNotification MonitorRef
_ ProcessId
pid' DiedReason
_) ->
                             ProcessId
pid' ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
wpid
                            ) ((ProcessMonitorNotification -> Process ()) -> Match ())
-> (ProcessMonitorNotification -> Process ()) -> Match ()
forall a b. (a -> b) -> a -> b
$ \ProcessMonitorNotification
_ -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                  ]
                IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM Bool -> STM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM Bool -> STM ()) -> STM Bool -> STM ()
forall a b. (a -> b) -> a -> b
$
                  TMVar (AsyncResult a) -> AsyncResult a -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (AsyncResult a)
result' (DiedReason -> AsyncResult a
forall a. DiedReason -> AsyncResult a
AsyncLinkFailed DiedReason
d)

-- | Check whether an 'Async' has completed yet.
poll :: (Serializable a) => Async a -> Process (AsyncResult a)
poll :: forall a. Serializable a => Async a -> Process (AsyncResult a)
poll Async a
hAsync = do
  Maybe (AsyncResult a)
r <- IO (Maybe (AsyncResult a)) -> Process (Maybe (AsyncResult a))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (AsyncResult a)) -> Process (Maybe (AsyncResult a)))
-> IO (Maybe (AsyncResult a)) -> Process (Maybe (AsyncResult a))
forall a b. (a -> b) -> a -> b
$ STM (Maybe (AsyncResult a)) -> IO (Maybe (AsyncResult a))
forall a. STM a -> IO a
atomically (STM (Maybe (AsyncResult a)) -> IO (Maybe (AsyncResult a)))
-> STM (Maybe (AsyncResult a)) -> IO (Maybe (AsyncResult a))
forall a b. (a -> b) -> a -> b
$ Async a -> STM (Maybe (AsyncResult a))
forall a. Async a -> STM (Maybe (AsyncResult a))
pollSTM Async a
hAsync
  AsyncResult a -> Process (AsyncResult a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (AsyncResult a -> Process (AsyncResult a))
-> AsyncResult a -> Process (AsyncResult a)
forall a b. (a -> b) -> a -> b
$ AsyncResult a -> Maybe (AsyncResult a) -> AsyncResult a
forall a. a -> Maybe a -> a
fromMaybe AsyncResult a
forall a. AsyncResult a
AsyncPending Maybe (AsyncResult a)
r

-- | Like 'poll' but returns 'Nothing' if @(poll hAsync) == AsyncPending@.
check :: (Serializable a) => Async a -> Process (Maybe (AsyncResult a))
check :: forall a.
Serializable a =>
Async a -> Process (Maybe (AsyncResult a))
check Async a
hAsync = Async a -> Process (AsyncResult a)
forall a. Serializable a => Async a -> Process (AsyncResult a)
poll Async a
hAsync Process (AsyncResult a)
-> (AsyncResult a -> Process (Maybe (AsyncResult a)))
-> Process (Maybe (AsyncResult a))
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \AsyncResult a
r -> case AsyncResult a
r of
  AsyncResult a
AsyncPending -> Maybe (AsyncResult a) -> Process (Maybe (AsyncResult a))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (AsyncResult a)
forall a. Maybe a
Nothing
  AsyncResult a
ar           -> Maybe (AsyncResult a) -> Process (Maybe (AsyncResult a))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (AsyncResult a -> Maybe (AsyncResult a)
forall a. a -> Maybe a
Just AsyncResult a
ar)

-- | Wait for an asynchronous operation to complete or timeout.
waitCheckTimeout :: (Serializable a) =>
                    Int -> Async a -> Process (AsyncResult a)
waitCheckTimeout :: forall a.
Serializable a =>
Int -> Async a -> Process (AsyncResult a)
waitCheckTimeout Int
t Async a
hAsync =
  (Maybe (AsyncResult a) -> AsyncResult a)
-> Process (Maybe (AsyncResult a)) -> Process (AsyncResult a)
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (AsyncResult a -> Maybe (AsyncResult a) -> AsyncResult a
forall a. a -> Maybe a -> a
fromMaybe AsyncResult a
forall a. AsyncResult a
AsyncPending) (Int -> Async a -> Process (Maybe (AsyncResult a))
forall a.
Serializable a =>
Int -> Async a -> Process (Maybe (AsyncResult a))
waitTimeout Int
t Async a
hAsync)

-- | Wait for an asynchronous action to complete, and return its
-- value. The result (which can include failure and/or cancellation) is
-- encoded by the 'AsyncResult' type.
--
-- @wait = liftIO . atomically . waitSTM@
--
{-# INLINE wait #-}
wait :: Async a -> Process (AsyncResult a)
wait :: forall a. Async a -> Process (AsyncResult a)
wait = IO (AsyncResult a) -> Process (AsyncResult a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (AsyncResult a) -> Process (AsyncResult a))
-> (Async a -> IO (AsyncResult a))
-> Async a
-> Process (AsyncResult a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (AsyncResult a) -> IO (AsyncResult a)
forall a. STM a -> IO a
atomically (STM (AsyncResult a) -> IO (AsyncResult a))
-> (Async a -> STM (AsyncResult a))
-> Async a
-> IO (AsyncResult a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> STM (AsyncResult a)
forall a. Async a -> STM (AsyncResult a)
waitSTM

-- | Wait for an asynchronous operation to complete or timeout.
waitTimeout :: (Serializable a) =>
               Int -> Async a -> Process (Maybe (AsyncResult a))
waitTimeout :: forall a.
Serializable a =>
Int -> Async a -> Process (Maybe (AsyncResult a))
waitTimeout Int
t Async a
hAsync =
    IO (Maybe (AsyncResult a)) -> Process (Maybe (AsyncResult a))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (AsyncResult a)) -> Process (Maybe (AsyncResult a)))
-> IO (Maybe (AsyncResult a)) -> Process (Maybe (AsyncResult a))
forall a b. (a -> b) -> a -> b
$ Int -> IO (AsyncResult a) -> IO (Maybe (AsyncResult a))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
t (IO (AsyncResult a) -> IO (Maybe (AsyncResult a)))
-> IO (AsyncResult a) -> IO (Maybe (AsyncResult a))
forall a b. (a -> b) -> a -> b
$ STM (AsyncResult a) -> IO (AsyncResult a)
forall a. STM a -> IO a
atomically (STM (AsyncResult a) -> IO (AsyncResult a))
-> STM (AsyncResult a) -> IO (AsyncResult a)
forall a b. (a -> b) -> a -> b
$ Async a -> STM (AsyncResult a)
forall a. Async a -> STM (AsyncResult a)
waitSTM Async a
hAsync

-- | Wait for an asynchronous operation to complete or timeout.
-- If it times out, then 'cancelWait' the async handle.
--
waitCancelTimeout :: (Serializable a)
                  => Int
                  -> Async a
                  -> Process (AsyncResult a)
waitCancelTimeout :: forall a.
Serializable a =>
Int -> Async a -> Process (AsyncResult a)
waitCancelTimeout Int
t Async a
hAsync = do
  Maybe (AsyncResult a)
r <- Int -> Async a -> Process (Maybe (AsyncResult a))
forall a.
Serializable a =>
Int -> Async a -> Process (Maybe (AsyncResult a))
waitTimeout Int
t Async a
hAsync
  case Maybe (AsyncResult a)
r of
    Maybe (AsyncResult a)
Nothing -> Async a -> Process (AsyncResult a)
forall a. Serializable a => Async a -> Process (AsyncResult a)
cancelWait Async a
hAsync
    Just AsyncResult a
ar -> AsyncResult a -> Process (AsyncResult a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return AsyncResult a
ar

-- | Wait for any of the supplied @Async@s to complete. If multiple
-- 'Async's complete, then the value returned corresponds to the first
-- completed 'Async' in the list.
--
-- NB: Unlike @AsyncChan@, 'Async' does not discard its 'AsyncResult' once
-- read, therefore the semantics of this function are different to the
-- former. Specifically, if @asyncs = [a1, a2, a3]@ and @(AsyncDone _) = a1@
-- then the remaining @a2, a3@ will never be returned by 'waitAny'.
--
waitAny :: (Serializable a)
        => [Async a]
        -> Process (Async a, AsyncResult a)
waitAny :: forall a.
Serializable a =>
[Async a] -> Process (Async a, AsyncResult a)
waitAny [Async a]
asyncs = IO (Async a, AsyncResult a) -> Process (Async a, AsyncResult a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async a, AsyncResult a) -> Process (Async a, AsyncResult a))
-> IO (Async a, AsyncResult a) -> Process (Async a, AsyncResult a)
forall a b. (a -> b) -> a -> b
$ [Async a] -> IO (Async a, AsyncResult a)
forall a. [Async a] -> IO (Async a, AsyncResult a)
waitAnySTM [Async a]
asyncs

-- | Like 'waitAny', but also cancels the other asynchronous
-- operations as soon as one has completed.
--
waitAnyCancel :: (Serializable a)
              => [Async a] -> Process (Async a, AsyncResult a)
waitAnyCancel :: forall a.
Serializable a =>
[Async a] -> Process (Async a, AsyncResult a)
waitAnyCancel [Async a]
asyncs =
  [Async a] -> Process (Async a, AsyncResult a)
forall a.
Serializable a =>
[Async a] -> Process (Async a, AsyncResult a)
waitAny [Async a]
asyncs Process (Async a, AsyncResult a)
-> Process () -> Process (Async a, AsyncResult a)
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
`finally` (Async a -> Process ()) -> [Async a] -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Async a -> Process ()
forall a. Async a -> Process ()
cancel [Async a]
asyncs

-- | Wait for the first of two @Async@s to finish.
--
waitEither :: Async a
              -> Async b
              -> Process (Either (AsyncResult a) (AsyncResult b))
waitEither :: forall a b.
Async a
-> Async b -> Process (Either (AsyncResult a) (AsyncResult b))
waitEither Async a
left Async b
right =
  IO (Either (AsyncResult a) (AsyncResult b))
-> Process (Either (AsyncResult a) (AsyncResult b))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either (AsyncResult a) (AsyncResult b))
 -> Process (Either (AsyncResult a) (AsyncResult b)))
-> IO (Either (AsyncResult a) (AsyncResult b))
-> Process (Either (AsyncResult a) (AsyncResult b))
forall a b. (a -> b) -> a -> b
$ STM (Either (AsyncResult a) (AsyncResult b))
-> IO (Either (AsyncResult a) (AsyncResult b))
forall a. STM a -> IO a
atomically (STM (Either (AsyncResult a) (AsyncResult b))
 -> IO (Either (AsyncResult a) (AsyncResult b)))
-> STM (Either (AsyncResult a) (AsyncResult b))
-> IO (Either (AsyncResult a) (AsyncResult b))
forall a b. (a -> b) -> a -> b
$
    (AsyncResult a -> Either (AsyncResult a) (AsyncResult b)
forall a b. a -> Either a b
Left  (AsyncResult a -> Either (AsyncResult a) (AsyncResult b))
-> STM (AsyncResult a)
-> STM (Either (AsyncResult a) (AsyncResult b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async a -> STM (AsyncResult a)
forall a. Async a -> STM (AsyncResult a)
waitSTM Async a
left)
      STM (Either (AsyncResult a) (AsyncResult b))
-> STM (Either (AsyncResult a) (AsyncResult b))
-> STM (Either (AsyncResult a) (AsyncResult b))
forall a. STM a -> STM a -> STM a
`orElse`
    (AsyncResult b -> Either (AsyncResult a) (AsyncResult b)
forall a b. b -> Either a b
Right (AsyncResult b -> Either (AsyncResult a) (AsyncResult b))
-> STM (AsyncResult b)
-> STM (Either (AsyncResult a) (AsyncResult b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async b -> STM (AsyncResult b)
forall a. Async a -> STM (AsyncResult a)
waitSTM Async b
right)

-- | Like 'waitEither', but the result is ignored.
--
waitEither_ :: Async a -> Async b -> Process ()
waitEither_ :: forall a b. Async a -> Async b -> Process ()
waitEither_ Async a
left Async b
right =
  IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
    (STM (AsyncResult a) -> STM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM (AsyncResult a) -> STM ()) -> STM (AsyncResult a) -> STM ()
forall a b. (a -> b) -> a -> b
$ Async a -> STM (AsyncResult a)
forall a. Async a -> STM (AsyncResult a)
waitSTM Async a
left)
      STM () -> STM () -> STM ()
forall a. STM a -> STM a -> STM a
`orElse`
    (STM (AsyncResult b) -> STM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM (AsyncResult b) -> STM ()) -> STM (AsyncResult b) -> STM ()
forall a b. (a -> b) -> a -> b
$ Async b -> STM (AsyncResult b)
forall a. Async a -> STM (AsyncResult a)
waitSTM Async b
right)

-- | Waits for both @Async@s to finish.
--
waitBoth :: Async a
            -> Async b
            -> Process (AsyncResult a, AsyncResult b)
waitBoth :: forall a b.
Async a -> Async b -> Process (AsyncResult a, AsyncResult b)
waitBoth Async a
left Async b
right =
  IO (AsyncResult a, AsyncResult b)
-> Process (AsyncResult a, AsyncResult b)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (AsyncResult a, AsyncResult b)
 -> Process (AsyncResult a, AsyncResult b))
-> IO (AsyncResult a, AsyncResult b)
-> Process (AsyncResult a, AsyncResult b)
forall a b. (a -> b) -> a -> b
$ STM (AsyncResult a, AsyncResult b)
-> IO (AsyncResult a, AsyncResult b)
forall a. STM a -> IO a
atomically (STM (AsyncResult a, AsyncResult b)
 -> IO (AsyncResult a, AsyncResult b))
-> STM (AsyncResult a, AsyncResult b)
-> IO (AsyncResult a, AsyncResult b)
forall a b. (a -> b) -> a -> b
$ do
    AsyncResult a
a <- Async a -> STM (AsyncResult a)
forall a. Async a -> STM (AsyncResult a)
waitSTM Async a
left
           STM (AsyncResult a) -> STM (AsyncResult a) -> STM (AsyncResult a)
forall a. STM a -> STM a -> STM a
`orElse`
         (Async b -> STM (AsyncResult b)
forall a. Async a -> STM (AsyncResult a)
waitSTM Async b
right STM (AsyncResult b) -> STM (AsyncResult a) -> STM (AsyncResult a)
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> STM (AsyncResult a)
forall a. STM a
retry)
    AsyncResult b
b <- Async b -> STM (AsyncResult b)
forall a. Async a -> STM (AsyncResult a)
waitSTM Async b
right
    (AsyncResult a, AsyncResult b)
-> STM (AsyncResult a, AsyncResult b)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (AsyncResult a
a,AsyncResult b
b)

-- | Like 'waitAny' but times out after the specified delay.
waitAnyTimeout :: (Serializable a)
               => Int
               -> [Async a]
               -> Process (Maybe (AsyncResult a))
waitAnyTimeout :: forall a.
Serializable a =>
Int -> [Async a] -> Process (Maybe (AsyncResult a))
waitAnyTimeout Int
delay [Async a]
asyncs =
  IO (Maybe (AsyncResult a)) -> Process (Maybe (AsyncResult a))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (AsyncResult a)) -> Process (Maybe (AsyncResult a)))
-> IO (Maybe (AsyncResult a)) -> Process (Maybe (AsyncResult a))
forall a b. (a -> b) -> a -> b
$ Int -> IO (AsyncResult a) -> IO (Maybe (AsyncResult a))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
delay (IO (AsyncResult a) -> IO (Maybe (AsyncResult a)))
-> IO (AsyncResult a) -> IO (Maybe (AsyncResult a))
forall a b. (a -> b) -> a -> b
$ do
    (Async a, AsyncResult a)
r <- [Async a] -> IO (Async a, AsyncResult a)
forall a. [Async a] -> IO (Async a, AsyncResult a)
waitAnySTM [Async a]
asyncs
    AsyncResult a -> IO (AsyncResult a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (AsyncResult a -> IO (AsyncResult a))
-> AsyncResult a -> IO (AsyncResult a)
forall a b. (a -> b) -> a -> b
$ (Async a, AsyncResult a) -> AsyncResult a
forall a b. (a, b) -> b
snd (Async a, AsyncResult a)
r

-- | Cancel an asynchronous operation.
cancel :: Async a -> Process ()
cancel :: forall a. Async a -> Process ()
cancel (Async ProcessId
_ ProcessId
g STM (AsyncResult a)
_) = ProcessId -> CancelWait -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
g CancelWait
CancelWait

-- | Cancel an asynchronous operation and wait for the cancellation to complete.
cancelWait :: (Serializable a) => Async a -> Process (AsyncResult a)
cancelWait :: forall a. Serializable a => Async a -> Process (AsyncResult a)
cancelWait Async a
hAsync = Async a -> Process ()
forall a. Async a -> Process ()
cancel Async a
hAsync Process () -> Process (AsyncResult a) -> Process (AsyncResult a)
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async a -> Process (AsyncResult a)
forall a. Async a -> Process (AsyncResult a)
wait Async a
hAsync

-- | Cancel an asynchronous operation immediately.
cancelWith :: (Serializable b) => b -> Async a -> Process ()
cancelWith :: forall b a. Serializable b => b -> Async a -> Process ()
cancelWith b
reason Async a
hAsync = ProcessId -> b -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
exit (Async a -> ProcessId
forall a. Async a -> ProcessId
_asyncWorker Async a
hAsync) b
reason

-- | Like 'cancelWith' but sends a @kill@ instruction instead of an exit.
cancelKill :: String -> Async a -> Process ()
cancelKill :: forall a. String -> Async a -> Process ()
cancelKill String
reason Async a
hAsync = ProcessId -> String -> Process ()
kill (Async a -> ProcessId
forall a. Async a -> ProcessId
_asyncWorker Async a
hAsync) String
reason

--------------------------------------------------------------------------------
-- STM Specific API                                                           --
--------------------------------------------------------------------------------

-- | STM version of 'waitAny'.
waitAnySTM :: [Async a] -> IO (Async a, AsyncResult a)
waitAnySTM :: forall a. [Async a] -> IO (Async a, AsyncResult a)
waitAnySTM [Async a]
asyncs =
  STM (Async a, AsyncResult a) -> IO (Async a, AsyncResult a)
forall a. STM a -> IO a
atomically (STM (Async a, AsyncResult a) -> IO (Async a, AsyncResult a))
-> STM (Async a, AsyncResult a) -> IO (Async a, AsyncResult a)
forall a b. (a -> b) -> a -> b
$
    (STM (Async a, AsyncResult a)
 -> STM (Async a, AsyncResult a) -> STM (Async a, AsyncResult a))
-> STM (Async a, AsyncResult a)
-> [STM (Async a, AsyncResult a)]
-> STM (Async a, AsyncResult a)
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr STM (Async a, AsyncResult a)
-> STM (Async a, AsyncResult a) -> STM (Async a, AsyncResult a)
forall a. STM a -> STM a -> STM a
orElse STM (Async a, AsyncResult a)
forall a. STM a
retry ([STM (Async a, AsyncResult a)] -> STM (Async a, AsyncResult a))
-> [STM (Async a, AsyncResult a)] -> STM (Async a, AsyncResult a)
forall a b. (a -> b) -> a -> b
$
      (Async a -> STM (Async a, AsyncResult a))
-> [Async a] -> [STM (Async a, AsyncResult a)]
forall a b. (a -> b) -> [a] -> [b]
map (\Async a
a -> do AsyncResult a
r <- Async a -> STM (AsyncResult a)
forall a. Async a -> STM (AsyncResult a)
waitSTM Async a
a; (Async a, AsyncResult a) -> STM (Async a, AsyncResult a)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Async a
a, AsyncResult a
r)) [Async a]
asyncs

-- | A version of 'wait' that can be used inside an STM transaction.
--
waitSTM :: Async a -> STM (AsyncResult a)
waitSTM :: forall a. Async a -> STM (AsyncResult a)
waitSTM (Async ProcessId
_ ProcessId
_ STM (AsyncResult a)
w) = STM (AsyncResult a)
w

-- | A version of 'poll' that can be used inside an STM transaction.
--
{-# INLINE pollSTM #-}
pollSTM :: Async a -> STM (Maybe (AsyncResult a))
pollSTM :: forall a. Async a -> STM (Maybe (AsyncResult a))
pollSTM (Async ProcessId
_ ProcessId
_ STM (AsyncResult a)
w) = (AsyncResult a -> Maybe (AsyncResult a)
forall a. a -> Maybe a
Just (AsyncResult a -> Maybe (AsyncResult a))
-> STM (AsyncResult a) -> STM (Maybe (AsyncResult a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (AsyncResult a)
w) STM (Maybe (AsyncResult a))
-> STM (Maybe (AsyncResult a)) -> STM (Maybe (AsyncResult a))
forall a. STM a -> STM a -> STM a
`orElse` Maybe (AsyncResult a) -> STM (Maybe (AsyncResult a))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (AsyncResult a)
forall a. Maybe a
Nothing