-- | Pipes that introduce parallelism on different levels. module Pipes.Parallel where import Control.Monad.Codensity (lowerCodensity) import Control.Monad (replicateM) import Control.Parallel.Strategies (Strategy, parMap) import Pipes -- | Evaluates chunks of pipes elements in parallel with a pure function. pipePar :: (Monad m) => Int -- ^ number of elements to evaluate in parallel -> Strategy b -- ^ with which strategy -> (a -> b) -- ^ function to be mapped in parallel -> Pipe a b m () pipePar n strat f = pipeParBA n strat f (\as -> return ((),as)) (\() bs -> return bs) {- where go = do xs <- lowerCodensity . replicateM n $ lift await let ys = parMap strat f xs lowerCodensity $ mapM_ (lift . yield) ys go -} -- | Evaluates chunks of pipes elements in parallel with a pure function. -- Before and after each parallel step, a monadic function is run. This -- allows generation of certain statistics or information during runs. pipeParBA :: (Monad m) => Int -- ^ number of elements to evaluate in parallel -> Strategy b -- ^ with which strategy -> (a -> b) -- ^ pure function to run in parallel -> ([a] -> m (x,[a])) -- ^ function to run before -> (x -> [b] -> m [b]) -- ^ function to run after -> Pipe a b m () pipeParBA n strat f bef aft = go where go = do as' <- lowerCodensity . replicateM n $ lift await (x,as) <- lift $ bef as' let bs' = parMap strat f as bs <- lift $ aft x bs' lowerCodensity $ mapM_ (lift . yield) bs go