module Control.Distributed.Process.Async
(
AsyncRef
, AsyncTask(..)
, Async
, AsyncResult(..)
, async
, asyncLinked
, task
, remoteTask
, monitorAsync
, asyncWorker
, cancel
, cancelWait
, cancelWith
, cancelKill
, poll
, check
, wait
, waitAny
, waitAnyTimeout
, waitTimeout
, waitCancelTimeout
, waitCheckTimeout
, pollSTM
, waitSTM
, waitAnySTM
, waitAnyCancel
, waitEither
, waitEither_
, waitBoth
) where
import Control.Applicative
import Control.Concurrent.STM hiding (check)
import Control.Distributed.Process hiding (catch, finally)
import Control.Distributed.Process.Serializable
import Control.Distributed.Process.Async.Internal.Types
import Control.Monad
import Control.Monad.Catch (finally)
import Data.Maybe
( fromMaybe
)
import System.Timeout (timeout)
import Prelude
task :: Process a -> AsyncTask a
task :: forall a. Process a -> AsyncTask a
task = Process a -> AsyncTask a
forall a. Process a -> AsyncTask a
AsyncTask
remoteTask :: Static (SerializableDict a)
-> NodeId
-> Closure (Process a)
-> AsyncTask a
remoteTask :: forall a.
Static (SerializableDict a)
-> NodeId -> Closure (Process a) -> AsyncTask a
remoteTask = Static (SerializableDict a)
-> NodeId -> Closure (Process a) -> AsyncTask a
forall a.
Static (SerializableDict a)
-> NodeId -> Closure (Process a) -> AsyncTask a
AsyncRemoteTask
monitorAsync :: Async a -> Process MonitorRef
monitorAsync :: forall a. Async a -> Process MonitorRef
monitorAsync = ProcessId -> Process MonitorRef
monitor (ProcessId -> Process MonitorRef)
-> (Async a -> ProcessId) -> Async a -> Process MonitorRef
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> ProcessId
forall a. Async a -> ProcessId
_asyncWorker
async :: (Serializable a) => AsyncTask a -> Process (Async a)
async :: forall a. Serializable a => AsyncTask a -> Process (Async a)
async = Bool -> AsyncTask a -> Process (Async a)
forall a.
Serializable a =>
Bool -> AsyncTask a -> Process (Async a)
asyncDo Bool
False
asyncWorker :: Async a -> ProcessId
asyncWorker :: forall a. Async a -> ProcessId
asyncWorker = Async a -> ProcessId
forall a. Async a -> ProcessId
_asyncWorker
asyncLinked :: (Serializable a) => AsyncTask a -> Process (Async a)
asyncLinked :: forall a. Serializable a => AsyncTask a -> Process (Async a)
asyncLinked = Bool -> AsyncTask a -> Process (Async a)
forall a.
Serializable a =>
Bool -> AsyncTask a -> Process (Async a)
asyncDo Bool
True
asyncDo :: (Serializable a) => Bool -> AsyncTask a -> Process (Async a)
asyncDo :: forall a.
Serializable a =>
Bool -> AsyncTask a -> Process (Async a)
asyncDo Bool
shouldLink (AsyncRemoteTask Static (SerializableDict a)
d NodeId
n Closure (Process a)
c) =
Bool -> AsyncTask a -> Process (Async a)
forall a.
Serializable a =>
Bool -> AsyncTask a -> Process (Async a)
asyncDo Bool
shouldLink (AsyncTask a -> Process (Async a))
-> AsyncTask a -> Process (Async a)
forall a b. (a -> b) -> a -> b
$ Process a -> AsyncTask a
forall a. Process a -> AsyncTask a
AsyncTask (Process a -> AsyncTask a) -> Process a -> AsyncTask a
forall a b. (a -> b) -> a -> b
$ Static (SerializableDict a)
-> NodeId -> Closure (Process a) -> Process a
forall a.
Serializable a =>
Static (SerializableDict a)
-> NodeId -> Closure (Process a) -> Process a
call Static (SerializableDict a)
d NodeId
n Closure (Process a)
c
asyncDo Bool
shouldLink (AsyncTask Process a
proc) = do
ProcessId
root <- Process ProcessId
getSelfPid
TMVar (AsyncResult a)
result <- IO (TMVar (AsyncResult a)) -> Process (TMVar (AsyncResult a))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMVar (AsyncResult a))
forall a. IO (TMVar a)
newEmptyTMVarIO
TMVar ()
sigStart <- IO (TMVar ()) -> Process (TMVar ())
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMVar ())
forall a. IO (TMVar a)
newEmptyTMVarIO
(SendPort ProcessId
sp, ReceivePort ProcessId
rp) <- Process (SendPort ProcessId, ReceivePort ProcessId)
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
ProcessId
insulator <- Process () -> Process ProcessId
spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$ do
ProcessId
worker <- Process () -> Process ProcessId
spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$ do
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar () -> STM ()
forall a. TMVar a -> STM a
takeTMVar TMVar ()
sigStart
a
r <- Process a
proc
Process () -> Process ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$ IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (AsyncResult a) -> AsyncResult a -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (AsyncResult a)
result (a -> AsyncResult a
forall a. a -> AsyncResult a
AsyncDone a
r)
SendPort ProcessId -> ProcessId -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan SendPort ProcessId
sp ProcessId
worker
MonitorRef
wref <- ProcessId -> Process MonitorRef
monitor ProcessId
worker
Maybe MonitorRef
rref <- if Bool
shouldLink then (MonitorRef -> Maybe MonitorRef)
-> Process MonitorRef -> Process (Maybe MonitorRef)
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap MonitorRef -> Maybe MonitorRef
forall a. a -> Maybe a
Just (ProcessId -> Process MonitorRef
monitor ProcessId
root) else Maybe MonitorRef -> Process (Maybe MonitorRef)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe MonitorRef
forall a. Maybe a
Nothing
Process () -> Process (Process ()) -> Process ()
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
finally (ProcessId -> TMVar (AsyncResult a) -> Process ()
forall a.
Serializable a =>
ProcessId -> TMVar (AsyncResult a) -> Process ()
pollUntilExit ProcessId
worker TMVar (AsyncResult a)
result)
(MonitorRef -> Process ()
unmonitor MonitorRef
wref Process () -> Process (Process ()) -> Process (Process ())
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
Process () -> Process (Process ())
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Process ()
-> (MonitorRef -> Process ()) -> Maybe MonitorRef -> Process ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) MonitorRef -> Process ()
unmonitor Maybe MonitorRef
rref))
ProcessId
workerPid <- ReceivePort ProcessId -> Process ProcessId
forall a. Serializable a => ReceivePort a -> Process a
receiveChan ReceivePort ProcessId
rp
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar () -> () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar ()
sigStart ()
Async a -> Process (Async a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Async { _asyncWorker :: ProcessId
_asyncWorker = ProcessId
workerPid
, _asyncMonitor :: ProcessId
_asyncMonitor = ProcessId
insulator
, _asyncWait :: STM (AsyncResult a)
_asyncWait = TMVar (AsyncResult a) -> STM (AsyncResult a)
forall a. TMVar a -> STM a
readTMVar TMVar (AsyncResult a)
result
}
where
pollUntilExit :: (Serializable a)
=> ProcessId
-> TMVar (AsyncResult a)
-> Process ()
pollUntilExit :: forall a.
Serializable a =>
ProcessId -> TMVar (AsyncResult a) -> Process ()
pollUntilExit ProcessId
wpid TMVar (AsyncResult a)
result' = do
Either CancelWait (ProcessId, DiedReason)
r <- [Match (Either CancelWait (ProcessId, DiedReason))]
-> Process (Either CancelWait (ProcessId, DiedReason))
forall b. [Match b] -> Process b
receiveWait [
(CancelWait -> Process (Either CancelWait (ProcessId, DiedReason)))
-> Match (Either CancelWait (ProcessId, DiedReason))
forall a b. Serializable a => (a -> Process b) -> Match b
match (\c :: CancelWait
c@CancelWait
CancelWait -> ProcessId -> String -> Process ()
kill ProcessId
wpid String
"cancel" Process ()
-> Process (Either CancelWait (ProcessId, DiedReason))
-> Process (Either CancelWait (ProcessId, DiedReason))
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either CancelWait (ProcessId, DiedReason)
-> Process (Either CancelWait (ProcessId, DiedReason))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (CancelWait -> Either CancelWait (ProcessId, DiedReason)
forall a b. a -> Either a b
Left CancelWait
c))
, (ProcessMonitorNotification
-> Process (Either CancelWait (ProcessId, DiedReason)))
-> Match (Either CancelWait (ProcessId, DiedReason))
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(ProcessMonitorNotification MonitorRef
_ ProcessId
pid' DiedReason
r) ->
Either CancelWait (ProcessId, DiedReason)
-> Process (Either CancelWait (ProcessId, DiedReason))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ((ProcessId, DiedReason)
-> Either CancelWait (ProcessId, DiedReason)
forall a b. b -> Either a b
Right (ProcessId
pid', DiedReason
r)))
]
case Either CancelWait (ProcessId, DiedReason)
r of
Left CancelWait
CancelWait
-> IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (AsyncResult a) -> AsyncResult a -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (AsyncResult a)
result' AsyncResult a
forall a. AsyncResult a
AsyncCancelled
Right (ProcessId
fpid, DiedReason
d)
| ProcessId
fpid ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
wpid -> case DiedReason
d of
DiedReason
DiedNormal -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
DiedReason
_ -> IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM Bool -> STM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM Bool -> STM ()) -> STM Bool -> STM ()
forall a b. (a -> b) -> a -> b
$
TMVar (AsyncResult a) -> AsyncResult a -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (AsyncResult a)
result' (DiedReason -> AsyncResult a
forall a. DiedReason -> AsyncResult a
AsyncFailed DiedReason
d)
| Bool
otherwise -> do
ProcessId -> String -> Process ()
kill ProcessId
wpid String
"linkFailed"
[Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait
[ (ProcessMonitorNotification -> Bool)
-> (ProcessMonitorNotification -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(ProcessMonitorNotification MonitorRef
_ ProcessId
pid' DiedReason
_) ->
ProcessId
pid' ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
wpid
) ((ProcessMonitorNotification -> Process ()) -> Match ())
-> (ProcessMonitorNotification -> Process ()) -> Match ()
forall a b. (a -> b) -> a -> b
$ \ProcessMonitorNotification
_ -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
]
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM Bool -> STM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM Bool -> STM ()) -> STM Bool -> STM ()
forall a b. (a -> b) -> a -> b
$
TMVar (AsyncResult a) -> AsyncResult a -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar (AsyncResult a)
result' (DiedReason -> AsyncResult a
forall a. DiedReason -> AsyncResult a
AsyncLinkFailed DiedReason
d)
poll :: (Serializable a) => Async a -> Process (AsyncResult a)
poll :: forall a. Serializable a => Async a -> Process (AsyncResult a)
poll Async a
hAsync = do
Maybe (AsyncResult a)
r <- IO (Maybe (AsyncResult a)) -> Process (Maybe (AsyncResult a))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (AsyncResult a)) -> Process (Maybe (AsyncResult a)))
-> IO (Maybe (AsyncResult a)) -> Process (Maybe (AsyncResult a))
forall a b. (a -> b) -> a -> b
$ STM (Maybe (AsyncResult a)) -> IO (Maybe (AsyncResult a))
forall a. STM a -> IO a
atomically (STM (Maybe (AsyncResult a)) -> IO (Maybe (AsyncResult a)))
-> STM (Maybe (AsyncResult a)) -> IO (Maybe (AsyncResult a))
forall a b. (a -> b) -> a -> b
$ Async a -> STM (Maybe (AsyncResult a))
forall a. Async a -> STM (Maybe (AsyncResult a))
pollSTM Async a
hAsync
AsyncResult a -> Process (AsyncResult a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (AsyncResult a -> Process (AsyncResult a))
-> AsyncResult a -> Process (AsyncResult a)
forall a b. (a -> b) -> a -> b
$ AsyncResult a -> Maybe (AsyncResult a) -> AsyncResult a
forall a. a -> Maybe a -> a
fromMaybe AsyncResult a
forall a. AsyncResult a
AsyncPending Maybe (AsyncResult a)
r
check :: (Serializable a) => Async a -> Process (Maybe (AsyncResult a))
check :: forall a.
Serializable a =>
Async a -> Process (Maybe (AsyncResult a))
check Async a
hAsync = Async a -> Process (AsyncResult a)
forall a. Serializable a => Async a -> Process (AsyncResult a)
poll Async a
hAsync Process (AsyncResult a)
-> (AsyncResult a -> Process (Maybe (AsyncResult a)))
-> Process (Maybe (AsyncResult a))
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \AsyncResult a
r -> case AsyncResult a
r of
AsyncResult a
AsyncPending -> Maybe (AsyncResult a) -> Process (Maybe (AsyncResult a))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (AsyncResult a)
forall a. Maybe a
Nothing
AsyncResult a
ar -> Maybe (AsyncResult a) -> Process (Maybe (AsyncResult a))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (AsyncResult a -> Maybe (AsyncResult a)
forall a. a -> Maybe a
Just AsyncResult a
ar)
waitCheckTimeout :: (Serializable a) =>
Int -> Async a -> Process (AsyncResult a)
waitCheckTimeout :: forall a.
Serializable a =>
Int -> Async a -> Process (AsyncResult a)
waitCheckTimeout Int
t Async a
hAsync =
(Maybe (AsyncResult a) -> AsyncResult a)
-> Process (Maybe (AsyncResult a)) -> Process (AsyncResult a)
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (AsyncResult a -> Maybe (AsyncResult a) -> AsyncResult a
forall a. a -> Maybe a -> a
fromMaybe AsyncResult a
forall a. AsyncResult a
AsyncPending) (Int -> Async a -> Process (Maybe (AsyncResult a))
forall a.
Serializable a =>
Int -> Async a -> Process (Maybe (AsyncResult a))
waitTimeout Int
t Async a
hAsync)
{-# INLINE wait #-}
wait :: Async a -> Process (AsyncResult a)
wait :: forall a. Async a -> Process (AsyncResult a)
wait = IO (AsyncResult a) -> Process (AsyncResult a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (AsyncResult a) -> Process (AsyncResult a))
-> (Async a -> IO (AsyncResult a))
-> Async a
-> Process (AsyncResult a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (AsyncResult a) -> IO (AsyncResult a)
forall a. STM a -> IO a
atomically (STM (AsyncResult a) -> IO (AsyncResult a))
-> (Async a -> STM (AsyncResult a))
-> Async a
-> IO (AsyncResult a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> STM (AsyncResult a)
forall a. Async a -> STM (AsyncResult a)
waitSTM
waitTimeout :: (Serializable a) =>
Int -> Async a -> Process (Maybe (AsyncResult a))
waitTimeout :: forall a.
Serializable a =>
Int -> Async a -> Process (Maybe (AsyncResult a))
waitTimeout Int
t Async a
hAsync =
IO (Maybe (AsyncResult a)) -> Process (Maybe (AsyncResult a))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (AsyncResult a)) -> Process (Maybe (AsyncResult a)))
-> IO (Maybe (AsyncResult a)) -> Process (Maybe (AsyncResult a))
forall a b. (a -> b) -> a -> b
$ Int -> IO (AsyncResult a) -> IO (Maybe (AsyncResult a))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
t (IO (AsyncResult a) -> IO (Maybe (AsyncResult a)))
-> IO (AsyncResult a) -> IO (Maybe (AsyncResult a))
forall a b. (a -> b) -> a -> b
$ STM (AsyncResult a) -> IO (AsyncResult a)
forall a. STM a -> IO a
atomically (STM (AsyncResult a) -> IO (AsyncResult a))
-> STM (AsyncResult a) -> IO (AsyncResult a)
forall a b. (a -> b) -> a -> b
$ Async a -> STM (AsyncResult a)
forall a. Async a -> STM (AsyncResult a)
waitSTM Async a
hAsync
waitCancelTimeout :: (Serializable a)
=> Int
-> Async a
-> Process (AsyncResult a)
waitCancelTimeout :: forall a.
Serializable a =>
Int -> Async a -> Process (AsyncResult a)
waitCancelTimeout Int
t Async a
hAsync = do
Maybe (AsyncResult a)
r <- Int -> Async a -> Process (Maybe (AsyncResult a))
forall a.
Serializable a =>
Int -> Async a -> Process (Maybe (AsyncResult a))
waitTimeout Int
t Async a
hAsync
case Maybe (AsyncResult a)
r of
Maybe (AsyncResult a)
Nothing -> Async a -> Process (AsyncResult a)
forall a. Serializable a => Async a -> Process (AsyncResult a)
cancelWait Async a
hAsync
Just AsyncResult a
ar -> AsyncResult a -> Process (AsyncResult a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return AsyncResult a
ar
waitAny :: (Serializable a)
=> [Async a]
-> Process (Async a, AsyncResult a)
waitAny :: forall a.
Serializable a =>
[Async a] -> Process (Async a, AsyncResult a)
waitAny [Async a]
asyncs = IO (Async a, AsyncResult a) -> Process (Async a, AsyncResult a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async a, AsyncResult a) -> Process (Async a, AsyncResult a))
-> IO (Async a, AsyncResult a) -> Process (Async a, AsyncResult a)
forall a b. (a -> b) -> a -> b
$ [Async a] -> IO (Async a, AsyncResult a)
forall a. [Async a] -> IO (Async a, AsyncResult a)
waitAnySTM [Async a]
asyncs
waitAnyCancel :: (Serializable a)
=> [Async a] -> Process (Async a, AsyncResult a)
waitAnyCancel :: forall a.
Serializable a =>
[Async a] -> Process (Async a, AsyncResult a)
waitAnyCancel [Async a]
asyncs =
[Async a] -> Process (Async a, AsyncResult a)
forall a.
Serializable a =>
[Async a] -> Process (Async a, AsyncResult a)
waitAny [Async a]
asyncs Process (Async a, AsyncResult a)
-> Process () -> Process (Async a, AsyncResult a)
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
`finally` (Async a -> Process ()) -> [Async a] -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Async a -> Process ()
forall a. Async a -> Process ()
cancel [Async a]
asyncs
waitEither :: Async a
-> Async b
-> Process (Either (AsyncResult a) (AsyncResult b))
waitEither :: forall a b.
Async a
-> Async b -> Process (Either (AsyncResult a) (AsyncResult b))
waitEither Async a
left Async b
right =
IO (Either (AsyncResult a) (AsyncResult b))
-> Process (Either (AsyncResult a) (AsyncResult b))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either (AsyncResult a) (AsyncResult b))
-> Process (Either (AsyncResult a) (AsyncResult b)))
-> IO (Either (AsyncResult a) (AsyncResult b))
-> Process (Either (AsyncResult a) (AsyncResult b))
forall a b. (a -> b) -> a -> b
$ STM (Either (AsyncResult a) (AsyncResult b))
-> IO (Either (AsyncResult a) (AsyncResult b))
forall a. STM a -> IO a
atomically (STM (Either (AsyncResult a) (AsyncResult b))
-> IO (Either (AsyncResult a) (AsyncResult b)))
-> STM (Either (AsyncResult a) (AsyncResult b))
-> IO (Either (AsyncResult a) (AsyncResult b))
forall a b. (a -> b) -> a -> b
$
(AsyncResult a -> Either (AsyncResult a) (AsyncResult b)
forall a b. a -> Either a b
Left (AsyncResult a -> Either (AsyncResult a) (AsyncResult b))
-> STM (AsyncResult a)
-> STM (Either (AsyncResult a) (AsyncResult b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async a -> STM (AsyncResult a)
forall a. Async a -> STM (AsyncResult a)
waitSTM Async a
left)
STM (Either (AsyncResult a) (AsyncResult b))
-> STM (Either (AsyncResult a) (AsyncResult b))
-> STM (Either (AsyncResult a) (AsyncResult b))
forall a. STM a -> STM a -> STM a
`orElse`
(AsyncResult b -> Either (AsyncResult a) (AsyncResult b)
forall a b. b -> Either a b
Right (AsyncResult b -> Either (AsyncResult a) (AsyncResult b))
-> STM (AsyncResult b)
-> STM (Either (AsyncResult a) (AsyncResult b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async b -> STM (AsyncResult b)
forall a. Async a -> STM (AsyncResult a)
waitSTM Async b
right)
waitEither_ :: Async a -> Async b -> Process ()
waitEither_ :: forall a b. Async a -> Async b -> Process ()
waitEither_ Async a
left Async b
right =
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
(STM (AsyncResult a) -> STM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM (AsyncResult a) -> STM ()) -> STM (AsyncResult a) -> STM ()
forall a b. (a -> b) -> a -> b
$ Async a -> STM (AsyncResult a)
forall a. Async a -> STM (AsyncResult a)
waitSTM Async a
left)
STM () -> STM () -> STM ()
forall a. STM a -> STM a -> STM a
`orElse`
(STM (AsyncResult b) -> STM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM (AsyncResult b) -> STM ()) -> STM (AsyncResult b) -> STM ()
forall a b. (a -> b) -> a -> b
$ Async b -> STM (AsyncResult b)
forall a. Async a -> STM (AsyncResult a)
waitSTM Async b
right)
waitBoth :: Async a
-> Async b
-> Process (AsyncResult a, AsyncResult b)
waitBoth :: forall a b.
Async a -> Async b -> Process (AsyncResult a, AsyncResult b)
waitBoth Async a
left Async b
right =
IO (AsyncResult a, AsyncResult b)
-> Process (AsyncResult a, AsyncResult b)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (AsyncResult a, AsyncResult b)
-> Process (AsyncResult a, AsyncResult b))
-> IO (AsyncResult a, AsyncResult b)
-> Process (AsyncResult a, AsyncResult b)
forall a b. (a -> b) -> a -> b
$ STM (AsyncResult a, AsyncResult b)
-> IO (AsyncResult a, AsyncResult b)
forall a. STM a -> IO a
atomically (STM (AsyncResult a, AsyncResult b)
-> IO (AsyncResult a, AsyncResult b))
-> STM (AsyncResult a, AsyncResult b)
-> IO (AsyncResult a, AsyncResult b)
forall a b. (a -> b) -> a -> b
$ do
AsyncResult a
a <- Async a -> STM (AsyncResult a)
forall a. Async a -> STM (AsyncResult a)
waitSTM Async a
left
STM (AsyncResult a) -> STM (AsyncResult a) -> STM (AsyncResult a)
forall a. STM a -> STM a -> STM a
`orElse`
(Async b -> STM (AsyncResult b)
forall a. Async a -> STM (AsyncResult a)
waitSTM Async b
right STM (AsyncResult b) -> STM (AsyncResult a) -> STM (AsyncResult a)
forall a b. STM a -> STM b -> STM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> STM (AsyncResult a)
forall a. STM a
retry)
AsyncResult b
b <- Async b -> STM (AsyncResult b)
forall a. Async a -> STM (AsyncResult a)
waitSTM Async b
right
(AsyncResult a, AsyncResult b)
-> STM (AsyncResult a, AsyncResult b)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (AsyncResult a
a,AsyncResult b
b)
waitAnyTimeout :: (Serializable a)
=> Int
-> [Async a]
-> Process (Maybe (AsyncResult a))
waitAnyTimeout :: forall a.
Serializable a =>
Int -> [Async a] -> Process (Maybe (AsyncResult a))
waitAnyTimeout Int
delay [Async a]
asyncs =
IO (Maybe (AsyncResult a)) -> Process (Maybe (AsyncResult a))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (AsyncResult a)) -> Process (Maybe (AsyncResult a)))
-> IO (Maybe (AsyncResult a)) -> Process (Maybe (AsyncResult a))
forall a b. (a -> b) -> a -> b
$ Int -> IO (AsyncResult a) -> IO (Maybe (AsyncResult a))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
delay (IO (AsyncResult a) -> IO (Maybe (AsyncResult a)))
-> IO (AsyncResult a) -> IO (Maybe (AsyncResult a))
forall a b. (a -> b) -> a -> b
$ do
(Async a, AsyncResult a)
r <- [Async a] -> IO (Async a, AsyncResult a)
forall a. [Async a] -> IO (Async a, AsyncResult a)
waitAnySTM [Async a]
asyncs
AsyncResult a -> IO (AsyncResult a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (AsyncResult a -> IO (AsyncResult a))
-> AsyncResult a -> IO (AsyncResult a)
forall a b. (a -> b) -> a -> b
$ (Async a, AsyncResult a) -> AsyncResult a
forall a b. (a, b) -> b
snd (Async a, AsyncResult a)
r
cancel :: Async a -> Process ()
cancel :: forall a. Async a -> Process ()
cancel (Async ProcessId
_ ProcessId
g STM (AsyncResult a)
_) = ProcessId -> CancelWait -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
g CancelWait
CancelWait
cancelWait :: (Serializable a) => Async a -> Process (AsyncResult a)
cancelWait :: forall a. Serializable a => Async a -> Process (AsyncResult a)
cancelWait Async a
hAsync = Async a -> Process ()
forall a. Async a -> Process ()
cancel Async a
hAsync Process () -> Process (AsyncResult a) -> Process (AsyncResult a)
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async a -> Process (AsyncResult a)
forall a. Async a -> Process (AsyncResult a)
wait Async a
hAsync
cancelWith :: (Serializable b) => b -> Async a -> Process ()
cancelWith :: forall b a. Serializable b => b -> Async a -> Process ()
cancelWith b
reason Async a
hAsync = ProcessId -> b -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
exit (Async a -> ProcessId
forall a. Async a -> ProcessId
_asyncWorker Async a
hAsync) b
reason
cancelKill :: String -> Async a -> Process ()
cancelKill :: forall a. String -> Async a -> Process ()
cancelKill String
reason Async a
hAsync = ProcessId -> String -> Process ()
kill (Async a -> ProcessId
forall a. Async a -> ProcessId
_asyncWorker Async a
hAsync) String
reason
waitAnySTM :: [Async a] -> IO (Async a, AsyncResult a)
waitAnySTM :: forall a. [Async a] -> IO (Async a, AsyncResult a)
waitAnySTM [Async a]
asyncs =
STM (Async a, AsyncResult a) -> IO (Async a, AsyncResult a)
forall a. STM a -> IO a
atomically (STM (Async a, AsyncResult a) -> IO (Async a, AsyncResult a))
-> STM (Async a, AsyncResult a) -> IO (Async a, AsyncResult a)
forall a b. (a -> b) -> a -> b
$
(STM (Async a, AsyncResult a)
-> STM (Async a, AsyncResult a) -> STM (Async a, AsyncResult a))
-> STM (Async a, AsyncResult a)
-> [STM (Async a, AsyncResult a)]
-> STM (Async a, AsyncResult a)
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr STM (Async a, AsyncResult a)
-> STM (Async a, AsyncResult a) -> STM (Async a, AsyncResult a)
forall a. STM a -> STM a -> STM a
orElse STM (Async a, AsyncResult a)
forall a. STM a
retry ([STM (Async a, AsyncResult a)] -> STM (Async a, AsyncResult a))
-> [STM (Async a, AsyncResult a)] -> STM (Async a, AsyncResult a)
forall a b. (a -> b) -> a -> b
$
(Async a -> STM (Async a, AsyncResult a))
-> [Async a] -> [STM (Async a, AsyncResult a)]
forall a b. (a -> b) -> [a] -> [b]
map (\Async a
a -> do AsyncResult a
r <- Async a -> STM (AsyncResult a)
forall a. Async a -> STM (AsyncResult a)
waitSTM Async a
a; (Async a, AsyncResult a) -> STM (Async a, AsyncResult a)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Async a
a, AsyncResult a
r)) [Async a]
asyncs
waitSTM :: Async a -> STM (AsyncResult a)
waitSTM :: forall a. Async a -> STM (AsyncResult a)
waitSTM (Async ProcessId
_ ProcessId
_ STM (AsyncResult a)
w) = STM (AsyncResult a)
w
{-# INLINE pollSTM #-}
pollSTM :: Async a -> STM (Maybe (AsyncResult a))
pollSTM :: forall a. Async a -> STM (Maybe (AsyncResult a))
pollSTM (Async ProcessId
_ ProcessId
_ STM (AsyncResult a)
w) = (AsyncResult a -> Maybe (AsyncResult a)
forall a. a -> Maybe a
Just (AsyncResult a -> Maybe (AsyncResult a))
-> STM (AsyncResult a) -> STM (Maybe (AsyncResult a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (AsyncResult a)
w) STM (Maybe (AsyncResult a))
-> STM (Maybe (AsyncResult a)) -> STM (Maybe (AsyncResult a))
forall a. STM a -> STM a -> STM a
`orElse` Maybe (AsyncResult a) -> STM (Maybe (AsyncResult a))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (AsyncResult a)
forall a. Maybe a
Nothing