{-# LANGUAGE CPP, MagicHash, UnboxedTuples, RankNTypes, GADTs #-}
#if __GLASGOW_HASKELL__ >= 701
{-# LANGUAGE Trustworthy #-}
#endif
{-# OPTIONS -Wall #-}

-----------------------------------------------------------------------------
-- |
-- Module      :  Control.Concurrent.Async
-- Copyright   :  (c) Simon Marlow 2012
-- License     :  BSD3 (see the file LICENSE)
--
-- Maintainer  :  Simon Marlow <marlowsd@gmail.com>
-- Stability   :  provisional
-- Portability :  non-portable (requires concurrency)
--
-- This module provides a set of operations for running IO operations
-- asynchronously and waiting for their results.  It is a thin layer
-- over the basic concurrency operations provided by
-- "Control.Concurrent".  The main additional functionality it
-- provides is the ability to wait for the return value of a thread,
-- but the interface also provides some additional safety and
-- robustness over using threads and @MVar@ directly.
--
-- The basic type is @'Async' a@, which represents an asynchronous
-- @IO@ action that will return a value of type @a@, or die with an
-- exception.  An @Async@ corresponds to a thread, and its 'ThreadId'
-- can be obtained with 'asyncThreadId', although that should rarely
-- be necessary.
--
-- For example, to fetch two web pages at the same time, we could do
-- this (assuming a suitable @getURL@ function):
--
-- >    do a1 <- async (getURL url1)
-- >       a2 <- async (getURL url2)
-- >       page1 <- wait a1
-- >       page2 <- wait a2
-- >       ...
--
-- where 'async' starts the operation in a separate thread, and
-- 'wait' waits for and returns the result.  If the operation
-- throws an exception, then that exception is re-thrown by
-- 'wait'.  This is one of the ways in which this library
-- provides some additional safety: it is harder to accidentally
-- forget about exceptions thrown in child threads.
--
-- A slight improvement over the previous example is this:
--
-- >       withAsync (getURL url1) $ \a1 -> do
-- >       withAsync (getURL url2) $ \a2 -> do
-- >       page1 <- wait a1
-- >       page2 <- wait a2
-- >       ...
--
-- 'withAsync' is like 'async', except that the 'Async' is
-- automatically killed (using 'cancel') if the enclosing IO operation
-- returns before it has completed.  Consider the case when the first
-- 'wait' throws an exception; then the second 'Async' will be
-- automatically killed rather than being left to run in the
-- background, possibly indefinitely.  This is the second way that the
-- library provides additional safety: using 'withAsync' means we can
-- avoid accidentally leaving threads running.  Furthermore,
-- 'withAsync' allows a tree of threads to be built, such that
-- children are automatically killed if their parents die for any
-- reason.
--
-- The pattern of performing two IO actions concurrently and waiting
-- for their results is packaged up in a combinator 'concurrently', so
-- we can further shorten the above example to:
--
-- >       (page1, page2) <- concurrently (getURL url1) (getURL url2)
-- >       ...
--
-- The 'Functor' instance can be used to change the result of an
-- 'Async'.  For example:
--
-- > ghci> a <- async (return 3)
-- > ghci> wait a
-- > 3
-- > ghci> wait (fmap (+1) a)
-- > 4

-----------------------------------------------------------------------------

module Control.Concurrent.Async.Pool.Async
    ( module Control.Concurrent.Async.Pool.Async
    , module Gr
    ) where

import Control.Concurrent.STM
import Control.Exception
import Control.Concurrent
import Control.Applicative
import Control.Monad hiding (forM, forM_, mapM, mapM_)
import Data.Foldable
import Data.Graph.Inductive.Graph as Gr hiding ((&))
import Data.Graph.Inductive.PatriciaTree as Gr
import Data.Graph.Inductive.Query.BFS as Gr
import Data.IntMap (IntMap)
import qualified Data.IntMap as IntMap
import Data.Traversable
import Prelude hiding (mapM_, mapM, foldr, all, any, concatMap, foldl1)

import GHC.Exts
import GHC.IO hiding (finally, onException)
import GHC.Conc

-- | A 'Handle' is a unique identifier for a task submitted to a 'Pool'.
type Handle    = Node
data State     = Ready | Starting | forall a. Started ThreadId (TMVar a)
data Status    = Pending | Completed deriving (Eq, Show)
type TaskGraph = Gr (TVar State) Status

instance Eq State where
    Ready        == Ready        = True
    Starting     == Starting     = True
    Started n1 _ == Started n2 _ = n1 == n2
    _            == _            = False

instance Show State where
    show Ready         = "Ready"
    show Starting      = "Starting"
    show (Started n _) = "Started " ++ show n

-- | A 'Pool' manages a collection of possibly interdependent tasks, such that
--   tasks await execution until the tasks they depend on have finished (and
--   tasks may depend on an arbitrary number of other tasks), while
--   independent tasks execute concurrently up to the number of available
--   resource slots in the pool.
--
--   Results from each task are available until the status of the task is
--   polled or waited on.  Further, the results are kept until that occurs, so
--   failing to ever wait will result in a memory leak.
--
--   Tasks may be cancelled, in which case all dependent tasks are
--   unscheduled.
data Pool = Pool
    { tasks :: TVar TaskGraph
      -- ^ The task graph represents a partially ordered set P with subset S
      --   such that for every x ∈ S and y ∈ P, either x ≤ y or x is unrelated
      --   to y.  Stated more simply, S is the set of least elements of all
      --   maximal chains in P.  In our case, ≤ relates two uncompleted tasks
      --   by dependency.  Therefore, S is equal to the set of tasks which may
      --   execute concurrently, as none of them have incomplete dependencies.
      --
      --   We use a graph representation to make determination of S more
      --   efficient (where S is just the set of roots in P expressed as a
      --   graph).  Completion status is recorded on the edges, and nodes are
      --   removed from the graph once no other incomplete node depends on
      --   them.
    , tokens :: TVar Int
      -- ^ Tokens identify tasks, and are provisioned monotonically.
    }

waitTMVar :: TMVar a -> STM ()
waitTMVar tv = do
    _ <- readTMVar tv
    return ()

syncPool :: Pool -> STM ()
syncPool p = do
    g <- readTVar (tasks p)
    forM_ (labNodes g) $ \(_h, st) -> do
        x <- readTVar st
        case x of
            Started _tid v -> waitTMVar v
            _ -> retry

data TaskGroup = TaskGroup
    { pool    :: Pool
    , avail   :: TVar Int
      -- ^ The number of available execution slots in the pool.
    , pending :: forall a. TVar (IntMap (IO ThreadId, TMVar a))
      -- ^ Nodes in the task graph that are waiting to start.
    }

-- -----------------------------------------------------------------------------
-- STM Async API


-- | An asynchronous action spawned by 'async' or 'withAsync'.
-- Asynchronous actions are executed in a separate thread, and
-- operations are provided for waiting for asynchronous actions to
-- complete and obtaining their results (see e.g. 'wait').
--
data Async a = Async
    { taskGroup  :: TaskGroup
    , taskHandle :: {-# UNPACK #-} !Handle
    , _asyncWait :: STM (Either SomeException a)
    }

getTaskVar :: TaskGraph -> Handle -> TVar State
getTaskVar g h = let (_to, _, t, _from) = context g h in t

getThreadId :: TaskGraph -> Node -> STM (Maybe ThreadId)
getThreadId g h = do
    status <- readTVar (getTaskVar g h)
    case status of
        Ready       -> return Nothing
        Starting    -> retry
        Started x _ -> return $ Just x

instance Eq (Async a) where
  Async _ a _ == Async _ b _  =  a == b

instance Ord (Async a) where
  Async _ a _ `compare` Async _ b _  =  a `compare` b

instance Functor Async where
  fmap f (Async p a w) = Async p a (fmap (fmap f) w)


-- | Spawn an asynchronous action in a separate thread.
async :: TaskGroup -> IO a -> IO (Async a)
async p = atomically . inline asyncUsing p rawForkIO

-- | Like 'async' but using 'forkOS' internally.
asyncBound :: TaskGroup -> IO a -> IO (Async a)
asyncBound p = atomically . asyncUsing p forkOS

-- | Like 'async' but using 'forkOn' internally.
asyncOn :: TaskGroup -> Int -> IO a -> IO (Async a)
asyncOn p = (atomically .) . asyncUsing p . rawForkOn

-- | Like 'async' but using 'forkIOWithUnmask' internally.
-- The child thread is passed a function that can be used to unmask asynchronous exceptions.
asyncWithUnmask :: TaskGroup -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncWithUnmask p actionWith =
    atomically $ asyncUsing p rawForkIO (actionWith unsafeUnmask)

-- | Like 'asyncOn' but using 'forkOnWithUnmask' internally.
-- The child thread is passed a function that can be used to unmask asynchronous exceptions.
asyncOnWithUnmask :: TaskGroup -> Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncOnWithUnmask p cpu actionWith =
    atomically $ asyncUsing p (rawForkOn cpu) (actionWith unsafeUnmask)

asyncUsing :: TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing p doFork action = do
    h <- nextIdent (pool p)

    var <- newEmptyTMVar
    let start = mask $ \restore ->
            doFork $ try (restore (action `finally` cleanup h))
                >>= atomically . putTMVar var

    modifyTVar (pending p) (IntMap.insert h (start, var))
    tv <- newTVar Ready
    modifyTVar (tasks (pool p)) (insNode (h, tv))

    return $ Async p h (readTMVar var)
  where
    cleanup h = atomically $ do
        modifyTVar (avail p) succ
        cleanupTask (pool p) h

-- | Return the next available thread identifier from the pool.  These are
--   monotonically increasing integers.
nextIdent :: Pool -> STM Int
nextIdent p = do
    tok <- readTVar (tokens p)
    writeTVar (tokens p) (succ tok)
    return tok

cleanupTask :: Pool -> Handle -> STM ()
cleanupTask p h =
    -- Once the task is done executing, we must alter the graph so any
    -- dependent children will know their parent has completed.
    modifyTVar (tasks p) $ \g ->
        case zip (repeat h) (Gr.suc g h) of
            -- If nothing dependend on this task and if the final result value
            -- has been observed, prune it from the graph, as well as any
            -- parents which now have no dependents.  Otherwise mark the edges
            -- as Completed so dependent children can execute.
            [] -> dropTask h g
            es -> insEdges (completeEdges es) $ delEdges es g
  where
    completeEdges = map (\(f, t) -> (f, t, Completed))

    dropTask k gr = foldl' f (delNode k gr) (Gr.pre gr k)
      where
        f g n = if outdeg g n == 0 then dropTask n g else g

-- | Spawn an asynchronous action in a separate thread, and pass its
-- @Async@ handle to the supplied function.  When the function returns
-- or throws an exception, 'cancel' is called on the @Async@.
--
-- > withAsync action inner = bracket (async action) cancel inner
--
-- This is a useful variant of 'async' that ensures an @Async@ is
-- never left running unintentionally.
--
-- Since 'cancel' may block, 'withAsync' may also block; see 'cancel'
-- for details.
--
withAsync :: TaskGroup -> IO a -> (Async a -> IO b) -> IO b
withAsync p = inline withAsyncUsing p rawForkIO

-- | Like 'withAsync' but uses 'forkOS' internally.
withAsyncBound :: TaskGroup -> IO a -> (Async a -> IO b) -> IO b
withAsyncBound p = withAsyncUsing p forkOS

-- | Like 'withAsync' but uses 'forkOn' internally.
withAsyncOn :: TaskGroup -> Int -> IO a -> (Async a -> IO b) -> IO b
withAsyncOn p = withAsyncUsing p . rawForkOn

-- | Like 'withAsync' but uses 'forkIOWithUnmask' internally.
-- The child thread is passed a function that can be used to unmask asynchronous exceptions.
withAsyncWithUnmask :: TaskGroup -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncWithUnmask p actionWith =
    withAsyncUsing p rawForkIO (actionWith unsafeUnmask)

-- | Like 'withAsyncOn' but uses 'forkOnWithUnmask' internally.
-- The child thread is passed a function that can be used to unmask asynchronous exceptions
withAsyncOnWithUnmask :: TaskGroup -> Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncOnWithUnmask p cpu actionWith = withAsyncUsing p (rawForkOn cpu) (actionWith unsafeUnmask)

withAsyncUsing :: TaskGroup -> (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b)
               -> IO b
-- The bracket version works, but is slow.  We can do better by
-- hand-coding it:
withAsyncUsing p doFork = \action inner -> do
  mask $ \restore -> do
    a <- atomically $ asyncUsing p doFork $ restore action
    r <- restore (inner a) `catchAll` \e -> do cancel a; throwIO e
    cancel a
    return r

-- | Wait for an asynchronous action to complete, and return its
-- value.  If the asynchronous action threw an exception, then the
-- exception is re-thrown by 'wait'.
--
-- > wait = atomically . waitSTM
--
{-# INLINE wait #-}
wait :: Async a -> IO a
wait = atomically . waitSTM

-- | Wait for an asynchronous action to complete, and return either
-- @Left e@ if the action raised an exception @e@, or @Right a@ if it
-- returned a value @a@.
--
-- > waitCatch = atomically . waitCatchSTM
--
{-# INLINE waitCatch #-}
waitCatch :: Async a -> IO (Either SomeException a)
waitCatch = atomically . waitCatchSTM

-- | Check whether an 'Async' has completed yet.  If it has not
-- completed yet, then the result is @Nothing@, otherwise the result
-- is @Just e@ where @e@ is @Left x@ if the @Async@ raised an
-- exception @x@, or @Right a@ if it returned a value @a@.
--
-- > poll = atomically . pollSTM
--
{-# INLINE poll #-}
poll :: Async a -> IO (Maybe (Either SomeException a))
poll = atomically . pollSTM

-- | A version of 'wait' that can be used inside an STM transaction.
--
waitSTM :: Async a -> STM a
waitSTM a = do
   r <- waitCatchSTM a
   either throwSTM return r

-- | A version of 'waitCatch' that can be used inside an STM transaction.
--
{-# INLINE waitCatchSTM #-}
waitCatchSTM :: Async a -> STM (Either SomeException a)
waitCatchSTM (Async _ _ w) = w

-- | A version of 'poll' that can be used inside an STM transaction.
--
{-# INLINE pollSTM #-}
pollSTM :: Async a -> STM (Maybe (Either SomeException a))
pollSTM (Async _ _ w) = (Just <$> w) `orElse` return Nothing

-- | Cancel an asynchronous action by throwing the @ThreadKilled@
-- exception to it.  Has no effect if the 'Async' has already
-- completed.
--
-- > cancel a = throwTo (asyncThreadId a) ThreadKilled
--
-- Note that 'cancel' is synchronous in the same sense as 'throwTo'.
-- It does not return until the exception has been thrown in the
-- target thread, or the target thread has completed.  In particular,
-- if the target thread is making a foreign call, the exception will
-- not be thrown until the foreign call returns, and in this case
-- 'cancel' may block indefinitely.  An asynchronous 'cancel' can
-- of course be obtained by wrapping 'cancel' itself in 'async'.
--
{-# INLINE cancel #-}
cancel :: Async a -> IO ()
cancel = flip cancelWith ThreadKilled

-- | Cancel an asynchronous action by throwing the supplied exception
-- to it.
--
-- > cancelWith a x = throwTo (asyncThreadId a) x
--
-- The notes about the synchronous nature of 'cancel' also apply to
-- 'cancelWith'.
cancelWith' :: Exception e => Pool -> Handle -> e -> IO ()
cancelWith' p h e =
    (mapM_ (`throwTo` e) =<<) $ atomically $ do
        g <- readTVar (tasks p)
        let hs = if gelem h g then nodeList g h else []
        xs <- foldM (go g) [] hs
        writeTVar (tasks p) $ foldl' (flip delNode) g hs
        return xs
  where
    go g acc h' = maybe acc (:acc) <$> getThreadId g h'

    nodeList :: TaskGraph -> Node -> [Node]
    nodeList g k = k : concatMap (nodeList g) (Gr.suc g k)

cancelWith :: Exception e => Async a -> e -> IO ()
cancelWith (Async p h _) = cancelWith' (pool p) h

-- | Cancel an asynchronous action by throwing the @ThreadKilled@ exception to
--   it, or unregistering it from the task pool if it had not started yet.  Has
--   no effect if the 'Async' has already completed.
--
-- Note that 'cancel' is synchronous in the same sense as 'throwTo'.  It does
-- not return until the exception has been thrown in the target thread, or the
-- target thread has completed.  In particular, if the target thread is making
-- a foreign call, the exception will not be thrown until the foreign call
-- returns, and in this case 'cancel' may block indefinitely.  An asynchronous
-- 'cancel' can of course be obtained by wrapping 'cancel' itself in 'async'.
cancelAll :: TaskGroup -> IO ()
cancelAll p = do
    hs <- atomically $ do
        writeTVar (pending p) IntMap.empty
        g <- readTVar (tasks (pool p))
        return $ nodes g
    mapM_ (\h -> cancelWith' (pool p) h ThreadKilled) hs

-- | Wait for any of the supplied asynchronous operations to complete.
-- The value returned is a pair of the 'Async' that completed, and the
-- result that would be returned by 'wait' on that 'Async'.
--
-- If multiple 'Async's complete or have completed, then the value
-- returned corresponds to the first completed 'Async' in the list.
--
waitAnyCatch :: [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatch asyncs =
  atomically $
    foldr orElse retry $
      map (\a -> do r <- waitCatchSTM a; return (a, r)) asyncs

-- | Like 'waitAnyCatch', but also cancels the other asynchronous
-- operations as soon as one has completed.
--
waitAnyCatchCancel :: [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatchCancel asyncs =
  waitAnyCatch asyncs `finally` mapM_ cancel asyncs

-- | Wait for any of the supplied @Async@s to complete.  If the first
-- to complete throws an exception, then that exception is re-thrown
-- by 'waitAny'.
--
-- If multiple 'Async's complete or have completed, then the value
-- returned corresponds to the first completed 'Async' in the list.
--
waitAny :: [Async a] -> IO (Async a, a)
waitAny asyncs =
  atomically $
    foldr orElse retry $
      map (\a -> do r <- waitSTM a; return (a, r)) asyncs

-- | Like 'waitAny', but also cancels the other asynchronous
-- operations as soon as one has completed.
--
waitAnyCancel :: [Async a] -> IO (Async a, a)
waitAnyCancel asyncs =
  waitAny asyncs `finally` mapM_ cancel asyncs

-- | Wait for the first of two @Async@s to finish.
waitEitherCatch :: Async a -> Async b
                -> IO (Either (Either SomeException a)
                              (Either SomeException b))
waitEitherCatch left right =
  atomically $
    (Left  <$> waitCatchSTM left)
      `orElse`
    (Right <$> waitCatchSTM right)

-- | Like 'waitEitherCatch', but also 'cancel's both @Async@s before
-- returning.
--
waitEitherCatchCancel :: Async a -> Async b
                      -> IO (Either (Either SomeException a)
                                    (Either SomeException b))
waitEitherCatchCancel left right =
  waitEitherCatch left right `finally` (cancel left >> cancel right)

-- | Wait for the first of two @Async@s to finish.  If the @Async@
-- that finished first raised an exception, then the exception is
-- re-thrown by 'waitEither'.
--
waitEither :: Async a -> Async b -> IO (Either a b)
waitEither left right =
  atomically $
    (Left  <$> waitSTM left)
      `orElse`
    (Right <$> waitSTM right)

-- | Like 'waitEither', but the result is ignored.
--
waitEither_ :: Async a -> Async b -> IO ()
waitEither_ left right =
  atomically $
    (void $ waitSTM left)
      `orElse`
    (void $ waitSTM right)

-- | Like 'waitEither', but also 'cancel's both @Async@s before
-- returning.
--
waitEitherCancel :: Async a -> Async b -> IO (Either a b)
waitEitherCancel left right =
  waitEither left right `finally` (cancel left >> cancel right)

-- | Waits for both @Async@s to finish, but if either of them throws
-- an exception before they have both finished, then the exception is
-- re-thrown by 'waitBoth'.
--
waitBoth :: Async a -> Async b -> IO (a,b)
waitBoth left right =
  atomically $ do
    a <- waitSTM left
           `orElse`
         (waitSTM right >> retry)
    b <- waitSTM right
    return (a,b)


-- | Link the given @Async@ to the current thread, such that if the
-- @Async@ raises an exception, that exception will be re-thrown in
-- the current thread.
--
link :: Async a -> IO ()
link (Async _ _ w) = do
  me <- myThreadId
  void $ forkRepeat $ do
     r <- atomically $ w
     case r of
       Left e -> throwTo me e
       _ -> return ()

-- | Link two @Async@s together, such that if either raises an
-- exception, the same exception is re-thrown in the other @Async@.
--
link2 :: Async a -> Async b -> IO ()
link2 left right =
  void $ forkRepeat $ do
    r <- waitEitherCatch left right
    case r of
      Left  (Left e) -> cancelWith right e
      Right (Left e) -> cancelWith left e
      _ -> return ()


-- -----------------------------------------------------------------------------

-- | Run two @IO@ actions concurrently, and return the first to
-- finish.  The loser of the race is 'cancel'led.
--
-- > race left right =
-- >   withAsync left $ \a ->
-- >   withAsync right $ \b ->
-- >   waitEither a b
--
race :: TaskGroup -> IO a -> IO b -> IO (Either a b)

-- | Like 'race', but the result is ignored.
--
race_ :: TaskGroup -> IO a -> IO b -> IO ()

-- | Run two @IO@ actions concurrently, and return both results.  If
-- either action throws an exception at any time, then the other
-- action is 'cancel'led, and the exception is re-thrown by
-- 'concurrently'.
--
-- > concurrently left right =
-- >   withAsync left $ \a ->
-- >   withAsync right $ \b ->
-- >   waitBoth a b
concurrently :: TaskGroup -> IO a -> IO b -> IO (a,b)

#define USE_ASYNC_VERSIONS 1

#if USE_ASYNC_VERSIONS

race p left right =
  withAsync p left $ \a ->
  withAsync p right $ \b ->
  waitEither a b

race_ p left right =
  withAsync p left $ \a ->
  withAsync p right $ \b ->
  waitEither_ a b

concurrently p left right =
  withAsync p left $ \a ->
  withAsync p right $ \b ->
  waitBoth a b

#else

-- MVar versions of race/concurrently
-- More ugly than the Async versions, but quite a bit faster.

-- race :: IO a -> IO b -> IO (Either a b)
race left right = concurrently' left right collect
  where
    collect m = do
        e <- takeMVar m
        case e of
            Left ex -> throwIO ex
            Right r -> return r

-- race_ :: IO a -> IO b -> IO ()
race_ left right = void $ race left right

-- concurrently :: IO a -> IO b -> IO (a,b)
concurrently left right = concurrently' left right (collect [])
  where
    collect [Left a, Right b] _ = return (a,b)
    collect [Right b, Left a] _ = return (a,b)
    collect xs m = do
        e <- takeMVar m
        case e of
            Left ex -> throwIO ex
            Right r -> collect (r:xs) m

concurrently' :: IO a -> IO b
             -> (MVar (Either SomeException (Either a b)) -> IO r)
             -> IO r
concurrently' left right collect = do
    done <- newEmptyMVar
    mask $ \restore -> do
        lid <- forkIO $ restore (left >>= putMVar done . Right . Left)
                             `catchAll` (putMVar done . Left)
        rid <- forkIO $ restore (right >>= putMVar done . Right . Right)
                             `catchAll` (putMVar done . Left)
        let stop = killThread lid >> killThread rid
        r <- restore (collect done) `onException` stop
        stop
        return r

#endif

-- -----------------------------------------------------------------------------

-- | A value of type @Concurrently a@ is an @IO@ operation that can be
-- composed with other @Concurrently@ values, using the @Applicative@
-- and @Alternative@ instances.
--
-- Calling @runConcurrently@ on a value of type @Concurrently a@ will
-- execute the @IO@ operations it contains concurrently, before
-- delivering the result of type @a@.
--
-- For example
--
-- > (page1, page2, page3)
-- >     <- runConcurrently $ (,,)
-- >     <$> Concurrently (getURL "url1")
-- >     <*> Concurrently (getURL "url2")
-- >     <*> Concurrently (getURL "url3")
--
newtype Concurrently a = Concurrently { runConcurrently :: TaskGroup -> IO a }

instance Functor Concurrently where
  fmap f (Concurrently a) = Concurrently $ fmap f <$> a

instance Applicative Concurrently where
  pure x = Concurrently $ \_ -> return x
  Concurrently fs <*> Concurrently as =
    Concurrently $ \tg -> (\(f, a) -> f a) <$> concurrently tg (fs tg) (as tg)

instance Alternative Concurrently where
  empty = Concurrently $ \_ -> forever (threadDelay maxBound)
  Concurrently as <|> Concurrently bs =
    Concurrently $ \tg -> either id id <$> race tg (as tg) (bs tg)

-- ----------------------------------------------------------------------------

-- | Fork a thread that runs the supplied action, and if it raises an
-- exception, re-runs the action.  The thread terminates only when the
-- action runs to completion without raising an exception.
forkRepeat :: IO a -> IO ThreadId
forkRepeat action =
  mask $ \restore ->
    let go = do r <- tryAll (restore action)
                case r of
                  Left _ -> go
                  _      -> return ()
    in forkIO go

catchAll :: IO a -> (SomeException -> IO a) -> IO a
catchAll = catch

tryAll :: IO a -> IO (Either SomeException a)
tryAll = try

-- A version of forkIO that does not include the outer exception
-- handler: saves a bit of time when we will be installing our own
-- exception handler.
{-# INLINE rawForkIO #-}
rawForkIO :: IO () -> IO ThreadId
rawForkIO action = IO $ \ s ->
   case (fork# action s) of (# s1, tid #) -> (# s1, ThreadId tid #)

{-# INLINE rawForkOn #-}
rawForkOn :: Int -> IO () -> IO ThreadId
rawForkOn (I# cpu) action = IO $ \ s ->
   case (forkOn# cpu action s) of (# s1, tid #) -> (# s1, ThreadId tid #)