{-
    Copyright 2010-2013 Mario Blazevic

    This file is part of the Streaming Component Combinators (SCC) project.

    The SCC project is free software: you can redistribute it and/or modify it under the terms of the GNU General Public
    License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
    version.

    SCC is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
    of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details.

    You should have received a copy of the GNU General Public License along with SCC.  If not, see
    <http://www.gnu.org/licenses/>.
-}

-- | This module defines 'Source' and 'Sink' types and 'pipe' functions that create them. The method 'get' on 'Source'
-- abstracts away 'Control.Monad.Coroutine.SuspensionFunctors.await', and the method 'put' on 'Sink' is a higher-level
-- abstraction of 'Control.Monad.Coroutine.SuspensionFunctors.yield'. With this arrangement, a single coroutine can
-- yield values to multiple sinks and await values from multiple sources with no need to change the
-- 'Control.Monad.Coroutine.Coroutine' functor. The only requirement is that each functor of the sources and sinks the
-- coroutine uses must be an 'Control.Monad.Coroutine.Nested.AncestorFunctor' of the coroutine's own functor. For
-- example, a coroutine that takes two sources and one sink might be declared like this:
-- 
-- @
-- zip :: forall m a1 a2 a3 d x y. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
--        => Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [(x, y)] -> Coroutine d m ()
-- @
-- 
-- Sources, sinks, and coroutines communicating through them are all created using the 'pipe' function or one of its
-- variants. They effectively split the current coroutine into a producer-consumer coroutine pair. The producer gets a
-- new 'Sink' to write to and the consumer a new 'Source' to read from, in addition to all the streams they inherit from
-- the current coroutine. The following function, for example, uses the /zip/ coroutine declard above to add together
-- the pairs of values from two Integer sources:
--
-- @
-- add :: forall m a1 a2 a3 d. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
--        => Source m a1 [Integer] -> Source m a2 [Integer] -> Sink m a3 [Integer] -> Coroutine d m ()
-- add source1 source2 sink = do pipe
--                                  (\pairSink-> zip source1 source2 pairSink)                         -- producer
--                                  (\pairSource-> mapStream (List.map $ uncurry (+)) pairSource sink) -- consumer
--                               return ()
-- @

