module Pipes.Interleave ( interleave
, combine
, combineM
, merge
, mergeM
, groupBy
) where
import Control.Monad (liftM)
import Data.Either (rights)
import qualified Data.Heap as H
import qualified Data.Sequence as Seq
import Data.Foldable (toList)
import Pipes
interleave :: forall a m. (Monad m, Ord a)
=> [Producer a m ()]
-> Producer a m ()
interleave producers = do
xs <- lift $ rights `liftM` mapM Pipes.next producers
go (H.fromList $ map (uncurry H.Entry) xs)
where go :: (Monad m, Functor m) => H.Heap (H.Entry a (Producer a m ())) -> Producer a m ()
go xs
| Just (H.Entry a producer, xs') <- H.viewMin xs = do
yield a
x' <- lift $ next producer
go $ either (const xs') (\(x,prod) -> H.insert (H.Entry x prod) xs') x'
| otherwise = return ()
combine :: (Monad m, Eq a)
=> (a -> a -> a)
-> Producer a m r -> Producer a m r
combine append = combineM (\a b->return $ append a b)
combineM :: (Monad m, Eq a)
=> (a -> a -> m a)
-> Producer a m r -> Producer a m r
combineM append producer = lift (next producer) >>= either return (uncurry go)
where go a producer' = do
n <- lift $ next producer'
case n of
Left r -> yield a >> return r
Right (a', producer'')
| a == a' -> do a'' <- lift $ append a a'
go a'' producer''
| otherwise -> yield a >> go a' producer''
merge :: (Monad m, Ord a)
=> (a -> a -> a)
-> [Producer a m ()]
-> Producer a m ()
merge append = mergeM (\a b->return $ append a b)
mergeM :: (Monad m, Ord a)
=> (a -> a -> m a)
-> [Producer a m ()]
-> Producer a m ()
mergeM append =
combineM append . interleave
groupBy :: forall a r m. (Monad m, Ord a)
=> Producer a m r -> Producer [a] m r
groupBy producer =
lift (next producer) >>= either return (\(x,producer)->go (Seq.singleton x) producer)
where
go :: Seq.Seq a -> Producer a m r -> Producer [a] m r
go xs producer' = do
n <- lift $ next producer'
case n of
Left r -> yield (toList xs) >> return r
Right (x, producer'')
| x == x0 -> go (xs Seq.|> x) producer''
| otherwise -> yield (toList xs) >> go (Seq.singleton x) producer''
where x0 Seq.:< _ = Seq.viewl xs