module Control.Concurrent.SCC.Streams
(
Sink, Source, SinkFunctor, SourceFunctor, AncestorFunctor,
pipe, pipeP, pipePS, nullSink, nullSource,
get, put, getWith,
liftSink, liftSource,
pour, tee, teeSink, teeSource,
mapStream, mapSource, mapSink, mapMStream, mapMSource, mapMSink, mapMStream_,
mapMaybeStream, mapMaybeSink, mapMaybeSource,
filterMStream, filterMSource, filterMSink,
foldStream, foldMStream, foldMStream_, mapAccumStream, partitionStream,
unfoldMStream, unmapMStream_,
zipWithMStream, parZipWithMStream,
getList, putList, putQueue,
cond
)
where
import qualified Control.Monad
import qualified Data.List
import qualified Data.Maybe
import Control.Monad (liftM, when)
import Data.Foldable (toList)
import Data.Sequence (Seq, viewl)
import Control.Monad.Parallel (MonadParallel(..))
import Control.Monad.Coroutine
import Control.Monad.Coroutine.SuspensionFunctors (Await(Await), Yield(Yield), EitherFunctor(..), await, yield)
import Control.Monad.Coroutine.Nested (AncestorFunctor(..), liftOut, seesawNested)
type SourceFunctor a x = EitherFunctor a (Await (Maybe x))
type SinkFunctor a x = EitherFunctor a (Yield x)
newtype Sink (m :: * -> *) a x =
Sink
{
put :: forall d. AncestorFunctor a d => x -> Coroutine d m ()
}
newtype Source (m :: * -> *) a x =
Source
{
get :: forall d. AncestorFunctor a d => Coroutine d m (Maybe x)
}
nullSink :: forall m a x. Monad m => Sink m a x
nullSink = Sink{put= const (return ())}
nullSource :: forall m a x. Monad m => Source m a x
nullSource = Source{get= return Nothing}
liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d x
liftSink s = Sink {put= liftOut . (put s :: x -> Coroutine d m ())}
liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d x
liftSource s = Source {get= liftOut (get s :: Coroutine d m (Maybe x))}
pipe :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
(Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
pipe = pipeG (\ f mx my -> do {x <- mx; y <- my; f x y})
pipeP :: forall m a a1 a2 x r1 r2. (MonadParallel m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
(Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
pipeP = pipeG bindM2
pipePS :: forall m a a1 a2 x r1 r2. (MonadParallel m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
Bool -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) ->
Coroutine a m (r1, r2)
pipePS parallel = if parallel then pipeP else pipe
pipeG :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
(forall x y r. (x -> y -> m r) -> m x -> m y -> m r)
-> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2)
-> Coroutine a m (r1, r2)
pipeG run2 producer consumer =
liftM (uncurry (flip (,))) $ seesawNested run2 resolver (consumer source) (producer sink)
where sink = Sink {put= liftOut . (mapSuspension RightF . yield :: x -> Coroutine a1 m ())} :: Sink m a1 x
source = Source (liftOut (mapSuspension RightF await :: Coroutine a2 m (Maybe x))) :: Source m a2 x
resolver = SeesawResolver {
resumeLeft = \(Await c)-> c Nothing,
resumeRight= \(Yield _ c)-> c,
resumeAny= \ _ resumeProducer resumeBoth (Await cc) (Yield x cp) -> resumeBoth (cc (Just x)) cp
}
getWith :: forall m a d x. (Monad m, AncestorFunctor a d) => (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()
getWith consumer source = get source >>= maybe (return ()) consumer
pour :: forall m a1 a2 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
pour source sink = mapMStream_ (put sink) source
mapStream :: forall m a1 a2 d x y . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapStream f source sink = mapMStream_ (put sink . f) source
mapSource :: forall m a x y. Monad m => (x -> y) -> Source m a x -> Source m a y
mapSource f source = Source{get= liftM (fmap f) (get source)}
mapSink :: forall m a x y. Monad m => (x -> y) -> Sink m a y -> Sink m a x
mapSink f sink = Sink{put= put sink . f}
mapMaybeStream :: forall m a1 a2 d x y . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (x -> Maybe y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapMaybeStream f source sink = mapMStream_ (maybe (return ()) (put sink) . f) source
mapMaybeSink :: forall m a x y . Monad m => (x -> Maybe y) -> Sink m a y -> Sink m a x
mapMaybeSink f sink = Sink{put= maybe (return ()) (put sink) . f}
mapMaybeSource :: forall m a x y . Monad m => (x -> Maybe y) -> Source m a x -> Source m a y
mapMaybeSource f source = Source{get= next}
where next :: forall d. AncestorFunctor a d => Coroutine d m (Maybe y)
next = get source
>>= maybe (return Nothing) (maybe next (return . Just) . f)
mapMStream :: forall m a1 a2 d x y . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (x -> Coroutine d m y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapMStream f source sink = loop
where loop = getWith (\x-> f x >>= put sink >> loop) source
mapMSource :: forall m a x y. Monad m
=> (forall d. AncestorFunctor a d => x -> Coroutine d m y) -> Source m a x -> Source m a y
mapMSource f source = Source{get= get source >>= maybe (return Nothing) (liftM Just . f)}
mapMSink :: forall m a x y. Monad m
=> (forall d. AncestorFunctor a d => x -> Coroutine d m y) -> Sink m a y -> Sink m a x
mapMSink f sink = Sink{put= (put sink =<<) . f}
mapMStream_ :: forall m a d x . (Monad m, AncestorFunctor a d)
=> (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()
mapMStream_ f source = loop
where loop = getWith (\x-> f x >> loop) source
filterMStream :: forall m a1 a2 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (x -> Coroutine d m Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
filterMStream f source sink = mapMStream_ (\x-> f x >>= cond (put sink x) (return ())) source
filterMSource :: forall m a x y . Monad m
=> (forall d. AncestorFunctor a d => x -> Coroutine d m Bool) -> Source m a x -> Source m a x
filterMSource f source = Source{get= find}
where find :: forall d. AncestorFunctor a d => Coroutine d m (Maybe x)
find = get source >>= maybe (return Nothing) (\x-> f x >>= cond (return (Just x)) find)
filterMSink :: forall m a x y . Monad m
=> (forall d. AncestorFunctor a d => x -> Coroutine d m Bool) -> Sink m a x -> Sink m a x
filterMSink f sink = Sink{put= \x-> f x >>= cond (put sink x) (return ())}
foldStream :: forall m a d x acc . (Monad m, AncestorFunctor a d)
=> (acc -> x -> acc) -> acc -> Source m a x -> Coroutine d m acc
foldStream f s source = loop s
where loop s = get source >>= maybe (return s) (\x-> loop (f s x))
foldMStream :: forall m a d x acc . (Monad m, AncestorFunctor a d)
=> (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m acc
foldMStream f acc source = loop acc
where loop acc = get source >>= maybe (return acc) (\x-> f acc x >>= loop)
foldMStream_ :: forall m a d x acc . (Monad m, AncestorFunctor a d)
=> (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m ()
foldMStream_ f acc source = loop acc
where loop acc = getWith (\x-> f acc x >>= loop) source
unfoldMStream :: forall m a d x acc . (Monad m, AncestorFunctor a d)
=> (acc -> Coroutine d m (Maybe (x, acc))) -> acc -> Sink m a x -> Coroutine d m acc
unfoldMStream f acc sink = loop acc
where loop acc = f acc >>= maybe (return acc) (\(x, acc')-> put sink x >> loop acc')
unmapMStream_ :: forall m a d x . (Monad m, AncestorFunctor a d)
=> Coroutine d m (Maybe x) -> Sink m a x -> Coroutine d m ()
unmapMStream_ f sink = loop
where loop = f >>= maybe (return ()) (\x-> put sink x >> loop)
mapAccumStream :: forall m a1 a2 d x y acc . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (acc -> x -> (acc, y)) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m acc
mapAccumStream f acc source sink = loop acc
where loop acc = get source >>= maybe (return acc) (\x-> let (acc', y) = f acc x in put sink y >> loop acc')
partitionStream :: forall m a1 a2 a3 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
=> (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
partitionStream f source true false = mapMStream_ (\x-> if f x then put true x else put false x) source
zipWithMStream :: forall m a1 a2 a3 d x y z. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
=> (x -> y -> Coroutine d m z) -> Source m a1 x -> Source m a2 y -> Sink m a3 z -> Coroutine d m ()
zipWithMStream f source1 source2 sink = loop
where loop = do mx <- get source1
my <- get source2
case (mx, my) of (Just x, Just y) -> f x y >>= put sink >> loop
_ -> return ()
parZipWithMStream :: forall m a1 a2 a3 d x y z.
(MonadParallel m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
=> (x -> y -> Coroutine d m z) -> Source m a1 x -> Source m a2 y -> Sink m a3 z -> Coroutine d m ()
parZipWithMStream f source1 source2 sink = loop
where loop = bindM2 zip (get source1) (get source2)
zip (Just x) (Just y) = f x y >>= put sink >> loop
zip _ _ = return ()
tee :: forall m a1 a2 a3 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
=> Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
tee source sink1 sink2 = distribute
where distribute = get source >>= maybe (return ()) (\x-> put sink1 x >> put sink2 x >> distribute)
teeSink :: forall m a1 a2 a3 x . (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3)
=> Sink m a1 x -> Sink m a2 x -> Sink m a3 x
teeSink s1 s2 = Sink{put= tee}
where tee :: forall d. AncestorFunctor a3 d => x -> Coroutine d m ()
tee x = put s1' x >> put s2' x
s1' :: Sink m a3 x
s1' = liftSink s1
s2' :: Sink m a3 x
s2' = liftSink s2
teeSource :: forall m a1 a2 a3 x . (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3)
=> Sink m a1 x -> Source m a2 x -> Source m a3 x
teeSource sink source = Source{get= tee}
where tee :: forall d. AncestorFunctor a3 d => Coroutine d m (Maybe x)
tee = do mx <- get source'
maybe (return ()) (put sink') mx
return mx
sink' :: Sink m a3 x
sink' = liftSink sink
source' :: Source m a3 x
source' = liftSource source
putList :: forall m a d x. (Monad m, AncestorFunctor a d) => [x] -> Sink m a x -> Coroutine d m ()
putList [] sink = return ()
putList l@(x:rest) sink = put sink x >> putList rest sink
getList :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Coroutine d m [x]
getList source = getList' return
where getList' f = get source >>= maybe (f []) (\x-> getList' (f . (x:)))
cond :: a -> a -> Bool -> a
cond x y test = if test then x else y
putQueue :: forall m a d x. (Monad m, AncestorFunctor a d) => Seq x -> Sink m a x -> Coroutine d m ()
putQueue q sink = putList (toList (viewl q)) sink