{-# LANGUAGE DeriveDataTypeable, MagicHash, UnboxedTuples #-}
module Control.Concurrent.Async (
async, withAsync, wait, asyncThreadId, cancel, concurrently
) where
import Control.Concurrent.STM
import Control.Exception
import Control.Concurrent
import Control.Monad
import Data.IORef
import Data.Typeable
import GHC.Conc
import GHC.Exts
import GHC.IO hiding (onException)
data Async a = Async
{ Async a -> ThreadId
asyncThreadId :: {-# UNPACK #-} !ThreadId
, Async a -> STM (Either SomeException a)
_asyncWait :: STM (Either SomeException a)
}
async :: IO a -> IO (Async a)
async :: IO a -> IO (Async a)
async = ((IO () -> IO ThreadId) -> IO a -> IO (Async a))
-> (IO () -> IO ThreadId) -> IO a -> IO (Async a)
forall a. a -> a
inline (IO () -> IO ThreadId) -> IO a -> IO (Async a)
forall a. (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing IO () -> IO ThreadId
rawForkIO
asyncUsing :: (IO () -> IO ThreadId)
-> IO a -> IO (Async a)
asyncUsing :: (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing IO () -> IO ThreadId
doFork = \IO a
action -> do
TMVar (Either SomeException a)
var <- IO (TMVar (Either SomeException a))
forall a. IO (TMVar a)
newEmptyTMVarIO
ThreadId
t <- ((forall a. IO a -> IO a) -> IO ThreadId) -> IO ThreadId
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO ThreadId) -> IO ThreadId)
-> ((forall a. IO a -> IO a) -> IO ThreadId) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore ->
IO () -> IO ThreadId
doFork (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO a -> IO a
forall a. IO a -> IO a
restore IO a
action) IO (Either SomeException a)
-> (Either SomeException a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (Either SomeException a -> STM ())
-> Either SomeException a
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TMVar (Either SomeException a) -> Either SomeException a -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either SomeException a)
var
Async a -> IO (Async a)
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId -> STM (Either SomeException a) -> Async a
forall a. ThreadId -> STM (Either SomeException a) -> Async a
Async ThreadId
t (TMVar (Either SomeException a) -> STM (Either SomeException a)
forall a. TMVar a -> STM a
readTMVar TMVar (Either SomeException a)
var))
withAsync :: IO a -> (Async a -> IO b) -> IO b
withAsync :: IO a -> (Async a -> IO b) -> IO b
withAsync = ((IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b)
-> (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
forall a. a -> a
inline (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
forall a b.
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing IO () -> IO ThreadId
rawForkIO
withAsyncUsing :: (IO () -> IO ThreadId)
-> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing :: (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing IO () -> IO ThreadId
doFork = \IO a
action Async a -> IO b
inner -> do
TMVar (Either SomeException a)
var <- IO (TMVar (Either SomeException a))
forall a. IO (TMVar a)
newEmptyTMVarIO
((forall a. IO a -> IO a) -> IO b) -> IO b
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO b) -> IO b)
-> ((forall a. IO a -> IO a) -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
ThreadId
t <- IO () -> IO ThreadId
doFork (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO a -> IO a
forall a. IO a -> IO a
restore IO a
action) IO (Either SomeException a)
-> (Either SomeException a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (Either SomeException a -> STM ())
-> Either SomeException a
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TMVar (Either SomeException a) -> Either SomeException a -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either SomeException a)
var
let a :: Async a
a = ThreadId -> STM (Either SomeException a) -> Async a
forall a. ThreadId -> STM (Either SomeException a) -> Async a
Async ThreadId
t (TMVar (Either SomeException a) -> STM (Either SomeException a)
forall a. TMVar a -> STM a
readTMVar TMVar (Either SomeException a)
var)
b
r <- IO b -> IO b
forall a. IO a -> IO a
restore (Async a -> IO b
inner Async a
a) IO b -> (SomeException -> IO b) -> IO b
forall a. IO a -> (SomeException -> IO a) -> IO a
`catchAll` \SomeException
e -> do
Async a -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel Async a
a
SomeException -> IO b
forall e a. Exception e => e -> IO a
throwIO SomeException
e
Async a -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel Async a
a
b -> IO b
forall (m :: * -> *) a. Monad m => a -> m a
return b
r
{-# INLINE wait #-}
wait :: Async a -> IO a
wait :: Async a -> IO a
wait = IO a -> IO a
forall a. IO a -> IO a
tryAgain (IO a -> IO a) -> (Async a -> IO a) -> Async a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> IO a
forall a. STM a -> IO a
atomically (STM a -> IO a) -> (Async a -> STM a) -> Async a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> STM a
forall a. Async a -> STM a
waitSTM
where
tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f IO a -> (BlockedIndefinitelyOnSTM -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Control.Exception.catch` \BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM -> IO a
f
{-# INLINE waitCatch #-}
waitCatch :: Async a -> IO (Either SomeException a)
waitCatch :: Async a -> IO (Either SomeException a)
waitCatch = IO (Either SomeException a) -> IO (Either SomeException a)
forall a. IO a -> IO a
tryAgain (IO (Either SomeException a) -> IO (Either SomeException a))
-> (Async a -> IO (Either SomeException a))
-> Async a
-> IO (Either SomeException a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Either SomeException a) -> IO (Either SomeException a)
forall a. STM a -> IO a
atomically (STM (Either SomeException a) -> IO (Either SomeException a))
-> (Async a -> STM (Either SomeException a))
-> Async a
-> IO (Either SomeException a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> STM (Either SomeException a)
forall a. Async a -> STM (Either SomeException a)
waitCatchSTM
where
tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f IO a -> (BlockedIndefinitelyOnSTM -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Control.Exception.catch` \BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM -> IO a
f
waitSTM :: Async a -> STM a
waitSTM :: Async a -> STM a
waitSTM Async a
a = do
Either SomeException a
r <- Async a -> STM (Either SomeException a)
forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async a
a
(SomeException -> STM a)
-> (a -> STM a) -> Either SomeException a -> STM a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> STM a
forall e a. Exception e => e -> STM a
throwSTM a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Either SomeException a
r
{-# INLINE waitCatchSTM #-}
waitCatchSTM :: Async a -> STM (Either SomeException a)
waitCatchSTM :: Async a -> STM (Either SomeException a)
waitCatchSTM (Async ThreadId
_ STM (Either SomeException a)
w) = STM (Either SomeException a)
w
{-# INLINE cancel #-}
cancel :: Async a -> IO ()
cancel :: Async a -> IO ()
cancel a :: Async a
a@(Async ThreadId
t STM (Either SomeException a)
_) = ThreadId -> AsyncCancelled -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
t AsyncCancelled
AsyncCancelled IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO (Either SomeException a) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Async a -> IO (Either SomeException a)
forall a. Async a -> IO (Either SomeException a)
waitCatch Async a
a)
data AsyncCancelled = AsyncCancelled
deriving (Int -> AsyncCancelled -> ShowS
[AsyncCancelled] -> ShowS
AsyncCancelled -> String
(Int -> AsyncCancelled -> ShowS)
-> (AsyncCancelled -> String)
-> ([AsyncCancelled] -> ShowS)
-> Show AsyncCancelled
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [AsyncCancelled] -> ShowS
$cshowList :: [AsyncCancelled] -> ShowS
show :: AsyncCancelled -> String
$cshow :: AsyncCancelled -> String
showsPrec :: Int -> AsyncCancelled -> ShowS
$cshowsPrec :: Int -> AsyncCancelled -> ShowS
Show, AsyncCancelled -> AsyncCancelled -> Bool
(AsyncCancelled -> AsyncCancelled -> Bool)
-> (AsyncCancelled -> AsyncCancelled -> Bool) -> Eq AsyncCancelled
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: AsyncCancelled -> AsyncCancelled -> Bool
$c/= :: AsyncCancelled -> AsyncCancelled -> Bool
== :: AsyncCancelled -> AsyncCancelled -> Bool
$c== :: AsyncCancelled -> AsyncCancelled -> Bool
Eq, Typeable)
instance Exception AsyncCancelled where
#if __GLASGOW_HASKELL__ >= 708
fromException :: SomeException -> Maybe AsyncCancelled
fromException = SomeException -> Maybe AsyncCancelled
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException
toException :: AsyncCancelled -> SomeException
toException = AsyncCancelled -> SomeException
forall e. Exception e => e -> SomeException
asyncExceptionToException
#endif
{-# INLINE uninterruptibleCancel #-}
uninterruptibleCancel :: Async a -> IO ()
uninterruptibleCancel :: Async a -> IO ()
uninterruptibleCancel = IO () -> IO ()
forall a. IO a -> IO a
uninterruptibleMask_ (IO () -> IO ()) -> (Async a -> IO ()) -> Async a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> IO ()
forall a. Async a -> IO ()
cancel
concurrently :: IO a -> IO b -> IO (a,b)
concurrently :: IO a -> IO b -> IO (a, b)
concurrently IO a
left IO b
right = IO a
-> IO b
-> (IO (Either SomeException (Either a b)) -> IO (a, b))
-> IO (a, b)
forall a b r.
IO a
-> IO b -> (IO (Either SomeException (Either a b)) -> IO r) -> IO r
concurrently' IO a
left IO b
right ([Either a b] -> IO (Either SomeException (Either a b)) -> IO (a, b)
forall e a b.
Exception e =>
[Either a b] -> IO (Either e (Either a b)) -> IO (a, b)
collect [])
where
collect :: [Either a b] -> IO (Either e (Either a b)) -> IO (a, b)
collect [Left a
a, Right b
b] IO (Either e (Either a b))
_ = (a, b) -> IO (a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a,b
b)
collect [Right b
b, Left a
a] IO (Either e (Either a b))
_ = (a, b) -> IO (a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a,b
b)
collect [Either a b]
xs IO (Either e (Either a b))
m = do
Either e (Either a b)
e <- IO (Either e (Either a b))
m
case Either e (Either a b)
e of
Left e
ex -> e -> IO (a, b)
forall e a. Exception e => e -> IO a
throwIO e
ex
Right Either a b
r -> [Either a b] -> IO (Either e (Either a b)) -> IO (a, b)
collect (Either a b
rEither a b -> [Either a b] -> [Either a b]
forall a. a -> [a] -> [a]
:[Either a b]
xs) IO (Either e (Either a b))
m
concurrently' :: IO a -> IO b
-> (IO (Either SomeException (Either a b)) -> IO r)
-> IO r
concurrently' :: IO a
-> IO b -> (IO (Either SomeException (Either a b)) -> IO r) -> IO r
concurrently' IO a
left IO b
right IO (Either SomeException (Either a b)) -> IO r
collect = do
MVar (Either SomeException (Either a b))
done <- IO (MVar (Either SomeException (Either a b)))
forall a. IO (MVar a)
newEmptyMVar
((forall a. IO a -> IO a) -> IO r) -> IO r
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO r) -> IO r)
-> ((forall a. IO a -> IO a) -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
ThreadId
lid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall a. IO a -> IO a
uninterruptibleMask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IO () -> IO ()
forall a. IO a -> IO a
restore (IO a
left IO a -> (a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MVar (Either SomeException (Either a b))
-> Either SomeException (Either a b) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done (Either SomeException (Either a b) -> IO ())
-> (a -> Either SomeException (Either a b)) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either a b -> Either SomeException (Either a b)
forall a b. b -> Either a b
Right (Either a b -> Either SomeException (Either a b))
-> (a -> Either a b) -> a -> Either SomeException (Either a b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Either a b
forall a b. a -> Either a b
Left)
IO () -> (SomeException -> IO ()) -> IO ()
forall a. IO a -> (SomeException -> IO a) -> IO a
`catchAll` (MVar (Either SomeException (Either a b))
-> Either SomeException (Either a b) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done (Either SomeException (Either a b) -> IO ())
-> (SomeException -> Either SomeException (Either a b))
-> SomeException
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Either SomeException (Either a b)
forall a b. a -> Either a b
Left)
ThreadId
rid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall a. IO a -> IO a
uninterruptibleMask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IO () -> IO ()
forall a. IO a -> IO a
restore (IO b
right IO b -> (b -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MVar (Either SomeException (Either a b))
-> Either SomeException (Either a b) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done (Either SomeException (Either a b) -> IO ())
-> (b -> Either SomeException (Either a b)) -> b -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either a b -> Either SomeException (Either a b)
forall a b. b -> Either a b
Right (Either a b -> Either SomeException (Either a b))
-> (b -> Either a b) -> b -> Either SomeException (Either a b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> Either a b
forall a b. b -> Either a b
Right)
IO () -> (SomeException -> IO ()) -> IO ()
forall a. IO a -> (SomeException -> IO a) -> IO a
`catchAll` (MVar (Either SomeException (Either a b))
-> Either SomeException (Either a b) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done (Either SomeException (Either a b) -> IO ())
-> (SomeException -> Either SomeException (Either a b))
-> SomeException
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Either SomeException (Either a b)
forall a b. a -> Either a b
Left)
IORef Int
count <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef (Int
2 :: Int)
let takeDone :: IO (Either SomeException (Either a b))
takeDone = do
Either SomeException (Either a b)
r <- MVar (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall a. MVar a -> IO a
takeMVar MVar (Either SomeException (Either a b))
done
IORef Int -> (Int -> Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef Int
count (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1)
Either SomeException (Either a b)
-> IO (Either SomeException (Either a b))
forall (m :: * -> *) a. Monad m => a -> m a
return Either SomeException (Either a b)
r
let tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f IO a -> (BlockedIndefinitelyOnMVar -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Control.Exception.catch` \BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar -> IO a
f
stop :: IO ()
stop = do
IO () -> IO ()
forall a. IO a -> IO a
uninterruptibleMask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int
count' <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
count
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
count' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
ThreadId -> AsyncCancelled -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
rid AsyncCancelled
AsyncCancelled
ThreadId -> AsyncCancelled -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
lid AsyncCancelled
AsyncCancelled
Int -> IO (Either SomeException (Either a b)) -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
count' (IO (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall a. IO a -> IO a
tryAgain (IO (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b)))
-> IO (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall a b. (a -> b) -> a -> b
$ MVar (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall a. MVar a -> IO a
takeMVar MVar (Either SomeException (Either a b))
done)
r
r <- IO (Either SomeException (Either a b)) -> IO r
collect (IO (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall a. IO a -> IO a
tryAgain (IO (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b)))
-> IO (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall a b. (a -> b) -> a -> b
$ IO (Either SomeException (Either a b))
takeDone) IO r -> IO () -> IO r
forall a b. IO a -> IO b -> IO a
`onException` IO ()
stop
IO ()
stop
r -> IO r
forall (m :: * -> *) a. Monad m => a -> m a
return r
r
catchAll :: IO a -> (SomeException -> IO a) -> IO a
catchAll :: IO a -> (SomeException -> IO a) -> IO a
catchAll = IO a -> (SomeException -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
Control.Exception.catch
{-# INLINE rawForkIO #-}
rawForkIO :: IO () -> IO ThreadId
rawForkIO :: IO () -> IO ThreadId
rawForkIO IO ()
action = (State# RealWorld -> (# State# RealWorld, ThreadId #))
-> IO ThreadId
forall a. (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
IO ((State# RealWorld -> (# State# RealWorld, ThreadId #))
-> IO ThreadId)
-> (State# RealWorld -> (# State# RealWorld, ThreadId #))
-> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \ State# RealWorld
s ->
case (IO () -> State# RealWorld -> (# State# RealWorld, ThreadId# #)
forall a.
a -> State# RealWorld -> (# State# RealWorld, ThreadId# #)
fork# IO ()
action State# RealWorld
s) of (# State# RealWorld
s1, ThreadId#
tid #) -> (# State# RealWorld
s1, ThreadId# -> ThreadId
ThreadId ThreadId#
tid #)