{-# LANGUAGE CPP, MagicHash, UnboxedTuples, RankNTypes, GADTs #-}
#if __GLASGOW_HASKELL__ >= 701
{-# LANGUAGE Trustworthy #-}
#endif
{-# OPTIONS -Wall #-}
module Control.Concurrent.Async.Pool.Async
( module Control.Concurrent.Async.Pool.Async
, module Gr
) where
import Control.Concurrent.STM
import Control.Exception
import Control.Concurrent
import Control.Applicative
import Control.Monad hiding (forM, forM_, mapM, mapM_)
import Data.Foldable
import Data.Graph.Inductive.Graph as Gr hiding ((&))
import Data.Graph.Inductive.PatriciaTree as Gr
import Data.Graph.Inductive.Query.BFS as Gr
import Data.IntMap (IntMap)
import qualified Data.IntMap as IntMap
import Data.Traversable
import Prelude hiding (mapM_, mapM, foldr, all, any, concatMap, foldl1)
import GHC.Exts
import GHC.IO hiding (finally, onException)
import GHC.Conc
type Handle = Node
data State = Ready | Starting | Started ThreadId SomeTMVar
data Status = Pending | Completed deriving (Status -> Status -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Status -> Status -> Bool
$c/= :: Status -> Status -> Bool
== :: Status -> Status -> Bool
$c== :: Status -> Status -> Bool
Eq, Int -> Status -> ShowS
[Status] -> ShowS
Status -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Status] -> ShowS
$cshowList :: [Status] -> ShowS
show :: Status -> String
$cshow :: Status -> String
showsPrec :: Int -> Status -> ShowS
$cshowsPrec :: Int -> Status -> ShowS
Show)
type TaskGraph = Gr (TVar State) Status
instance Eq State where
State
Ready == :: State -> State -> Bool
== State
Ready = Bool
True
State
Starting == State
Starting = Bool
True
Started ThreadId
n1 SomeTMVar
_ == Started ThreadId
n2 SomeTMVar
_ = ThreadId
n1 forall a. Eq a => a -> a -> Bool
== ThreadId
n2
State
_ == State
_ = Bool
False
instance Show State where
show :: State -> String
show State
Ready = String
"Ready"
show State
Starting = String
"Starting"
show (Started ThreadId
n SomeTMVar
_) = String
"Started " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> String
show ThreadId
n
data Pool = Pool
{ Pool -> TVar TaskGraph
tasks :: TVar TaskGraph
, Pool -> TVar Int
tokens :: TVar Int
}
waitTMVar :: TMVar a -> STM ()
waitTMVar :: forall a. TMVar a -> STM ()
waitTMVar TMVar a
tv = do
a
_ <- forall a. TMVar a -> STM a
readTMVar TMVar a
tv
forall (m :: * -> *) a. Monad m => a -> m a
return ()
syncPool :: Pool -> STM ()
syncPool :: Pool -> STM ()
syncPool Pool
p = do
TaskGraph
g <- forall a. TVar a -> STM a
readTVar (Pool -> TVar TaskGraph
tasks Pool
p)
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (forall (gr :: * -> * -> *) a b. Graph gr => gr a b -> [LNode a]
labNodes TaskGraph
g) forall a b. (a -> b) -> a -> b
$ \(Int
_h, TVar State
st) -> do
State
x <- forall a. TVar a -> STM a
readTVar TVar State
st
case State
x of
Started ThreadId
_tid SomeTMVar
v -> SomeTMVar -> STM ()
waitSomeTMVar SomeTMVar
v
State
_ -> forall a. STM a
retry
data SomeTMVar where
SomeTMVar :: forall a. TMVar a -> SomeTMVar
waitSomeTMVar :: SomeTMVar -> STM ()
waitSomeTMVar :: SomeTMVar -> STM ()
waitSomeTMVar (SomeTMVar TMVar a
mv) = forall a. TMVar a -> STM ()
waitTMVar TMVar a
mv
data TaskGroup = TaskGroup
{ TaskGroup -> Pool
pool :: Pool
, TaskGroup -> TVar Int
avail :: TVar Int
, TaskGroup -> TVar (IntMap (IO ThreadId, SomeTMVar))
pending :: TVar (IntMap (IO ThreadId, SomeTMVar))
}
data Async a = Async
{ forall a. Async a -> TaskGroup
taskGroup :: TaskGroup
, forall a. Async a -> Int
taskHandle :: {-# UNPACK #-} !Handle
, forall a. Async a -> STM (Either SomeException a)
_asyncWait :: STM (Either SomeException a)
}
getTaskVar :: TaskGraph -> Handle -> TVar State
getTaskVar :: TaskGraph -> Int -> TVar State
getTaskVar TaskGraph
g Int
h = let (Adj Status
_to, Int
_, TVar State
t, Adj Status
_from) = forall (gr :: * -> * -> *) a b.
Graph gr =>
gr a b -> Int -> Context a b
context TaskGraph
g Int
h in TVar State
t
getThreadId :: TaskGraph -> Node -> STM (Maybe ThreadId)
getThreadId :: TaskGraph -> Int -> STM (Maybe ThreadId)
getThreadId TaskGraph
g Int
h = do
State
status <- forall a. TVar a -> STM a
readTVar (TaskGraph -> Int -> TVar State
getTaskVar TaskGraph
g Int
h)
case State
status of
State
Ready -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
State
Starting -> forall a. STM a
retry
Started ThreadId
x SomeTMVar
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just ThreadId
x
instance Eq (Async a) where
Async TaskGroup
_ Int
a STM (Either SomeException a)
_ == :: Async a -> Async a -> Bool
== Async TaskGroup
_ Int
b STM (Either SomeException a)
_ = Int
a forall a. Eq a => a -> a -> Bool
== Int
b
instance Ord (Async a) where
Async TaskGroup
_ Int
a STM (Either SomeException a)
_ compare :: Async a -> Async a -> Ordering
`compare` Async TaskGroup
_ Int
b STM (Either SomeException a)
_ = Int
a forall a. Ord a => a -> a -> Ordering
`compare` Int
b
instance Functor Async where
fmap :: forall a b. (a -> b) -> Async a -> Async b
fmap a -> b
f (Async TaskGroup
p Int
a STM (Either SomeException a)
w) = forall a.
TaskGroup -> Int -> STM (Either SomeException a) -> Async a
Async TaskGroup
p Int
a (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f) STM (Either SomeException a)
w)
async :: TaskGroup -> IO a -> IO (Async a)
async :: forall a. TaskGroup -> IO a -> IO (Async a)
async TaskGroup
p = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. a -> a
inline forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO
asyncBound :: TaskGroup -> IO a -> IO (Async a)
asyncBound :: forall a. TaskGroup -> IO a -> IO (Async a)
asyncBound TaskGroup
p = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
forkOS
asyncOn :: TaskGroup -> Int -> IO a -> IO (Async a)
asyncOn :: forall a. TaskGroup -> Int -> IO a -> IO (Async a)
asyncOn TaskGroup
p = (forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
.) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO () -> IO ThreadId
rawForkOn
asyncWithUnmask :: TaskGroup -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncWithUnmask :: forall a.
TaskGroup -> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a)
asyncWithUnmask TaskGroup
p (forall b. IO b -> IO b) -> IO a
actionWith =
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO ((forall b. IO b -> IO b) -> IO a
actionWith forall b. IO b -> IO b
unsafeUnmask)
asyncOnWithUnmask :: TaskGroup -> Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncOnWithUnmask :: forall a.
TaskGroup
-> Int -> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a)
asyncOnWithUnmask TaskGroup
p Int
cpu (forall b. IO b -> IO b) -> IO a
actionWith =
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p (Int -> IO () -> IO ThreadId
rawForkOn Int
cpu) ((forall b. IO b -> IO b) -> IO a
actionWith forall b. IO b -> IO b
unsafeUnmask)
asyncUsing :: TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing :: forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
doFork IO a
action = do
Int
h <- Pool -> STM Int
nextIdent (TaskGroup -> Pool
pool TaskGroup
p)
TMVar (Either SomeException a)
var <- forall a. STM (TMVar a)
newEmptyTMVar
let start :: IO ThreadId
start = forall b. ((forall b. IO b -> IO b) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore ->
IO () -> IO ThreadId
doFork forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => IO a -> IO (Either e a)
try (forall b. IO b -> IO b
restore (IO a
action forall a b. IO a -> IO b -> IO a
`finally` Int -> IO ()
cleanup Int
h))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either SomeException a)
var
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (TaskGroup -> TVar (IntMap (IO ThreadId, SomeTMVar))
pending TaskGroup
p) (forall a. Int -> a -> IntMap a -> IntMap a
IntMap.insert Int
h (IO ThreadId
start, forall a. TMVar a -> SomeTMVar
SomeTMVar TMVar (Either SomeException a)
var))
TVar State
tv <- forall a. a -> STM (TVar a)
newTVar State
Ready
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (Pool -> TVar TaskGraph
tasks (TaskGroup -> Pool
pool TaskGroup
p)) (forall (gr :: * -> * -> *) a b.
DynGraph gr =>
LNode a -> gr a b -> gr a b
insNode (Int
h, TVar State
tv))
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a.
TaskGroup -> Int -> STM (Either SomeException a) -> Async a
Async TaskGroup
p Int
h (forall a. TMVar a -> STM a
readTMVar TMVar (Either SomeException a)
var)
where
cleanup :: Int -> IO ()
cleanup Int
h = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (TaskGroup -> TVar Int
avail TaskGroup
p) forall a. Enum a => a -> a
succ
Pool -> Int -> STM ()
cleanupTask (TaskGroup -> Pool
pool TaskGroup
p) Int
h
asyncUsingLazy :: TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsingLazy :: forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsingLazy TaskGroup
p IO () -> IO ThreadId
doFork IO a
action = do
Int
availSlots <- forall a. TVar a -> STM a
readTVar (TaskGroup -> TVar Int
avail TaskGroup
p)
Bool -> STM ()
check (Int
availSlots forall a. Ord a => a -> a -> Bool
> Int
0)
forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
doFork IO a
action
nextIdent :: Pool -> STM Int
nextIdent :: Pool -> STM Int
nextIdent Pool
p = do
Int
tok <- forall a. TVar a -> STM a
readTVar (Pool -> TVar Int
tokens Pool
p)
forall a. TVar a -> a -> STM ()
writeTVar (Pool -> TVar Int
tokens Pool
p) (forall a. Enum a => a -> a
succ Int
tok)
forall (m :: * -> *) a. Monad m => a -> m a
return Int
tok
cleanupTask :: Pool -> Handle -> STM ()
cleanupTask :: Pool -> Int -> STM ()
cleanupTask Pool
p Int
h =
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (Pool -> TVar TaskGraph
tasks Pool
p) forall a b. (a -> b) -> a -> b
$ \TaskGraph
g ->
case forall a b. [a] -> [b] -> [(a, b)]
zip (forall a. a -> [a]
repeat Int
h) (forall (gr :: * -> * -> *) a b. Graph gr => gr a b -> Int -> [Int]
Gr.suc TaskGraph
g Int
h) of
[] -> forall {gr :: * -> * -> *} {a} {b}.
Graph gr =>
Int -> gr a b -> gr a b
dropTask Int
h TaskGraph
g
[(Int, Int)]
es -> forall (gr :: * -> * -> *) b a.
DynGraph gr =>
[LEdge b] -> gr a b -> gr a b
insEdges (forall {a} {b}. [(a, b)] -> [(a, b, Status)]
completeEdges [(Int, Int)]
es) forall a b. (a -> b) -> a -> b
$ forall (gr :: * -> * -> *) a b.
DynGraph gr =>
[(Int, Int)] -> gr a b -> gr a b
delEdges [(Int, Int)]
es TaskGraph
g
where
completeEdges :: [(a, b)] -> [(a, b, Status)]
completeEdges = forall a b. (a -> b) -> [a] -> [b]
map (\(a
f, b
t) -> (a
f, b
t, Status
Completed))
dropTask :: Int -> gr a b -> gr a b
dropTask Int
k gr a b
gr = forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' gr a b -> Int -> gr a b
f (forall {gr :: * -> * -> *} {a} {b}.
Graph gr =>
Int -> gr a b -> gr a b
delNode Int
k gr a b
gr) (forall (gr :: * -> * -> *) a b. Graph gr => gr a b -> Int -> [Int]
Gr.pre gr a b
gr Int
k)
where
f :: gr a b -> Int -> gr a b
f gr a b
g Int
n = if forall (gr :: * -> * -> *) a b. Graph gr => gr a b -> Int -> Int
outdeg gr a b
g Int
n forall a. Eq a => a -> a -> Bool
== Int
0 then Int -> gr a b -> gr a b
dropTask Int
n gr a b
g else gr a b
g
withAsync :: TaskGroup -> IO a -> (Async a -> IO b) -> IO b
withAsync :: forall a b. TaskGroup -> IO a -> (Async a -> IO b) -> IO b
withAsync TaskGroup
p = forall a. a -> a
inline forall a b.
TaskGroup
-> (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO
withAsyncBound :: TaskGroup -> IO a -> (Async a -> IO b) -> IO b
withAsyncBound :: forall a b. TaskGroup -> IO a -> (Async a -> IO b) -> IO b
withAsyncBound TaskGroup
p = forall a b.
TaskGroup
-> (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing TaskGroup
p IO () -> IO ThreadId
forkOS
withAsyncOn :: TaskGroup -> Int -> IO a -> (Async a -> IO b) -> IO b
withAsyncOn :: forall a b. TaskGroup -> Int -> IO a -> (Async a -> IO b) -> IO b
withAsyncOn TaskGroup
p = forall a b.
TaskGroup
-> (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing TaskGroup
p forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO () -> IO ThreadId
rawForkOn
withAsyncWithUnmask :: TaskGroup -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncWithUnmask :: forall a b.
TaskGroup
-> ((forall b. IO b -> IO b) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncWithUnmask TaskGroup
p (forall b. IO b -> IO b) -> IO a
actionWith =
forall a b.
TaskGroup
-> (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO ((forall b. IO b -> IO b) -> IO a
actionWith forall b. IO b -> IO b
unsafeUnmask)
withAsyncOnWithUnmask :: TaskGroup -> Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncOnWithUnmask :: forall a b.
TaskGroup
-> Int
-> ((forall b. IO b -> IO b) -> IO a)
-> (Async a -> IO b)
-> IO b
withAsyncOnWithUnmask TaskGroup
p Int
cpu (forall b. IO b -> IO b) -> IO a
actionWith = forall a b.
TaskGroup
-> (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing TaskGroup
p (Int -> IO () -> IO ThreadId
rawForkOn Int
cpu) ((forall b. IO b -> IO b) -> IO a
actionWith forall b. IO b -> IO b
unsafeUnmask)
withAsyncUsing :: TaskGroup -> (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b)
-> IO b
withAsyncUsing :: forall a b.
TaskGroup
-> (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing TaskGroup
p IO () -> IO ThreadId
doFork = \IO a
action Async a -> IO b
inner -> do
forall b. ((forall b. IO b -> IO b) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore -> do
Async a
a <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
doFork forall a b. (a -> b) -> a -> b
$ forall b. IO b -> IO b
restore IO a
action
b
r <- forall b. IO b -> IO b
restore (Async a -> IO b
inner Async a
a) forall a. IO a -> (SomeException -> IO a) -> IO a
`catchAll` \SomeException
e -> do forall a. Async a -> IO ()
cancel Async a
a; forall e a. Exception e => e -> IO a
throwIO SomeException
e
forall a. Async a -> IO ()
cancel Async a
a
forall (m :: * -> *) a. Monad m => a -> m a
return b
r
{-# INLINE wait #-}
wait :: Async a -> IO a
wait :: forall a. Async a -> IO a
wait = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> STM a
waitSTM
{-# INLINE waitCatch #-}
waitCatch :: Async a -> IO (Either SomeException a)
waitCatch :: forall a. Async a -> IO (Either SomeException a)
waitCatch = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> STM (Either SomeException a)
waitCatchSTM
{-# INLINE poll #-}
poll :: Async a -> IO (Maybe (Either SomeException a))
poll :: forall a. Async a -> IO (Maybe (Either SomeException a))
poll = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> STM (Maybe (Either SomeException a))
pollSTM
waitSTM :: Async a -> STM a
waitSTM :: forall a. Async a -> STM a
waitSTM Async a
a = do
Either SomeException a
r <- forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async a
a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall e a. Exception e => e -> STM a
throwSTM forall (m :: * -> *) a. Monad m => a -> m a
return Either SomeException a
r
{-# INLINE waitCatchSTM #-}
waitCatchSTM :: Async a -> STM (Either SomeException a)
waitCatchSTM :: forall a. Async a -> STM (Either SomeException a)
waitCatchSTM (Async TaskGroup
_ Int
_ STM (Either SomeException a)
w) = STM (Either SomeException a)
w
{-# INLINE pollSTM #-}
pollSTM :: Async a -> STM (Maybe (Either SomeException a))
pollSTM :: forall a. Async a -> STM (Maybe (Either SomeException a))
pollSTM (Async TaskGroup
_ Int
_ STM (Either SomeException a)
w) = (forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (Either SomeException a)
w) forall a. STM a -> STM a -> STM a
`orElse` forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
{-# INLINE cancel #-}
cancel :: Async a -> IO ()
cancel :: forall a. Async a -> IO ()
cancel = forall a b c. (a -> b -> c) -> b -> a -> c
flip forall e a. Exception e => Async a -> e -> IO ()
cancelWith AsyncException
ThreadKilled
cancelWith' :: Exception e => Pool -> Handle -> e -> IO ()
cancelWith' :: forall e. Exception e => Pool -> Int -> e -> IO ()
cancelWith' Pool
p Int
h e
e =
(forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` e
e) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
TaskGraph
g <- forall a. TVar a -> STM a
readTVar (Pool -> TVar TaskGraph
tasks Pool
p)
let hs :: [Int]
hs = if forall (gr :: * -> * -> *) a b. Graph gr => Int -> gr a b -> Bool
gelem Int
h TaskGraph
g then TaskGraph -> Int -> [Int]
nodeList TaskGraph
g Int
h else []
[ThreadId]
xs <- forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM (TaskGraph -> [ThreadId] -> Int -> STM [ThreadId]
go TaskGraph
g) [] [Int]
hs
forall a. TVar a -> a -> STM ()
writeTVar (Pool -> TVar TaskGraph
tasks Pool
p) forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (forall a b c. (a -> b -> c) -> b -> a -> c
flip forall {gr :: * -> * -> *} {a} {b}.
Graph gr =>
Int -> gr a b -> gr a b
delNode) TaskGraph
g [Int]
hs
forall (m :: * -> *) a. Monad m => a -> m a
return [ThreadId]
xs
where
go :: TaskGraph -> [ThreadId] -> Int -> STM [ThreadId]
go TaskGraph
g [ThreadId]
acc Int
h' = forall b a. b -> (a -> b) -> Maybe a -> b
maybe [ThreadId]
acc (forall a. a -> [a] -> [a]
:[ThreadId]
acc) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TaskGraph -> Int -> STM (Maybe ThreadId)
getThreadId TaskGraph
g Int
h'
nodeList :: TaskGraph -> Node -> [Node]
nodeList :: TaskGraph -> Int -> [Int]
nodeList TaskGraph
g Int
k = Int
k forall a. a -> [a] -> [a]
: forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (TaskGraph -> Int -> [Int]
nodeList TaskGraph
g) (forall (gr :: * -> * -> *) a b. Graph gr => gr a b -> Int -> [Int]
Gr.suc TaskGraph
g Int
k)
cancelWith :: Exception e => Async a -> e -> IO ()
cancelWith :: forall e a. Exception e => Async a -> e -> IO ()
cancelWith (Async TaskGroup
p Int
h STM (Either SomeException a)
_) = forall e. Exception e => Pool -> Int -> e -> IO ()
cancelWith' (TaskGroup -> Pool
pool TaskGroup
p) Int
h
cancelAll :: TaskGroup -> IO ()
cancelAll :: TaskGroup -> IO ()
cancelAll TaskGroup
p = do
[Int]
hs <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
forall a. TVar a -> a -> STM ()
writeTVar (TaskGroup -> TVar (IntMap (IO ThreadId, SomeTMVar))
pending TaskGroup
p) forall a. IntMap a
IntMap.empty
TaskGraph
g <- forall a. TVar a -> STM a
readTVar (Pool -> TVar TaskGraph
tasks (TaskGroup -> Pool
pool TaskGroup
p))
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (gr :: * -> * -> *) a b. Graph gr => gr a b -> [Int]
nodes TaskGraph
g
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\Int
h -> forall e. Exception e => Pool -> Int -> e -> IO ()
cancelWith' (TaskGroup -> Pool
pool TaskGroup
p) Int
h AsyncException
ThreadKilled) [Int]
hs
waitAnyCatch :: [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatch :: forall a. [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatch [Async a]
asyncs =
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr forall a. STM a -> STM a -> STM a
orElse forall a. STM a
retry forall a b. (a -> b) -> a -> b
$
forall a b. (a -> b) -> [a] -> [b]
map (\Async a
a -> do Either SomeException a
r <- forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async a
a; forall (m :: * -> *) a. Monad m => a -> m a
return (Async a
a, Either SomeException a
r)) [Async a]
asyncs
waitAnyCatchCancel :: [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatchCancel :: forall a. [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatchCancel [Async a]
asyncs =
forall a. [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatch [Async a]
asyncs forall a b. IO a -> IO b -> IO a
`finally` forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall a. Async a -> IO ()
cancel [Async a]
asyncs
waitAny :: [Async a] -> IO (Async a, a)
waitAny :: forall a. [Async a] -> IO (Async a, a)
waitAny [Async a]
asyncs =
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr forall a. STM a -> STM a -> STM a
orElse forall a. STM a
retry forall a b. (a -> b) -> a -> b
$
forall a b. (a -> b) -> [a] -> [b]
map (\Async a
a -> do a
r <- forall a. Async a -> STM a
waitSTM Async a
a; forall (m :: * -> *) a. Monad m => a -> m a
return (Async a
a, a
r)) [Async a]
asyncs
waitAnyCancel :: [Async a] -> IO (Async a, a)
waitAnyCancel :: forall a. [Async a] -> IO (Async a, a)
waitAnyCancel [Async a]
asyncs =
forall a. [Async a] -> IO (Async a, a)
waitAny [Async a]
asyncs forall a b. IO a -> IO b -> IO a
`finally` forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall a. Async a -> IO ()
cancel [Async a]
asyncs
waitEitherCatch :: Async a -> Async b
-> IO (Either (Either SomeException a)
(Either SomeException b))
waitEitherCatch :: forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch Async a
left Async b
right =
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$
(forall a b. a -> Either a b
Left forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async a
left)
forall a. STM a -> STM a -> STM a
`orElse`
(forall a b. b -> Either a b
Right forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async b
right)
waitEitherCatchCancel :: Async a -> Async b
-> IO (Either (Either SomeException a)
(Either SomeException b))
waitEitherCatchCancel :: forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatchCancel Async a
left Async b
right =
forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch Async a
left Async b
right forall a b. IO a -> IO b -> IO a
`finally` (forall a. Async a -> IO ()
cancel Async a
left forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. Async a -> IO ()
cancel Async b
right)
waitEither :: Async a -> Async b -> IO (Either a b)
waitEither :: forall a b. Async a -> Async b -> IO (Either a b)
waitEither Async a
left Async b
right =
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$
(forall a b. a -> Either a b
Left forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Async a -> STM a
waitSTM Async a
left)
forall a. STM a -> STM a -> STM a
`orElse`
(forall a b. b -> Either a b
Right forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Async a -> STM a
waitSTM Async b
right)
waitEither_ :: Async a -> Async b -> IO ()
waitEither_ :: forall a b. Async a -> Async b -> IO ()
waitEither_ Async a
left Async b
right =
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$
(forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. Async a -> STM a
waitSTM Async a
left)
forall a. STM a -> STM a -> STM a
`orElse`
(forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. Async a -> STM a
waitSTM Async b
right)
waitEitherCancel :: Async a -> Async b -> IO (Either a b)
waitEitherCancel :: forall a b. Async a -> Async b -> IO (Either a b)
waitEitherCancel Async a
left Async b
right =
forall a b. Async a -> Async b -> IO (Either a b)
waitEither Async a
left Async b
right forall a b. IO a -> IO b -> IO a
`finally` (forall a. Async a -> IO ()
cancel Async a
left forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. Async a -> IO ()
cancel Async b
right)
waitBoth :: Async a -> Async b -> IO (a,b)
waitBoth :: forall a b. Async a -> Async b -> IO (a, b)
waitBoth Async a
left Async b
right =
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
a
a <- forall a. Async a -> STM a
waitSTM Async a
left
forall a. STM a -> STM a -> STM a
`orElse`
(forall a. Async a -> STM a
waitSTM Async b
right forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. STM a
retry)
b
b <- forall a. Async a -> STM a
waitSTM Async b
right
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a,b
b)
link :: Async a -> IO ()
link :: forall a. Async a -> IO ()
link (Async TaskGroup
_ Int
_ STM (Either SomeException a)
w) = do
ThreadId
me <- IO ThreadId
myThreadId
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. IO a -> IO ThreadId
forkRepeat forall a b. (a -> b) -> a -> b
$ do
Either SomeException a
r <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ STM (Either SomeException a)
w
case Either SomeException a
r of
Left SomeException
e -> forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
me SomeException
e
Either SomeException a
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
link2 :: Async a -> Async b -> IO ()
link2 :: forall a b. Async a -> Async b -> IO ()
link2 Async a
left Async b
right =
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. IO a -> IO ThreadId
forkRepeat forall a b. (a -> b) -> a -> b
$ do
Either (Either SomeException a) (Either SomeException b)
r <- forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch Async a
left Async b
right
case Either (Either SomeException a) (Either SomeException b)
r of
Left (Left SomeException
e) -> forall e a. Exception e => Async a -> e -> IO ()
cancelWith Async b
right SomeException
e
Right (Left SomeException
e) -> forall e a. Exception e => Async a -> e -> IO ()
cancelWith Async a
left SomeException
e
Either (Either SomeException a) (Either SomeException b)
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
race :: TaskGroup -> IO a -> IO b -> IO (Either a b)
race_ :: TaskGroup -> IO a -> IO b -> IO ()
concurrently :: TaskGroup -> IO a -> IO b -> IO (a,b)
#define USE_ASYNC_VERSIONS 1
#if USE_ASYNC_VERSIONS
race :: forall a b. TaskGroup -> IO a -> IO b -> IO (Either a b)
race TaskGroup
p IO a
left IO b
right =
forall a b. TaskGroup -> IO a -> (Async a -> IO b) -> IO b
withAsync TaskGroup
p IO a
left forall a b. (a -> b) -> a -> b
$ \Async a
a ->
forall a b. TaskGroup -> IO a -> (Async a -> IO b) -> IO b
withAsync TaskGroup
p IO b
right forall a b. (a -> b) -> a -> b
$ \Async b
b ->
forall a b. Async a -> Async b -> IO (Either a b)
waitEither Async a
a Async b
b
race_ :: forall a b. TaskGroup -> IO a -> IO b -> IO ()
race_ TaskGroup
p IO a
left IO b
right =
forall a b. TaskGroup -> IO a -> (Async a -> IO b) -> IO b
withAsync TaskGroup
p IO a
left forall a b. (a -> b) -> a -> b
$ \Async a
a ->
forall a b. TaskGroup -> IO a -> (Async a -> IO b) -> IO b
withAsync TaskGroup
p IO b
right forall a b. (a -> b) -> a -> b
$ \Async b
b ->
forall a b. Async a -> Async b -> IO ()
waitEither_ Async a
a Async b
b
concurrently :: forall a b. TaskGroup -> IO a -> IO b -> IO (a, b)
concurrently TaskGroup
p IO a
left IO b
right =
forall a b. TaskGroup -> IO a -> (Async a -> IO b) -> IO b
withAsync TaskGroup
p IO a
left forall a b. (a -> b) -> a -> b
$ \Async a
a ->
forall a b. TaskGroup -> IO a -> (Async a -> IO b) -> IO b
withAsync TaskGroup
p IO b
right forall a b. (a -> b) -> a -> b
$ \Async b
b ->
forall a b. Async a -> Async b -> IO (a, b)
waitBoth Async a
a Async b
b
#else
race left right = concurrently' left right collect
where
collect m = do
e <- takeMVar m
case e of
Left ex -> throwIO ex
Right r -> return r
race_ left right = void $ race left right
concurrently left right = concurrently' left right (collect [])
where
collect [Left a, Right b] _ = return (a,b)
collect [Right b, Left a] _ = return (a,b)
collect xs m = do
e <- takeMVar m
case e of
Left ex -> throwIO ex
Right r -> collect (r:xs) m
concurrently' :: IO a -> IO b
-> (MVar (Either SomeException (Either a b)) -> IO r)
-> IO r
concurrently' left right collect = do
done <- newEmptyMVar
mask $ \restore -> do
lid <- forkIO $ restore (left >>= putMVar done . Right . Left)
`catchAll` (putMVar done . Left)
rid <- forkIO $ restore (right >>= putMVar done . Right . Right)
`catchAll` (putMVar done . Left)
let stop = killThread lid >> killThread rid
r <- restore (collect done) `onException` stop
stop
return r
#endif
newtype Concurrently a = Concurrently { forall a. Concurrently a -> TaskGroup -> IO a
runConcurrently :: TaskGroup -> IO a }
instance Functor Concurrently where
fmap :: forall a b. (a -> b) -> Concurrently a -> Concurrently b
fmap a -> b
f (Concurrently TaskGroup -> IO a
a) = forall a. (TaskGroup -> IO a) -> Concurrently a
Concurrently forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TaskGroup -> IO a
a
instance Applicative Concurrently where
pure :: forall a. a -> Concurrently a
pure a
x = forall a. (TaskGroup -> IO a) -> Concurrently a
Concurrently forall a b. (a -> b) -> a -> b
$ \TaskGroup
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return a
x
Concurrently TaskGroup -> IO (a -> b)
fs <*> :: forall a b.
Concurrently (a -> b) -> Concurrently a -> Concurrently b
<*> Concurrently TaskGroup -> IO a
as =
forall a. (TaskGroup -> IO a) -> Concurrently a
Concurrently forall a b. (a -> b) -> a -> b
$ \TaskGroup
tg -> (\(a -> b
f, a
a) -> a -> b
f a
a) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a b. TaskGroup -> IO a -> IO b -> IO (a, b)
concurrently TaskGroup
tg (TaskGroup -> IO (a -> b)
fs TaskGroup
tg) (TaskGroup -> IO a
as TaskGroup
tg)
instance Alternative Concurrently where
empty :: forall a. Concurrently a
empty = forall a. (TaskGroup -> IO a) -> Concurrently a
Concurrently forall a b. (a -> b) -> a -> b
$ \TaskGroup
_ -> forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Int -> IO ()
threadDelay forall a. Bounded a => a
maxBound)
Concurrently TaskGroup -> IO a
as <|> :: forall a. Concurrently a -> Concurrently a -> Concurrently a
<|> Concurrently TaskGroup -> IO a
bs =
forall a. (TaskGroup -> IO a) -> Concurrently a
Concurrently forall a b. (a -> b) -> a -> b
$ \TaskGroup
tg -> forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall a. a -> a
id forall a. a -> a
id forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a b. TaskGroup -> IO a -> IO b -> IO (Either a b)
race TaskGroup
tg (TaskGroup -> IO a
as TaskGroup
tg) (TaskGroup -> IO a
bs TaskGroup
tg)
forkRepeat :: IO a -> IO ThreadId
forkRepeat :: forall a. IO a -> IO ThreadId
forkRepeat IO a
action =
forall b. ((forall b. IO b -> IO b) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore ->
let go :: IO ()
go = do Either SomeException a
r <- forall a. IO a -> IO (Either SomeException a)
tryAll (forall b. IO b -> IO b
restore IO a
action)
case Either SomeException a
r of
Left SomeException
_ -> IO ()
go
Either SomeException a
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
in IO () -> IO ThreadId
forkIO IO ()
go
catchAll :: IO a -> (SomeException -> IO a) -> IO a
catchAll :: forall a. IO a -> (SomeException -> IO a) -> IO a
catchAll = forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch
tryAll :: IO a -> IO (Either SomeException a)
tryAll :: forall a. IO a -> IO (Either SomeException a)
tryAll = forall e a. Exception e => IO a -> IO (Either e a)
try
{-# INLINE rawForkIO #-}
rawForkIO :: IO () -> IO ThreadId
#if MIN_VERSION_base(4,17,0)
rawForkIO (IO action) = IO $ \ s ->
#else
rawForkIO :: IO () -> IO ThreadId
rawForkIO IO ()
action = forall a. (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
IO forall a b. (a -> b) -> a -> b
$ \ State# RealWorld
s ->
#endif
case (forall a.
a -> State# RealWorld -> (# State# RealWorld, ThreadId# #)
fork# IO ()
action State# RealWorld
s) of (# State# RealWorld
s1, ThreadId#
tid #) -> (# State# RealWorld
s1, ThreadId# -> ThreadId
ThreadId ThreadId#
tid #)
{-# INLINE rawForkOn #-}
rawForkOn :: Int -> IO () -> IO ThreadId
#if MIN_VERSION_base(4,17,0)
rawForkOn (I# cpu) (IO action) = IO $ \ s ->
#else
rawForkOn :: Int -> IO () -> IO ThreadId
rawForkOn (I# Int#
cpu) IO ()
action = forall a. (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
IO forall a b. (a -> b) -> a -> b
$ \ State# RealWorld
s ->
#endif
case (forall a.
Int# -> a -> State# RealWorld -> (# State# RealWorld, ThreadId# #)
forkOn# Int#
cpu IO ()
action State# RealWorld
s) of (# State# RealWorld
s1, ThreadId#
tid #) -> (# State# RealWorld
s1, ThreadId# -> ThreadId
ThreadId ThreadId#
tid #)