module Control.Concurrent.SCC.Streams
(
Sink, Source, SinkFunctor, SourceFunctor, AncestorFunctor,
pipe, pipeP, pipeG, nullSink,
get, getWith, getPrime, peek, put, tryPut,
liftSink, liftSource,
pour, pour_, tee, teeSink,
getAll, putAll, putChunk,
getParsed, getRead,
getWhile, getUntil,
pourRead, pourParsed, pourWhile, pourUntil,
Reader, Reading(..), ReadingResult(..),
markDown, markUpWith,
mapSink, mapStream,
mapMaybeStream, concatMapStream,
mapStreamChunks, mapAccumStreamChunks, foldStream, mapAccumStream, concatMapAccumStream, partitionStream,
mapMStream, mapMStream_, mapMStreamChunks_,
filterMStream, foldMStream, foldMStream_, unfoldMStream, unmapMStream_, unmapMStreamChunks_,
zipWithMStream, parZipWithMStream,
)
where
import Prelude hiding (foldl, foldr, map, mapM, mapM_, null, span, takeWhile)
import qualified Control.Monad
import Control.Monad (liftM, when, unless, foldM)
import Data.Functor.Identity (Identity(..))
import Data.Functor.Sum (Sum(InR))
import Data.Monoid (Monoid(mappend, mconcat, mempty), First(First, getFirst))
import Data.Monoid.Factorial (FactorialMonoid, foldl, foldMap, mapM, mapM_, span, primePrefix, splitPrimePrefix)
import Data.Monoid.Null (MonoidNull(null))
import Data.Maybe (mapMaybe)
import Data.List (mapAccumL)
import qualified Data.List as List (map, span)
import Text.ParserCombinators.Incremental (Parser, feed, feedEof, inspect, completeResults)
import Control.Monad.Parallel (MonadParallel(..))
import Control.Monad.Coroutine
import Control.Monad.Coroutine.SuspensionFunctors (Request, request,
ReadRequest, requestRead,
Reader, Reading(..), ReadingResult(..),
weaveNestedReadWriteRequests)
import Control.Monad.Coroutine.Nested (AncestorFunctor(..), liftAncestor)
type SourceFunctor a x = Sum a (ReadRequest x)
type SinkFunctor a x = Sum a (Request x x)
newtype Sink (m :: * -> *) a x =
Sink
{
putChunk :: forall d. AncestorFunctor a d => x -> Coroutine d m x
}
newtype Source (m :: * -> *) a x =
Source
{
readChunk :: forall d py y. AncestorFunctor a d => Reader x py y -> Coroutine d m (ReadingResult x py y)
}
readAll :: Reader x x x
readAll s = Advance readAll s s
nullSink :: forall m a x. (Monad m, Monoid x) => Sink m a x
nullSink = Sink{putChunk= const (return mempty)}
emptySource :: forall m a x. (Monad m, Monoid x) => Source m a x
emptySource = Source{readChunk= return . finalize . ($ mempty)}
where finalize (Final _ x) = FinalResult x
finalize (Advance _ x _) = FinalResult x
finalize (Deferred _ x) = FinalResult x
liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d x
liftSink s = Sink {putChunk= liftAncestor . (putChunk s :: x -> Coroutine d m x)}
liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d x
liftSource s = Source {readChunk= liftAncestor . (readChunk s :: Reader x py y -> Coroutine d m (ReadingResult x py y))}
markUpWith :: forall m a x mark. (Monad m, Monoid x) => mark -> Sink m a [(x, mark)] -> Sink m a x
markUpWith mark sink = Sink putMarkedChunk
where putMarkedChunk :: forall d. AncestorFunctor a d => x -> Coroutine d m x
putMarkedChunk x = do rest <- putChunk sink [(x, mark)]
case rest of [] -> return mempty
[(y, _)]-> return y
markDown :: forall m a x mark. (Monad m, MonoidNull x) => Sink m a x -> Sink m a [(x, mark)]
markDown sink = Sink putUnmarkedChunk
where putUnmarkedChunk :: forall d. AncestorFunctor a d => [(x, mark)] -> Coroutine d m [(x, mark)]
putUnmarkedChunk [] = return mempty
putUnmarkedChunk ((x, mark):tail) = do rest <- putChunk sink x
if null rest
then putUnmarkedChunk tail
else return ((rest, mark):tail)
pipe :: forall m a a1 a2 x r1 r2. (Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
(Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
pipe = pipeG sequentialBinder
pipeP :: forall m a a1 a2 x r1 r2.
(MonadParallel m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
(Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
pipeP = pipeG bindM2
pipeG :: forall m a a1 a2 x r1 r2. (Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
PairBinder m -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2)
-> Coroutine a m (r1, r2)
pipeG run2 producer consumer =
liftM (uncurry (flip (,))) $
weave run2 weaveNestedReadWriteRequests (consumer source) (producer sink)
where sink = Sink {putChunk= \xs-> liftAncestor (mapSuspension InR (request xs) :: Coroutine a1 m x)}
source = Source {readChunk= fc}
fc :: forall d py y. AncestorFunctor a2 d => Reader x py y -> Coroutine d m (ReadingResult x py y)
fc t = liftAncestor (mapSuspension InR (requestRead t) :: Coroutine a2 m (ReadingResult x py y))
fromParser :: forall p x y. Monoid x => y -> Parser p x y -> Reader x (y -> y) y
fromParser failure p s = case inspect (feed s p)
of ([], Nothing) -> Final s failure
([], Just (Nothing, p')) -> Deferred (fromParser failure p') r'
where (r', s') = case completeResults (feedEof p')
of [] -> (failure, s)
hd:_ -> hd
([], Just (Just prefix, p')) -> Advance (fromParser failure p') (prefix r') prefix
where (r', s'):_ = completeResults (feedEof p')
([(r, s')], Nothing) -> Final s' r
get :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a [x] -> Coroutine d m (Maybe x)
get source = readChunk source readOne
>>= \(FinalResult x) -> return x
where readOne [] = Deferred readOne Nothing
readOne (x:rest) = Final rest (Just x)
getPrime :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => Source m a x -> Coroutine d m x
getPrime source = readChunk source primeReader
>>= \(FinalResult x) -> return x
where primeReader x = maybe (Deferred primeReader x)
(\(prefix, rest)-> Final rest prefix)
(splitPrimePrefix x)
getWith :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) =>
Source m a x -> (x -> Coroutine d m ()) -> Coroutine d m ()
getWith source consumer = readChunk source primeReader
>>= \(FinalResult x) -> x
where primeReader x = maybe (Deferred primeReader (return ()))
(\(prefix, rest)-> Final rest (consumer prefix) )
(splitPrimePrefix x)
peek :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a [x] -> Coroutine d m (Maybe x)
peek source = readChunk source readOneAhead
>>= \(FinalResult x) -> return x
where readOneAhead [] = Deferred readOneAhead Nothing
readOneAhead s@(x:_) = Final s (Just x)
getAll :: forall m a d x. (Monad m, Monoid x, AncestorFunctor a d) => Source m a x -> Coroutine d m x
getAll source = readChunk source (readAll id)
>>= \(FinalResult all)-> return all
where readAll :: (x -> x) -> Reader x () x
readAll prefix s = Deferred (readAll (prefix . mappend s)) (prefix s)
getParsed :: forall m a d p x y. (Monad m, Monoid x, Monoid y, AncestorFunctor a d) =>
Parser p x y -> Source m a x -> Coroutine d m y
getParsed parser = getRead (fromParser mempty parser)
getRead :: forall m a d x y. (Monad m, Monoid x, AncestorFunctor a d) =>
Reader x (y -> y) y -> Source m a x -> Coroutine d m y
getRead reader source = loop return reader
where loop cont r = readChunk source r >>= proceed cont
proceed cont (FinalResult chunk) = cont chunk
proceed cont (ResultPart d p') = loop (cont . d) p'
getWhile :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) =>
(x -> Bool) -> Source m a x -> Coroutine d m x
getWhile predicate source = readChunk source (readWhile predicate id)
>>= \(FinalResult x)-> return x
where readWhile :: (x -> Bool) -> (x -> x) -> Reader x () x
readWhile predicate prefix1 s = if null suffix
then Deferred (readWhile predicate (prefix1 . mappend s)) (prefix1 s)
else Final suffix (prefix1 prefix2)
where (prefix2, suffix) = span predicate s
getUntil :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) =>
(x -> Bool) -> Source m a x -> Coroutine d m (x, Maybe x)
getUntil predicate source = readChunk source (readUntil (not . predicate) id)
>>= \(FinalResult r)-> return r
where readUntil :: (x -> Bool) -> (x -> x) -> Reader x () (x, Maybe x)
readUntil predicate prefix1 s = if null suffix
then Deferred (readUntil predicate (prefix1 . mappend s)) (prefix1 s, Nothing)
else Final suffix (prefix1 prefix2, Just $ primePrefix suffix)
where (prefix2, suffix) = span predicate s
pour :: forall m a1 a2 d x . (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> Source m a1 x -> Sink m a2 x -> Coroutine d m Bool
pour source sink = loop False
where loop another = readChunk source readAll >>= extract another
extract another (FinalResult _chunk) = return another
extract _ (ResultPart chunk _) = putChunk sink chunk >> loop True
pour_ :: forall m a1 a2 d x . (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
pour_ source sink = pour source sink >> return ()
pourRead :: forall m a1 a2 d x y. (Monad m, MonoidNull x, MonoidNull y, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> Reader x y y -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
pourRead reader source sink = loop reader
where loop p = readChunk source p >>= extract
extract (FinalResult r) = unless (null r) (putChunk sink r >> return ())
extract (ResultPart chunk p') = putChunk sink chunk >> loop p'
pourParsed :: forall m p a1 a2 d x y. (Monad m, MonoidNull x, MonoidNull y, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> Parser p x y -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
pourParsed parser source sink = loop (fromParser mempty parser)
where loop p = readChunk source p >>= extract
extract (FinalResult r) = unless (null r) (putChunk sink r >> return ())
extract (ResultPart d p') = putChunk sink (d mempty) >> loop p'
pourWhile :: forall m a1 a2 d x . (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
pourWhile = pourRead . readWhile
where readWhile :: FactorialMonoid x => (x -> Bool) -> Reader x x x
readWhile p = while
where while s = if null suffix
then Advance while prefix prefix
else Final suffix prefix
where (prefix, suffix) = span p s
pourUntil :: forall m a1 a2 d x . (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m (Maybe x)
pourUntil predicate source sink = loop $ readUntil (not . predicate)
where readUntil :: FactorialMonoid x => (x -> Bool) -> Reader x x (x, Maybe x)
readUntil p = until
where until s = if null suffix
then Advance until (prefix, Nothing) prefix
else Final suffix (prefix, Just $ primePrefix suffix)
where (prefix, suffix) = span p s
loop rd = readChunk source rd >>= extract
extract (FinalResult (chunk, mx)) = putChunk sink chunk >> return mx
extract (ResultPart chunk rd') = putChunk sink chunk >> loop rd'
mapStream :: forall m a1 a2 d x y . (Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapStream f source sink = loop
where loop = readChunk source readAll
>>= \r-> case r
of ResultPart chunk _ -> putChunk sink (foldMap f chunk) >> loop
FinalResult _ -> return ()
mapSink :: forall m a x y. Monad m => (x -> y) -> Sink m a [y] -> Sink m a [x]
mapSink f sink = Sink{putChunk= \xs-> putChunk sink (List.map f xs)
>>= \rest-> return (dropExcept (length rest) xs)}
where dropExcept :: forall z. Int -> [z] -> [z]
dropExcept 0 _ = []
dropExcept n list = snd (drop' list)
where drop' :: [z] -> (Int, [z])
drop' [] = (0, [])
drop' (x:xs) = let r@(len, tl) = drop' xs in if len < n then (succ len, x:tl) else r
mapMaybeStream :: forall m a1 a2 d x y . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (x -> Maybe y) -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m ()
mapMaybeStream f source sink = mapMStreamChunks_ ((>> return ()) . putChunk sink . mapMaybe f) source
concatMapStream :: forall m a1 a2 d x y . (Monad m, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (x -> y) -> Source m a1 [x] -> Sink m a2 y -> Coroutine d m ()
concatMapStream f = mapStream (mconcat . List.map f)
mapAccumStream :: forall m a1 a2 d x y acc . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (acc -> x -> (acc, y)) -> acc -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m acc
mapAccumStream f acc source sink = foldMStreamChunks (\a xs-> dispatch $ mapAccumL f a xs) acc source
where dispatch (a, ys) = putChunk sink ys >> return a
concatMapAccumStream :: forall m a1 a2 d x y acc . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (acc -> x -> (acc, [y])) -> acc -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m acc
concatMapAccumStream f acc source sink = foldMStreamChunks (\a xs-> dispatch $ concatMapAccumL a xs) acc source
where dispatch (a, ys) = putChunk sink ys >> return a
concatMapAccumL s [] = (s, [])
concatMapAccumL s (x:xs) = (s'', y ++ ys)
where (s', y ) = f s x
(s'', ys) = concatMapAccumL s' xs
mapStreamChunks :: forall m a1 a2 d x y . (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapStreamChunks f source sink = loop
where loop = readChunk source readAll
>>= \r-> case r
of ResultPart chunk _ -> putChunk sink (f chunk) >> loop
FinalResult _ -> return ()
mapAccumStreamChunks :: forall m a1 a2 d x y acc. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (acc -> x -> (acc, y)) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m acc
mapAccumStreamChunks f acc source sink = loop acc
where loop acc = readChunk source readAll
>>= \r-> case r
of ResultPart chunk _ -> let (acc', chunk') = f acc chunk
in putChunk sink chunk' >> loop acc'
FinalResult _ -> return acc
mapMStream :: forall m a1 a2 d x y . (Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (x -> Coroutine d m y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapMStream f source sink = loop
where loop = readChunk source readAll
>>= \r-> case r
of ResultPart chunk _ -> mapM f chunk >>= putChunk sink >> loop
FinalResult _ -> return ()
mapMStream_ :: forall m a d x r. (Monad m, FactorialMonoid x, AncestorFunctor a d)
=> (x -> Coroutine d m r) -> Source m a x -> Coroutine d m ()
mapMStream_ f = mapMStreamChunks_ (mapM_ f)
mapMStreamChunks_ :: forall m a d x r. (Monad m, Monoid x, AncestorFunctor a d)
=> (x -> Coroutine d m r) -> Source m a x -> Coroutine d m ()
mapMStreamChunks_ f source = loop
where loop = readChunk source readAll
>>= \r-> case r
of ResultPart chunk _ -> f chunk >> loop
FinalResult _ -> return ()
filterMStream :: forall m a1 a2 d x . (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d)
=> (x -> Coroutine d m Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
filterMStream f = mapMStream (\x-> f x >>= \p-> return $ if p then x else mempty)
foldStream :: forall m a d x acc . (Monad m, FactorialMonoid x, AncestorFunctor a d)
=> (acc -> x -> acc) -> acc -> Source m a x -> Coroutine d m acc
foldStream f acc source = loop acc
where loop a = readChunk source readAll
>>= \r-> case r
of ResultPart chunk _ -> loop (foldl f a chunk)
FinalResult{} -> return a
foldMStream :: forall m a d x acc . (Monad m, AncestorFunctor a d)
=> (acc -> x -> Coroutine d m acc) -> acc -> Source m a [x] -> Coroutine d m acc
foldMStream f acc source = loop acc
where loop a = readChunk source readAll
>>= \r-> case r
of ResultPart chunk _ -> foldM f a chunk >>= loop
FinalResult [] -> return a
foldMStream_ :: forall m a d x acc . (Monad m, AncestorFunctor a d)
=> (acc -> x -> Coroutine d m acc) -> acc -> Source m a [x] -> Coroutine d m ()
foldMStream_ f acc source = foldMStream f acc source >> return ()
foldMStreamChunks :: forall m a d x acc . (Monad m, Monoid x, AncestorFunctor a d)
=> (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m acc
foldMStreamChunks f acc source = loop acc
where loop a = readChunk source readAll
>>= \r-> case r
of ResultPart chunk _ -> f a chunk >>= loop
FinalResult _ -> return a
unfoldMStream :: forall m a d x acc . (Monad m, AncestorFunctor a d)
=> (acc -> Coroutine d m (Maybe (x, acc))) -> acc -> Sink m a [x] -> Coroutine d m acc
unfoldMStream f acc sink = loop acc
where loop a = f a >>= maybe (return a) (\(x, acc')-> put sink x >> loop acc')
unmapMStream_ :: forall m a d x . (Monad m, AncestorFunctor a d)
=> Coroutine d m (Maybe x) -> Sink m a [x] -> Coroutine d m ()
unmapMStream_ f sink = loop
where loop = f >>= maybe (return ()) (\x-> put sink x >> loop)
unmapMStreamChunks_ :: forall m a d x . (Monad m, MonoidNull x, AncestorFunctor a d)
=> Coroutine d m x -> Sink m a x -> Coroutine d m ()
unmapMStreamChunks_ f sink = loop >> return ()
where loop = f >>= nullOrElse (return mempty) ((>>= nullOrElse loop return) . putChunk sink)
partitionStream :: forall m a1 a2 a3 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
=> (x -> Bool) -> Source m a1 [x] -> Sink m a2 [x] -> Sink m a3 [x] -> Coroutine d m ()
partitionStream f source true false = mapMStreamChunks_ partitionChunk source
where partitionChunk (x:rest) = partitionTo (f x) x rest
partitionChunk [] = return ()
partitionTo False x chunk = let (falses, rest) = break f chunk
in putChunk false (x:falses)
>> case rest of y:ys -> partitionTo True y ys
[] -> return ()
partitionTo True x chunk = let (trues, rest) = List.span f chunk
in putChunk true (x:trues)
>> case rest of y:ys -> partitionTo False y ys
[] -> return ()
zipWithMStream :: forall m a1 a2 a3 d x y z. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
=> (x -> y -> Coroutine d m z) -> Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [z]
-> Coroutine d m ()
zipWithMStream f source1 source2 sink = loop
where loop = do mx <- get source1
my <- get source2
case (mx, my) of (Just x, Just y) -> f x y >>= put sink >> loop
_ -> return ()
parZipWithMStream :: forall m a1 a2 a3 d x y z.
(MonadParallel m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
=> (x -> y -> Coroutine d m z) -> Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [z]
-> Coroutine d m ()
parZipWithMStream f source1 source2 sink = loop
where loop = bindM2 zipMaybe (get source1) (get source2)
zipMaybe (Just x) (Just y) = f x y >>= put sink >> loop
zipMaybe _ _ = return ()
tee :: forall m a1 a2 a3 d x . (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
=> Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
tee source sink1 sink2 = distribute
where distribute = readChunk source readAll
>>= \r-> case r
of ResultPart chunk _ -> putChunk sink1 chunk >> putChunk sink2 chunk >> distribute
FinalResult _ -> return ()
teeSink :: forall m a1 a2 a3 x . (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3)
=> Sink m a1 x -> Sink m a2 x -> Sink m a3 x
teeSink s1 s2 = Sink{putChunk= teeChunk}
where teeChunk :: forall d. AncestorFunctor a3 d => x -> Coroutine d m x
teeChunk x = putChunk s1' x >> putChunk s2' x
s1' :: Sink m a3 x
s1' = liftSink s1
s2' :: Sink m a3 x
s2' = liftSink s2
put :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a [x] -> x -> Coroutine d m ()
put sink x = putChunk sink [x] >> return ()
tryPut :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a [x] -> x -> Coroutine d m Bool
tryPut sink x = liftM null $ putChunk sink [x]
putAll :: forall m a d x. (Monad m, MonoidNull x, AncestorFunctor a d) => x -> Sink m a x -> Coroutine d m x
putAll l sink = if null l then return l else putChunk sink l
nullOrElse :: MonoidNull x => a -> (x -> a) -> x -> a
nullOrElse nullCase f x | null x = nullCase
| otherwise = f x