module Control.Monad.Parallel
(
MonadParallel(..), MonadFork(..),
bindM3,
ap, forM, forM_, liftM2, liftM3, mapM, mapM_, replicateM, replicateM_, sequence, sequence_
)
where
import Prelude ()
import Control.Concurrent (forkIO)
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar, readMVar)
import Control.Exception (SomeException, throwIO, mask, try)
import Control.Monad (Monad, (>>=), return, liftM)
import Control.Monad.Trans.Identity (IdentityT(IdentityT, runIdentityT))
import Control.Monad.Trans.Maybe (MaybeT(MaybeT, runMaybeT))
import Control.Monad.Trans.Except (ExceptT(ExceptT), runExceptT)
import Control.Monad.Trans.List (ListT(ListT, runListT))
import Control.Monad.Trans.Reader (ReaderT(ReaderT, runReaderT))
import Control.Parallel (par, pseq)
import Data.Either (Either(..), either)
import Data.Function (($), (.), const)
import Data.Functor.Identity (Identity)
import Data.Int (Int)
import Data.List ((++), foldr, map, replicate)
import Data.Maybe (Maybe(Just, Nothing))
import System.IO (IO)
class Monad m => MonadParallel m where
bindM2 :: (a -> b -> m c) -> m a -> m b -> m c
bindM2 f ma mb = let ma' = ma >>= return
mb' = mb >>= return
in ma' `par` (mb' `pseq` do {a <- ma'; b <- mb'; f a b})
class MonadParallel m => MonadFork m where
forkExec :: m a -> m (m a)
forkExec e = let result = e >>= return
in result `par` (return result)
bindM3 :: MonadParallel m => (a -> b -> c -> m d) -> m a -> m b -> m c -> m d
bindM3 f ma mb mc = bindM2 (\f' c-> f' c) (liftM2 f ma mb) mc
liftM2 :: MonadParallel m => (a -> b -> c) -> m a -> m b -> m c
liftM2 f m1 m2 = bindM2 (\a b-> return (f a b)) m1 m2
liftM3 :: (MonadParallel m) => (a1 -> a2 -> a3 -> r) -> m a1 -> m a2 -> m a3 -> m r
liftM3 f m1 m2 m3 = bindM3 (\a b c-> return (f a b c)) m1 m2 m3
ap :: MonadParallel m => m (a -> b) -> m a -> m b
ap mf ma = bindM2 (\f a-> return (f a)) mf ma
sequence :: MonadParallel m => [m a] -> m [a]
sequence ms = foldr k (return []) ms where
k m m' = liftM2 (:) m m'
sequence_ :: MonadParallel m => [m a] -> m ()
sequence_ ms = foldr (liftM2 (\ _ _ -> ())) (return ()) ms
mapM :: MonadParallel m => (a -> m b) -> [a] -> m [b]
mapM f list = sequence (map f list)
mapM_ :: MonadParallel m => (a -> m b) -> [a] -> m ()
mapM_ f list = sequence_ (map f list)
forM :: MonadParallel m => [a] -> (a -> m b) -> m [b]
forM list f = sequence (map f list)
forM_ :: MonadParallel m => [a] -> (a -> m b) -> m ()
forM_ list f = sequence_ (map f list)
replicateM :: MonadParallel m => Int -> m a -> m [a]
replicateM n action = sequence (replicate n action)
replicateM_ :: MonadParallel m => Int -> m a -> m ()
replicateM_ n action = sequence_ (replicate n action)
instance MonadParallel Identity
instance MonadParallel Maybe
instance MonadParallel []
instance MonadParallel ((->) r) where
bindM2 f ma mb r = let a = ma r
b = mb r
in a `par` (b `pseq` f a b r)
instance MonadParallel IO where
bindM2 f ma mb = do waitForB <- forkExec mb
a <- ma
b <- waitForB
f a b
instance MonadParallel m => MonadParallel (IdentityT m) where
bindM2 f ma mb = IdentityT (bindM2 f' (runIdentityT ma) (runIdentityT mb))
where f' a b = runIdentityT (f a b)
instance MonadParallel m => MonadParallel (MaybeT m) where
bindM2 f ma mb = MaybeT (bindM2 f' (runMaybeT ma) (runMaybeT mb))
where f' (Just a) (Just b) = runMaybeT (f a b)
f' _ _ = return Nothing
instance MonadParallel m => MonadParallel (ExceptT e m) where
bindM2 f ma mb = ExceptT (bindM2 f' (runExceptT ma) (runExceptT mb))
where f' (Right a) (Right b) = runExceptT (f a b)
f' (Left e) _ = return (Left e)
f' _ (Left e) = return (Left e)
instance MonadParallel m => MonadParallel (ListT m) where
bindM2 f ma mb = ListT (bindM2 f' (runListT ma) (runListT mb))
where f' as bs = foldr concat (return []) [runListT (f a b) | a <- as, b <- bs]
concat m m' = do {x <- m; y <- m'; return (x ++ y)}
instance MonadParallel m => MonadParallel (ReaderT r m) where
bindM2 f ma mb = ReaderT (\r-> bindM2 (f' r) (runReaderT ma r) (runReaderT mb r))
where f' r a b = runReaderT (f a b) r
instance MonadFork Maybe
instance MonadFork []
instance MonadFork ((->) r) where
forkExec e = \r-> let result = e r
in result `par` (return result)
instance MonadFork IO where
forkExec ma = do
v <- newEmptyMVar
_ <- mask $ \restore -> forkIO $ try (restore ma) >>= putMVar v
return $ readMVar v >>= either (\e -> throwIO (e :: SomeException)) return
instance MonadFork m => MonadFork (IdentityT m) where
forkExec ma = IdentityT (liftM IdentityT $ forkExec (runIdentityT ma))
instance MonadFork m => MonadFork (MaybeT m) where
forkExec ma = MaybeT (liftM (Just . MaybeT) $ forkExec (runMaybeT ma))
instance MonadFork m => MonadFork (ExceptT e m) where
forkExec ma = ExceptT (liftM (Right . ExceptT) $ forkExec (runExceptT ma))
instance MonadFork m => MonadFork (ListT m) where
forkExec ma = ListT (liftM ((:[]) . ListT) $ forkExec (runListT ma))
instance MonadFork m => MonadFork (ReaderT r m) where
forkExec ma = ReaderT (\r-> liftM (ReaderT . const) $ forkExec (runReaderT ma r))