module Control.Concurrent.Async.Extra where
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent
import Control.Concurrent.MSem (new, with)
import Data.Traversable
import Control.Applicative
import Control.Monad
sequencePool :: Traversable t => Int -> t (IO a) -> IO (t a)
sequencePool max xs = do
sem <- new max
runConcurrently $ traverse (Concurrently . with sem) xs
mapPool :: Traversable t => Int -> (a -> IO b) -> t a -> IO (t b)
mapPool max f xs = do
sem <- new max
mapConcurrently (with sem . f) xs
sequenceConcurrently :: Traversable t => t (IO a) -> IO (t a)
sequenceConcurrently = runConcurrently . traverse Concurrently
fixAsync :: (Async a -> IO a) -> IO (Async a)
fixAsync f = mdo
this <- async $ f this
return this
withParent :: Async a -> IO b -> IO (Async b)
withParent parent act = async $ link parent >> act
newtype Promise a = Promise { unPromise :: IO a }
deriving (Functor)
instance Applicative Promise where
pure = Promise . return
Promise f <*> Promise x = Promise $ uncurry ($) <$> concurrently f x
instance Alternative Promise where
empty = Promise $ forever (threadDelay maxBound)
Promise x <|> Promise y = Promise $ either id id <$> race x y
instance Monad Promise where
return = pure
Promise m >>= f = Promise $ async m >>= wait >>= unPromise . f
instance MonadPlus Promise where
mzero = empty
mplus = (<|>)