module Control.Distributed.Process.Async
(
AsyncRef
, AsyncTask(..)
, Async
, AsyncResult(..)
, async
, asyncLinked
, task
, remoteTask
, monitorAsync
, cancel
, cancelWait
, cancelWith
, cancelKill
, poll
, check
, wait
, waitAny
, waitAnyTimeout
, waitTimeout
, waitCancelTimeout
, waitCheckTimeout
, pollSTM
, waitTimeoutSTM
, waitAnyCancel
, waitEither
, waitEither_
, waitBoth
) where
#if ! MIN_VERSION_base(4,6,0)
import Prelude hiding (catch)
#endif
import Control.Applicative
import Control.Concurrent.STM hiding (check)
import Control.Distributed.Process
import Control.Distributed.Process.Serializable
import Control.Distributed.Process.Async.Internal.Types
import Control.Distributed.Process.Extras
( CancelWait(..)
, Channel
, Resolvable(..)
)
import Control.Distributed.Process.Extras.Time
( asTimeout
, TimeInterval
)
import Control.Monad
import Data.Maybe
( fromMaybe
)
import System.Timeout (timeout)
task :: Process a -> AsyncTask a
task = AsyncTask
remoteTask :: Static (SerializableDict a)
-> NodeId
-> Closure (Process a)
-> AsyncTask a
remoteTask = AsyncRemoteTask
monitorAsync :: Async a -> Process MonitorRef
monitorAsync hAsync = do
worker <- resolve hAsync
case worker of
Nothing -> die "Invalid Async Handle"
Just p -> monitor p
async :: (Serializable a) => AsyncTask a -> Process (Async a)
async = asyncDo False
asyncLinked :: (Serializable a) => AsyncTask a -> Process (Async a)
asyncLinked = asyncDo True
asyncDo :: (Serializable a) => Bool -> AsyncTask a -> Process (Async a)
asyncDo shouldLink (AsyncRemoteTask d n c) =
let proc = call d n c in asyncDo shouldLink AsyncTask { asyncTask = proc }
asyncDo shouldLink (AsyncTask proc) = do
root <- getSelfPid
result <- liftIO $ newEmptyTMVarIO
sigStart <- liftIO $ newEmptyTMVarIO
(sp, rp) <- newChan
insulator <- spawnLocal $ do
worker <- spawnLocal $ do
liftIO $ atomically $ takeTMVar sigStart
r <- proc
void $ liftIO $ atomically $ putTMVar result (AsyncDone r)
sendChan sp worker
wref <- monitor worker
rref <- case shouldLink of
True -> monitor root >>= return . Just
False -> return Nothing
finally (pollUntilExit worker result)
(unmonitor wref >>
return (maybe (return ()) unmonitor rref))
workerPid <- receiveChan rp
liftIO $ atomically $ putTMVar sigStart ()
return Async {
_asyncWorker = workerPid
, _asyncMonitor = insulator
, _asyncWait = (readTMVar result)
}
where
pollUntilExit :: (Serializable a)
=> ProcessId
-> TMVar (AsyncResult a)
-> Process ()
pollUntilExit wpid result' = do
r <- receiveWait [
match (\c@(CancelWait) -> kill wpid "cancel" >> return (Left c))
, match (\(ProcessMonitorNotification _ pid' r) ->
return (Right (pid', r)))
]
case r of
Left CancelWait
-> liftIO $ atomically $ putTMVar result' AsyncCancelled
Right (fpid, d)
| fpid == wpid
-> case d of
DiedNormal -> return ()
_ -> liftIO $ atomically $ putTMVar result' (AsyncFailed d)
| otherwise -> kill wpid "linkFailed"
poll :: (Serializable a) => Async a -> Process (AsyncResult a)
poll hAsync = do
r <- liftIO $ atomically $ pollSTM hAsync
return $ fromMaybe (AsyncPending) r
check :: (Serializable a) => Async a -> Process (Maybe (AsyncResult a))
check hAsync = poll hAsync >>= \r -> case r of
AsyncPending -> return Nothing
ar -> return (Just ar)
waitCheckTimeout :: (Serializable a) =>
TimeInterval -> Async a -> Process (AsyncResult a)
waitCheckTimeout t hAsync =
waitTimeout t hAsync >>= return . fromMaybe (AsyncPending)
wait :: Async a -> Process (AsyncResult a)
wait = liftIO . atomically . waitSTM
waitTimeout :: (Serializable a) =>
TimeInterval -> Async a -> Process (Maybe (AsyncResult a))
waitTimeout t hAsync = do
(sp, rp) <- newChan :: (Serializable a) => Process (Channel (AsyncResult a))
pid <- spawnLocal $ wait hAsync >>= sendChan sp
receiveChanTimeout (asTimeout t) rp `finally` kill pid "timeout"
waitCancelTimeout :: (Serializable a)
=> TimeInterval
-> Async a
-> Process (AsyncResult a)
waitCancelTimeout t hAsync = do
r <- waitTimeout t hAsync
case r of
Nothing -> cancelWait hAsync
Just ar -> return ar
waitTimeoutSTM :: (Serializable a)
=> TimeInterval
-> Async a
-> Process (Maybe (AsyncResult a))
waitTimeoutSTM t hAsync =
let t' = (asTimeout t)
in liftIO $ timeout t' $ atomically $ waitSTM hAsync
waitAny :: (Serializable a)
=> [Async a]
-> Process (Async a, AsyncResult a)
waitAny asyncs = do
r <- liftIO $ waitAnySTM asyncs
return r
waitAnyCancel :: (Serializable a)
=> [Async a] -> Process (Async a, AsyncResult a)
waitAnyCancel asyncs =
waitAny asyncs `finally` mapM_ cancel asyncs
waitEither :: Async a
-> Async b
-> Process (Either (AsyncResult a) (AsyncResult b))
waitEither left right =
liftIO $ atomically $
(Left <$> waitSTM left)
`orElse`
(Right <$> waitSTM right)
waitEither_ :: Async a -> Async b -> Process ()
waitEither_ left right =
liftIO $ atomically $
(void $ waitSTM left)
`orElse`
(void $ waitSTM right)
waitBoth :: Async a
-> Async b
-> Process ((AsyncResult a), (AsyncResult b))
waitBoth left right =
liftIO $ atomically $ do
a <- waitSTM left
`orElse`
(waitSTM right >> retry)
b <- waitSTM right
return (a,b)
waitAnyTimeout :: (Serializable a)
=> TimeInterval
-> [Async a]
-> Process (Maybe (AsyncResult a))
waitAnyTimeout delay asyncs =
let t' = asTimeout delay
in liftIO $ timeout t' $ do
r <- waitAnySTM asyncs
return $ snd r
cancel :: Async a -> Process ()
cancel (Async _ g _) = send g CancelWait
cancelWait :: (Serializable a) => Async a -> Process (AsyncResult a)
cancelWait hAsync = cancel hAsync >> wait hAsync
cancelWith :: (Serializable b) => b -> Async a -> Process ()
cancelWith reason hAsync = do
worker <- resolve hAsync
case worker of
Nothing -> die "Invalid Async Handle"
Just ref -> exit ref reason
cancelKill :: String -> Async a -> Process ()
cancelKill reason hAsync = do
worker <- resolve hAsync
case worker of
Nothing -> die "Invalid Async Handle"
Just ref -> kill ref reason
waitAnySTM :: [Async a] -> IO (Async a, AsyncResult a)
waitAnySTM asyncs =
atomically $
foldr orElse retry $
map (\a -> do r <- waitSTM a; return (a, r)) asyncs
waitSTM :: Async a -> STM (AsyncResult a)
waitSTM (Async _ _ w) = w
pollSTM :: Async a -> STM (Maybe (AsyncResult a))
pollSTM (Async _ _ w) = (Just <$> w) `orElse` return Nothing