{-# LANGUAGE ScopedTypeVariables, Rank2Types, TypeFamilies, KindSignatures #-}

module Control.Concurrent.SCC.Streams
   (
    -- * Sink and Source types
    Sink, Source, SinkFunctor, SourceFunctor, AncestorFunctor,
    -- * Sink and Source constructors
    pipe, pipeP, pipeG, nullSink,
    -- * Operations on sinks and sources
    -- ** Singleton operations
    get, getWith, getPrime, peek, put, tryPut,
    -- ** Lifting functions
    liftSink, liftSource,
    -- ** Bulk operations
    -- *** Fetching and moving data
    pour, pour_, tee, teeSink,
    getAll, putAll, putChunk,
    getParsed, getRead,
    getWhile, getUntil, 
    pourRead, pourParsed, pourWhile, pourUntil,
    Reader, Reading(..), ReadingResult(..),
    -- *** Stream transformations
    markDown, markUpWith,
    mapSink, mapStream,
    mapMaybeStream, concatMapStream,
    mapStreamChunks, mapAccumStreamChunks, foldStream, mapAccumStream, concatMapAccumStream, partitionStream,
    -- *** Monadic stream transformations
    mapMStream, mapMStream_, mapMStreamChunks_,
    filterMStream, foldMStream, foldMStream_, unfoldMStream, unmapMStream_, unmapMStreamChunks_,
    zipWithMStream, parZipWithMStream,
   )
where

import Prelude hiding (foldl, foldr, map, mapM, mapM_, null, span, takeWhile)

import qualified Control.Monad
import Control.Monad (liftM, when, unless, foldM)
import Data.Functor.Identity (Identity(..))
import Data.Functor.Sum (Sum(InR))
import Data.Monoid (Monoid(mappend, mconcat, mempty), First(First, getFirst))
import Data.Monoid.Factorial (FactorialMonoid)
import qualified Data.Monoid.Factorial as Factorial
import Data.Monoid.Null (MonoidNull(null))
import Data.Maybe (mapMaybe)
import Data.List (mapAccumL)
import qualified Data.List as List (map, span)
import Text.ParserCombinators.Incremental (Parser, feed, feedEof, inspect, completeResults)

import Control.Monad.Parallel (MonadParallel(..))
import Control.Monad.Coroutine
import Control.Monad.Coroutine.SuspensionFunctors (Request, request,
                                                   ReadRequest, requestRead,
                                                   Reader, Reading(..), ReadingResult(..),
                                                   weaveNestedReadWriteRequests)
import Control.Monad.Coroutine.Nested (AncestorFunctor(..), liftAncestor)

type SourceFunctor a x = Sum a (ReadRequest x)
type SinkFunctor a x = Sum a (Request x x)

-- | A 'Sink' can be used to yield output from any nested `Coroutine` computation whose functor provably descends from
-- the functor /a/. It's the write-only end of a communication channel created by 'pipe'.
newtype Sink (m :: * -> *) a x =
   Sink
   {
   -- | This method puts a portion of the producer's output into the `Sink`. The intervening 'Coroutine' computations
   -- suspend up to the 'pipe' invocation that has created the argument sink. The method returns the suffix of the
   -- argument that could not make it into the sink because of the sibling coroutine's death.
   Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk :: forall d. AncestorFunctor a d => x -> Coroutine d m x
   }

-- | A 'Source' can be used to read input into any nested `Coroutine` computation whose functor provably descends from
-- the functor /a/. It's the read-only end of a communication channel created by 'pipe'.
newtype Source (m :: * -> *) a x =
   Source
   {
   -- | This method consumes a portion of input from the 'Source' using the 'Reader' argument and returns the
   -- 'ReadingResult'. Depending on the reader, the producer coroutine may not need to be resumed at all, or it may need
   -- to be resumed many times. The intervening 'Coroutine' computations suspend all the way to the 'pipe' function
   -- invocation that created the source.
   Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk :: forall d py y. AncestorFunctor a d => Reader x py y -> Coroutine d m (ReadingResult x py y)
   }

readAll :: Reader x x x
readAll :: Reader x x x
readAll x
s = Reader x x x -> x -> Reader x x x
forall x py y. Reader x py y -> y -> py -> Reading x py y
Advance Reader x x x
forall x. Reader x x x
readAll x
s x
s

-- | A disconnected sink that consumes and ignores all data 'put' into it.
nullSink :: forall m a x. (Monad m, Monoid x) => Sink m a x
nullSink :: Sink m a x
nullSink = Sink :: forall (m :: * -> *) (a :: * -> *) x.
(forall (d :: * -> *). AncestorFunctor a d => x -> Coroutine d m x)
-> Sink m a x
Sink{putChunk :: forall (d :: * -> *). AncestorFunctor a d => x -> Coroutine d m x
putChunk= Coroutine d m x -> x -> Coroutine d m x
forall a b. a -> b -> a
const (x -> Coroutine d m x
forall (m :: * -> *) a. Monad m => a -> m a
return x
forall a. Monoid a => a
mempty)}

-- | A disconnected sink that consumes and ignores all data 'put' into it.
emptySource :: forall m a x. (Monad m, Monoid x) => Source m a x
emptySource :: Source m a x
emptySource = Source :: forall (m :: * -> *) (a :: * -> *) x.
(forall (d :: * -> *) py y.
 AncestorFunctor a d =>
 Reader x py y -> Coroutine d m (ReadingResult x py y))
-> Source m a x
Source{readChunk :: forall (d :: * -> *) py y.
AncestorFunctor a d =>
Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk= ReadingResult x py y -> Coroutine d m (ReadingResult x py y)
forall (m :: * -> *) a. Monad m => a -> m a
return (ReadingResult x py y -> Coroutine d m (ReadingResult x py y))
-> ((x -> Reading x py y) -> ReadingResult x py y)
-> (x -> Reading x py y)
-> Coroutine d m (ReadingResult x py y)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Reading x py y -> ReadingResult x py y
forall x py y x py. Reading x py y -> ReadingResult x py y
finalize (Reading x py y -> ReadingResult x py y)
-> ((x -> Reading x py y) -> Reading x py y)
-> (x -> Reading x py y)
-> ReadingResult x py y
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((x -> Reading x py y) -> x -> Reading x py y
forall a b. (a -> b) -> a -> b
$ x
forall a. Monoid a => a
mempty)}
   where finalize :: Reading x py y -> ReadingResult x py y
finalize (Final x
_ y
x) = y -> ReadingResult x py y
forall x py y. y -> ReadingResult x py y
FinalResult y
x
         finalize (Advance Reader x py y
_ y
x py
_) = y -> ReadingResult x py y
forall x py y. y -> ReadingResult x py y
FinalResult y
x
         finalize (Deferred Reader x py y
_ y
x) = y -> ReadingResult x py y
forall x py y. y -> ReadingResult x py y
FinalResult y
x

-- | Converts a 'Sink' on the ancestor functor /a/ into a sink on the descendant functor /d/.
liftSink :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a x -> Sink m d x
liftSink :: Sink m a x -> Sink m d x
liftSink Sink m a x
s = Sink :: forall (m :: * -> *) (a :: * -> *) x.
(forall (d :: * -> *). AncestorFunctor a d => x -> Coroutine d m x)
-> Sink m a x
Sink {putChunk :: forall (d :: * -> *). AncestorFunctor d d => x -> Coroutine d m x
putChunk= Coroutine d m x -> Coroutine d m x
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, Functor a, AncestorFunctor a d) =>
Coroutine a m x -> Coroutine d m x
liftAncestor (Coroutine d m x -> Coroutine d m x)
-> (x -> Coroutine d m x) -> x -> Coroutine d m x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a x
s :: x -> Coroutine d m x)}
{-# INLINE liftSink #-}

-- | Converts a 'Source' on the ancestor functor /a/ into a source on the descendant functor /d/.
liftSource :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a x -> Source m d x
liftSource :: Source m a x -> Source m d x
liftSource Source m a x
s = Source :: forall (m :: * -> *) (a :: * -> *) x.
(forall (d :: * -> *) py y.
 AncestorFunctor a d =>
 Reader x py y -> Coroutine d m (ReadingResult x py y))
-> Source m a x
Source {readChunk :: forall (d :: * -> *) py y.
AncestorFunctor d d =>
Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk= Coroutine d m (ReadingResult x py y)
-> Coroutine d m (ReadingResult x py y)
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, Functor a, AncestorFunctor a d) =>
Coroutine a m x -> Coroutine d m x
liftAncestor (Coroutine d m (ReadingResult x py y)
 -> Coroutine d m (ReadingResult x py y))
-> (Reader x py y -> Coroutine d m (ReadingResult x py y))
-> Reader x py y
-> Coroutine d m (ReadingResult x py y)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
forall (m :: * -> *) (a :: * -> *) x.
Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk Source m a x
s :: Reader x py y -> Coroutine d m (ReadingResult x py y))}
{-# INLINE liftSource #-}

-- | A sink mark-up transformation: every chunk going into the sink is accompanied by the given value.
markUpWith :: forall m a x mark. (Monad m, Monoid x) => mark -> Sink m a [(x, mark)] -> Sink m a x
markUpWith :: mark -> Sink m a [(x, mark)] -> Sink m a x
markUpWith mark
mark Sink m a [(x, mark)]
sink = (forall (d :: * -> *). AncestorFunctor a d => x -> Coroutine d m x)
-> Sink m a x
forall (m :: * -> *) (a :: * -> *) x.
(forall (d :: * -> *). AncestorFunctor a d => x -> Coroutine d m x)
-> Sink m a x
Sink forall (d :: * -> *). AncestorFunctor a d => x -> Coroutine d m x
putMarkedChunk
   where putMarkedChunk :: forall d. AncestorFunctor a d => x -> Coroutine d m x
         putMarkedChunk :: x -> Coroutine d m x
putMarkedChunk x
x = do [(x, mark)]
rest <- Sink m a [(x, mark)] -> [(x, mark)] -> Coroutine d m [(x, mark)]
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a [(x, mark)]
sink [(x
x, mark
mark)]
                               case [(x, mark)]
rest of [] -> x -> Coroutine d m x
forall (m :: * -> *) a. Monad m => a -> m a
return x
forall a. Monoid a => a
mempty
                                            [(x
y, mark
_)]-> x -> Coroutine d m x
forall (m :: * -> *) a. Monad m => a -> m a
return x
y

-- | A sink mark-down transformation: the marks get removed off each chunk.
markDown :: forall m a x mark. (Monad m, MonoidNull x) => Sink m a x -> Sink m a [(x, mark)]
markDown :: Sink m a x -> Sink m a [(x, mark)]
markDown Sink m a x
sink = (forall (d :: * -> *).
 AncestorFunctor a d =>
 [(x, mark)] -> Coroutine d m [(x, mark)])
-> Sink m a [(x, mark)]
forall (m :: * -> *) (a :: * -> *) x.
(forall (d :: * -> *). AncestorFunctor a d => x -> Coroutine d m x)
-> Sink m a x
Sink forall (d :: * -> *).
AncestorFunctor a d =>
[(x, mark)] -> Coroutine d m [(x, mark)]
putUnmarkedChunk
   where putUnmarkedChunk :: forall d. AncestorFunctor a d => [(x, mark)] -> Coroutine d m [(x, mark)]
         putUnmarkedChunk :: [(x, mark)] -> Coroutine d m [(x, mark)]
putUnmarkedChunk [] = [(x, mark)] -> Coroutine d m [(x, mark)]
forall (m :: * -> *) a. Monad m => a -> m a
return [(x, mark)]
forall a. Monoid a => a
mempty
         putUnmarkedChunk ((x
x, mark
mark):[(x, mark)]
tail) = do x
rest <- Sink m a x -> x -> Coroutine d m x
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a x
sink x
x
                                                if x -> Bool
forall m. MonoidNull m => m -> Bool
null x
rest 
                                                   then [(x, mark)] -> Coroutine d m [(x, mark)]
forall (d :: * -> *).
AncestorFunctor a d =>
[(x, mark)] -> Coroutine d m [(x, mark)]
putUnmarkedChunk [(x, mark)]
tail
                                                   else [(x, mark)] -> Coroutine d m [(x, mark)]
forall (m :: * -> *) a. Monad m => a -> m a
return ((x
rest, mark
mark)(x, mark) -> [(x, mark)] -> [(x, mark)]
forall a. a -> [a] -> [a]
:[(x, mark)]
tail)

-- | The 'pipe' function splits the computation into two concurrent parts, /producer/ and /consumer/. The /producer/ is
-- given a 'Sink' to put values into, and /consumer/ a 'Source' to get those values from. Once producer and consumer
-- both complete, 'pipe' returns their paired results.
pipe :: forall m a a1 a2 x r1 r2. (Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
        (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
pipe :: (Sink m a1 x -> Coroutine a1 m r1)
-> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
pipe = PairBinder m
-> (Sink m a1 x -> Coroutine a1 m r1)
-> (Source m a2 x -> Coroutine a2 m r2)
-> Coroutine a m (r1, r2)
forall (m :: * -> *) (a :: * -> *) (a1 :: * -> *) (a2 :: * -> *) x
       r1 r2.
(Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x,
 a2 ~ SourceFunctor a x) =>
PairBinder m
-> (Sink m a1 x -> Coroutine a1 m r1)
-> (Source m a2 x -> Coroutine a2 m r2)
-> Coroutine a m (r1, r2)
pipeG PairBinder m
forall (m :: * -> *). Monad m => PairBinder m
sequentialBinder

-- | The 'pipeP' function is equivalent to 'pipe', except it runs the /producer/ and the /consumer/ in parallel.
pipeP :: forall m a a1 a2 x r1 r2. 
         (MonadParallel m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
         (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
pipeP :: (Sink m a1 x -> Coroutine a1 m r1)
-> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)
pipeP = PairBinder m
-> (Sink m a1 x -> Coroutine a1 m r1)
-> (Source m a2 x -> Coroutine a2 m r2)
-> Coroutine a m (r1, r2)
forall (m :: * -> *) (a :: * -> *) (a1 :: * -> *) (a2 :: * -> *) x
       r1 r2.
(Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x,
 a2 ~ SourceFunctor a x) =>
PairBinder m
-> (Sink m a1 x -> Coroutine a1 m r1)
-> (Source m a2 x -> Coroutine a2 m r2)
-> Coroutine a m (r1, r2)
pipeG PairBinder m
forall (m :: * -> *) a b c.
MonadParallel m =>
(a -> b -> m c) -> m a -> m b -> m c
bindM2

-- | A generic version of 'pipe'. The first argument is used to combine two computation steps.
pipeG :: forall m a a1 a2 x r1 r2. (Monad m, Monoid x, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
         PairBinder m -> (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2)
      -> Coroutine a m (r1, r2)
pipeG :: PairBinder m
-> (Sink m a1 x -> Coroutine a1 m r1)
-> (Source m a2 x -> Coroutine a2 m r2)
-> Coroutine a m (r1, r2)
pipeG PairBinder m
run2 Sink m a1 x -> Coroutine a1 m r1
producer Source m a2 x -> Coroutine a2 m r2
consumer =
   ((r2, r1) -> (r1, r2))
-> Coroutine a m (r2, r1) -> Coroutine a m (r1, r2)
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM ((r2 -> r1 -> (r1, r2)) -> (r2, r1) -> (r1, r2)
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry ((r1 -> r2 -> (r1, r2)) -> r2 -> r1 -> (r1, r2)
forall a b c. (a -> b -> c) -> b -> a -> c
flip (,))) (Coroutine a m (r2, r1) -> Coroutine a m (r1, r2))
-> Coroutine a m (r2, r1) -> Coroutine a m (r1, r2)
forall a b. (a -> b) -> a -> b
$
   PairBinder m
-> WeaveStepper
     (Sum a (ReadRequest x)) (Sum a (Request x x)) a m r2 r1 (r2, r1)
-> Weaver
     (Sum a (ReadRequest x)) (Sum a (Request x x)) a m r2 r1 (r2, r1)
forall (s1 :: * -> *) (s2 :: * -> *) (s3 :: * -> *) (m :: * -> *) x
       y z.
(Monad m, Functor s1, Functor s2, Functor s3) =>
PairBinder m
-> WeaveStepper s1 s2 s3 m x y z -> Weaver s1 s2 s3 m x y z
weave PairBinder m
run2 WeaveStepper
  (Sum a (ReadRequest x)) (Sum a (Request x x)) a m r2 r1 (r2, r1)
forall (m :: * -> *) (s :: * -> *) x r1 r2.
(Monad m, Functor s, Monoid x) =>
NestWeaveStepper s (ReadRequest x) (Request x x) m r1 r2 (r1, r2)
weaveNestedReadWriteRequests (Source m a2 x -> Coroutine a2 m r2
consumer Source m a2 x
source) (Sink m a1 x -> Coroutine a1 m r1
producer Sink m a1 x
sink)
   where sink :: Sink m a1 x
sink = Sink :: forall (m :: * -> *) (a :: * -> *) x.
(forall (d :: * -> *). AncestorFunctor a d => x -> Coroutine d m x)
-> Sink m a x
Sink {putChunk :: forall (d :: * -> *). AncestorFunctor a1 d => x -> Coroutine d m x
putChunk= \x
xs-> Coroutine a1 m x -> Coroutine d m x
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, Functor a, AncestorFunctor a d) =>
Coroutine a m x -> Coroutine d m x
liftAncestor ((forall y. Request x x y -> a1 y)
-> Coroutine (Request x x) m x -> Coroutine a1 m x
forall (s :: * -> *) (m :: * -> *) (s' :: * -> *) x.
(Functor s, Monad m) =>
(forall y. s y -> s' y) -> Coroutine s m x -> Coroutine s' m x
mapSuspension forall y. Request x x y -> a1 y
forall k (f :: k -> *) (g :: k -> *) (a :: k). g a -> Sum f g a
InR (x -> Coroutine (Request x x) m x
forall (m :: * -> *) x y.
Monad m =>
x -> Coroutine (Request x y) m y
request x
xs) :: Coroutine a1 m x)}
         source :: Source m a2 x
source = Source :: forall (m :: * -> *) (a :: * -> *) x.
(forall (d :: * -> *) py y.
 AncestorFunctor a d =>
 Reader x py y -> Coroutine d m (ReadingResult x py y))
-> Source m a x
Source {readChunk :: forall (d :: * -> *) py y.
AncestorFunctor a2 d =>
Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk= forall (d :: * -> *) py y.
AncestorFunctor a2 d =>
Reader x py y -> Coroutine d m (ReadingResult x py y)
fc}
         fc :: forall d py y. AncestorFunctor a2 d => Reader x py y -> Coroutine d m (ReadingResult x py y)
         fc :: Reader x py y -> Coroutine d m (ReadingResult x py y)
fc Reader x py y
t = Coroutine a2 m (ReadingResult x py y)
-> Coroutine d m (ReadingResult x py y)
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, Functor a, AncestorFunctor a d) =>
Coroutine a m x -> Coroutine d m x
liftAncestor ((forall y. ReadRequest x y -> a2 y)
-> Coroutine (ReadRequest x) m (ReadingResult x py y)
-> Coroutine a2 m (ReadingResult x py y)
forall (s :: * -> *) (m :: * -> *) (s' :: * -> *) x.
(Functor s, Monad m) =>
(forall y. s y -> s' y) -> Coroutine s m x -> Coroutine s' m x
mapSuspension forall y. ReadRequest x y -> a2 y
forall k (f :: k -> *) (g :: k -> *) (a :: k). g a -> Sum f g a
InR (Reader x py y -> Coroutine (ReadRequest x) m (ReadingResult x py y)
forall (m :: * -> *) x py y.
(Monad m, Monoid x) =>
Reader x py y -> Coroutine (ReadRequest x) m (ReadingResult x py y)
requestRead Reader x py y
t) :: Coroutine a2 m (ReadingResult x py y))

fromParser :: forall p x y. Monoid x => y -> Parser p x y -> Reader x (y -> y) y
fromParser :: y -> Parser p x y -> Reader x (y -> y) y
fromParser y
failure Parser p x y
p x
s = case Parser p x y -> ([(y, x)], Maybe (Maybe (y -> y), Parser p x y))
forall t s r.
Parser t s r -> ([(r, s)], Maybe (Maybe (r -> r), Parser t s r))
inspect (x -> Parser p x y -> Parser p x y
forall s t r. Monoid s => s -> Parser t s r -> Parser t s r
feed x
s Parser p x y
p)
                         of ([], Maybe (Maybe (y -> y), Parser p x y)
Nothing) -> x -> y -> Reading x (y -> y) y
forall x py y. x -> y -> Reading x py y
Final x
s y
failure
                            ([], Just (Maybe (y -> y)
Nothing, Parser p x y
p')) -> Reader x (y -> y) y -> y -> Reading x (y -> y) y
forall x py y. Reader x py y -> y -> Reading x py y
Deferred (y -> Parser p x y -> Reader x (y -> y) y
forall p x y. Monoid x => y -> Parser p x y -> Reader x (y -> y) y
fromParser y
failure Parser p x y
p') y
r'
                               where (y
r', x
s') = case Parser p x y -> [(y, x)]
forall t s r. Parser t s r -> [(r, s)]
completeResults (Parser p x y -> Parser p x y
forall s t r. Monoid s => Parser t s r -> Parser t s r
feedEof Parser p x y
p')
                                                of [] -> (y
failure, x
s)
                                                   (y, x)
hd:[(y, x)]
_ -> (y, x)
hd
                            ([], Just (Just y -> y
prefix, Parser p x y
p')) -> Reader x (y -> y) y -> y -> (y -> y) -> Reading x (y -> y) y
forall x py y. Reader x py y -> y -> py -> Reading x py y
Advance (y -> Parser p x y -> Reader x (y -> y) y
forall p x y. Monoid x => y -> Parser p x y -> Reader x (y -> y) y
fromParser y
failure Parser p x y
p') (y -> y
prefix y
r') y -> y
prefix
                               where (y
r', x
s'):[(y, x)]
_ = Parser p x y -> [(y, x)]
forall t s r. Parser t s r -> [(r, s)]
completeResults (Parser p x y -> Parser p x y
forall s t r. Monoid s => Parser t s r -> Parser t s r
feedEof Parser p x y
p')
                            ([(y
r, x
s')], Maybe (Maybe (y -> y), Parser p x y)
Nothing) -> x -> y -> Reading x (y -> y) y
forall x py y. x -> y -> Reading x py y
Final x
s' y
r

-- | Function 'get' tries to get a single value from the given 'Source' argument. The intervening 'Coroutine'
-- computations suspend all the way to the 'pipe' function invocation that created the source. The function returns
-- 'Nothing' if the argument source is empty.
get :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a [x] -> Coroutine d m (Maybe x)
get :: Source m a [x] -> Coroutine d m (Maybe x)
get Source m a [x]
source = Source m a [x]
-> Reader [x] Any (Maybe x)
-> Coroutine d m (ReadingResult [x] Any (Maybe x))
forall (m :: * -> *) (a :: * -> *) x.
Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk Source m a [x]
source Reader [x] Any (Maybe x)
forall a py. Reader [a] py (Maybe a)
readOne
              Coroutine d m (ReadingResult [x] Any (Maybe x))
-> (ReadingResult [x] Any (Maybe x) -> Coroutine d m (Maybe x))
-> Coroutine d m (Maybe x)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \(FinalResult Maybe x
x) -> Maybe x -> Coroutine d m (Maybe x)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe x
x
   where readOne :: Reader [a] py (Maybe a)
readOne [] = Reader [a] py (Maybe a) -> Maybe a -> Reading [a] py (Maybe a)
forall x py y. Reader x py y -> y -> Reading x py y
Deferred Reader [a] py (Maybe a)
readOne Maybe a
forall a. Maybe a
Nothing
         readOne (a
x:[a]
rest) = [a] -> Maybe a -> Reading [a] py (Maybe a)
forall x py y. x -> y -> Reading x py y
Final [a]
rest (a -> Maybe a
forall a. a -> Maybe a
Just a
x)

-- | Tries to get a minimal, /i.e./, prime, prefix from the given 'Source' argument. The intervening 'Coroutine'
-- computations suspend all the way to the 'pipe' function invocation that created the source. The function returns
-- 'mempty' if the argument source is empty.
getPrime :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) => Source m a x -> Coroutine d m x
getPrime :: Source m a x -> Coroutine d m x
getPrime Source m a x
source = Source m a x
-> Reader x Any x -> Coroutine d m (ReadingResult x Any x)
forall (m :: * -> *) (a :: * -> *) x.
Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk Source m a x
source Reader x Any x
forall x py. FactorialMonoid x => Reader x py x
primeReader
                  Coroutine d m (ReadingResult x Any x)
-> (ReadingResult x Any x -> Coroutine d m x) -> Coroutine d m x
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \(FinalResult x
x) -> x -> Coroutine d m x
forall (m :: * -> *) a. Monad m => a -> m a
return x
x
   where primeReader :: Reader x py x
primeReader x
x = Reading x py x
-> ((x, x) -> Reading x py x) -> Maybe (x, x) -> Reading x py x
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Reader x py x -> Reader x py x
forall x py y. Reader x py y -> y -> Reading x py y
Deferred Reader x py x
primeReader x
x) 
                               (\(x
prefix, x
rest)-> x -> Reader x py x
forall x py y. x -> y -> Reading x py y
Final x
rest x
prefix) 
                               (x -> Maybe (x, x)
forall m. FactorialMonoid m => m -> Maybe (m, m)
Factorial.splitPrimePrefix x
x)

-- | Invokes its first argument with the value it gets from the source, if there is any to get.
getWith :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) =>
           Source m a x -> (x -> Coroutine d m ()) -> Coroutine d m ()
getWith :: Source m a x -> (x -> Coroutine d m ()) -> Coroutine d m ()
getWith Source m a x
source x -> Coroutine d m ()
consumer = Source m a x
-> Reader x Any (Coroutine d m ())
-> Coroutine d m (ReadingResult x Any (Coroutine d m ()))
forall (m :: * -> *) (a :: * -> *) x.
Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk Source m a x
source Reader x Any (Coroutine d m ())
primeReader
                          Coroutine d m (ReadingResult x Any (Coroutine d m ()))
-> (ReadingResult x Any (Coroutine d m ()) -> Coroutine d m ())
-> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \(FinalResult Coroutine d m ()
x) -> Coroutine d m ()
x
   where primeReader :: Reader x Any (Coroutine d m ())
primeReader x
x = Reading x Any (Coroutine d m ())
-> ((x, x) -> Reading x Any (Coroutine d m ()))
-> Maybe (x, x)
-> Reading x Any (Coroutine d m ())
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Reader x Any (Coroutine d m ())
-> Coroutine d m () -> Reading x Any (Coroutine d m ())
forall x py y. Reader x py y -> y -> Reading x py y
Deferred Reader x Any (Coroutine d m ())
primeReader (() -> Coroutine d m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()))
                               (\(x
prefix, x
rest)-> x -> Coroutine d m () -> Reading x Any (Coroutine d m ())
forall x py y. x -> y -> Reading x py y
Final x
rest (x -> Coroutine d m ()
consumer x
prefix) )
                               (x -> Maybe (x, x)
forall m. FactorialMonoid m => m -> Maybe (m, m)
Factorial.splitPrimePrefix x
x)

-- | Function 'peek' acts the same way as 'get', but doesn't actually consume the value from the source; sequential
-- calls to 'peek' will always return the same value.
peek :: forall m a d x. (Monad m, AncestorFunctor a d) => Source m a [x] -> Coroutine d m (Maybe x)
peek :: Source m a [x] -> Coroutine d m (Maybe x)
peek Source m a [x]
source = Source m a [x]
-> Reader [x] Any (Maybe x)
-> Coroutine d m (ReadingResult [x] Any (Maybe x))
forall (m :: * -> *) (a :: * -> *) x.
Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk Source m a [x]
source Reader [x] Any (Maybe x)
forall a py. Reader [a] py (Maybe a)
readOneAhead
              Coroutine d m (ReadingResult [x] Any (Maybe x))
-> (ReadingResult [x] Any (Maybe x) -> Coroutine d m (Maybe x))
-> Coroutine d m (Maybe x)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \(FinalResult Maybe x
x) -> Maybe x -> Coroutine d m (Maybe x)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe x
x
   where readOneAhead :: Reader [a] py (Maybe a)
readOneAhead [] = Reader [a] py (Maybe a) -> Maybe a -> Reading [a] py (Maybe a)
forall x py y. Reader x py y -> y -> Reading x py y
Deferred Reader [a] py (Maybe a)
readOneAhead Maybe a
forall a. Maybe a
Nothing
         readOneAhead s :: [a]
s@(a
x:[a]
_) = [a] -> Maybe a -> Reading [a] py (Maybe a)
forall x py y. x -> y -> Reading x py y
Final [a]
s (a -> Maybe a
forall a. a -> Maybe a
Just a
x)

-- | 'getAll' consumes and returns all data generated by the source.
getAll :: forall m a d x. (Monad m, Monoid x, AncestorFunctor a d) => Source m a x -> Coroutine d m x
getAll :: Source m a x -> Coroutine d m x
getAll Source m a x
source = Source m a x
-> Reader x () x -> Coroutine d m (ReadingResult x () x)
forall (m :: * -> *) (a :: * -> *) x.
Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk Source m a x
source ((x -> x) -> Reader x () x
readAll x -> x
forall a. a -> a
id)
                Coroutine d m (ReadingResult x () x)
-> (ReadingResult x () x -> Coroutine d m x) -> Coroutine d m x
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \(FinalResult x
all)-> x -> Coroutine d m x
forall (m :: * -> *) a. Monad m => a -> m a
return x
all
   where readAll :: (x -> x) -> Reader x () x
         readAll :: (x -> x) -> Reader x () x
readAll x -> x
prefix x
s = Reader x () x -> Reader x () x
forall x py y. Reader x py y -> y -> Reading x py y
Deferred ((x -> x) -> Reader x () x
readAll (x -> x
prefix (x -> x) -> (x -> x) -> x -> x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> x -> x
forall a. Monoid a => a -> a -> a
mappend x
s)) (x -> x
prefix x
s)

-- | Consumes inputs from the /source/ as long as the /parser/ accepts it.
getParsed :: forall m a d p x y. (Monad m, Monoid x, Monoid y, AncestorFunctor a d) => 
             Parser p x y -> Source m a x -> Coroutine d m y
getParsed :: Parser p x y -> Source m a x -> Coroutine d m y
getParsed Parser p x y
parser = Reader x (y -> y) y -> Source m a x -> Coroutine d m y
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x y.
(Monad m, Monoid x, AncestorFunctor a d) =>
Reader x (y -> y) y -> Source m a x -> Coroutine d m y
getRead (y -> Parser p x y -> Reader x (y -> y) y
forall p x y. Monoid x => y -> Parser p x y -> Reader x (y -> y) y
fromParser y
forall a. Monoid a => a
mempty Parser p x y
parser)

-- | Consumes input from the /source/ as long as the /reader/ accepts it.
getRead :: forall m a d x y. (Monad m, Monoid x, AncestorFunctor a d) => 
           Reader x (y -> y) y -> Source m a x -> Coroutine d m y
getRead :: Reader x (y -> y) y -> Source m a x -> Coroutine d m y
getRead Reader x (y -> y) y
reader Source m a x
source = (y -> Coroutine d m y) -> Reader x (y -> y) y -> Coroutine d m y
loop y -> Coroutine d m y
forall (m :: * -> *) a. Monad m => a -> m a
return Reader x (y -> y) y
reader
   where loop :: (y -> Coroutine d m y) -> Reader x (y -> y) y -> Coroutine d m y
loop y -> Coroutine d m y
cont Reader x (y -> y) y
r = Source m a x
-> Reader x (y -> y) y
-> Coroutine d m (ReadingResult x (y -> y) y)
forall (m :: * -> *) (a :: * -> *) x.
Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk Source m a x
source Reader x (y -> y) y
r Coroutine d m (ReadingResult x (y -> y) y)
-> (ReadingResult x (y -> y) y -> Coroutine d m y)
-> Coroutine d m y
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (y -> Coroutine d m y)
-> ReadingResult x (y -> y) y -> Coroutine d m y
proceed y -> Coroutine d m y
cont
         proceed :: (y -> Coroutine d m y)
-> ReadingResult x (y -> y) y -> Coroutine d m y
proceed y -> Coroutine d m y
cont (FinalResult y
chunk) = y -> Coroutine d m y
cont y
chunk
         proceed y -> Coroutine d m y
cont (ResultPart y -> y
d Reader x (y -> y) y
p') = (y -> Coroutine d m y) -> Reader x (y -> y) y -> Coroutine d m y
loop (y -> Coroutine d m y
cont (y -> Coroutine d m y) -> (y -> y) -> y -> Coroutine d m y
forall b c a. (b -> c) -> (a -> b) -> a -> c
. y -> y
d) Reader x (y -> y) y
p'

-- | Consumes values from the /source/ as long as each satisfies the predicate, then returns their list.
getWhile :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) =>
            (x -> Bool) -> Source m a x -> Coroutine d m x
getWhile :: (x -> Bool) -> Source m a x -> Coroutine d m x
getWhile x -> Bool
predicate Source m a x
source = Source m a x
-> Reader x () x -> Coroutine d m (ReadingResult x () x)
forall (m :: * -> *) (a :: * -> *) x.
Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk Source m a x
source ((x -> Bool) -> (x -> x) -> Reader x () x
readWhile x -> Bool
predicate x -> x
forall a. a -> a
id)
                            Coroutine d m (ReadingResult x () x)
-> (ReadingResult x () x -> Coroutine d m x) -> Coroutine d m x
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \(FinalResult x
x)-> x -> Coroutine d m x
forall (m :: * -> *) a. Monad m => a -> m a
return x
x
   where readWhile :: (x -> Bool) -> (x -> x) -> Reader x () x
         readWhile :: (x -> Bool) -> (x -> x) -> Reader x () x
readWhile x -> Bool
predicate x -> x
prefix1 x
s = if x -> Bool
forall m. MonoidNull m => m -> Bool
null x
suffix
                                         then Reader x () x -> Reader x () x
forall x py y. Reader x py y -> y -> Reading x py y
Deferred ((x -> Bool) -> (x -> x) -> Reader x () x
readWhile x -> Bool
predicate (x -> x
prefix1 (x -> x) -> (x -> x) -> x -> x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> x -> x
forall a. Monoid a => a -> a -> a
mappend x
s)) (x -> x
prefix1 x
s)
                                         else x -> Reader x () x
forall x py y. x -> y -> Reading x py y
Final x
suffix (x -> x
prefix1 x
prefix2)
            where (x
prefix2, x
suffix) = (x -> Bool) -> x -> (x, x)
forall m. FactorialMonoid m => (m -> Bool) -> m -> (m, m)
Factorial.span x -> Bool
predicate x
s

-- | Consumes values from the /source/ until one of them satisfies the predicate or the source is emptied, then returns
-- the pair of the list of preceding values and maybe the one value that satisfied the predicate. The latter is not
-- consumed.
getUntil :: forall m a d x. (Monad m, FactorialMonoid x, AncestorFunctor a d) =>
            (x -> Bool) -> Source m a x -> Coroutine d m (x, Maybe x)
getUntil :: (x -> Bool) -> Source m a x -> Coroutine d m (x, Maybe x)
getUntil x -> Bool
predicate Source m a x
source = Source m a x
-> Reader x () (x, Maybe x)
-> Coroutine d m (ReadingResult x () (x, Maybe x))
forall (m :: * -> *) (a :: * -> *) x.
Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk Source m a x
source ((x -> Bool) -> (x -> x) -> Reader x () (x, Maybe x)
readUntil (Bool -> Bool
not (Bool -> Bool) -> (x -> Bool) -> x -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> Bool
predicate) x -> x
forall a. a -> a
id)
                            Coroutine d m (ReadingResult x () (x, Maybe x))
-> (ReadingResult x () (x, Maybe x) -> Coroutine d m (x, Maybe x))
-> Coroutine d m (x, Maybe x)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \(FinalResult (x, Maybe x)
r)-> (x, Maybe x) -> Coroutine d m (x, Maybe x)
forall (m :: * -> *) a. Monad m => a -> m a
return (x, Maybe x)
r
   where readUntil :: (x -> Bool) -> (x -> x) -> Reader x () (x, Maybe x)
         readUntil :: (x -> Bool) -> (x -> x) -> Reader x () (x, Maybe x)
readUntil x -> Bool
predicate x -> x
prefix1 x
s = if x -> Bool
forall m. MonoidNull m => m -> Bool
null x
suffix
                                         then Reader x () (x, Maybe x)
-> (x, Maybe x) -> Reading x () (x, Maybe x)
forall x py y. Reader x py y -> y -> Reading x py y
Deferred ((x -> Bool) -> (x -> x) -> Reader x () (x, Maybe x)
readUntil x -> Bool
predicate (x -> x
prefix1 (x -> x) -> (x -> x) -> x -> x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> x -> x
forall a. Monoid a => a -> a -> a
mappend x
s)) (x -> x
prefix1 x
s, Maybe x
forall a. Maybe a
Nothing)
                                         else x -> (x, Maybe x) -> Reading x () (x, Maybe x)
forall x py y. x -> y -> Reading x py y
Final x
suffix (x -> x
prefix1 x
prefix2, x -> Maybe x
forall a. a -> Maybe a
Just (x -> Maybe x) -> x -> Maybe x
forall a b. (a -> b) -> a -> b
$ x -> x
forall m. Factorial m => m -> m
Factorial.primePrefix x
suffix)
            where (x
prefix2, x
suffix) = (x -> Bool) -> x -> (x, x)
forall m. FactorialMonoid m => (m -> Bool) -> m -> (m, m)
Factorial.span x -> Bool
predicate x
s

-- | Copies all data from the /source/ argument into the /sink/ argument. The result indicates if there was any chunk to
-- copy.
pour :: forall m a1 a2 d x . (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d)
        => Source m a1 x -> Sink m a2 x -> Coroutine d m Bool
pour :: Source m a1 x -> Sink m a2 x -> Coroutine d m Bool
pour Source m a1 x
source Sink m a2 x
sink = Bool -> Coroutine d m Bool
loop Bool
False
   where loop :: Bool -> Coroutine d m Bool
loop Bool
another = Source m a1 x
-> Reader x x x -> Coroutine d m (ReadingResult x x x)
forall (m :: * -> *) (a :: * -> *) x.
Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk Source m a1 x
source Reader x x x
forall x. Reader x x x
readAll Coroutine d m (ReadingResult x x x)
-> (ReadingResult x x x -> Coroutine d m Bool)
-> Coroutine d m Bool
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> ReadingResult x x x -> Coroutine d m Bool
extract Bool
another
         extract :: Bool -> ReadingResult x x x -> Coroutine d m Bool
extract Bool
another (FinalResult x
_chunk) = Bool -> Coroutine d m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
another -- the last chunk must be empty
         extract Bool
_ (ResultPart x
chunk Reader x x x
_) = Sink m a2 x -> x -> Coroutine d m x
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a2 x
sink x
chunk Coroutine d m x -> Coroutine d m Bool -> Coroutine d m Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> Coroutine d m Bool
loop Bool
True

-- | Copies all data from the /source/ argument into the /sink/ argument, like 'pour' but ignoring the result.
pour_ :: forall m a1 a2 d x . (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d)
        => Source m a1 x -> Sink m a2 x -> Coroutine d m ()
pour_ :: Source m a1 x -> Sink m a2 x -> Coroutine d m ()
pour_ Source m a1 x
source Sink m a2 x
sink = Source m a1 x -> Sink m a2 x -> Coroutine d m Bool
forall (m :: * -> *) (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *) x.
(Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d) =>
Source m a1 x -> Sink m a2 x -> Coroutine d m Bool
pour Source m a1 x
source Sink m a2 x
sink Coroutine d m Bool -> Coroutine d m () -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> Coroutine d m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Like 'pour', copies data from the /source/ to the /sink/, but only as long as it satisfies the predicate.
pourRead :: forall m a1 a2 d x y. (Monad m, MonoidNull x, MonoidNull y, AncestorFunctor a1 d, AncestorFunctor a2 d)
              => Reader x y y -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
pourRead :: Reader x y y -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
pourRead Reader x y y
reader Source m a1 x
source Sink m a2 y
sink = Reader x y y -> Coroutine d m ()
loop Reader x y y
reader
   where loop :: Reader x y y -> Coroutine d m ()
loop Reader x y y
p = Source m a1 x
-> Reader x y y -> Coroutine d m (ReadingResult x y y)
forall (m :: * -> *) (a :: * -> *) x.
Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk Source m a1 x
source Reader x y y
p Coroutine d m (ReadingResult x y y)
-> (ReadingResult x y y -> Coroutine d m ()) -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ReadingResult x y y -> Coroutine d m ()
extract
         extract :: ReadingResult x y y -> Coroutine d m ()
extract (FinalResult y
r) = Bool -> Coroutine d m () -> Coroutine d m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (y -> Bool
forall m. MonoidNull m => m -> Bool
null y
r) (Sink m a2 y -> y -> Coroutine d m y
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a2 y
sink y
r Coroutine d m y -> Coroutine d m () -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> Coroutine d m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
         extract (ResultPart y
chunk Reader x y y
p') = Sink m a2 y -> y -> Coroutine d m y
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a2 y
sink y
chunk Coroutine d m y -> Coroutine d m () -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Reader x y y -> Coroutine d m ()
loop Reader x y y
p'

-- | Parses the input data using the given parser and copies the results to output.
pourParsed :: forall m p a1 a2 d x y. (Monad m, MonoidNull x, MonoidNull y, AncestorFunctor a1 d, AncestorFunctor a2 d)
              => Parser p x y -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
pourParsed :: Parser p x y -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
pourParsed Parser p x y
parser Source m a1 x
source Sink m a2 y
sink = Reader x (y -> y) y -> Coroutine d m ()
loop (y -> Parser p x y -> Reader x (y -> y) y
forall p x y. Monoid x => y -> Parser p x y -> Reader x (y -> y) y
fromParser y
forall a. Monoid a => a
mempty Parser p x y
parser)
   where loop :: Reader x (y -> y) y -> Coroutine d m ()
loop Reader x (y -> y) y
p = Source m a1 x
-> Reader x (y -> y) y
-> Coroutine d m (ReadingResult x (y -> y) y)
forall (m :: * -> *) (a :: * -> *) x.
Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk Source m a1 x
source Reader x (y -> y) y
p Coroutine d m (ReadingResult x (y -> y) y)
-> (ReadingResult x (y -> y) y -> Coroutine d m ())
-> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ReadingResult x (y -> y) y -> Coroutine d m ()
extract
         extract :: ReadingResult x (y -> y) y -> Coroutine d m ()
extract (FinalResult y
r) = Bool -> Coroutine d m () -> Coroutine d m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (y -> Bool
forall m. MonoidNull m => m -> Bool
null y
r) (Sink m a2 y -> y -> Coroutine d m y
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a2 y
sink y
r Coroutine d m y -> Coroutine d m () -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> Coroutine d m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
         extract (ResultPart y -> y
d Reader x (y -> y) y
p') = Sink m a2 y -> y -> Coroutine d m y
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a2 y
sink (y -> y
d y
forall a. Monoid a => a
mempty) Coroutine d m y -> Coroutine d m () -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Reader x (y -> y) y -> Coroutine d m ()
loop Reader x (y -> y) y
p'

-- | Like 'pour', copies data from the /source/ to the /sink/, but only as long as it satisfies the predicate.
pourWhile :: forall m a1 a2 d x . (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d)
             => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
pourWhile :: (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
pourWhile = Reader x x x -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
forall (m :: * -> *) (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *) x
       y.
(Monad m, MonoidNull x, MonoidNull y, AncestorFunctor a1 d,
 AncestorFunctor a2 d) =>
Reader x y y -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
pourRead (Reader x x x -> Source m a1 x -> Sink m a2 x -> Coroutine d m ())
-> ((x -> Bool) -> Reader x x x)
-> (x -> Bool)
-> Source m a1 x
-> Sink m a2 x
-> Coroutine d m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FactorialMonoid x => (x -> Bool) -> Reader x x x
(x -> Bool) -> Reader x x x
readWhile
   where readWhile :: FactorialMonoid x => (x -> Bool) -> Reader x x x
         readWhile :: (x -> Bool) -> Reader x x x
readWhile x -> Bool
p = Reader x x x
while
            where while :: Reader x x x
while x
s = if x -> Bool
forall m. MonoidNull m => m -> Bool
null x
suffix
                            then Reader x x x -> x -> Reader x x x
forall x py y. Reader x py y -> y -> py -> Reading x py y
Advance Reader x x x
while x
prefix x
prefix
                            else x -> Reader x x x
forall x py y. x -> y -> Reading x py y
Final x
suffix x
prefix
                     where (x
prefix, x
suffix) = (x -> Bool) -> x -> (x, x)
forall m. FactorialMonoid m => (m -> Bool) -> m -> (m, m)
Factorial.span x -> Bool
p x
s

-- | Like 'pour', copies data from the /source/ to the /sink/, but only until one value satisfies the predicate. That
-- value is returned rather than copied.
pourUntil :: forall m a1 a2 d x . (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d)
             => (x -> Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m (Maybe x)
pourUntil :: (x -> Bool)
-> Source m a1 x -> Sink m a2 x -> Coroutine d m (Maybe x)
pourUntil x -> Bool
predicate Source m a1 x
source Sink m a2 x
sink = Reader x x (x, Maybe x) -> Coroutine d m (Maybe x)
loop (Reader x x (x, Maybe x) -> Coroutine d m (Maybe x))
-> Reader x x (x, Maybe x) -> Coroutine d m (Maybe x)
forall a b. (a -> b) -> a -> b
$ FactorialMonoid x => (x -> Bool) -> Reader x x (x, Maybe x)
(x -> Bool) -> Reader x x (x, Maybe x)
readUntil (Bool -> Bool
not (Bool -> Bool) -> (x -> Bool) -> x -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> Bool
predicate)
   where readUntil :: FactorialMonoid x => (x -> Bool) -> Reader x x (x, Maybe x)
         readUntil :: (x -> Bool) -> Reader x x (x, Maybe x)
readUntil x -> Bool
p = Reader x x (x, Maybe x)
until
            where until :: Reader x x (x, Maybe x)
until x
s = if x -> Bool
forall m. MonoidNull m => m -> Bool
null x
suffix
                            then Reader x x (x, Maybe x) -> (x, Maybe x) -> Reader x x (x, Maybe x)
forall x py y. Reader x py y -> y -> py -> Reading x py y
Advance Reader x x (x, Maybe x)
until (x
prefix, Maybe x
forall a. Maybe a
Nothing) x
prefix
                            else x -> (x, Maybe x) -> Reading x x (x, Maybe x)
forall x py y. x -> y -> Reading x py y
Final x
suffix (x
prefix, x -> Maybe x
forall a. a -> Maybe a
Just (x -> Maybe x) -> x -> Maybe x
forall a b. (a -> b) -> a -> b
$ x -> x
forall m. Factorial m => m -> m
Factorial.primePrefix x
suffix)
                     where (x
prefix, x
suffix) = (x -> Bool) -> x -> (x, x)
forall m. FactorialMonoid m => (m -> Bool) -> m -> (m, m)
Factorial.span x -> Bool
p x
s
         loop :: Reader x x (x, Maybe x) -> Coroutine d m (Maybe x)
loop Reader x x (x, Maybe x)
rd = Source m a1 x
-> Reader x x (x, Maybe x)
-> Coroutine d m (ReadingResult x x (x, Maybe x))
forall (m :: * -> *) (a :: * -> *) x.
Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk Source m a1 x
source Reader x x (x, Maybe x)
rd Coroutine d m (ReadingResult x x (x, Maybe x))
-> (ReadingResult x x (x, Maybe x) -> Coroutine d m (Maybe x))
-> Coroutine d m (Maybe x)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ReadingResult x x (x, Maybe x) -> Coroutine d m (Maybe x)
extract
         extract :: ReadingResult x x (x, Maybe x) -> Coroutine d m (Maybe x)
extract (FinalResult (x
chunk, Maybe x
mx)) = Sink m a2 x -> x -> Coroutine d m x
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a2 x
sink x
chunk Coroutine d m x
-> Coroutine d m (Maybe x) -> Coroutine d m (Maybe x)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe x -> Coroutine d m (Maybe x)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe x
mx
         extract (ResultPart x
chunk Reader x x (x, Maybe x)
rd') = Sink m a2 x -> x -> Coroutine d m x
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a2 x
sink x
chunk Coroutine d m x
-> Coroutine d m (Maybe x) -> Coroutine d m (Maybe x)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Reader x x (x, Maybe x) -> Coroutine d m (Maybe x)
loop Reader x x (x, Maybe x)
rd'

-- | 'mapStream' is like 'pour' that applies the function /f/ to each argument before passing it into the /sink/.
mapStream :: forall m a1 a2 d x y . (Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d)
           => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapStream :: (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapStream x -> y
f Source m a1 x
source Sink m a2 y
sink = Coroutine d m ()
loop
   where loop :: Coroutine d m ()
loop = Source m a1 x
-> Reader x x x -> Coroutine d m (ReadingResult x x x)
forall (m :: * -> *) (a :: * -> *) x.
Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk Source m a1 x
source Reader x x x
forall x. Reader x x x
readAll
                Coroutine d m (ReadingResult x x x)
-> (ReadingResult x x x -> Coroutine d m ()) -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ReadingResult x x x
r-> case ReadingResult x x x
r
                         of ResultPart x
chunk Reader x x x
_ -> Sink m a2 y -> y -> Coroutine d m y
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a2 y
sink ((x -> y) -> x -> y
forall m n. (Factorial m, Monoid n) => (m -> n) -> m -> n
Factorial.foldMap x -> y
f x
chunk) Coroutine d m y -> Coroutine d m () -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Coroutine d m ()
loop
                            FinalResult x
_ -> () -> Coroutine d m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()  -- the last chunk must be empty

-- | An equivalent of 'Data.List.map' that works on a 'Sink' instead of a list. The argument function is applied to
-- every value vefore it's written to the sink argument.
mapSink :: forall m a x y. Monad m => (x -> y) -> Sink m a [y] -> Sink m a [x]
mapSink :: (x -> y) -> Sink m a [y] -> Sink m a [x]
mapSink x -> y
f Sink m a [y]
sink = Sink :: forall (m :: * -> *) (a :: * -> *) x.
(forall (d :: * -> *). AncestorFunctor a d => x -> Coroutine d m x)
-> Sink m a x
Sink{putChunk :: forall (d :: * -> *).
AncestorFunctor a d =>
[x] -> Coroutine d m [x]
putChunk= \[x]
xs-> Sink m a [y] -> [y] -> Coroutine d m [y]
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a [y]
sink ((x -> y) -> [x] -> [y]
forall a b. (a -> b) -> [a] -> [b]
List.map x -> y
f [x]
xs)
                                      Coroutine d m [y]
-> ([y] -> Coroutine d m [x]) -> Coroutine d m [x]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \[y]
rest-> [x] -> Coroutine d m [x]
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> [x] -> [x]
forall z. Int -> [z] -> [z]
dropExcept ([y] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [y]
rest) [x]
xs)}
   where dropExcept :: forall z. Int -> [z] -> [z]
         dropExcept :: Int -> [z] -> [z]
dropExcept Int
0 [z]
_ = []
         dropExcept Int
n [z]
list = (Int, [z]) -> [z]
forall a b. (a, b) -> b
snd ([z] -> (Int, [z])
drop' [z]
list)
            where drop' :: [z] -> (Int, [z])
                  drop' :: [z] -> (Int, [z])
drop' [] = (Int
0, [])
                  drop' (z
x:[z]
xs) = let r :: (Int, [z])
r@(Int
len, [z]
tl) = [z] -> (Int, [z])
drop' [z]
xs in if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
n then (Int -> Int
forall a. Enum a => a -> a
succ Int
len, z
xz -> [z] -> [z]
forall a. a -> [a] -> [a]
:[z]
tl) else (Int, [z])
r
         

-- | 'mapMaybeStream' is to 'mapStream' like 'Data.Maybe.mapMaybe' is to 'Data.List.map'.
mapMaybeStream :: forall m a1 a2 d x y . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
                => (x -> Maybe y) -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m ()
mapMaybeStream :: (x -> Maybe y)
-> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m ()
mapMaybeStream x -> Maybe y
f Source m a1 [x]
source Sink m a2 [y]
sink = ([x] -> Coroutine d m ()) -> Source m a1 [x] -> Coroutine d m ()
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x r.
(Monad m, Monoid x, AncestorFunctor a d) =>
(x -> Coroutine d m r) -> Source m a x -> Coroutine d m ()
mapMStreamChunks_ ((Coroutine d m [y] -> Coroutine d m () -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> Coroutine d m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) (Coroutine d m [y] -> Coroutine d m ())
-> ([x] -> Coroutine d m [y]) -> [x] -> Coroutine d m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sink m a2 [y]
-> forall (d :: * -> *).
   AncestorFunctor a2 d =>
   [y] -> Coroutine d m [y]
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a2 [y]
sink ([y] -> Coroutine d m [y])
-> ([x] -> [y]) -> [x] -> Coroutine d m [y]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (x -> Maybe y) -> [x] -> [y]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe x -> Maybe y
f) Source m a1 [x]
source

-- | 'concatMapStream' is to 'mapStream' like 'Data.List.concatMap' is to 'Data.List.map'.
concatMapStream :: forall m a1 a2 d x y . (Monad m, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d)
                   => (x -> y) -> Source m a1 [x] -> Sink m a2 y -> Coroutine d m ()
concatMapStream :: (x -> y) -> Source m a1 [x] -> Sink m a2 y -> Coroutine d m ()
concatMapStream x -> y
f = ([x] -> y) -> Source m a1 [x] -> Sink m a2 y -> Coroutine d m ()
forall (m :: * -> *) (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *) x
       y.
(Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d,
 AncestorFunctor a2 d) =>
(x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapStream ([y] -> y
forall a. Monoid a => [a] -> a
mconcat ([y] -> y) -> ([x] -> [y]) -> [x] -> y
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (x -> y) -> [x] -> [y]
forall a b. (a -> b) -> [a] -> [b]
List.map x -> y
f)

-- | 'mapAccumStream' is similar to 'mapAccumL' except it reads the values from a 'Source' instead of a list
-- and writes the mapped values into a 'Sink' instead of returning another list.
mapAccumStream :: forall m a1 a2 d x y acc . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
                  => (acc -> x -> (acc, y)) -> acc -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m acc
mapAccumStream :: (acc -> x -> (acc, y))
-> acc -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m acc
mapAccumStream acc -> x -> (acc, y)
f acc
acc Source m a1 [x]
source Sink m a2 [y]
sink = (acc -> [x] -> Coroutine d m acc)
-> acc -> Source m a1 [x] -> Coroutine d m acc
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x acc.
(Monad m, Monoid x, AncestorFunctor a d) =>
(acc -> x -> Coroutine d m acc)
-> acc -> Source m a x -> Coroutine d m acc
foldMStreamChunks (\acc
a [x]
xs-> (acc, [y]) -> Coroutine d m acc
dispatch ((acc, [y]) -> Coroutine d m acc)
-> (acc, [y]) -> Coroutine d m acc
forall a b. (a -> b) -> a -> b
$ (acc -> x -> (acc, y)) -> acc -> [x] -> (acc, [y])
forall (t :: * -> *) a b c.
Traversable t =>
(a -> b -> (a, c)) -> a -> t b -> (a, t c)
mapAccumL acc -> x -> (acc, y)
f acc
a [x]
xs) acc
acc Source m a1 [x]
source
   where dispatch :: (acc, [y]) -> Coroutine d m acc
dispatch (acc
a, [y]
ys) = Sink m a2 [y] -> [y] -> Coroutine d m [y]
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a2 [y]
sink [y]
ys Coroutine d m [y] -> Coroutine d m acc -> Coroutine d m acc
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> acc -> Coroutine d m acc
forall (m :: * -> *) a. Monad m => a -> m a
return acc
a

-- | 'concatMapAccumStream' is a love child of 'concatMapStream' and 'mapAccumStream': it threads the accumulator like
-- the latter, but its argument function returns not a single value, but a list of values to write into the sink.
concatMapAccumStream :: forall m a1 a2 d x y acc . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d)
                  => (acc -> x -> (acc, [y])) -> acc -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m acc
concatMapAccumStream :: (acc -> x -> (acc, [y]))
-> acc -> Source m a1 [x] -> Sink m a2 [y] -> Coroutine d m acc
concatMapAccumStream acc -> x -> (acc, [y])
f acc
acc Source m a1 [x]
source Sink m a2 [y]
sink = (acc -> [x] -> Coroutine d m acc)
-> acc -> Source m a1 [x] -> Coroutine d m acc
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x acc.
(Monad m, Monoid x, AncestorFunctor a d) =>
(acc -> x -> Coroutine d m acc)
-> acc -> Source m a x -> Coroutine d m acc
foldMStreamChunks (\acc
a [x]
xs-> (acc, [y]) -> Coroutine d m acc
dispatch ((acc, [y]) -> Coroutine d m acc)
-> (acc, [y]) -> Coroutine d m acc
forall a b. (a -> b) -> a -> b
$ acc -> [x] -> (acc, [y])
concatMapAccumL acc
a [x]
xs) acc
acc Source m a1 [x]
source
   where dispatch :: (acc, [y]) -> Coroutine d m acc
dispatch (acc
a, [y]
ys) = Sink m a2 [y] -> [y] -> Coroutine d m [y]
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a2 [y]
sink [y]
ys Coroutine d m [y] -> Coroutine d m acc -> Coroutine d m acc
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> acc -> Coroutine d m acc
forall (m :: * -> *) a. Monad m => a -> m a
return acc
a
         concatMapAccumL :: acc -> [x] -> (acc, [y])
concatMapAccumL acc
s []        =  (acc
s, [])
         concatMapAccumL acc
s (x
x:[x]
xs)    =  (acc
s'', [y]
y [y] -> [y] -> [y]
forall a. [a] -> [a] -> [a]
++ [y]
ys)
            where (acc
s',  [y]
y ) = acc -> x -> (acc, [y])
f acc
s x
x
                  (acc
s'', [y]
ys) = acc -> [x] -> (acc, [y])
concatMapAccumL acc
s' [x]
xs

-- | Like 'mapStream' except it runs the argument function on whole chunks read from the input.
mapStreamChunks :: forall m a1 a2 d x y . (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d)
                   => (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapStreamChunks :: (x -> y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapStreamChunks x -> y
f Source m a1 x
source Sink m a2 y
sink = Coroutine d m ()
loop
   where loop :: Coroutine d m ()
loop = Source m a1 x
-> Reader x x x -> Coroutine d m (ReadingResult x x x)
forall (m :: * -> *) (a :: * -> *) x.
Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk Source m a1 x
source Reader x x x
forall x. Reader x x x
readAll
                Coroutine d m (ReadingResult x x x)
-> (ReadingResult x x x -> Coroutine d m ()) -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ReadingResult x x x
r-> case ReadingResult x x x
r
                         of ResultPart x
chunk Reader x x x
_ -> Sink m a2 y -> y -> Coroutine d m y
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a2 y
sink (x -> y
f x
chunk) Coroutine d m y -> Coroutine d m () -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Coroutine d m ()
loop
                            FinalResult x
_ -> () -> Coroutine d m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()  -- the last chunk must be empty

-- | Like 'mapAccumStream' except it runs the argument function on whole chunks read from the input.
mapAccumStreamChunks :: forall m a1 a2 d x y acc. (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d)
                   => (acc -> x -> (acc, y)) -> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m acc
mapAccumStreamChunks :: (acc -> x -> (acc, y))
-> acc -> Source m a1 x -> Sink m a2 y -> Coroutine d m acc
mapAccumStreamChunks acc -> x -> (acc, y)
f acc
acc Source m a1 x
source Sink m a2 y
sink = acc -> Coroutine d m acc
loop acc
acc
   where loop :: acc -> Coroutine d m acc
loop acc
acc = Source m a1 x
-> Reader x x x -> Coroutine d m (ReadingResult x x x)
forall (m :: * -> *) (a :: * -> *) x.
Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk Source m a1 x
source Reader x x x
forall x. Reader x x x
readAll
                    Coroutine d m (ReadingResult x x x)
-> (ReadingResult x x x -> Coroutine d m acc) -> Coroutine d m acc
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ReadingResult x x x
r-> case ReadingResult x x x
r
                             of ResultPart x
chunk Reader x x x
_ -> let (acc
acc', y
chunk') = acc -> x -> (acc, y)
f acc
acc x
chunk 
                                                      in Sink m a2 y -> y -> Coroutine d m y
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a2 y
sink y
chunk' Coroutine d m y -> Coroutine d m acc -> Coroutine d m acc
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> acc -> Coroutine d m acc
loop acc
acc'
                                FinalResult x
_ -> acc -> Coroutine d m acc
forall (m :: * -> *) a. Monad m => a -> m a
return acc
acc  -- the last chunk must be empty

-- | 'mapMStream' is similar to 'Control.Monad.mapM'. It draws the values from a 'Source' instead of a list, writes the
-- mapped values to a 'Sink', and returns a 'Coroutine'.
mapMStream :: forall m a1 a2 d x y . (Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d, AncestorFunctor a2 d)
              => (x -> Coroutine d m y) -> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapMStream :: (x -> Coroutine d m y)
-> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapMStream x -> Coroutine d m y
f Source m a1 x
source Sink m a2 y
sink = Coroutine d m ()
loop
   where loop :: Coroutine d m ()
loop = Source m a1 x
-> Reader x x x -> Coroutine d m (ReadingResult x x x)
forall (m :: * -> *) (a :: * -> *) x.
Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk Source m a1 x
source Reader x x x
forall x. Reader x x x
readAll
                Coroutine d m (ReadingResult x x x)
-> (ReadingResult x x x -> Coroutine d m ()) -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ReadingResult x x x
r-> case ReadingResult x x x
r
                         of ResultPart x
chunk Reader x x x
_ -> (x -> Coroutine d m y) -> x -> Coroutine d m y
forall a b (m :: * -> *).
(Factorial a, Monoid b, Monad m) =>
(a -> m b) -> a -> m b
Factorial.mapM x -> Coroutine d m y
f x
chunk Coroutine d m y -> (y -> Coroutine d m y) -> Coroutine d m y
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Sink m a2 y
-> forall (d :: * -> *).
   AncestorFunctor a2 d =>
   y -> Coroutine d m y
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a2 y
sink Coroutine d m y -> Coroutine d m () -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Coroutine d m ()
loop
                            FinalResult x
_ -> () -> Coroutine d m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()  -- the last chunk must be empty

-- | 'mapMStream_' is similar to 'Control.Monad.mapM_' except it draws the values from a 'Source' instead of a list and
-- works with 'Coroutine' instead of an arbitrary monad.
mapMStream_ :: forall m a d x r. (Monad m, FactorialMonoid x, AncestorFunctor a d)
              => (x -> Coroutine d m r) -> Source m a x -> Coroutine d m ()
mapMStream_ :: (x -> Coroutine d m r) -> Source m a x -> Coroutine d m ()
mapMStream_ x -> Coroutine d m r
f = (x -> Coroutine d m ()) -> Source m a x -> Coroutine d m ()
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x r.
(Monad m, Monoid x, AncestorFunctor a d) =>
(x -> Coroutine d m r) -> Source m a x -> Coroutine d m ()
mapMStreamChunks_ ((x -> Coroutine d m r) -> x -> Coroutine d m ()
forall a (m :: * -> *) b.
(Factorial a, Applicative m) =>
(a -> m b) -> a -> m ()
Factorial.mapM_ x -> Coroutine d m r
f)

-- | Like 'mapMStream_' except it runs the argument function on whole chunks read from the input.
mapMStreamChunks_ :: forall m a d x r. (Monad m, Monoid x, AncestorFunctor a d)
              => (x -> Coroutine d m r) -> Source m a x -> Coroutine d m ()
mapMStreamChunks_ :: (x -> Coroutine d m r) -> Source m a x -> Coroutine d m ()
mapMStreamChunks_ x -> Coroutine d m r
f Source m a x
source = Coroutine d m ()
loop
   where loop :: Coroutine d m ()
loop = Source m a x -> Reader x x x -> Coroutine d m (ReadingResult x x x)
forall (m :: * -> *) (a :: * -> *) x.
Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk Source m a x
source Reader x x x
forall x. Reader x x x
readAll
                Coroutine d m (ReadingResult x x x)
-> (ReadingResult x x x -> Coroutine d m ()) -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ReadingResult x x x
r-> case ReadingResult x x x
r
                         of ResultPart x
chunk Reader x x x
_ -> x -> Coroutine d m r
f x
chunk Coroutine d m r -> Coroutine d m () -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Coroutine d m ()
loop
                            FinalResult x
_ -> () -> Coroutine d m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()  -- the last chunk must be empty

-- | An equivalent of 'Control.Monad.filterM'. Draws the values from a 'Source' instead of a list, writes the filtered
-- values to a 'Sink', and returns a 'Coroutine'.
filterMStream :: forall m a1 a2 d x . (Monad m, FactorialMonoid x, AncestorFunctor a1 d, AncestorFunctor a2 d)
              => (x -> Coroutine d m Bool) -> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
filterMStream :: (x -> Coroutine d m Bool)
-> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
filterMStream x -> Coroutine d m Bool
f = (x -> Coroutine d m x)
-> Source m a1 x -> Sink m a2 x -> Coroutine d m ()
forall (m :: * -> *) (a1 :: * -> *) (a2 :: * -> *) (d :: * -> *) x
       y.
(Monad m, FactorialMonoid x, Monoid y, AncestorFunctor a1 d,
 AncestorFunctor a2 d) =>
(x -> Coroutine d m y)
-> Source m a1 x -> Sink m a2 y -> Coroutine d m ()
mapMStream (\x
x-> x -> Coroutine d m Bool
f x
x Coroutine d m Bool -> (Bool -> Coroutine d m x) -> Coroutine d m x
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
p-> x -> Coroutine d m x
forall (m :: * -> *) a. Monad m => a -> m a
return (x -> Coroutine d m x) -> x -> Coroutine d m x
forall a b. (a -> b) -> a -> b
$ if Bool
p then x
x else x
forall a. Monoid a => a
mempty)

-- | Similar to 'Data.List.foldl', but reads the values from a 'Source' instead of a list.
foldStream :: forall m a d x acc . (Monad m, FactorialMonoid x, AncestorFunctor a d)
              => (acc -> x -> acc) -> acc -> Source m a x -> Coroutine d m acc
foldStream :: (acc -> x -> acc) -> acc -> Source m a x -> Coroutine d m acc
foldStream acc -> x -> acc
f acc
acc Source m a x
source = acc -> Coroutine d m acc
loop acc
acc
   where loop :: acc -> Coroutine d m acc
loop acc
a = Source m a x -> Reader x x x -> Coroutine d m (ReadingResult x x x)
forall (m :: * -> *) (a :: * -> *) x.
Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk Source m a x
source Reader x x x
forall x. Reader x x x
readAll
                  Coroutine d m (ReadingResult x x x)
-> (ReadingResult x x x -> Coroutine d m acc) -> Coroutine d m acc
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ReadingResult x x x
r-> case ReadingResult x x x
r
                           of ResultPart x
chunk Reader x x x
_ -> acc -> Coroutine d m acc
loop ((acc -> x -> acc) -> acc -> x -> acc
forall m a. Factorial m => (a -> m -> a) -> a -> m -> a
Factorial.foldl acc -> x -> acc
f acc
a x
chunk)
                              FinalResult{} -> acc -> Coroutine d m acc
forall (m :: * -> *) a. Monad m => a -> m a
return acc
a  -- the last chunk must be empty

-- | 'foldMStream' is similar to 'Control.Monad.foldM' except it draws the values from a 'Source' instead of a list and
-- works with 'Coroutine' instead of an arbitrary monad.
foldMStream :: forall m a d x acc . (Monad m, AncestorFunctor a d)
              => (acc -> x -> Coroutine d m acc) -> acc -> Source m a [x] -> Coroutine d m acc
foldMStream :: (acc -> x -> Coroutine d m acc)
-> acc -> Source m a [x] -> Coroutine d m acc
foldMStream acc -> x -> Coroutine d m acc
f acc
acc Source m a [x]
source = acc -> Coroutine d m acc
loop acc
acc
   where loop :: acc -> Coroutine d m acc
loop acc
a = Source m a [x]
-> Reader [x] [x] [x] -> Coroutine d m (ReadingResult [x] [x] [x])
forall (m :: * -> *) (a :: * -> *) x.
Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk Source m a [x]
source Reader [x] [x] [x]
forall x. Reader x x x
readAll
                  Coroutine d m (ReadingResult [x] [x] [x])
-> (ReadingResult [x] [x] [x] -> Coroutine d m acc)
-> Coroutine d m acc
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ReadingResult [x] [x] [x]
r-> case ReadingResult [x] [x] [x]
r
                           of ResultPart [x]
chunk Reader [x] [x] [x]
_ -> (acc -> x -> Coroutine d m acc) -> acc -> [x] -> Coroutine d m acc
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM acc -> x -> Coroutine d m acc
f acc
a [x]
chunk Coroutine d m acc
-> (acc -> Coroutine d m acc) -> Coroutine d m acc
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= acc -> Coroutine d m acc
loop
                              FinalResult [] -> acc -> Coroutine d m acc
forall (m :: * -> *) a. Monad m => a -> m a
return acc
a  -- the last chunk must be empty

-- | A variant of 'foldMStream' that discards the final result value.
foldMStream_ :: forall m a d x acc . (Monad m, AncestorFunctor a d)
                => (acc -> x -> Coroutine d m acc) -> acc -> Source m a [x] -> Coroutine d m ()
foldMStream_ :: (acc -> x -> Coroutine d m acc)
-> acc -> Source m a [x] -> Coroutine d m ()
foldMStream_ acc -> x -> Coroutine d m acc
f acc
acc Source m a [x]
source = (acc -> x -> Coroutine d m acc)
-> acc -> Source m a [x] -> Coroutine d m acc
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x acc.
(Monad m, AncestorFunctor a d) =>
(acc -> x -> Coroutine d m acc)
-> acc -> Source m a [x] -> Coroutine d m acc
foldMStream acc -> x -> Coroutine d m acc
f acc
acc Source m a [x]
source Coroutine d m acc -> Coroutine d m () -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> Coroutine d m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Like 'foldMStream' but working on whole chunks from the argument source.
foldMStreamChunks :: forall m a d x acc . (Monad m, Monoid x, AncestorFunctor a d)
                     => (acc -> x -> Coroutine d m acc) -> acc -> Source m a x -> Coroutine d m acc
foldMStreamChunks :: (acc -> x -> Coroutine d m acc)
-> acc -> Source m a x -> Coroutine d m acc
foldMStreamChunks acc -> x -> Coroutine d m acc
f acc
acc Source m a x
source = acc -> Coroutine d m acc
loop acc
acc
   where loop :: acc -> Coroutine d m acc
loop acc
a = Source m a x -> Reader x x x -> Coroutine d m (ReadingResult x x x)
forall (m :: * -> *) (a :: * -> *) x.
Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk Source m a x
source Reader x x x
forall x. Reader x x x
readAll
                  Coroutine d m (ReadingResult x x x)
-> (ReadingResult x x x -> Coroutine d m acc) -> Coroutine d m acc
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ReadingResult x x x
r-> case ReadingResult x x x
r
                           of ResultPart x
chunk Reader x x x
_ -> acc -> x -> Coroutine d m acc
f acc
a x
chunk Coroutine d m acc
-> (acc -> Coroutine d m acc) -> Coroutine d m acc
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= acc -> Coroutine d m acc
loop
                              FinalResult x
_ -> acc -> Coroutine d m acc
forall (m :: * -> *) a. Monad m => a -> m a
return acc
a  -- the last chunk must be empty

-- | 'unfoldMStream' is a version of 'Data.List.unfoldr' that writes the generated values into a 'Sink' instead of
-- returning a list.
unfoldMStream :: forall m a d x acc . (Monad m, AncestorFunctor a d)
                 => (acc -> Coroutine d m (Maybe (x, acc))) -> acc -> Sink m a [x] -> Coroutine d m acc
unfoldMStream :: (acc -> Coroutine d m (Maybe (x, acc)))
-> acc -> Sink m a [x] -> Coroutine d m acc
unfoldMStream acc -> Coroutine d m (Maybe (x, acc))
f acc
acc Sink m a [x]
sink = acc -> Coroutine d m acc
loop acc
acc
   where loop :: acc -> Coroutine d m acc
loop acc
a = acc -> Coroutine d m (Maybe (x, acc))
f acc
a Coroutine d m (Maybe (x, acc))
-> (Maybe (x, acc) -> Coroutine d m acc) -> Coroutine d m acc
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Coroutine d m acc
-> ((x, acc) -> Coroutine d m acc)
-> Maybe (x, acc)
-> Coroutine d m acc
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (acc -> Coroutine d m acc
forall (m :: * -> *) a. Monad m => a -> m a
return acc
a) (\(x
x, acc
acc')-> Sink m a [x] -> x -> Coroutine d m ()
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, AncestorFunctor a d) =>
Sink m a [x] -> x -> Coroutine d m ()
put Sink m a [x]
sink x
x Coroutine d m () -> Coroutine d m acc -> Coroutine d m acc
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> acc -> Coroutine d m acc
loop acc
acc')

-- | 'unmapMStream_' is opposite of 'mapMStream_'; it takes a 'Sink' instead of a 'Source' argument and writes the
-- generated values into it.
unmapMStream_ :: forall m a d x . (Monad m, AncestorFunctor a d)
                 => Coroutine d m (Maybe x) -> Sink m a [x] -> Coroutine d m ()
unmapMStream_ :: Coroutine d m (Maybe x) -> Sink m a [x] -> Coroutine d m ()
unmapMStream_ Coroutine d m (Maybe x)
f Sink m a [x]
sink = Coroutine d m ()
loop
   where loop :: Coroutine d m ()
loop = Coroutine d m (Maybe x)
f Coroutine d m (Maybe x)
-> (Maybe x -> Coroutine d m ()) -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Coroutine d m ()
-> (x -> Coroutine d m ()) -> Maybe x -> Coroutine d m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> Coroutine d m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) (\x
x-> Sink m a [x] -> x -> Coroutine d m ()
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, AncestorFunctor a d) =>
Sink m a [x] -> x -> Coroutine d m ()
put Sink m a [x]
sink x
x Coroutine d m () -> Coroutine d m () -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Coroutine d m ()
loop)

-- | Like 'unmapMStream_' but writing whole chunks of generated data into the argument sink.
unmapMStreamChunks_ :: forall m a d x . (Monad m, MonoidNull x, AncestorFunctor a d)
                       => Coroutine d m x -> Sink m a x -> Coroutine d m ()
unmapMStreamChunks_ :: Coroutine d m x -> Sink m a x -> Coroutine d m ()
unmapMStreamChunks_ Coroutine d m x
f Sink m a x
sink = Coroutine d m x
loop Coroutine d m x -> Coroutine d m () -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> Coroutine d m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
   where loop :: Coroutine d m x
loop = Coroutine d m x
f Coroutine d m x -> (x -> Coroutine d m x) -> Coroutine d m x
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Coroutine d m x -> (x -> Coroutine d m x) -> x -> Coroutine d m x
forall x a. MonoidNull x => a -> (x -> a) -> x -> a
nullOrElse (x -> Coroutine d m x
forall (m :: * -> *) a. Monad m => a -> m a
return x
forall a. Monoid a => a
mempty) ((Coroutine d m x -> (x -> Coroutine d m x) -> Coroutine d m x
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Coroutine d m x -> (x -> Coroutine d m x) -> x -> Coroutine d m x
forall x a. MonoidNull x => a -> (x -> a) -> x -> a
nullOrElse Coroutine d m x
loop x -> Coroutine d m x
forall (m :: * -> *) a. Monad m => a -> m a
return) (Coroutine d m x -> Coroutine d m x)
-> (x -> Coroutine d m x) -> x -> Coroutine d m x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a x
sink)

-- | Equivalent to 'Data.List.partition'. Takes a 'Source' instead of a list argument and partitions its contents into
-- the two 'Sink' arguments.
partitionStream :: forall m a1 a2 a3 d x . (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
                   => (x -> Bool) -> Source m a1 [x] -> Sink m a2 [x] -> Sink m a3 [x] -> Coroutine d m ()
partitionStream :: (x -> Bool)
-> Source m a1 [x]
-> Sink m a2 [x]
-> Sink m a3 [x]
-> Coroutine d m ()
partitionStream x -> Bool
f Source m a1 [x]
source Sink m a2 [x]
true Sink m a3 [x]
false = ([x] -> Coroutine d m ()) -> Source m a1 [x] -> Coroutine d m ()
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x r.
(Monad m, Monoid x, AncestorFunctor a d) =>
(x -> Coroutine d m r) -> Source m a x -> Coroutine d m ()
mapMStreamChunks_ [x] -> Coroutine d m ()
partitionChunk Source m a1 [x]
source
   where partitionChunk :: [x] -> Coroutine d m ()
partitionChunk (x
x:[x]
rest) = Bool -> x -> [x] -> Coroutine d m ()
partitionTo (x -> Bool
f x
x) x
x [x]
rest
         partitionChunk [] = () -> Coroutine d m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
         partitionTo :: Bool -> x -> [x] -> Coroutine d m ()
partitionTo Bool
False x
x [x]
chunk = let ([x]
falses, [x]
rest) = (x -> Bool) -> [x] -> ([x], [x])
forall a. (a -> Bool) -> [a] -> ([a], [a])
break x -> Bool
f [x]
chunk
                                     in Sink m a3 [x] -> [x] -> Coroutine d m [x]
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a3 [x]
false (x
xx -> [x] -> [x]
forall a. a -> [a] -> [a]
:[x]
falses)
                                        Coroutine d m [x] -> Coroutine d m () -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> case [x]
rest of x
y:[x]
ys -> Bool -> x -> [x] -> Coroutine d m ()
partitionTo Bool
True x
y [x]
ys
                                                        [] -> () -> Coroutine d m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
         partitionTo Bool
True x
x [x]
chunk = let ([x]
trues, [x]
rest) = (x -> Bool) -> [x] -> ([x], [x])
forall a. (a -> Bool) -> [a] -> ([a], [a])
List.span x -> Bool
f [x]
chunk
                                    in Sink m a2 [x] -> [x] -> Coroutine d m [x]
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a2 [x]
true (x
xx -> [x] -> [x]
forall a. a -> [a] -> [a]
:[x]
trues)
                                       Coroutine d m [x] -> Coroutine d m () -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> case [x]
rest of x
y:[x]
ys -> Bool -> x -> [x] -> Coroutine d m ()
partitionTo Bool
False x
y [x]
ys
                                                       [] -> () -> Coroutine d m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | 'zipWithMStream' is similar to 'Control.Monad.zipWithM' except it draws the values from two 'Source' arguments
-- instead of two lists, sends the results into a 'Sink', and works with 'Coroutine' instead of an arbitrary monad.
zipWithMStream :: forall m a1 a2 a3 d x y z. (Monad m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
                  => (x -> y -> Coroutine d m z) -> Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [z]
                  -> Coroutine d m ()
zipWithMStream :: (x -> y -> Coroutine d m z)
-> Source m a1 [x]
-> Source m a2 [y]
-> Sink m a3 [z]
-> Coroutine d m ()
zipWithMStream x -> y -> Coroutine d m z
f Source m a1 [x]
source1 Source m a2 [y]
source2 Sink m a3 [z]
sink = Coroutine d m ()
loop
   where loop :: Coroutine d m ()
loop = do Maybe x
mx <- Source m a1 [x] -> Coroutine d m (Maybe x)
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, AncestorFunctor a d) =>
Source m a [x] -> Coroutine d m (Maybe x)
get Source m a1 [x]
source1
                   Maybe y
my <- Source m a2 [y] -> Coroutine d m (Maybe y)
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, AncestorFunctor a d) =>
Source m a [x] -> Coroutine d m (Maybe x)
get Source m a2 [y]
source2
                   case (Maybe x
mx, Maybe y
my) of (Just x
x, Just y
y) -> x -> y -> Coroutine d m z
f x
x y
y Coroutine d m z -> (z -> Coroutine d m ()) -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Sink m a3 [z] -> z -> Coroutine d m ()
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, AncestorFunctor a d) =>
Sink m a [x] -> x -> Coroutine d m ()
put Sink m a3 [z]
sink Coroutine d m () -> Coroutine d m () -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Coroutine d m ()
loop
                                    (Maybe x, Maybe y)
_ -> () -> Coroutine d m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | 'parZipWithMStream' is equivalent to 'zipWithMStream', but it consumes the two sources in parallel.
parZipWithMStream :: forall m a1 a2 a3 d x y z.
                     (MonadParallel m, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
                     => (x -> y -> Coroutine d m z) -> Source m a1 [x] -> Source m a2 [y] -> Sink m a3 [z]
                     -> Coroutine d m ()
parZipWithMStream :: (x -> y -> Coroutine d m z)
-> Source m a1 [x]
-> Source m a2 [y]
-> Sink m a3 [z]
-> Coroutine d m ()
parZipWithMStream x -> y -> Coroutine d m z
f Source m a1 [x]
source1 Source m a2 [y]
source2 Sink m a3 [z]
sink = Coroutine d m ()
loop
   where loop :: Coroutine d m ()
loop = (Maybe x -> Maybe y -> Coroutine d m ())
-> Coroutine d m (Maybe x)
-> Coroutine d m (Maybe y)
-> Coroutine d m ()
forall (m :: * -> *) a b c.
MonadParallel m =>
(a -> b -> m c) -> m a -> m b -> m c
bindM2 Maybe x -> Maybe y -> Coroutine d m ()
zipMaybe (Source m a1 [x] -> Coroutine d m (Maybe x)
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, AncestorFunctor a d) =>
Source m a [x] -> Coroutine d m (Maybe x)
get Source m a1 [x]
source1) (Source m a2 [y] -> Coroutine d m (Maybe y)
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, AncestorFunctor a d) =>
Source m a [x] -> Coroutine d m (Maybe x)
get Source m a2 [y]
source2)
         zipMaybe :: Maybe x -> Maybe y -> Coroutine d m ()
zipMaybe (Just x
x) (Just y
y) = x -> y -> Coroutine d m z
f x
x y
y Coroutine d m z -> (z -> Coroutine d m ()) -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Sink m a3 [z] -> z -> Coroutine d m ()
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, AncestorFunctor a d) =>
Sink m a [x] -> x -> Coroutine d m ()
put Sink m a3 [z]
sink Coroutine d m () -> Coroutine d m () -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Coroutine d m ()
loop
         zipMaybe Maybe x
_ Maybe y
_ = () -> Coroutine d m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | 'tee' is similar to 'pour' except it distributes every input value from its source argument into its both sink
-- arguments.
tee :: forall m a1 a2 a3 d x . (Monad m, Monoid x, AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d)
       => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
tee :: Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Coroutine d m ()
tee Source m a1 x
source Sink m a2 x
sink1 Sink m a3 x
sink2 = Coroutine d m ()
distribute
   where distribute :: Coroutine d m ()
distribute = Source m a1 x
-> Reader x x x -> Coroutine d m (ReadingResult x x x)
forall (m :: * -> *) (a :: * -> *) x.
Source m a x
-> forall (d :: * -> *) py y.
   AncestorFunctor a d =>
   Reader x py y -> Coroutine d m (ReadingResult x py y)
readChunk Source m a1 x
source Reader x x x
forall x. Reader x x x
readAll
                      Coroutine d m (ReadingResult x x x)
-> (ReadingResult x x x -> Coroutine d m ()) -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ReadingResult x x x
r-> case ReadingResult x x x
r
                               of ResultPart x
chunk Reader x x x
_ -> Sink m a2 x -> x -> Coroutine d m x
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a2 x
sink1 x
chunk Coroutine d m x -> Coroutine d m x -> Coroutine d m x
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Sink m a3 x -> x -> Coroutine d m x
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a3 x
sink2 x
chunk Coroutine d m x -> Coroutine d m () -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Coroutine d m ()
distribute
                                  FinalResult x
_ -> () -> Coroutine d m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()  -- the last chunk must be empty

-- | Every value 'put' into a 'teeSink' result sink goes into its both argument sinks: @put (teeSink s1 s2) x@ is
-- equivalent to @put s1 x >> put s2 x@. The 'putChunk' method returns the list of values that couldn't fit into the
-- second sink.
teeSink :: forall m a1 a2 a3 x . (Monad m, AncestorFunctor a1 a3, AncestorFunctor a2 a3)
           => Sink m a1 x -> Sink m a2 x -> Sink m a3 x
teeSink :: Sink m a1 x -> Sink m a2 x -> Sink m a3 x
teeSink Sink m a1 x
s1 Sink m a2 x
s2 = Sink :: forall (m :: * -> *) (a :: * -> *) x.
(forall (d :: * -> *). AncestorFunctor a d => x -> Coroutine d m x)
-> Sink m a x
Sink{putChunk :: forall (d :: * -> *). AncestorFunctor a3 d => x -> Coroutine d m x
putChunk= forall (d :: * -> *). AncestorFunctor a3 d => x -> Coroutine d m x
teeChunk}
   where teeChunk :: forall d. AncestorFunctor a3 d => x -> Coroutine d m x
         teeChunk :: x -> Coroutine d m x
teeChunk x
x = Sink m a3 x -> x -> Coroutine d m x
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a3 x
s1' x
x Coroutine d m x -> Coroutine d m x -> Coroutine d m x
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Sink m a3 x -> x -> Coroutine d m x
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a3 x
s2' x
x
         s1' :: Sink m a3 x
         s1' :: Sink m a3 x
s1' = Sink m a1 x -> Sink m a3 x
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, AncestorFunctor a d) =>
Sink m a x -> Sink m d x
liftSink Sink m a1 x
s1
         s2' :: Sink m a3 x
         s2' :: Sink m a3 x
s2' = Sink m a2 x -> Sink m a3 x
forall (m :: * -> *) (a :: * -> *) (d :: * -> *) x.
(Monad m, AncestorFunctor a d) =>
Sink m a x -> Sink m d x
liftSink Sink m a2 x
s2

-- | This function puts a value into the given `Sink`. The intervening 'Coroutine' computations suspend up
-- to the 'pipe' invocation that has created the argument sink.
put :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a [x] -> x -> Coroutine d m ()
put :: Sink m a [x] -> x -> Coroutine d m ()
put Sink m a [x]
sink x
x = Sink m a [x] -> [x] -> Coroutine d m [x]
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a [x]
sink [x
x] Coroutine d m [x] -> Coroutine d m () -> Coroutine d m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> Coroutine d m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Like 'put', but returns a Bool that determines if the sink is still active.
tryPut :: forall m a d x. (Monad m, AncestorFunctor a d) => Sink m a [x] -> x -> Coroutine d m Bool
tryPut :: Sink m a [x] -> x -> Coroutine d m Bool
tryPut Sink m a [x]
sink x
x = ([x] -> Bool) -> Coroutine d m [x] -> Coroutine d m Bool
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM [x] -> Bool
forall m. MonoidNull m => m -> Bool
null (Coroutine d m [x] -> Coroutine d m Bool)
-> Coroutine d m [x] -> Coroutine d m Bool
forall a b. (a -> b) -> a -> b
$ Sink m a [x] -> [x] -> Coroutine d m [x]
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a [x]
sink [x
x]

-- | 'putAll' puts an entire list into its /sink/ argument. If the coroutine fed by the /sink/ dies, the remainder of
-- the argument list is returned.
putAll :: forall m a d x. (Monad m, MonoidNull x, AncestorFunctor a d) => x -> Sink m a x -> Coroutine d m x
putAll :: x -> Sink m a x -> Coroutine d m x
putAll x
l Sink m a x
sink = if x -> Bool
forall m. MonoidNull m => m -> Bool
null x
l then x -> Coroutine d m x
forall (m :: * -> *) a. Monad m => a -> m a
return x
l else Sink m a x -> x -> Coroutine d m x
forall (m :: * -> *) (a :: * -> *) x.
Sink m a x
-> forall (d :: * -> *).
   AncestorFunctor a d =>
   x -> Coroutine d m x
putChunk Sink m a x
sink x
l

nullOrElse :: MonoidNull x => a -> (x -> a) -> x -> a
nullOrElse :: a -> (x -> a) -> x -> a
nullOrElse a
nullCase x -> a
f x
x | x -> Bool
forall m. MonoidNull m => m -> Bool
null x
x = a
nullCase
                        | Bool
otherwise = x -> a
f x
x