{-# LANGUAGE CPP #-}
-- |
-- Module      : Streamly.Internal.Data.StreamK
-- Copyright   : (c) 2017 Composewell Technologies
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
module Streamly.Internal.Data.StreamK
    (
    -- * Setup
    -- | To execute the code examples provided in this module in ghci, please
    -- run the following commands first.
    --
    -- $setup

    -- * The stream type
      module Streamly.Internal.Data.StreamK.Type
    , module Streamly.Internal.Data.StreamK.Transformer

    , StreamK(..)
    , fromList
    , fromStream
    , toStream

    -- ** Specialized Generation
    , repeatM
    , replicate
    , replicateM
    , fromIndices
    , fromIndicesM
    , iterate
    , iterateM

    -- * Elimination
    -- ** General Folds
    , foldr1
    , foldlM'
    , foldlMx'
    , fold
    , foldBreak
    , foldEither
    , foldConcat
    , parseDBreak
    , parseD
    , parseBreakChunks
    , parseChunks
    , parseBreak
    , parse
    , parseBreakChunksGeneric
    , parseChunksGeneric

    -- ** Specialized Folds
    , head
    , elem
    , notElem
    , all
    , any
    , last
    , minimum
    , minimumBy
    , maximum
    , maximumBy
    , findIndices
    , lookup
    , findM
    , find
    , (!!)

    -- ** Map and Fold
    , mapM_

    -- ** Conversions
    , toList
    , hoist

    -- * Transformation
    -- ** By folding (scans)
    , scanl'
    , scanlx'

    -- ** Filtering
    , filter
    , take
    , takeWhile
    , drop
    , dropWhile

    -- ** Mapping
    , mapM
    , sequence

    -- ** Inserting
    , intersperseM
    , intersperse
    , insertBy

    -- ** Deleting
    , deleteBy

    -- ** Reordering
    , sortBy

    -- ** Map and Filter
    , mapMaybe

    -- ** Zipping
    , zipWith
    , zipWithM

    -- ** Merging
    , mergeBy
    , mergeByM

    -- ** Transformation comprehensions
    , the

    -- * Exceptions
    , handle

    -- * Resource Management
    , bracketIO
    )
where

#include "ArrayMacros.h"
#include "inline.hs"
#include "assert.hs"

import Control.Exception (mask_, Exception)
import Control.Monad (void, join)
import Control.Monad.Catch (MonadCatch)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Proxy (Proxy(..))
import GHC.Types (SPEC(..))
import Streamly.Internal.Data.Array.Type (Array(..))
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.IOFinalizer (newIOFinalizer, runIOFinalizer)
import Streamly.Internal.Data.ParserK.Type (ParserK)
import Streamly.Internal.Data.Producer.Type (Producer(..))
import Streamly.Internal.Data.SVar.Type (adaptState, defState)
import Streamly.Internal.Data.Unbox (sizeOf, Unbox)

import qualified Control.Monad.Catch as MC
import qualified Streamly.Internal.Data.Array.Type as Array
import qualified Streamly.Internal.Data.Array.Generic as GenArr
import qualified Streamly.Internal.Data.Fold.Type as FL
import qualified Streamly.Internal.Data.Parser as Parser
import qualified Streamly.Internal.Data.Parser.Type as PR
import qualified Streamly.Internal.Data.ParserK.Type as ParserK
import qualified Streamly.Internal.Data.Stream as Stream
import qualified Prelude

import Prelude
       hiding (Foldable(..), last, map, mapM, mapM_, repeat, sequence,
               take, filter, all, any, takeWhile, drop, dropWhile,
               notElem, head, tail, init, zipWith, lookup,
               (!!), replicate, reverse, concatMap, iterate, splitAt)
import Data.Foldable (sum, length)
import Streamly.Internal.Data.StreamK.Type
import Streamly.Internal.Data.StreamK.Transformer
import Streamly.Internal.Data.Parser (ParseError(..))

#include "DocTestDataStreamK.hs"

-- | Convert a fused 'Stream' to 'StreamK'.
--
-- For example:
--
-- >>> s1 = StreamK.fromStream $ Stream.fromList [1,2]
-- >>> s2 = StreamK.fromStream $ Stream.fromList [3,4]
-- >>> Stream.fold Fold.toList $ StreamK.toStream $ s1 `StreamK.append` s2
-- [1,2,3,4]
--
{-# INLINE fromStream #-}
fromStream :: Monad m => Stream.Stream m a -> StreamK m a
fromStream :: forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
fromStream = forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK

-- | Convert a 'StreamK' to a fused 'Stream'.
--
{-# INLINE toStream #-}
toStream :: Applicative m => StreamK m a -> Stream.Stream m a
toStream :: forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
toStream = forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
Stream.fromStreamK

-------------------------------------------------------------------------------
-- Generation
-------------------------------------------------------------------------------

{-
-- Generalization of concurrent streams/SVar via unfoldr.
--
-- Unfold a value into monadic actions and then run the resulting monadic
-- actions to generate a stream. Since the step of generating the monadic
-- action and running them are decoupled we can run the monadic actions
-- cooncurrently. For example, the seed could be a list of monadic actions or a
-- pure stream of monadic actions.
--
-- We can have different flavors of this depending on the stream type t. The
-- concurrent version could be async or ahead etc. Depending on how we queue
-- back the feedback portion b, it could be DFS or BFS style.
--
unfoldrA :: (b -> Maybe (m a, b)) -> b -> StreamK m a
unfoldrA = undefined
-}

-------------------------------------------------------------------------------
-- Special generation
-------------------------------------------------------------------------------

-- |
-- >>> repeatM = StreamK.sequence . StreamK.repeat
-- >>> repeatM = fix . StreamK.consM
-- >>> repeatM = cycle1 . StreamK.fromEffect
--
-- Generate a stream by repeatedly executing a monadic action forever.
--
-- >>> :{
-- repeatAction =
--        StreamK.repeatM (threadDelay 1000000 >> print 1)
--      & StreamK.take 10
--      & StreamK.fold Fold.drain
-- :}
--
repeatM :: Monad m => m a -> StreamK m a
repeatM :: forall (m :: * -> *) a. Monad m => m a -> StreamK m a
repeatM = forall (m :: * -> *) a (t :: (* -> *) -> * -> *).
(m a -> t m a -> t m a) -> m a -> t m a
repeatMWith forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
consM

{-# INLINE replicateM #-}
replicateM :: Monad m => Int -> m a -> StreamK m a
replicateM :: forall (m :: * -> *) a. Monad m => Int -> m a -> StreamK m a
replicateM = forall (m :: * -> *) a.
(m a -> StreamK m a -> StreamK m a) -> Int -> m a -> StreamK m a
replicateMWith forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
consM
{-# INLINE replicate #-}
replicate :: Int -> a -> StreamK m a
replicate :: forall a (m :: * -> *). Int -> a -> StreamK m a
replicate Int
n a
a = forall {t} {m :: * -> *}. (Ord t, Num t) => t -> StreamK m a
go Int
n
    where
    go :: t -> StreamK m a
go t
cnt = if t
cnt forall a. Ord a => a -> a -> Bool
<= t
0 then forall (m :: * -> *) a. StreamK m a
nil else a
a forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` t -> StreamK m a
go (t
cnt forall a. Num a => a -> a -> a
- t
1)

{-# INLINE fromIndicesM #-}
fromIndicesM :: Monad m => (Int -> m a) -> StreamK m a
fromIndicesM :: forall (m :: * -> *) a. Monad m => (Int -> m a) -> StreamK m a
fromIndicesM = forall (m :: * -> *) a.
(m a -> StreamK m a -> StreamK m a) -> (Int -> m a) -> StreamK m a
fromIndicesMWith forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
consM
{-# INLINE fromIndices #-}
fromIndices :: (Int -> a) -> StreamK m a
fromIndices :: forall a (m :: * -> *). (Int -> a) -> StreamK m a
fromIndices Int -> a
gen = forall {m :: * -> *}. Int -> StreamK m a
go Int
0
  where
    go :: Int -> StreamK m a
go Int
n = Int -> a
gen Int
n forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` Int -> StreamK m a
go (Int
n forall a. Num a => a -> a -> a
+ Int
1)

-- |
-- >>> iterate f x = x `StreamK.cons` iterate f x
--
-- Generate an infinite stream with @x@ as the first element and each
-- successive element derived by applying the function @f@ on the previous
-- element.
--
-- >>> StreamK.toList $ StreamK.take 5 $ StreamK.iterate (+1) 1
-- [1,2,3,4,5]
--
{-# INLINE iterate #-}
iterate :: (a -> a) -> a -> StreamK m a
iterate :: forall a (m :: * -> *). (a -> a) -> a -> StreamK m a
iterate a -> a
step = forall {m :: * -> *}. a -> StreamK m a
go
    where
        go :: a -> StreamK m a
go !a
s = forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons a
s (a -> StreamK m a
go (a -> a
step a
s))

-- |
-- >>> iterateM f m = m >>= \a -> return a `StreamK.consM` iterateM f (f a)
--
-- Generate an infinite stream with the first element generated by the action
-- @m@ and each successive element derived by applying the monadic function
-- @f@ on the previous element.
--
-- >>> :{
-- StreamK.iterateM (\x -> print x >> return (x + 1)) (return 0)
--     & StreamK.take 3
--     & StreamK.toList
-- :}
-- 0
-- 1
-- [0,1,2]
--
{-# INLINE iterateM #-}
iterateM :: Monad m => (a -> m a) -> m a -> StreamK m a
iterateM :: forall (m :: * -> *) a. Monad m => (a -> m a) -> m a -> StreamK m a
iterateM = forall (m :: * -> *) a.
Monad m =>
(m a -> StreamK m a -> StreamK m a)
-> (a -> m a) -> m a -> StreamK m a
iterateMWith forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
consM

-------------------------------------------------------------------------------
-- Conversions
-------------------------------------------------------------------------------

{-# INLINE fromList #-}
fromList :: [a] -> StreamK m a
fromList :: forall a (m :: * -> *). [a] -> StreamK m a
fromList = forall (f :: * -> *) a (m :: * -> *).
Foldable f =>
f a -> StreamK m a
fromFoldable

-------------------------------------------------------------------------------
-- Elimination by Folding
-------------------------------------------------------------------------------

{-# INLINE foldr1 #-}
foldr1 :: Monad m => (a -> a -> a) -> StreamK m a -> m (Maybe a)
foldr1 :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> a) -> StreamK m a -> m (Maybe a)
foldr1 a -> a -> a
step StreamK m a
m = do
    Maybe (a, StreamK m a)
r <- forall (m :: * -> *) a.
Applicative m =>
StreamK m a -> m (Maybe (a, StreamK m a))
uncons StreamK m a
m
    case Maybe (a, StreamK m a)
r of
        Maybe (a, StreamK m a)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
        Just (a
h, StreamK m a
t) -> forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. a -> Maybe a
Just (forall {m :: * -> *}. Monad m => a -> StreamK m a -> m a
go a
h StreamK m a
t)
    where
    go :: a -> StreamK m a -> m a
go a
p StreamK m a
m1 =
        let stp :: m a
stp = forall (m :: * -> *) a. Monad m => a -> m a
return a
p
            single :: a -> m a
single a
a = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ a -> a -> a
step a
a a
p
            yieldk :: a -> StreamK m a -> m a
yieldk a
a StreamK m a
r = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a -> a -> a
step a
p) (a -> StreamK m a -> m a
go a
a StreamK m a
r)
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m a
yieldk forall {m :: * -> *}. Monad m => a -> m a
single m a
stp StreamK m a
m1

-- XXX replace the recursive "go" with explicit continuations.
-- | Like 'foldx', but with a monadic step function.
{-# INLINABLE foldlMx' #-}
foldlMx' :: Monad m
    => (x -> a -> m x) -> m x -> (x -> m b) -> StreamK m a -> m b
foldlMx' :: forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> m x) -> m x -> (x -> m b) -> StreamK m a -> m b
foldlMx' x -> a -> m x
step m x
begin x -> m b
done = m x -> StreamK m a -> m b
go m x
begin
    where
    go :: m x -> StreamK m a -> m b
go !m x
acc StreamK m a
m1 =
        let stop :: m b
stop = m x
acc forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= x -> m b
done
            single :: a -> m b
single a
a = m x
acc forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \x
b -> x -> a -> m x
step x
b a
a forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= x -> m b
done
            yieldk :: a -> StreamK m a -> m b
yieldk a
a StreamK m a
r = m x
acc forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \x
b -> x -> a -> m x
step x
b a
a forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \x
x -> m x -> StreamK m a -> m b
go (forall (m :: * -> *) a. Monad m => a -> m a
return x
x) StreamK m a
r
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m b
yieldk a -> m b
single m b
stop StreamK m a
m1

-- | Fold a stream using the supplied left 'Fold' and reducing the resulting
-- expression strictly at each step. The behavior is similar to 'foldl''. A
-- 'Fold' can terminate early without consuming the full stream. See the
-- documentation of individual 'Fold's for termination behavior.
--
-- Definitions:
--
-- >>> fold f = fmap fst . StreamK.foldBreak f
-- >>> fold f = StreamK.parseD (Parser.fromFold f)
--
-- Example:
--
-- >>> StreamK.fold Fold.sum $ StreamK.fromStream $ Stream.enumerateFromTo 1 100
-- 5050
--
{-# INLINABLE fold #-}
fold :: Monad m => FL.Fold m a b -> StreamK m a -> m b
fold :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> StreamK m a -> m b
fold (FL.Fold s -> a -> m (Step s b)
step m (Step s b)
begin s -> m b
_ s -> m b
final) StreamK m a
m = do
    Step s b
res <- m (Step s b)
begin
    case Step s b
res of
        FL.Partial s
fs -> s -> StreamK m a -> m b
go s
fs StreamK m a
m
        FL.Done b
fb -> forall (m :: * -> *) a. Monad m => a -> m a
return b
fb

    where
    go :: s -> StreamK m a -> m b
go !s
acc StreamK m a
m1 =
        let stop :: m b
stop = s -> m b
final s
acc
            single :: a -> m b
single a
a = s -> a -> m (Step s b)
step s
acc a
a
              forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                        FL.Partial s
s -> s -> m b
final s
s
                        FL.Done b
b1 -> forall (m :: * -> *) a. Monad m => a -> m a
return b
b1
            yieldk :: a -> StreamK m a -> m b
yieldk a
a StreamK m a
r = s -> a -> m (Step s b)
step s
acc a
a
              forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                        FL.Partial s
s -> s -> StreamK m a -> m b
go s
s StreamK m a
r
                        FL.Done b
b1 -> forall (m :: * -> *) a. Monad m => a -> m a
return b
b1
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m b
yieldk a -> m b
single m b
stop StreamK m a
m1

-- | Fold resulting in either breaking the stream or continuation of the fold.
-- Instead of supplying the input stream in one go we can run the fold multiple
-- times, each time supplying the next segment of the input stream. If the fold
-- has not yet finished it returns a fold that can be run again otherwise it
-- returns the fold result and the residual stream.
--
-- /Internal/
{-# INLINE foldEither #-}
foldEither :: Monad m =>
    Fold m a b -> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a))
foldEither :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b
-> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a))
foldEither (FL.Fold s -> a -> m (Step s b)
step m (Step s b)
begin s -> m b
done s -> m b
final) StreamK m a
m = do
    Step s b
res <- m (Step s b)
begin
    case Step s b
res of
        FL.Partial s
fs -> s -> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a))
go s
fs StreamK m a
m
        FL.Done b
fb -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. b -> Either a b
Right (b
fb, StreamK m a
m)

    where

    go :: s -> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a))
go !s
acc StreamK m a
m1 =
        let stop :: m (Either (Fold m a b) b)
stop =
                let f :: Fold m a b
f = forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold s -> a -> m (Step s b)
step (forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
FL.Partial s
acc) s -> m b
done s -> m b
final
                 in forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left Fold m a b
f
            single :: a -> m (Either (Fold m a b) (b, StreamK m a))
single a
a =
                s -> a -> m (Step s b)
step s
acc a
a
                  forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                    FL.Partial s
s ->
                        let f :: Fold m a b
f = forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold s -> a -> m (Step s b)
step (forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
FL.Partial s
s) s -> m b
done s -> m b
final
                         in forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left Fold m a b
f
                    FL.Done b
b1 -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. b -> Either a b
Right (b
b1, forall (m :: * -> *) a. StreamK m a
nil)
            yieldk :: a -> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a))
yieldk a
a StreamK m a
r =
                s -> a -> m (Step s b)
step s
acc a
a
                  forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                    FL.Partial s
s -> s -> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a))
go s
s StreamK m a
r
                    FL.Done b
b1 -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. b -> Either a b
Right (b
b1, StreamK m a
r)
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a))
yieldk forall {m :: * -> *} {a}.
a -> m (Either (Fold m a b) (b, StreamK m a))
single forall {b}. m (Either (Fold m a b) b)
stop StreamK m a
m1

-- | Like 'fold' but also returns the remaining stream. The resulting stream
-- would be 'StreamK.nil' if the stream finished before the fold.
--
{-# INLINE foldBreak #-}
foldBreak :: Monad m => Fold m a b -> StreamK m a -> m (b, StreamK m a)
foldBreak :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> StreamK m a -> m (b, StreamK m a)
foldBreak Fold m a b
fld StreamK m a
strm = do
    Either (Fold m a b) (b, StreamK m a)
r <- forall (m :: * -> *) a b.
Monad m =>
Fold m a b
-> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a))
foldEither Fold m a b
fld StreamK m a
strm
    case Either (Fold m a b) (b, StreamK m a)
r of
        Right (b, StreamK m a)
res -> forall (m :: * -> *) a. Monad m => a -> m a
return (b, StreamK m a)
res
        Left (Fold s -> a -> m (Step s b)
_ m (Step s b)
initial s -> m b
_ s -> m b
final) -> do
            Step s b
res <- m (Step s b)
initial
            case Step s b
res of
                FL.Done b
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"foldBreak: unreachable state"
                FL.Partial s
s -> do
                    b
b <- s -> m b
final s
s
                    forall (m :: * -> *) a. Monad m => a -> m a
return (b
b, forall (m :: * -> *) a. StreamK m a
nil)

-- XXX Array folds can be implemented using this.
-- foldContainers? Specialized to foldArrays.

-- | Generate streams from individual elements of a stream and fold the
-- concatenation of those streams using the supplied fold. Return the result of
-- the fold and residual stream.
--
-- For example, this can be used to efficiently fold an Array Word8 stream
-- using Word8 folds.
--
-- /Internal/
{-# INLINE foldConcat #-}
foldConcat :: Monad m =>
    Producer m a b -> Fold m b c -> StreamK m a -> m (c, StreamK m a)
foldConcat :: forall (m :: * -> *) a b c.
Monad m =>
Producer m a b -> Fold m b c -> StreamK m a -> m (c, StreamK m a)
foldConcat
    (Producer s -> m (Step s b)
pstep a -> m s
pinject s -> m a
pextract)
    (Fold s -> b -> m (Step s c)
fstep m (Step s c)
begin s -> m c
_ s -> m c
final)
    StreamK m a
stream = do

    Step s c
res <- m (Step s c)
begin
    case Step s c
res of
        FL.Partial s
fs -> s -> StreamK m a -> m (c, StreamK m a)
go s
fs StreamK m a
stream
        FL.Done c
fb -> forall (m :: * -> *) a. Monad m => a -> m a
return (c
fb, StreamK m a
stream)

    where

    go :: s -> StreamK m a -> m (c, StreamK m a)
go !s
acc StreamK m a
m1 = do
        let stop :: m (c, StreamK m a)
stop = do
                c
r <- s -> m c
final s
acc
                forall (m :: * -> *) a. Monad m => a -> m a
return (c
r, forall (m :: * -> *) a. StreamK m a
nil)
            single :: a -> m (c, StreamK m a)
single a
a = do
                s
st <- a -> m s
pinject a
a
                Either s (c, s)
res <- SPEC -> s -> s -> m (Either s (c, s))
go1 SPEC
SPEC s
acc s
st
                case Either s (c, s)
res of
                    Left s
fs -> do
                        c
r <- s -> m c
final s
fs
                        forall (m :: * -> *) a. Monad m => a -> m a
return (c
r, forall (m :: * -> *) a. StreamK m a
nil)
                    Right (c
b, s
s) -> do
                        a
x <- s -> m a
pextract s
s
                        forall (m :: * -> *) a. Monad m => a -> m a
return (c
b, forall a (m :: * -> *). a -> StreamK m a
fromPure a
x)
            yieldk :: a -> StreamK m a -> m (c, StreamK m a)
yieldk a
a StreamK m a
r = do
                s
st <- a -> m s
pinject a
a
                Either s (c, s)
res <- SPEC -> s -> s -> m (Either s (c, s))
go1 SPEC
SPEC s
acc s
st
                case Either s (c, s)
res of
                    Left s
fs -> s -> StreamK m a -> m (c, StreamK m a)
go s
fs StreamK m a
r
                    Right (c
b, s
s) -> do
                        a
x <- s -> m a
pextract s
s
                        forall (m :: * -> *) a. Monad m => a -> m a
return (c
b, a
x forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a
r)
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (c, StreamK m a)
yieldk forall {m :: * -> *}. a -> m (c, StreamK m a)
single forall {m :: * -> *} {a}. m (c, StreamK m a)
stop StreamK m a
m1

    {-# INLINE go1 #-}
    go1 :: SPEC -> s -> s -> m (Either s (c, s))
go1 !SPEC
_ !s
fs s
st = do
        Step s b
r <- s -> m (Step s b)
pstep s
st
        case Step s b
r of
            Stream.Yield b
x s
s -> do
                Step s c
res <- s -> b -> m (Step s c)
fstep s
fs b
x
                case Step s c
res of
                    FL.Done c
b -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. b -> Either a b
Right (c
b, s
s)
                    FL.Partial s
fs1 -> SPEC -> s -> s -> m (Either s (c, s))
go1 SPEC
SPEC s
fs1 s
s
            Stream.Skip s
s -> SPEC -> s -> s -> m (Either s (c, s))
go1 SPEC
SPEC s
fs s
s
            Step s b
Stream.Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left s
fs

-- | Like 'foldl'' but with a monadic step function.
{-# INLINE foldlM' #-}
foldlM' :: Monad m => (b -> a -> m b) -> m b -> StreamK m a -> m b
foldlM' :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> StreamK m a -> m b
foldlM' b -> a -> m b
step m b
begin = forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> m x) -> m x -> (x -> m b) -> StreamK m a -> m b
foldlMx' b -> a -> m b
step m b
begin forall (m :: * -> *) a. Monad m => a -> m a
return

------------------------------------------------------------------------------
-- Specialized folds
------------------------------------------------------------------------------

{-# INLINE head #-}
head :: Monad m => StreamK m a -> m (Maybe a)
-- head = foldrM (\x _ -> return $ Just x) (return Nothing)
head :: forall (m :: * -> *) a. Monad m => StreamK m a -> m (Maybe a)
head StreamK m a
m =
    let stop :: m (Maybe a)
stop      = forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
        single :: a -> m (Maybe a)
single a
a  = forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
a)
        yieldk :: a -> p -> m (Maybe a)
yieldk a
a p
_ = forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
a)
    in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState forall {m :: * -> *} {a} {p}. Monad m => a -> p -> m (Maybe a)
yieldk forall {m :: * -> *} {a}. Monad m => a -> m (Maybe a)
single forall {a}. m (Maybe a)
stop StreamK m a
m

{-# INLINE elem #-}
elem :: (Monad m, Eq a) => a -> StreamK m a -> m Bool
elem :: forall (m :: * -> *) a.
(Monad m, Eq a) =>
a -> StreamK m a -> m Bool
elem a
e = forall {m :: * -> *}. Monad m => StreamK m a -> m Bool
go
    where
    go :: StreamK m a -> m Bool
go StreamK m a
m1 =
        let stop :: m Bool
stop      = forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            single :: a -> m Bool
single a
a  = forall (m :: * -> *) a. Monad m => a -> m a
return (a
a forall a. Eq a => a -> a -> Bool
== a
e)
            yieldk :: a -> StreamK m a -> m Bool
yieldk a
a StreamK m a
r = if a
a forall a. Eq a => a -> a -> Bool
== a
e then forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True else StreamK m a -> m Bool
go StreamK m a
r
        in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m Bool
yieldk forall {m :: * -> *}. Monad m => a -> m Bool
single m Bool
stop StreamK m a
m1

{-# INLINE notElem #-}
notElem :: (Monad m, Eq a) => a -> StreamK m a -> m Bool
notElem :: forall (m :: * -> *) a.
(Monad m, Eq a) =>
a -> StreamK m a -> m Bool
notElem a
e = forall {m :: * -> *}. Monad m => StreamK m a -> m Bool
go
    where
    go :: StreamK m a -> m Bool
go StreamK m a
m1 =
        let stop :: m Bool
stop      = forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            single :: a -> m Bool
single a
a  = forall (m :: * -> *) a. Monad m => a -> m a
return (a
a forall a. Eq a => a -> a -> Bool
/= a
e)
            yieldk :: a -> StreamK m a -> m Bool
yieldk a
a StreamK m a
r = if a
a forall a. Eq a => a -> a -> Bool
== a
e then forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False else StreamK m a -> m Bool
go StreamK m a
r
        in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m Bool
yieldk forall {m :: * -> *}. Monad m => a -> m Bool
single m Bool
stop StreamK m a
m1

{-# INLINABLE all #-}
all :: Monad m => (a -> Bool) -> StreamK m a -> m Bool
all :: forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> StreamK m a -> m Bool
all a -> Bool
p = forall {m :: * -> *}. Monad m => StreamK m a -> m Bool
go
    where
    go :: StreamK m a -> m Bool
go StreamK m a
m1 =
        let single :: a -> m Bool
single a
a   | a -> Bool
p a
a       = forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
                       | Bool
otherwise = forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            yieldk :: a -> StreamK m a -> m Bool
yieldk a
a StreamK m a
r | a -> Bool
p a
a       = StreamK m a -> m Bool
go StreamK m a
r
                       | Bool
otherwise = forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m Bool
yieldk forall {m :: * -> *}. Monad m => a -> m Bool
single (forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True) StreamK m a
m1

{-# INLINABLE any #-}
any :: Monad m => (a -> Bool) -> StreamK m a -> m Bool
any :: forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> StreamK m a -> m Bool
any a -> Bool
p = forall {m :: * -> *}. Monad m => StreamK m a -> m Bool
go
    where
    go :: StreamK m a -> m Bool
go StreamK m a
m1 =
        let single :: a -> m Bool
single a
a   | a -> Bool
p a
a       = forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
                       | Bool
otherwise = forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            yieldk :: a -> StreamK m a -> m Bool
yieldk a
a StreamK m a
r | a -> Bool
p a
a       = forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
                       | Bool
otherwise = StreamK m a -> m Bool
go StreamK m a
r
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m Bool
yieldk forall {m :: * -> *}. Monad m => a -> m Bool
single (forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False) StreamK m a
m1

-- | Extract the last element of the stream, if any.
{-# INLINE last #-}
last :: Monad m => StreamK m a -> m (Maybe a)
last :: forall (m :: * -> *) a. Monad m => StreamK m a -> m (Maybe a)
last = forall (m :: * -> *) a b x.
Monad m =>
(x -> a -> x) -> x -> (x -> b) -> StreamK m a -> m b
foldlx' (\Maybe a
_ a
y -> forall a. a -> Maybe a
Just a
y) forall a. Maybe a
Nothing forall a. a -> a
id

{-# INLINE minimum #-}
minimum :: (Monad m, Ord a) => StreamK m a -> m (Maybe a)
minimum :: forall (m :: * -> *) a.
(Monad m, Ord a) =>
StreamK m a -> m (Maybe a)
minimum = forall {m :: * -> *} {a}.
(Ord a, Monad m) =>
Maybe a -> StreamK m a -> m (Maybe a)
go forall a. Maybe a
Nothing
    where
    go :: Maybe a -> StreamK m a -> m (Maybe a)
go Maybe a
Nothing StreamK m a
m1 =
        let stop :: m (Maybe a)
stop      = forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
            single :: a -> m (Maybe a)
single a
a  = forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
a)
            yieldk :: a -> StreamK m a -> m (Maybe a)
yieldk a
a StreamK m a
r = Maybe a -> StreamK m a -> m (Maybe a)
go (forall a. a -> Maybe a
Just a
a) StreamK m a
r
        in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
yieldk forall {m :: * -> *} {a}. Monad m => a -> m (Maybe a)
single forall {a}. m (Maybe a)
stop StreamK m a
m1

    go (Just a
res) StreamK m a
m1 =
        let stop :: m (Maybe a)
stop      = forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
res)
            single :: a -> m (Maybe a)
single a
a  =
                if a
res forall a. Ord a => a -> a -> Bool
<= a
a
                then forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
res)
                else forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
a)
            yieldk :: a -> StreamK m a -> m (Maybe a)
yieldk a
a StreamK m a
r =
                if a
res forall a. Ord a => a -> a -> Bool
<= a
a
                then Maybe a -> StreamK m a -> m (Maybe a)
go (forall a. a -> Maybe a
Just a
res) StreamK m a
r
                else Maybe a -> StreamK m a -> m (Maybe a)
go (forall a. a -> Maybe a
Just a
a) StreamK m a
r
        in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
yieldk forall {m :: * -> *}. Monad m => a -> m (Maybe a)
single m (Maybe a)
stop StreamK m a
m1

{-# INLINE minimumBy #-}
minimumBy
    :: (Monad m)
    => (a -> a -> Ordering) -> StreamK m a -> m (Maybe a)
minimumBy :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> StreamK m a -> m (Maybe a)
minimumBy a -> a -> Ordering
cmp = forall {m :: * -> *}.
Monad m =>
Maybe a -> StreamK m a -> m (Maybe a)
go forall a. Maybe a
Nothing
    where
    go :: Maybe a -> StreamK m a -> m (Maybe a)
go Maybe a
Nothing StreamK m a
m1 =
        let stop :: m (Maybe a)
stop      = forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
            single :: a -> m (Maybe a)
single a
a  = forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
a)
            yieldk :: a -> StreamK m a -> m (Maybe a)
yieldk a
a StreamK m a
r = Maybe a -> StreamK m a -> m (Maybe a)
go (forall a. a -> Maybe a
Just a
a) StreamK m a
r
        in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
yieldk forall {m :: * -> *} {a}. Monad m => a -> m (Maybe a)
single forall {a}. m (Maybe a)
stop StreamK m a
m1

    go (Just a
res) StreamK m a
m1 =
        let stop :: m (Maybe a)
stop      = forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
res)
            single :: a -> m (Maybe a)
single a
a  = case a -> a -> Ordering
cmp a
res a
a of
                Ordering
GT -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
a)
                Ordering
_  -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
res)
            yieldk :: a -> StreamK m a -> m (Maybe a)
yieldk a
a StreamK m a
r = case a -> a -> Ordering
cmp a
res a
a of
                Ordering
GT -> Maybe a -> StreamK m a -> m (Maybe a)
go (forall a. a -> Maybe a
Just a
a) StreamK m a
r
                Ordering
_  -> Maybe a -> StreamK m a -> m (Maybe a)
go (forall a. a -> Maybe a
Just a
res) StreamK m a
r
        in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
yieldk forall {m :: * -> *}. Monad m => a -> m (Maybe a)
single m (Maybe a)
stop StreamK m a
m1

{-# INLINE maximum #-}
maximum :: (Monad m, Ord a) => StreamK m a -> m (Maybe a)
maximum :: forall (m :: * -> *) a.
(Monad m, Ord a) =>
StreamK m a -> m (Maybe a)
maximum = forall {m :: * -> *} {a}.
(Ord a, Monad m) =>
Maybe a -> StreamK m a -> m (Maybe a)
go forall a. Maybe a
Nothing
    where
    go :: Maybe a -> StreamK m a -> m (Maybe a)
go Maybe a
Nothing StreamK m a
m1 =
        let stop :: m (Maybe a)
stop      = forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
            single :: a -> m (Maybe a)
single a
a  = forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
a)
            yieldk :: a -> StreamK m a -> m (Maybe a)
yieldk a
a StreamK m a
r = Maybe a -> StreamK m a -> m (Maybe a)
go (forall a. a -> Maybe a
Just a
a) StreamK m a
r
        in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
yieldk forall {m :: * -> *} {a}. Monad m => a -> m (Maybe a)
single forall {a}. m (Maybe a)
stop StreamK m a
m1

    go (Just a
res) StreamK m a
m1 =
        let stop :: m (Maybe a)
stop      = forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
res)
            single :: a -> m (Maybe a)
single a
a  =
                if a
res forall a. Ord a => a -> a -> Bool
<= a
a
                then forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
a)
                else forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
res)
            yieldk :: a -> StreamK m a -> m (Maybe a)
yieldk a
a StreamK m a
r =
                if a
res forall a. Ord a => a -> a -> Bool
<= a
a
                then Maybe a -> StreamK m a -> m (Maybe a)
go (forall a. a -> Maybe a
Just a
a) StreamK m a
r
                else Maybe a -> StreamK m a -> m (Maybe a)
go (forall a. a -> Maybe a
Just a
res) StreamK m a
r
        in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
yieldk forall {m :: * -> *}. Monad m => a -> m (Maybe a)
single m (Maybe a)
stop StreamK m a
m1

{-# INLINE maximumBy #-}
maximumBy :: Monad m => (a -> a -> Ordering) -> StreamK m a -> m (Maybe a)
maximumBy :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> StreamK m a -> m (Maybe a)
maximumBy a -> a -> Ordering
cmp = forall {m :: * -> *}.
Monad m =>
Maybe a -> StreamK m a -> m (Maybe a)
go forall a. Maybe a
Nothing
    where
    go :: Maybe a -> StreamK m a -> m (Maybe a)
go Maybe a
Nothing StreamK m a
m1 =
        let stop :: m (Maybe a)
stop      = forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
            single :: a -> m (Maybe a)
single a
a  = forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
a)
            yieldk :: a -> StreamK m a -> m (Maybe a)
yieldk a
a StreamK m a
r = Maybe a -> StreamK m a -> m (Maybe a)
go (forall a. a -> Maybe a
Just a
a) StreamK m a
r
        in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
yieldk forall {m :: * -> *} {a}. Monad m => a -> m (Maybe a)
single forall {a}. m (Maybe a)
stop StreamK m a
m1

    go (Just a
res) StreamK m a
m1 =
        let stop :: m (Maybe a)
stop      = forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
res)
            single :: a -> m (Maybe a)
single a
a  = case a -> a -> Ordering
cmp a
res a
a of
                Ordering
GT -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
res)
                Ordering
_  -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
a)
            yieldk :: a -> StreamK m a -> m (Maybe a)
yieldk a
a StreamK m a
r = case a -> a -> Ordering
cmp a
res a
a of
                Ordering
GT -> Maybe a -> StreamK m a -> m (Maybe a)
go (forall a. a -> Maybe a
Just a
res) StreamK m a
r
                Ordering
_  -> Maybe a -> StreamK m a -> m (Maybe a)
go (forall a. a -> Maybe a
Just a
a) StreamK m a
r
        in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
yieldk forall {m :: * -> *}. Monad m => a -> m (Maybe a)
single m (Maybe a)
stop StreamK m a
m1

{-# INLINE (!!) #-}
(!!) :: Monad m => StreamK m a -> Int -> m (Maybe a)
StreamK m a
m !! :: forall (m :: * -> *) a.
Monad m =>
StreamK m a -> Int -> m (Maybe a)
!! Int
i = forall {m :: * -> *} {t} {a}.
(Ord t, Monad m, Num t) =>
t -> StreamK m a -> m (Maybe a)
go Int
i StreamK m a
m
    where
    go :: t -> StreamK m a -> m (Maybe a)
go t
n StreamK m a
m1 =
      let single :: a -> m (Maybe a)
single a
a | t
n forall a. Eq a => a -> a -> Bool
== t
0 = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just a
a
                   | Bool
otherwise = forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
          yieldk :: a -> StreamK m a -> m (Maybe a)
yieldk a
a StreamK m a
x | t
n forall a. Ord a => a -> a -> Bool
< t
0 = forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
                     | t
n forall a. Eq a => a -> a -> Bool
== t
0 = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just a
a
                     | Bool
otherwise = t -> StreamK m a -> m (Maybe a)
go (t
n forall a. Num a => a -> a -> a
- t
1) StreamK m a
x
      in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
yieldk forall {m :: * -> *} {a}. Monad m => a -> m (Maybe a)
single (forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing) StreamK m a
m1

{-# INLINE lookup #-}
lookup :: (Monad m, Eq a) => a -> StreamK m (a, b) -> m (Maybe b)
lookup :: forall (m :: * -> *) a b.
(Monad m, Eq a) =>
a -> StreamK m (a, b) -> m (Maybe b)
lookup a
e = forall {m :: * -> *} {a}.
Monad m =>
StreamK m (a, a) -> m (Maybe a)
go
    where
    go :: StreamK m (a, a) -> m (Maybe a)
go StreamK m (a, a)
m1 =
        let single :: (a, a) -> m (Maybe a)
single (a
a, a
b) | a
a forall a. Eq a => a -> a -> Bool
== a
e = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just a
b
                          | Bool
otherwise = forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
            yieldk :: (a, a) -> StreamK m (a, a) -> m (Maybe a)
yieldk (a
a, a
b) StreamK m (a, a)
x | a
a forall a. Eq a => a -> a -> Bool
== a
e = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just a
b
                            | Bool
otherwise = StreamK m (a, a) -> m (Maybe a)
go StreamK m (a, a)
x
        in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState (a, a) -> StreamK m (a, a) -> m (Maybe a)
yieldk forall {m :: * -> *} {a}. Monad m => (a, a) -> m (Maybe a)
single (forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing) StreamK m (a, a)
m1

{-# INLINE findM #-}
findM :: Monad m => (a -> m Bool) -> StreamK m a -> m (Maybe a)
findM :: forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> StreamK m a -> m (Maybe a)
findM a -> m Bool
p = StreamK m a -> m (Maybe a)
go
    where
    go :: StreamK m a -> m (Maybe a)
go StreamK m a
m1 =
        let single :: a -> m (Maybe a)
single a
a = do
                Bool
b <- a -> m Bool
p a
a
                if Bool
b then forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just a
a else forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
            yieldk :: a -> StreamK m a -> m (Maybe a)
yieldk a
a StreamK m a
x = do
                Bool
b <- a -> m Bool
p a
a
                if Bool
b then forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just a
a else StreamK m a -> m (Maybe a)
go StreamK m a
x
        in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
yieldk a -> m (Maybe a)
single (forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing) StreamK m a
m1

{-# INLINE find #-}
find :: Monad m => (a -> Bool) -> StreamK m a -> m (Maybe a)
find :: forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> StreamK m a -> m (Maybe a)
find a -> Bool
p = forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> StreamK m a -> m (Maybe a)
findM (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Bool
p)

{-# INLINE findIndices #-}
findIndices :: (a -> Bool) -> StreamK m a -> StreamK m Int
findIndices :: forall a (m :: * -> *). (a -> Bool) -> StreamK m a -> StreamK m Int
findIndices a -> Bool
p = forall {t} {m :: * -> *}. Num t => t -> StreamK m a -> StreamK m t
go Int
0
    where
    go :: t -> StreamK m a -> StreamK m t
go t
offset StreamK m a
m1 = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m t
st t -> StreamK m t -> m r
yld t -> m r
sng m r
stp ->
        let single :: a -> m r
single a
a | a -> Bool
p a
a = t -> m r
sng t
offset
                     | Bool
otherwise = m r
stp
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
x | a -> Bool
p a
a = t -> StreamK m t -> m r
yld t
offset forall a b. (a -> b) -> a -> b
$ t -> StreamK m a -> StreamK m t
go (t
offset forall a. Num a => a -> a -> a
+ t
1) StreamK m a
x
                       | Bool
otherwise = forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m t
st) t -> StreamK m t -> m r
yld t -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$
                            t -> StreamK m a -> StreamK m t
go (t
offset forall a. Num a => a -> a -> a
+ t
1) StreamK m a
x
        in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m t
st) a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
m1

------------------------------------------------------------------------------
-- Map and Fold
------------------------------------------------------------------------------

-- | Apply a monadic action to each element of the stream and discard the
-- output of the action.
{-# INLINE mapM_ #-}
mapM_ :: Monad m => (a -> m b) -> StreamK m a -> m ()
mapM_ :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> StreamK m a -> m ()
mapM_ a -> m b
f = StreamK m a -> m ()
go
    where
    go :: StreamK m a -> m ()
go StreamK m a
m1 =
        let stop :: m ()
stop = forall (m :: * -> *) a. Monad m => a -> m a
return ()
            single :: a -> m ()
single a
a = forall (f :: * -> *) a. Functor f => f a -> f ()
void (a -> m b
f a
a)
            yieldk :: a -> StreamK m a -> m ()
yieldk a
a StreamK m a
r = a -> m b
f a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StreamK m a -> m ()
go StreamK m a
r
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m ()
yieldk a -> m ()
single m ()
stop StreamK m a
m1

{-# INLINE mapM #-}
mapM :: Monad m => (a -> m b) -> StreamK m a -> StreamK m b
mapM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> StreamK m a -> StreamK m b
mapM = forall (m :: * -> *) b a.
(m b -> StreamK m b -> StreamK m b)
-> (a -> m b) -> StreamK m a -> StreamK m b
mapMWith forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
consM

------------------------------------------------------------------------------
-- Converting folds
------------------------------------------------------------------------------

{-# INLINABLE toList #-}
toList :: Monad m => StreamK m a -> m [a]
toList :: forall (m :: * -> *) a. Monad m => StreamK m a -> m [a]
toList = forall (m :: * -> *) a b.
Monad m =>
(a -> b -> b) -> b -> StreamK m a -> m b
foldr (:) []

-- Based on suggestions by David Feuer and Pranay Sashank
{-# INLINE hoist #-}
hoist :: (Monad m, Monad n)
    => (forall x. m x -> n x) -> StreamK m a -> StreamK n a
hoist :: forall (m :: * -> *) (n :: * -> *) a.
(Monad m, Monad n) =>
(forall x. m x -> n x) -> StreamK m a -> StreamK n a
hoist forall x. m x -> n x
f StreamK m a
str =
    forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK n a
st a -> StreamK n a -> n r
yld a -> n r
sng n r
stp ->
            let single :: a -> m (n r)
single = forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> n r
sng
                yieldk :: a -> StreamK m a -> m (n r)
yieldk a
a StreamK m a
s = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ a -> StreamK n a -> n r
yld a
a (forall (m :: * -> *) (n :: * -> *) a.
(Monad m, Monad n) =>
(forall x. m x -> n x) -> StreamK m a -> StreamK n a
hoist forall x. m x -> n x
f StreamK m a
s)
                stop :: m (n r)
stop = forall (m :: * -> *) a. Monad m => a -> m a
return n r
stp
                state :: State StreamK n b
state = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK n a
st
             in forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall x. m x -> n x
f forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared forall {n :: * -> *} {b}. State StreamK n b
state forall {m :: * -> *}. Monad m => a -> StreamK m a -> m (n r)
yieldk a -> m (n r)
single m (n r)
stop StreamK m a
str

-------------------------------------------------------------------------------
-- Transformation by folding (Scans)
-------------------------------------------------------------------------------

{-# INLINE scanlx' #-}
scanlx' :: (x -> a -> x) -> x -> (x -> b) -> StreamK m a -> StreamK m b
scanlx' :: forall x a b (m :: * -> *).
(x -> a -> x) -> x -> (x -> b) -> StreamK m a -> StreamK m b
scanlx' x -> a -> x
step x
begin x -> b
done StreamK m a
m =
    forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons (x -> b
done x
begin) forall a b. (a -> b) -> a -> b
$ forall {m :: * -> *}. StreamK m a -> x -> StreamK m b
go StreamK m a
m x
begin
    where
    go :: StreamK m a -> x -> StreamK m b
go StreamK m a
m1 !x
acc = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp ->
        let single :: a -> m r
single a
a = b -> m r
sng (x -> b
done forall a b. (a -> b) -> a -> b
$ x -> a -> x
step x
acc a
a)
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r =
                let s :: x
s = x -> a -> x
step x
acc a
a
                in b -> StreamK m b -> m r
yld (x -> b
done x
s) (StreamK m a -> x -> StreamK m b
go StreamK m a
r x
s)
        in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m b
st) a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
m1

{-# INLINE scanl' #-}
scanl' :: (b -> a -> b) -> b -> StreamK m a -> StreamK m b
scanl' :: forall b a (m :: * -> *).
(b -> a -> b) -> b -> StreamK m a -> StreamK m b
scanl' b -> a -> b
step b
begin = forall x a b (m :: * -> *).
(x -> a -> x) -> x -> (x -> b) -> StreamK m a -> StreamK m b
scanlx' b -> a -> b
step b
begin forall a. a -> a
id

-------------------------------------------------------------------------------
-- Filtering
-------------------------------------------------------------------------------

{-# INLINE filter #-}
filter :: (a -> Bool) -> StreamK m a -> StreamK m a
filter :: forall a (m :: * -> *). (a -> Bool) -> StreamK m a -> StreamK m a
filter a -> Bool
p = forall {m :: * -> *}. StreamK m a -> StreamK m a
go
    where
    go :: StreamK m a -> StreamK m a
go StreamK m a
m1 = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let single :: a -> m r
single a
a   | a -> Bool
p a
a       = a -> m r
sng a
a
                       | Bool
otherwise = m r
stp
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r | a -> Bool
p a
a       = a -> StreamK m a -> m r
yld a
a (StreamK m a -> StreamK m a
go StreamK m a
r)
                       | Bool
otherwise = forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
r
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
m1

{-# INLINE take #-}
take :: Int -> StreamK m a -> StreamK m a
take :: forall (m :: * -> *) a. Int -> StreamK m a -> StreamK m a
take = forall {t} {m :: * -> *} {a}.
(Ord t, Num t) =>
t -> StreamK m a -> StreamK m a
go
    where
    go :: t -> StreamK m a -> StreamK m a
go t
n1 StreamK m a
m1 = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = a -> StreamK m a -> m r
yld a
a (t -> StreamK m a -> StreamK m a
go (t
n1 forall a. Num a => a -> a -> a
- t
1) StreamK m a
r)
        in if t
n1 forall a. Ord a => a -> a -> Bool
<= t
0
           then m r
stp
           else forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
sng m r
stp StreamK m a
m1

{-# INLINE takeWhile #-}
takeWhile :: (a -> Bool) -> StreamK m a -> StreamK m a
takeWhile :: forall a (m :: * -> *). (a -> Bool) -> StreamK m a -> StreamK m a
takeWhile a -> Bool
p = forall {m :: * -> *}. StreamK m a -> StreamK m a
go
    where
    go :: StreamK m a -> StreamK m a
go StreamK m a
m1 = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let single :: a -> m r
single a
a   | a -> Bool
p a
a       = a -> m r
sng a
a
                       | Bool
otherwise = m r
stp
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r | a -> Bool
p a
a       = a -> StreamK m a -> m r
yld a
a (StreamK m a -> StreamK m a
go StreamK m a
r)
                       | Bool
otherwise = m r
stp
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
m1

{-# INLINE drop #-}
drop :: Int -> StreamK m a -> StreamK m a
drop :: forall (m :: * -> *) a. Int -> StreamK m a -> StreamK m a
drop Int
n StreamK m a
m = forall (m :: * -> *) a. StreamK m a -> StreamK m a
unShare (forall {t} {m :: * -> *} {a}.
(Ord t, Num t) =>
t -> StreamK m a -> StreamK m a
go Int
n StreamK m a
m)
    where
    go :: t -> StreamK m a -> StreamK m a
go t
n1 StreamK m a
m1 = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let single :: p -> m r
single p
_ = m r
stp
            yieldk :: p -> StreamK m a -> m r
yieldk p
_ StreamK m a
r = forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ t -> StreamK m a -> StreamK m a
go (t
n1 forall a. Num a => a -> a -> a
- t
1) StreamK m a
r
        -- Somehow "<=" check performs better than a ">"
        in if t
n1 forall a. Ord a => a -> a -> Bool
<= t
0
           then forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
m1
           else forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
st forall {p}. p -> StreamK m a -> m r
yieldk forall {p}. p -> m r
single m r
stp StreamK m a
m1

{-# INLINE dropWhile #-}
dropWhile :: (a -> Bool) -> StreamK m a -> StreamK m a
dropWhile :: forall a (m :: * -> *). (a -> Bool) -> StreamK m a -> StreamK m a
dropWhile a -> Bool
p = forall {m :: * -> *}. StreamK m a -> StreamK m a
go
    where
    go :: StreamK m a -> StreamK m a
go StreamK m a
m1 = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let single :: a -> m r
single a
a   | a -> Bool
p a
a       = m r
stp
                       | Bool
otherwise = a -> m r
sng a
a
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r | a -> Bool
p a
a = forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
r
                       | Bool
otherwise = a -> StreamK m a -> m r
yld a
a StreamK m a
r
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
m1

-------------------------------------------------------------------------------
-- Mapping
-------------------------------------------------------------------------------

-- Be careful when modifying this, this uses a consM (|:) deliberately to allow
-- other stream types to overload it.
{-# INLINE sequence #-}
sequence :: Monad m => StreamK m (m a) -> StreamK m a
sequence :: forall (m :: * -> *) a. Monad m => StreamK m (m a) -> StreamK m a
sequence = forall (m :: * -> *) a. Monad m => StreamK m (m a) -> StreamK m a
go
    where
    go :: StreamK m (m a) -> StreamK m a
go StreamK m (m a)
m1 = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let single :: m a -> m r
single m a
ma = m a
ma forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> m r
sng
            yieldk :: m a -> StreamK m (m a) -> m r
yieldk m a
ma StreamK m (m a)
r = forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ m a
ma forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
`consM` StreamK m (m a) -> StreamK m a
go StreamK m (m a)
r
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st) m a -> StreamK m (m a) -> m r
yieldk m a -> m r
single m r
stp StreamK m (m a)
m1

-------------------------------------------------------------------------------
-- Inserting
-------------------------------------------------------------------------------

{-# INLINE intersperseM #-}
intersperseM :: Monad m => m a -> StreamK m a -> StreamK m a
intersperseM :: forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
intersperseM m a
a = StreamK m a -> StreamK m a
prependingStart
    where
    prependingStart :: StreamK m a -> StreamK m a
prependingStart StreamK m a
m1 = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let yieldk :: a -> StreamK m a -> m r
yieldk a
i StreamK m a
x =
                forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return a
i forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
`consM` StreamK m a -> StreamK m a
go StreamK m a
x
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
sng m r
stp StreamK m a
m1
    go :: StreamK m a -> StreamK m a
go StreamK m a
m2 = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let single :: a -> m r
single a
i = forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ m a
a forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
`consM` forall a (m :: * -> *). a -> StreamK m a
fromPure a
i
            yieldk :: a -> StreamK m a -> m r
yieldk a
i StreamK m a
x =
                forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamShared
                    State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ m a
a forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
`consM` forall (m :: * -> *) a. Monad m => a -> m a
return a
i forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
`consM` StreamK m a -> StreamK m a
go StreamK m a
x
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
m2

{-# INLINE intersperse #-}
intersperse :: Monad m => a -> StreamK m a -> StreamK m a
intersperse :: forall (m :: * -> *) a. Monad m => a -> StreamK m a -> StreamK m a
intersperse a
a = forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
intersperseM (forall (m :: * -> *) a. Monad m => a -> m a
return a
a)

{-# INLINE insertBy #-}
insertBy :: (a -> a -> Ordering) -> a -> StreamK m a -> StreamK m a
insertBy :: forall a (m :: * -> *).
(a -> a -> Ordering) -> a -> StreamK m a -> StreamK m a
insertBy a -> a -> Ordering
cmp a
x = forall {m :: * -> *}. StreamK m a -> StreamK m a
go
  where
    go :: StreamK m a -> StreamK m a
go StreamK m a
m1 = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
_ m r
_ ->
        let single :: a -> m r
single a
a = case a -> a -> Ordering
cmp a
x a
a of
                Ordering
GT -> a -> StreamK m a -> m r
yld a
a (forall a (m :: * -> *). a -> StreamK m a
fromPure a
x)
                Ordering
_  -> a -> StreamK m a -> m r
yld a
x (forall a (m :: * -> *). a -> StreamK m a
fromPure a
a)
            stop :: m r
stop = a -> StreamK m a -> m r
yld a
x forall (m :: * -> *) a. StreamK m a
nil
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = case a -> a -> Ordering
cmp a
x a
a of
                Ordering
GT -> a -> StreamK m a -> m r
yld a
a (StreamK m a -> StreamK m a
go StreamK m a
r)
                Ordering
_  -> a -> StreamK m a -> m r
yld a
x (a
a forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a
r)
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single m r
stop StreamK m a
m1

------------------------------------------------------------------------------
-- Deleting
------------------------------------------------------------------------------

{-# INLINE deleteBy #-}
deleteBy :: (a -> a -> Bool) -> a -> StreamK m a -> StreamK m a
deleteBy :: forall a (m :: * -> *).
(a -> a -> Bool) -> a -> StreamK m a -> StreamK m a
deleteBy a -> a -> Bool
eq a
x = forall {m :: * -> *}. StreamK m a -> StreamK m a
go
  where
    go :: StreamK m a -> StreamK m a
go StreamK m a
m1 = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let single :: a -> m r
single a
a = if a -> a -> Bool
eq a
x a
a then m r
stp else a -> m r
sng a
a
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = if a -> a -> Bool
eq a
x a
a
              then forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
r
              else a -> StreamK m a -> m r
yld a
a (StreamK m a -> StreamK m a
go StreamK m a
r)
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
m1

-------------------------------------------------------------------------------
-- Map and Filter
-------------------------------------------------------------------------------

{-# INLINE mapMaybe #-}
mapMaybe :: (a -> Maybe b) -> StreamK m a -> StreamK m b
mapMaybe :: forall a b (m :: * -> *).
(a -> Maybe b) -> StreamK m a -> StreamK m b
mapMaybe a -> Maybe b
f = forall {m :: * -> *}. StreamK m a -> StreamK m b
go
  where
    go :: StreamK m a -> StreamK m b
go StreamK m a
m1 = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m b
st b -> StreamK m b -> m r
yld b -> m r
sng m r
stp ->
        let single :: a -> m r
single a
a = forall b a. b -> (a -> b) -> Maybe a -> b
maybe m r
stp b -> m r
sng (a -> Maybe b
f a
a)
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = case a -> Maybe b
f a
a of
                Just b
b  -> b -> StreamK m b -> m r
yld b
b forall a b. (a -> b) -> a -> b
$ StreamK m a -> StreamK m b
go StreamK m a
r
                Maybe b
Nothing -> forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m b
st) a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
r
        in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m b
st) a -> StreamK m a -> m r
yieldk a -> m r
single m r
stp StreamK m a
m1

-------------------------------------------------------------------------------
-- Exception Handling
-------------------------------------------------------------------------------

-- | Like Streamly.Data.Stream.'Streamly.Data.Stream.handle' but with one
-- significant difference, this function observes exceptions from the consumer
-- of the stream as well.
--
-- You can also convert 'StreamK' to 'Stream' and use exception handling from
-- 'Stream' module:
--
-- >>> handle f s = StreamK.fromStream $ Stream.handle (\e -> StreamK.toStream (f e)) (StreamK.toStream s)
--
{-# INLINABLE handle #-}
handle :: (MonadCatch m, Exception e)
    => (e -> m (StreamK m a)) -> StreamK m a -> StreamK m a
handle :: forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> m (StreamK m a)) -> StreamK m a -> StreamK m a
handle e -> m (StreamK m a)
f StreamK m a
stream = StreamK m a -> StreamK m a
go StreamK m a
stream

    where

    go :: StreamK m a -> StreamK m a
go StreamK m a
m1 = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = a -> StreamK m a -> m r
yld a
a forall a b. (a -> b) -> a -> b
$ StreamK m a -> StreamK m a
go StreamK m a
r
        in do
            Either e r
res <- forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try (forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st) a -> StreamK m a -> m r
yieldk a -> m r
sng m r
stp StreamK m a
m1)
            case Either e r
res of
                Right r
r -> forall (m :: * -> *) a. Monad m => a -> m a
return r
r
                Left e
e -> do
                    StreamK m a
r <- e -> m (StreamK m a)
f e
e
                    forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st) a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
r

-------------------------------------------------------------------------------
-- Resource Management
-------------------------------------------------------------------------------

-- If we are folding the stream and we do not drain the entire stream (e.g. if
-- the fold terminates before the stream) then the finalizer will run on GC.
--
-- XXX To implement a prompt cleanup, we will have to yield a cleanup function
-- via the yield continuation. A chain of cleanup functions can be built and
-- the entire chain can be invoked when the stream ends voluntarily or if
-- someone decides to abandon the stream.

-- | Like Streamly.Data.Stream.'Streamly.Data.Stream.bracketIO' but with one
-- significant difference, this function observes exceptions from the consumer
-- of the stream as well. Therefore, it cleans up the resource promptly when
-- the consumer encounters an exception.
--
-- You can also convert 'StreamK' to 'Stream' and use resource handling from
-- 'Stream' module:
--
-- >>> bracketIO bef aft bet = StreamK.fromStream $ Stream.bracketIO bef aft (StreamK.toStream . bet)
--
{-# INLINABLE bracketIO #-}
bracketIO :: (MonadIO m, MonadCatch m)
    => IO b -> (b -> IO c) -> (b -> StreamK m a) -> StreamK m a
bracketIO :: forall (m :: * -> *) b c a.
(MonadIO m, MonadCatch m) =>
IO b -> (b -> IO c) -> (b -> StreamK m a) -> StreamK m a
bracketIO IO b
bef b -> IO c
aft b -> StreamK m a
bet =
    forall (m :: * -> *) a. Monad m => m (StreamK m a) -> StreamK m a
concatEffect forall a b. (a -> b) -> a -> b
$ do
        (b
r, IOFinalizer
ref) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IO a -> IO a
mask_ forall a b. (a -> b) -> a -> b
$ do
            b
r <- IO b
bef
            IOFinalizer
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m IOFinalizer
newIOFinalizer (b -> IO c
aft b
r)
            forall (m :: * -> *) a. Monad m => a -> m a
return (b
r, IOFinalizer
ref)
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall {m :: * -> *} {a}.
(MonadCatch m, MonadIO m) =>
IOFinalizer -> StreamK m a -> StreamK m a
go IOFinalizer
ref (b -> StreamK m a
bet b
r)

    where

    go :: IOFinalizer -> StreamK m a -> StreamK m a
go IOFinalizer
ref StreamK m a
m1 = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
        let
            -- We can discard exceptions on continuations to make it equivalent
            -- to StreamD, but it seems like a desirable behavior.
            stop :: m r
stop = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m r
stp
            single :: a -> m r
single a
a = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> m r
sng a
a
            yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
r = a -> StreamK m a -> m r
yld a
a forall a b. (a -> b) -> a -> b
$ IOFinalizer -> StreamK m a -> StreamK m a
go IOFinalizer
ref StreamK m a
r
        in do
            -- Do not call the finalizer twice if it has already been
            -- called via stop continuation and stop continuation itself
            -- generated an exception. runIOFinalizer takes care of that.
            Either SomeException r
res <- forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try (forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st) a -> StreamK m a -> m r
yieldk a -> m r
single m r
stop StreamK m a
m1)
            case Either SomeException r
res of
                Right r
r -> forall (m :: * -> *) a. Monad m => a -> m a
return r
r
                Left (SomeException
e :: MC.SomeException) ->
                    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
MC.throwM SomeException
e

------------------------------------------------------------------------------
-- Serial Zipping
------------------------------------------------------------------------------

-- | Zipping of @n@ streams can be performed by combining the streams pair
-- wise using 'mergeMapWith' with O(n * log n) time complexity. If used
-- with 'concatMapWith' it will have O(n^2) performance.
{-# INLINE zipWith #-}
zipWith :: Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c
zipWith :: forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c
zipWith a -> b -> c
f = forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> StreamK m a -> StreamK m b -> StreamK m c
zipWithM (\a
a b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b -> c
f a
a b
b))

{-# INLINE zipWithM #-}
zipWithM :: Monad m =>
    (a -> b -> m c) -> StreamK m a -> StreamK m b -> StreamK m c
zipWithM :: forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> StreamK m a -> StreamK m b -> StreamK m c
zipWithM a -> b -> m c
f = StreamK m a -> StreamK m b -> StreamK m c
go

    where

    go :: StreamK m a -> StreamK m b -> StreamK m c
go StreamK m a
mx StreamK m b
my = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m c
st c -> StreamK m c -> m r
yld c -> m r
sng m r
stp -> do
        let merge :: a -> StreamK m a -> m r
merge a
a StreamK m a
ra =
                let single2 :: b -> m r
single2 b
b   = a -> b -> m c
f a
a b
b forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= c -> m r
sng
                    yield2 :: b -> StreamK m b -> m r
yield2 b
b StreamK m b
rb = a -> b -> m c
f a
a b
b forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \c
x -> c -> StreamK m c -> m r
yld c
x (StreamK m a -> StreamK m b -> StreamK m c
go StreamK m a
ra StreamK m b
rb)
                 in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m c
st) b -> StreamK m b -> m r
yield2 b -> m r
single2 m r
stp StreamK m b
my
        let single1 :: a -> m r
single1 a
a = a -> StreamK m a -> m r
merge a
a forall (m :: * -> *) a. StreamK m a
nil
            yield1 :: a -> StreamK m a -> m r
yield1 = a -> StreamK m a -> m r
merge
        forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m c
st) a -> StreamK m a -> m r
yield1 a -> m r
single1 m r
stp StreamK m a
mx

------------------------------------------------------------------------------
-- Merging
------------------------------------------------------------------------------

{-# INLINE mergeByM #-}
mergeByM :: Monad m =>
    (a -> a -> m Ordering) -> StreamK m a -> StreamK m a -> StreamK m a
mergeByM :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> m Ordering) -> StreamK m a -> StreamK m a -> StreamK m a
mergeByM a -> a -> m Ordering
cmp = StreamK m a -> StreamK m a -> StreamK m a
go

    where

    go :: StreamK m a -> StreamK m a -> StreamK m a
go StreamK m a
mx StreamK m a
my = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
        let stop :: m r
stop = forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
my
            single :: a -> m r
single a
x = forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (a -> StreamK m a -> StreamK m a
goX0 a
x StreamK m a
my)
            yield :: a -> StreamK m a -> m r
yield a
x StreamK m a
rx = forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (a -> StreamK m a -> StreamK m a -> StreamK m a
goX a
x StreamK m a
rx StreamK m a
my)
        forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yield a -> m r
single m r
stop StreamK m a
mx

    goX0 :: a -> StreamK m a -> StreamK m a
goX0 a
x StreamK m a
my = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
_ -> do
        let stop :: m r
stop = a -> m r
sng a
x
            single :: a -> m r
single a
y = do
                Ordering
r <- a -> a -> m Ordering
cmp a
x a
y
                case Ordering
r of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (forall a (m :: * -> *). a -> StreamK m a
fromPure a
x)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (forall a (m :: * -> *). a -> StreamK m a
fromPure a
y)
            yield :: a -> StreamK m a -> m r
yield a
y StreamK m a
ry = do
                Ordering
r <- a -> a -> m Ordering
cmp a
x a
y
                case Ordering
r of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a -> StreamK m a -> StreamK m a
goX0 a
x StreamK m a
ry)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (a
y forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a
ry)
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yield a -> m r
single m r
stop StreamK m a
my

    goX :: a -> StreamK m a -> StreamK m a -> StreamK m a
goX a
x StreamK m a
mx StreamK m a
my = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
_ m r
_ -> do
        let stop :: m r
stop = a -> StreamK m a -> m r
yld a
x StreamK m a
mx
            single :: a -> m r
single a
y = do
                Ordering
r <- a -> a -> m Ordering
cmp a
x a
y
                case Ordering
r of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a
x forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a
mx)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (StreamK m a -> a -> StreamK m a
goY0 StreamK m a
mx a
y)
            yield :: a -> StreamK m a -> m r
yield a
y StreamK m a
ry = do
                Ordering
r <- a -> a -> m Ordering
cmp a
x a
y
                case Ordering
r of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a -> StreamK m a -> StreamK m a -> StreamK m a
goX a
x StreamK m a
mx StreamK m a
ry)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (StreamK m a -> a -> StreamK m a -> StreamK m a
goY StreamK m a
mx a
y StreamK m a
ry)
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yield a -> m r
single m r
stop StreamK m a
my

    goY0 :: StreamK m a -> a -> StreamK m a
goY0 StreamK m a
mx a
y = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
_ -> do
        let stop :: m r
stop = a -> m r
sng a
y
            single :: a -> m r
single a
x = do
                Ordering
r <- a -> a -> m Ordering
cmp a
x a
y
                case Ordering
r of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (forall a (m :: * -> *). a -> StreamK m a
fromPure a
x)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (forall a (m :: * -> *). a -> StreamK m a
fromPure a
y)
            yield :: a -> StreamK m a -> m r
yield a
x StreamK m a
rx = do
                Ordering
r <- a -> a -> m Ordering
cmp a
x a
y
                case Ordering
r of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a
x forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a
rx)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (StreamK m a -> a -> StreamK m a
goY0 StreamK m a
rx a
y)
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yield a -> m r
single m r
stop StreamK m a
mx

    goY :: StreamK m a -> a -> StreamK m a -> StreamK m a
goY StreamK m a
mx a
y StreamK m a
my = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
_ m r
_ -> do
        let stop :: m r
stop = a -> StreamK m a -> m r
yld a
y StreamK m a
my
            single :: a -> m r
single a
x = do
                Ordering
r <- a -> a -> m Ordering
cmp a
x a
y
                case Ordering
r of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a -> StreamK m a -> StreamK m a
goX0 a
x StreamK m a
my)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (a
y forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a
my)
            yield :: a -> StreamK m a -> m r
yield a
x StreamK m a
rx = do
                Ordering
r <- a -> a -> m Ordering
cmp a
x a
y
                case Ordering
r of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a -> StreamK m a -> StreamK m a -> StreamK m a
goX a
x StreamK m a
rx StreamK m a
my)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (StreamK m a -> a -> StreamK m a -> StreamK m a
goY StreamK m a
rx a
y StreamK m a
my)
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yield a -> m r
single m r
stop StreamK m a
mx

-- | Merging of @n@ streams can be performed by combining the streams pair
-- wise using 'mergeMapWith' to give O(n * log n) time complexity. If used
-- with 'concatMapWith' it will have O(n^2) performance.
--
{-# INLINE mergeBy #-}
mergeBy :: (a -> a -> Ordering) -> StreamK m a -> StreamK m a -> StreamK m a
-- XXX GHC: This has slightly worse performance than replacing "r <- cmp x y"
-- with "let r = cmp x y" in the monadic version. The definition below is
-- exactly the same as mergeByM except this change.
-- mergeBy cmp = mergeByM (\a b -> return $ cmp a b)
mergeBy :: forall a (m :: * -> *).
(a -> a -> Ordering) -> StreamK m a -> StreamK m a -> StreamK m a
mergeBy a -> a -> Ordering
cmp = forall {m :: * -> *}. StreamK m a -> StreamK m a -> StreamK m a
go

    where

    go :: StreamK m a -> StreamK m a -> StreamK m a
go StreamK m a
mx StreamK m a
my = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
        let stop :: m r
stop = forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
my
            single :: a -> m r
single a
x = forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (forall {m :: * -> *}. a -> StreamK m a -> StreamK m a
goX0 a
x StreamK m a
my)
            yield :: a -> StreamK m a -> m r
yield a
x StreamK m a
rx = forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp (forall {m :: * -> *}.
a -> StreamK m a -> StreamK m a -> StreamK m a
goX a
x StreamK m a
rx StreamK m a
my)
        forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yield a -> m r
single m r
stop StreamK m a
mx

    goX0 :: a -> StreamK m a -> StreamK m a
goX0 a
x StreamK m a
my = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
_ -> do
        let stop :: m r
stop = a -> m r
sng a
x
            single :: a -> m r
single a
y = do
                case a -> a -> Ordering
cmp a
x a
y of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (forall a (m :: * -> *). a -> StreamK m a
fromPure a
x)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (forall a (m :: * -> *). a -> StreamK m a
fromPure a
y)
            yield :: a -> StreamK m a -> m r
yield a
y StreamK m a
ry = do
                case a -> a -> Ordering
cmp a
x a
y of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a -> StreamK m a -> StreamK m a
goX0 a
x StreamK m a
ry)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (a
y forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a
ry)
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yield a -> m r
single m r
stop StreamK m a
my

    goX :: a -> StreamK m a -> StreamK m a -> StreamK m a
goX a
x StreamK m a
mx StreamK m a
my = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
_ m r
_ -> do
        let stop :: m r
stop = a -> StreamK m a -> m r
yld a
x StreamK m a
mx
            single :: a -> m r
single a
y = do
                case a -> a -> Ordering
cmp a
x a
y of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a
x forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a
mx)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (forall {m :: * -> *}. StreamK m a -> a -> StreamK m a
goY0 StreamK m a
mx a
y)
            yield :: a -> StreamK m a -> m r
yield a
y StreamK m a
ry = do
                case a -> a -> Ordering
cmp a
x a
y of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a -> StreamK m a -> StreamK m a -> StreamK m a
goX a
x StreamK m a
mx StreamK m a
ry)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (StreamK m a -> a -> StreamK m a -> StreamK m a
goY StreamK m a
mx a
y StreamK m a
ry)
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yield a -> m r
single m r
stop StreamK m a
my

    goY0 :: StreamK m a -> a -> StreamK m a
goY0 StreamK m a
mx a
y = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
_ -> do
        let stop :: m r
stop = a -> m r
sng a
y
            single :: a -> m r
single a
x = do
                case a -> a -> Ordering
cmp a
x a
y of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (forall a (m :: * -> *). a -> StreamK m a
fromPure a
x)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (forall a (m :: * -> *). a -> StreamK m a
fromPure a
y)
            yield :: a -> StreamK m a -> m r
yield a
x StreamK m a
rx = do
                case a -> a -> Ordering
cmp a
x a
y of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a
x forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a
rx)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (StreamK m a -> a -> StreamK m a
goY0 StreamK m a
rx a
y)
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yield a -> m r
single m r
stop StreamK m a
mx

    goY :: StreamK m a -> a -> StreamK m a -> StreamK m a
goY StreamK m a
mx a
y StreamK m a
my = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
_ m r
_ -> do
        let stop :: m r
stop = a -> StreamK m a -> m r
yld a
y StreamK m a
my
            single :: a -> m r
single a
x = do
                case a -> a -> Ordering
cmp a
x a
y of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (forall {m :: * -> *}. a -> StreamK m a -> StreamK m a
goX0 a
x StreamK m a
my)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (a
y forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
`cons` StreamK m a
my)
            yield :: a -> StreamK m a -> m r
yield a
x StreamK m a
rx = do
                case a -> a -> Ordering
cmp a
x a
y of
                    Ordering
GT -> a -> StreamK m a -> m r
yld a
y (a -> StreamK m a -> StreamK m a -> StreamK m a
goX a
x StreamK m a
rx StreamK m a
my)
                    Ordering
_  -> a -> StreamK m a -> m r
yld a
x (StreamK m a -> a -> StreamK m a -> StreamK m a
goY StreamK m a
rx a
y StreamK m a
my)
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yield a -> m r
single m r
stop StreamK m a
mx

------------------------------------------------------------------------------
-- Transformation comprehensions
------------------------------------------------------------------------------

{-# INLINE the #-}
the :: (Eq a, Monad m) => StreamK m a -> m (Maybe a)
the :: forall a (m :: * -> *).
(Eq a, Monad m) =>
StreamK m a -> m (Maybe a)
the StreamK m a
m = do
    Maybe (a, StreamK m a)
r <- forall (m :: * -> *) a.
Applicative m =>
StreamK m a -> m (Maybe (a, StreamK m a))
uncons StreamK m a
m
    case Maybe (a, StreamK m a)
r of
        Maybe (a, StreamK m a)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
        Just (a
h, StreamK m a
t) -> forall {m :: * -> *} {a}.
(Monad m, Eq a) =>
a -> StreamK m a -> m (Maybe a)
go a
h StreamK m a
t
    where
    go :: a -> StreamK m a -> m (Maybe a)
go a
h StreamK m a
m1 =
        let single :: a -> m (Maybe a)
single a
a   | a
h forall a. Eq a => a -> a -> Bool
== a
a    = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just a
h
                       | Bool
otherwise = forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
            yieldk :: a -> StreamK m a -> m (Maybe a)
yieldk a
a StreamK m a
r | a
h forall a. Eq a => a -> a -> Bool
== a
a    = a -> StreamK m a -> m (Maybe a)
go a
h StreamK m a
r
                       | Bool
otherwise = forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Maybe a)
yieldk forall {m :: * -> *}. Monad m => a -> m (Maybe a)
single (forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just a
h) StreamK m a
m1

------------------------------------------------------------------------------
-- Alternative & MonadPlus
------------------------------------------------------------------------------

_alt :: StreamK m a -> StreamK m a -> StreamK m a
_alt :: forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
_alt StreamK m a
m1 StreamK m a
m2 = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
    let stop :: m r
stop  = forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
m2
    in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stop StreamK m a
m1

------------------------------------------------------------------------------
-- MonadError
------------------------------------------------------------------------------

{-
-- XXX handle and test cross thread state transfer
withCatchError
    :: MonadError e m
    => StreamK m a -> (e -> StreamK m a) -> StreamK m a
withCatchError m h =
    mkStream $ \_ stp sng yld ->
        let run x = unStream x Nothing stp sng yieldk
            handle r = r `catchError` \e -> run $ h e
            yieldk a r = yld a (withCatchError r h)
        in handle $ run m
-}

-------------------------------------------------------------------------------
-- Parsing
-------------------------------------------------------------------------------

-- Inlined definition.
{-# INLINE splitAt #-}
splitAt :: Int -> [a] -> ([a],[a])
splitAt :: forall a. Int -> [a] -> ([a], [a])
splitAt Int
n [a]
ls
  | Int
n forall a. Ord a => a -> a -> Bool
<= Int
0 = ([], [a]
ls)
  | Bool
otherwise          = forall a. Int -> [a] -> ([a], [a])
splitAt' Int
n [a]
ls
    where
        splitAt' :: Int -> [a] -> ([a], [a])
        splitAt' :: forall a. Int -> [a] -> ([a], [a])
splitAt' Int
_  []     = ([], [])
        splitAt' Int
1  (a
x:[a]
xs) = ([a
x], [a]
xs)
        splitAt' Int
m  (a
x:[a]
xs) = (a
xforall a. a -> [a] -> [a]
:[a]
xs', [a]
xs'')
          where
            ([a]
xs', [a]
xs'') = forall a. Int -> [a] -> ([a], [a])
splitAt' (Int
m forall a. Num a => a -> a -> a
- Int
1) [a]
xs

-- | Run a 'Parser' over a stream and return rest of the Stream.
{-# INLINE_NORMAL parseDBreak #-}
parseDBreak
    :: Monad m
    => PR.Parser a m b
    -> StreamK m a
    -> m (Either ParseError b, StreamK m a)
parseDBreak :: forall (m :: * -> *) a b.
Monad m =>
Parser a m b -> StreamK m a -> m (Either ParseError b, StreamK m a)
parseDBreak (PR.Parser s -> a -> m (Step s b)
pstep m (Initial s b)
initial s -> m (Step s b)
extract) StreamK m a
stream = do
    Initial s b
res <- m (Initial s b)
initial
    case Initial s b
res of
        PR.IPartial s
s -> StreamK m a -> [a] -> s -> m (Either ParseError b, StreamK m a)
goStream StreamK m a
stream [] s
s
        PR.IDone b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, StreamK m a
stream)
        PR.IError [Char]
err -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left ([Char] -> ParseError
ParseError [Char]
err), StreamK m a
stream)

    where

    -- "buf" contains last few items in the stream that we may have to
    -- backtrack to.
    --
    -- XXX currently we are using a dumb list based approach for backtracking
    -- buffer. This can be replaced by a sliding/ring buffer using Data.Array.
    -- That will allow us more efficient random back and forth movement.
    goStream :: StreamK m a -> [a] -> s -> m (Either ParseError b, StreamK m a)
goStream StreamK m a
st [a]
buf !s
pst =
        let stop :: m (Either ParseError b, StreamK m a)
stop = do
                Step s b
r <- s -> m (Step s b)
extract s
pst
                case Step s b
r of
                    PR.Error [Char]
err -> do
                        let src :: [a]
src = forall a. [a] -> [a]
Prelude.reverse [a]
buf
                        forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left ([Char] -> ParseError
ParseError [Char]
err), forall a (m :: * -> *). [a] -> StreamK m a
fromList [a]
src)
                    PR.Done Int
n b
b -> do
                        assertM(Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. Foldable t => t a -> Int
length [a]
buf)
                        let src0 :: [a]
src0 = forall a. Int -> [a] -> [a]
Prelude.take Int
n [a]
buf
                            src :: [a]
src  = forall a. [a] -> [a]
Prelude.reverse [a]
src0
                        forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, forall a (m :: * -> *). [a] -> StreamK m a
fromList [a]
src)
                    PR.Partial Int
_ s
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"Bug: parseBreak: Partial in extract"
                    PR.Continue Int
0 s
s -> StreamK m a -> [a] -> s -> m (Either ParseError b, StreamK m a)
goStream forall (m :: * -> *) a. StreamK m a
nil [a]
buf s
s
                    PR.Continue Int
n s
s -> do
                        assertM(Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. Foldable t => t a -> Int
length [a]
buf)
                        let ([a]
src0, [a]
buf1) = forall a. Int -> [a] -> ([a], [a])
splitAt Int
n [a]
buf
                            src :: [a]
src = forall a. [a] -> [a]
Prelude.reverse [a]
src0
                        StreamK m a
-> [a] -> [a] -> s -> m (Either ParseError b, StreamK m a)
goBuf forall (m :: * -> *) a. StreamK m a
nil [a]
buf1 [a]
src s
s
            single :: a -> m (Either ParseError b, StreamK m a)
single a
x = a -> StreamK m a -> m (Either ParseError b, StreamK m a)
yieldk a
x forall (m :: * -> *) a. StreamK m a
nil
            yieldk :: a -> StreamK m a -> m (Either ParseError b, StreamK m a)
yieldk a
x StreamK m a
r = do
                Step s b
res <- s -> a -> m (Step s b)
pstep s
pst a
x
                case Step s b
res of
                    PR.Partial Int
0 s
s -> StreamK m a -> [a] -> s -> m (Either ParseError b, StreamK m a)
goStream StreamK m a
r [] s
s
                    PR.Partial Int
n s
s -> do
                        assertM(Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. Foldable t => t a -> Int
length (a
xforall a. a -> [a] -> [a]
:[a]
buf))
                        let src0 :: [a]
src0 = forall a. Int -> [a] -> [a]
Prelude.take Int
n (a
xforall a. a -> [a] -> [a]
:[a]
buf)
                            src :: [a]
src  = forall a. [a] -> [a]
Prelude.reverse [a]
src0
                        StreamK m a
-> [a] -> [a] -> s -> m (Either ParseError b, StreamK m a)
goBuf StreamK m a
r [] [a]
src s
s
                    PR.Continue Int
0 s
s -> StreamK m a -> [a] -> s -> m (Either ParseError b, StreamK m a)
goStream StreamK m a
r (a
xforall a. a -> [a] -> [a]
:[a]
buf) s
s
                    PR.Continue Int
n s
s -> do
                        assertM(Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. Foldable t => t a -> Int
length (a
xforall a. a -> [a] -> [a]
:[a]
buf))
                        let ([a]
src0, [a]
buf1) = forall a. Int -> [a] -> ([a], [a])
splitAt Int
n (a
xforall a. a -> [a] -> [a]
:[a]
buf)
                            src :: [a]
src = forall a. [a] -> [a]
Prelude.reverse [a]
src0
                        StreamK m a
-> [a] -> [a] -> s -> m (Either ParseError b, StreamK m a)
goBuf StreamK m a
r [a]
buf1 [a]
src s
s
                    PR.Done Int
0 b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, StreamK m a
r)
                    PR.Done Int
n b
b -> do
                        assertM(Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. Foldable t => t a -> Int
length (a
xforall a. a -> [a] -> [a]
:[a]
buf))
                        let src0 :: [a]
src0 = forall a. Int -> [a] -> [a]
Prelude.take Int
n (a
xforall a. a -> [a] -> [a]
:[a]
buf)
                            src :: [a]
src  = forall a. [a] -> [a]
Prelude.reverse [a]
src0
                        forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
append (forall a (m :: * -> *). [a] -> StreamK m a
fromList [a]
src) StreamK m a
r)
                    PR.Error [Char]
err -> do
                        let src :: [a]
src = forall a. [a] -> [a]
Prelude.reverse (a
xforall a. a -> [a] -> [a]
:[a]
buf)
                        forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left ([Char] -> ParseError
ParseError [Char]
err), forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
append (forall a (m :: * -> *). [a] -> StreamK m a
fromList [a]
src) StreamK m a
r)
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> StreamK m a -> m (Either ParseError b, StreamK m a)
yieldk a -> m (Either ParseError b, StreamK m a)
single m (Either ParseError b, StreamK m a)
stop StreamK m a
st

    goBuf :: StreamK m a
-> [a] -> [a] -> s -> m (Either ParseError b, StreamK m a)
goBuf StreamK m a
st [a]
buf [] !s
pst = StreamK m a -> [a] -> s -> m (Either ParseError b, StreamK m a)
goStream StreamK m a
st [a]
buf s
pst
    goBuf StreamK m a
st [a]
buf (a
x:[a]
xs) !s
pst = do
        Step s b
pRes <- s -> a -> m (Step s b)
pstep s
pst a
x
        case Step s b
pRes of
            PR.Partial Int
0 s
s -> StreamK m a
-> [a] -> [a] -> s -> m (Either ParseError b, StreamK m a)
goBuf StreamK m a
st [] [a]
xs s
s
            PR.Partial Int
n s
s -> do
                forall a. HasCallStack => Bool -> a -> a
assert (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. Foldable t => t a -> Int
length (a
xforall a. a -> [a] -> [a]
:[a]
buf)) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let src0 :: [a]
src0 = forall a. Int -> [a] -> [a]
Prelude.take Int
n (a
xforall a. a -> [a] -> [a]
:[a]
buf)
                    src :: [a]
src  = forall a. [a] -> [a]
Prelude.reverse [a]
src0 forall a. [a] -> [a] -> [a]
++ [a]
xs
                StreamK m a
-> [a] -> [a] -> s -> m (Either ParseError b, StreamK m a)
goBuf StreamK m a
st [] [a]
src s
s
            PR.Continue Int
0 s
s -> StreamK m a
-> [a] -> [a] -> s -> m (Either ParseError b, StreamK m a)
goBuf StreamK m a
st (a
xforall a. a -> [a] -> [a]
:[a]
buf) [a]
xs s
s
            PR.Continue Int
n s
s -> do
                forall a. HasCallStack => Bool -> a -> a
assert (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. Foldable t => t a -> Int
length (a
xforall a. a -> [a] -> [a]
:[a]
buf)) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let ([a]
src0, [a]
buf1) = forall a. Int -> [a] -> ([a], [a])
splitAt Int
n (a
xforall a. a -> [a] -> [a]
:[a]
buf)
                    src :: [a]
src  = forall a. [a] -> [a]
Prelude.reverse [a]
src0 forall a. [a] -> [a] -> [a]
++ [a]
xs
                StreamK m a
-> [a] -> [a] -> s -> m (Either ParseError b, StreamK m a)
goBuf StreamK m a
st [a]
buf1 [a]
src s
s
            PR.Done Int
n b
b -> do
                forall a. HasCallStack => Bool -> a -> a
assert (Int
n forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. Foldable t => t a -> Int
length (a
xforall a. a -> [a] -> [a]
:[a]
buf)) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                let src0 :: [a]
src0 = forall a. Int -> [a] -> [a]
Prelude.take Int
n (a
xforall a. a -> [a] -> [a]
:[a]
buf)
                    src :: [a]
src  = forall a. [a] -> [a]
Prelude.reverse [a]
src0
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
append (forall a (m :: * -> *). [a] -> StreamK m a
fromList [a]
src) StreamK m a
st)
            PR.Error [Char]
err -> do
                let src :: [a]
src = forall a. [a] -> [a]
Prelude.reverse [a]
buf forall a. [a] -> [a] -> [a]
++ a
xforall a. a -> [a] -> [a]
:[a]
xs
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left ([Char] -> ParseError
ParseError [Char]
err), forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
append (forall a (m :: * -> *). [a] -> StreamK m a
fromList [a]
src) StreamK m a
st)

-- Using ParserD or ParserK on StreamK may not make much difference. We should
-- perhaps use only chunked parsing on StreamK. We can always convert a stream
-- to chunks before parsing. Or just have a ParserK element parser for StreamK
-- and convert ParserD to ParserK for element parsing using StreamK.
{-# INLINE parseD #-}
parseD :: Monad m =>
    Parser.Parser a m b -> StreamK m a -> m (Either ParseError b)
parseD :: forall (m :: * -> *) a b.
Monad m =>
Parser a m b -> StreamK m a -> m (Either ParseError b)
parseD Parser a m b
f = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> a
fst forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
Monad m =>
Parser a m b -> StreamK m a -> m (Either ParseError b, StreamK m a)
parseDBreak Parser a m b
f

-------------------------------------------------------------------------------
-- ParserK Chunked
-------------------------------------------------------------------------------

-- The backracking buffer consists of arrays in the most-recent-first order. We
-- want to take a total of n array elements from this buffer. Note: when we
-- have to take an array partially, we must take the last part of the array.
{-# INLINE backTrack #-}
backTrack :: forall m a. Unbox a =>
       Int
    -> [Array a]
    -> StreamK m (Array a)
    -> (StreamK m (Array a), [Array a])
backTrack :: forall (m :: * -> *) a.
Unbox a =>
Int
-> [Array a]
-> StreamK m (Array a)
-> (StreamK m (Array a), [Array a])
backTrack = forall {a} {m :: * -> *}.
Unbox a =>
Int
-> [Array a]
-> StreamK m (Array a)
-> (StreamK m (Array a), [Array a])
go

    where

    go :: Int
-> [Array a]
-> StreamK m (Array a)
-> (StreamK m (Array a), [Array a])
go Int
_ [] StreamK m (Array a)
stream = (StreamK m (Array a)
stream, [])
    go Int
n [Array a]
xs StreamK m (Array a)
stream | Int
n forall a. Ord a => a -> a -> Bool
<= Int
0 = (StreamK m (Array a)
stream, [Array a]
xs)
    go Int
n (Array a
x:[Array a]
xs) StreamK m (Array a)
stream =
        let len :: Int
len = forall a. Unbox a => Array a -> Int
Array.length Array a
x
        in if Int
n forall a. Ord a => a -> a -> Bool
> Int
len
           then Int
-> [Array a]
-> StreamK m (Array a)
-> (StreamK m (Array a), [Array a])
go (Int
n forall a. Num a => a -> a -> a
- Int
len) [Array a]
xs (forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons Array a
x StreamK m (Array a)
stream)
           else if Int
n forall a. Eq a => a -> a -> Bool
== Int
len
           then (forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons Array a
x StreamK m (Array a)
stream, [Array a]
xs)
           else let !(Array MutByteArray
contents Int
start Int
end) = Array a
x
                    !start1 :: Int
start1 = Int
end forall a. Num a => a -> a -> a
- (Int
n forall a. Num a => a -> a -> a
* SIZE_OF(a))
                    arr1 :: Array a
arr1 = forall a. MutByteArray -> Int -> Int -> Array a
Array MutByteArray
contents Int
start1 Int
end
                    arr2 :: Array a
arr2 = forall a. MutByteArray -> Int -> Int -> Array a
Array MutByteArray
contents Int
start Int
start1
                 in (forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons forall {a}. Array a
arr1 StreamK m (Array a)
stream, forall {a}. Array a
arr2forall a. a -> [a] -> [a]
:[Array a]
xs)

-- | A continuation to extract the result when a CPS parser is done.
{-# INLINE parserDone #-}
parserDone :: Applicative m =>
    ParserK.ParseResult b -> Int -> ParserK.Input a -> m (ParserK.Step a m b)
parserDone :: forall (m :: * -> *) b a.
Applicative m =>
ParseResult b -> Int -> Input a -> m (Step a m b)
parserDone (ParserK.Success Int
n b
b) Int
_ Input a
_ = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a (m :: * -> *) r. Int -> r -> Step a m r
ParserK.Done Int
n b
b
parserDone (ParserK.Failure Int
n [Char]
e) Int
_ Input a
_ = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a (m :: * -> *) r. Int -> [Char] -> Step a m r
ParserK.Error Int
n [Char]
e

-- XXX parseDBreakChunks may be faster than converting parserD to parserK and
-- using parseBreakChunks. We can also use parseBreak as an alternative to the
-- monad instance of ParserD.

-- | Run a 'ParserK' over a chunked 'StreamK' and return the parse result and
-- the remaining Stream.
{-# INLINE_NORMAL parseBreakChunks #-}
parseBreakChunks
    :: (Monad m, Unbox a)
    => ParserK (Array a) m b
    -> StreamK m (Array a)
    -> m (Either ParseError b, StreamK m (Array a))
parseBreakChunks :: forall (m :: * -> *) a b.
(Monad m, Unbox a) =>
ParserK (Array a) m b
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
parseBreakChunks ParserK (Array a) m b
parser StreamK m (Array a)
input = do
    let parserk :: Input (Array a) -> m (Step (Array a) m b)
parserk = forall a (m :: * -> *) b.
ParserK a m b
-> forall r.
   (ParseResult b -> Int -> Input a -> m (Step a m r))
   -> Int -> Int -> Input a -> m (Step a m r)
ParserK.runParser ParserK (Array a) m b
parser forall (m :: * -> *) b a.
Applicative m =>
ParseResult b -> Int -> Input a -> m (Step a m b)
parserDone Int
0 Int
0
     in forall {m :: * -> *} {a} {b}.
(Monad m, Unbox a) =>
[Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
go [] Input (Array a) -> m (Step (Array a) m b)
parserk StreamK m (Array a)
input

    where

    {-# INLINE goStop #-}
    goStop :: [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> m (Either ParseError b, StreamK m (Array a))
goStop [Array a]
backBuf Input (Array a) -> m (Step (Array a) m b)
parserk = do
        Step (Array a) m b
pRes <- Input (Array a) -> m (Step (Array a) m b)
parserk forall a. Input a
ParserK.None
        case Step (Array a) m b
pRes of
            -- If we stop in an alternative, it will try calling the next
            -- parser, the next parser may call initial returning Partial and
            -- then immediately we have to call extract on it.
            ParserK.Partial Int
0 Input (Array a) -> m (Step (Array a) m b)
cont1 ->
                 [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
go [] Input (Array a) -> m (Step (Array a) m b)
cont1 forall (m :: * -> *) a. StreamK m a
nil
            ParserK.Partial Int
n Input (Array a) -> m (Step (Array a) m b)
cont1 -> do
                let n1 :: Int
n1 = forall a. Num a => a -> a
negate Int
n
                assertM(Int
n1 forall a. Ord a => a -> a -> Bool
>= Int
0 Bool -> Bool -> Bool
&& Int
n1 forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
Prelude.map forall a. Unbox a => Array a -> Int
Array.length [Array a]
backBuf))
                let (StreamK m (Array a)
s1, [Array a]
backBuf1) = forall (m :: * -> *) a.
Unbox a =>
Int
-> [Array a]
-> StreamK m (Array a)
-> (StreamK m (Array a), [Array a])
backTrack Int
n1 [Array a]
backBuf forall (m :: * -> *) a. StreamK m a
nil
                 in [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
go [Array a]
backBuf1 Input (Array a) -> m (Step (Array a) m b)
cont1 forall {m :: * -> *}. StreamK m (Array a)
s1
            ParserK.Continue Int
0 Input (Array a) -> m (Step (Array a) m b)
cont1 ->
                [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
go [Array a]
backBuf Input (Array a) -> m (Step (Array a) m b)
cont1 forall (m :: * -> *) a. StreamK m a
nil
            ParserK.Continue Int
n Input (Array a) -> m (Step (Array a) m b)
cont1 -> do
                let n1 :: Int
n1 = forall a. Num a => a -> a
negate Int
n
                assertM(Int
n1 forall a. Ord a => a -> a -> Bool
>= Int
0 Bool -> Bool -> Bool
&& Int
n1 forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
Prelude.map forall a. Unbox a => Array a -> Int
Array.length [Array a]
backBuf))
                let (StreamK m (Array a)
s1, [Array a]
backBuf1) = forall (m :: * -> *) a.
Unbox a =>
Int
-> [Array a]
-> StreamK m (Array a)
-> (StreamK m (Array a), [Array a])
backTrack Int
n1 [Array a]
backBuf forall (m :: * -> *) a. StreamK m a
nil
                 in [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
go [Array a]
backBuf1 Input (Array a) -> m (Step (Array a) m b)
cont1 forall {m :: * -> *}. StreamK m (Array a)
s1
            ParserK.Done Int
0 b
b ->
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, forall (m :: * -> *) a. StreamK m a
nil)
            ParserK.Done Int
n b
b -> do
                let n1 :: Int
n1 = forall a. Num a => a -> a
negate Int
n
                assertM(Int
n1 forall a. Ord a => a -> a -> Bool
>= Int
0 Bool -> Bool -> Bool
&& Int
n1 forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
Prelude.map forall a. Unbox a => Array a -> Int
Array.length [Array a]
backBuf))
                let (StreamK m (Array a)
s1, [Array a]
_) = forall (m :: * -> *) a.
Unbox a =>
Int
-> [Array a]
-> StreamK m (Array a)
-> (StreamK m (Array a), [Array a])
backTrack Int
n1 [Array a]
backBuf forall (m :: * -> *) a. StreamK m a
nil
                 in forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, forall {m :: * -> *}. StreamK m (Array a)
s1)
            ParserK.Error Int
_ [Char]
err -> do
                let (StreamK m (Array a)
s1, [Array a]
_) = forall (m :: * -> *) a.
Unbox a =>
Int
-> [Array a]
-> StreamK m (Array a)
-> (StreamK m (Array a), [Array a])
backTrack forall a. Bounded a => a
maxBound [Array a]
backBuf forall (m :: * -> *) a. StreamK m a
nil
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left ([Char] -> ParseError
ParseError [Char]
err), forall {m :: * -> *}. StreamK m (Array a)
s1)

    seekErr :: a -> a -> a
seekErr a
n a
len =
        forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"parseBreak: Partial: forward seek not implemented n = "
            forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show a
n forall a. [a] -> [a] -> [a]
++ [Char]
" len = " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show a
len

    yieldk :: [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> Array a
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
yieldk [Array a]
backBuf Input (Array a) -> m (Step (Array a) m b)
parserk Array a
arr StreamK m (Array a)
stream = do
        Step (Array a) m b
pRes <- Input (Array a) -> m (Step (Array a) m b)
parserk (forall a. a -> Input a
ParserK.Chunk Array a
arr)
        let len :: Int
len = forall a. Unbox a => Array a -> Int
Array.length Array a
arr
        case Step (Array a) m b
pRes of
            ParserK.Partial Int
n Input (Array a) -> m (Step (Array a) m b)
cont1 ->
                case forall a. Ord a => a -> a -> Ordering
compare Int
n Int
len of
                    Ordering
EQ -> [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
go [] Input (Array a) -> m (Step (Array a) m b)
cont1 StreamK m (Array a)
stream
                    Ordering
LT -> do
                        if Int
n forall a. Ord a => a -> a -> Bool
>= Int
0
                        then [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> Array a
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
yieldk [] Input (Array a) -> m (Step (Array a) m b)
cont1 Array a
arr StreamK m (Array a)
stream
                        else do
                            let n1 :: Int
n1 = forall a. Num a => a -> a
negate Int
n
                                bufLen :: Int
bufLen = forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
Prelude.map forall a. Unbox a => Array a -> Int
Array.length [Array a]
backBuf)
                                s :: StreamK m (Array a)
s = forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons Array a
arr StreamK m (Array a)
stream
                            assertM(Int
n1 forall a. Ord a => a -> a -> Bool
>= Int
0 Bool -> Bool -> Bool
&& Int
n1 forall a. Ord a => a -> a -> Bool
<= Int
bufLen)
                            let (StreamK m (Array a)
s1, [Array a]
_) = forall (m :: * -> *) a.
Unbox a =>
Int
-> [Array a]
-> StreamK m (Array a)
-> (StreamK m (Array a), [Array a])
backTrack Int
n1 [Array a]
backBuf StreamK m (Array a)
s
                            [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
go [] Input (Array a) -> m (Step (Array a) m b)
cont1 StreamK m (Array a)
s1
                    Ordering
GT -> forall {a} {a} {a}. (Show a, Show a) => a -> a -> a
seekErr Int
n Int
len
            ParserK.Continue Int
n Input (Array a) -> m (Step (Array a) m b)
cont1 ->
                case forall a. Ord a => a -> a -> Ordering
compare Int
n Int
len of
                    Ordering
EQ -> [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
go (Array a
arrforall a. a -> [a] -> [a]
:[Array a]
backBuf) Input (Array a) -> m (Step (Array a) m b)
cont1 StreamK m (Array a)
stream
                    Ordering
LT -> do
                        if Int
n forall a. Ord a => a -> a -> Bool
>= Int
0
                        then [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> Array a
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
yieldk [Array a]
backBuf Input (Array a) -> m (Step (Array a) m b)
cont1 Array a
arr StreamK m (Array a)
stream
                        else do
                            let n1 :: Int
n1 = forall a. Num a => a -> a
negate Int
n
                                bufLen :: Int
bufLen = forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
Prelude.map forall a. Unbox a => Array a -> Int
Array.length [Array a]
backBuf)
                                s :: StreamK m (Array a)
s = forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons Array a
arr StreamK m (Array a)
stream
                            assertM(Int
n1 forall a. Ord a => a -> a -> Bool
>= Int
0 Bool -> Bool -> Bool
&& Int
n1 forall a. Ord a => a -> a -> Bool
<= Int
bufLen)
                            let (StreamK m (Array a)
s1, [Array a]
backBuf1) = forall (m :: * -> *) a.
Unbox a =>
Int
-> [Array a]
-> StreamK m (Array a)
-> (StreamK m (Array a), [Array a])
backTrack Int
n1 [Array a]
backBuf StreamK m (Array a)
s
                            [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
go [Array a]
backBuf1 Input (Array a) -> m (Step (Array a) m b)
cont1 StreamK m (Array a)
s1
                    Ordering
GT -> forall {a} {a} {a}. (Show a, Show a) => a -> a -> a
seekErr Int
n Int
len
            ParserK.Done Int
n b
b -> do
                let n1 :: Int
n1 = Int
len forall a. Num a => a -> a -> a
- Int
n
                assertM(Int
n1 forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
Prelude.map forall a. Unbox a => Array a -> Int
Array.length (Array a
arrforall a. a -> [a] -> [a]
:[Array a]
backBuf)))
                let (StreamK m (Array a)
s1, [Array a]
_) = forall (m :: * -> *) a.
Unbox a =>
Int
-> [Array a]
-> StreamK m (Array a)
-> (StreamK m (Array a), [Array a])
backTrack Int
n1 (Array a
arrforall a. a -> [a] -> [a]
:[Array a]
backBuf) StreamK m (Array a)
stream
                 in forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, StreamK m (Array a)
s1)
            ParserK.Error Int
_ [Char]
err -> do
                let (StreamK m (Array a)
s1, [Array a]
_) = forall (m :: * -> *) a.
Unbox a =>
Int
-> [Array a]
-> StreamK m (Array a)
-> (StreamK m (Array a), [Array a])
backTrack forall a. Bounded a => a
maxBound (Array a
arrforall a. a -> [a] -> [a]
:[Array a]
backBuf) StreamK m (Array a)
stream
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left ([Char] -> ParseError
ParseError [Char]
err), StreamK m (Array a)
s1)

    go :: [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
go [Array a]
backBuf Input (Array a) -> m (Step (Array a) m b)
parserk StreamK m (Array a)
stream = do
        let stop :: m (Either ParseError b, StreamK m (Array a))
stop = [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> m (Either ParseError b, StreamK m (Array a))
goStop [Array a]
backBuf Input (Array a) -> m (Step (Array a) m b)
parserk
            single :: Array a -> m (Either ParseError b, StreamK m (Array a))
single Array a
a = [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> Array a
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
yieldk [Array a]
backBuf Input (Array a) -> m (Step (Array a) m b)
parserk Array a
a forall (m :: * -> *) a. StreamK m a
nil
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream
                forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState ([Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> Array a
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
yieldk [Array a]
backBuf Input (Array a) -> m (Step (Array a) m b)
parserk) Array a -> m (Either ParseError b, StreamK m (Array a))
single m (Either ParseError b, StreamK m (Array a))
stop StreamK m (Array a)
stream

{-# INLINE parseChunks #-}
parseChunks :: (Monad m, Unbox a) =>
    ParserK (Array a) m b -> StreamK m (Array a) -> m (Either ParseError b)
parseChunks :: forall (m :: * -> *) a b.
(Monad m, Unbox a) =>
ParserK (Array a) m b
-> StreamK m (Array a) -> m (Either ParseError b)
parseChunks ParserK (Array a) m b
f = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> a
fst forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
(Monad m, Unbox a) =>
ParserK (Array a) m b
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
parseBreakChunks ParserK (Array a) m b
f

-------------------------------------------------------------------------------
-- ParserK Singular
-------------------------------------------------------------------------------

{-# INLINE backTrackSingular #-}
backTrackSingular :: Int -> [a] -> StreamK m a -> (StreamK m a, [a])
backTrackSingular :: forall a (m :: * -> *).
Int -> [a] -> StreamK m a -> (StreamK m a, [a])
backTrackSingular = forall a (m :: * -> *).
Int -> [a] -> StreamK m a -> (StreamK m a, [a])
go

    where

    go :: Int -> [a] -> StreamK m a -> (StreamK m a, [a])
go Int
_ [] StreamK m a
stream = (StreamK m a
stream, [])
    go Int
n [a]
xs StreamK m a
stream | Int
n forall a. Ord a => a -> a -> Bool
<= Int
0 = (StreamK m a
stream, [a]
xs)
    go Int
n [a]
xs StreamK m a
stream =
        let ([a]
appendBuf, [a]
newBTBuf) = forall a. Int -> [a] -> ([a], [a])
splitAt Int
n [a]
xs
         in (forall (m :: * -> *) a. StreamK m a -> StreamK m a -> StreamK m a
append (forall a (m :: * -> *). [a] -> StreamK m a
fromList (forall a. [a] -> [a]
Prelude.reverse [a]
appendBuf)) StreamK m a
stream, [a]
newBTBuf)


-- | Similar to 'parseBreak' but works on singular elements.
--
{-# INLINE_NORMAL parseBreak #-}
parseBreak
    :: forall m a b. Monad m
    => ParserK.ParserK a m b
    -> StreamK m a
    -> m (Either ParseError b, StreamK m a)
parseBreak :: forall (m :: * -> *) a b.
Monad m =>
ParserK a m b
-> StreamK m a -> m (Either ParseError b, StreamK m a)
parseBreak ParserK a m b
parser StreamK m a
input = do
    let parserk :: Input a -> m (Step a m b)
parserk = forall a (m :: * -> *) b.
ParserK a m b
-> forall r.
   (ParseResult b -> Int -> Input a -> m (Step a m r))
   -> Int -> Int -> Input a -> m (Step a m r)
ParserK.runParser ParserK a m b
parser forall (m :: * -> *) b a.
Applicative m =>
ParseResult b -> Int -> Input a -> m (Step a m b)
parserDone Int
0 Int
0
     in [a]
-> (Input a -> m (Step a m b))
-> StreamK m a
-> m (Either ParseError b, StreamK m a)
go [] Input a -> m (Step a m b)
parserk StreamK m a
input

    where

    {-# INLINE goStop #-}
    goStop
        :: [a]
        -> (ParserK.Input a -> m (ParserK.Step a m b))
        -> m (Either ParseError b, StreamK m a)
    goStop :: [a]
-> (Input a -> m (Step a m b))
-> m (Either ParseError b, StreamK m a)
goStop [a]
backBuf Input a -> m (Step a m b)
parserk = do
        Step a m b
pRes <- Input a -> m (Step a m b)
parserk forall a. Input a
ParserK.None
        case Step a m b
pRes of
            -- If we stop in an alternative, it will try calling the next
            -- parser, the next parser may call initial returning Partial and
            -- then immediately we have to call extract on it.
            ParserK.Partial Int
0 Input a -> m (Step a m b)
cont1 ->
                 [a]
-> (Input a -> m (Step a m b))
-> StreamK m a
-> m (Either ParseError b, StreamK m a)
go [] Input a -> m (Step a m b)
cont1 forall (m :: * -> *) a. StreamK m a
nil
            ParserK.Partial Int
n Input a -> m (Step a m b)
cont1 -> do
                let n1 :: Int
n1 = forall a. Num a => a -> a
negate Int
n
                assertM(Int
n1 forall a. Ord a => a -> a -> Bool
>= Int
0 Bool -> Bool -> Bool
&& Int
n1 forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. Foldable t => t a -> Int
length [a]
backBuf)
                let (StreamK m a
s1, [a]
backBuf1) = forall a (m :: * -> *).
Int -> [a] -> StreamK m a -> (StreamK m a, [a])
backTrackSingular Int
n1 [a]
backBuf forall (m :: * -> *) a. StreamK m a
nil
                 in [a]
-> (Input a -> m (Step a m b))
-> StreamK m a
-> m (Either ParseError b, StreamK m a)
go [a]
backBuf1 Input a -> m (Step a m b)
cont1 forall {m :: * -> *}. StreamK m a
s1
            ParserK.Continue Int
0 Input a -> m (Step a m b)
cont1 ->
                [a]
-> (Input a -> m (Step a m b))
-> StreamK m a
-> m (Either ParseError b, StreamK m a)
go [a]
backBuf Input a -> m (Step a m b)
cont1 forall (m :: * -> *) a. StreamK m a
nil
            ParserK.Continue Int
n Input a -> m (Step a m b)
cont1 -> do
                let n1 :: Int
n1 = forall a. Num a => a -> a
negate Int
n
                assertM(Int
n1 forall a. Ord a => a -> a -> Bool
>= Int
0 Bool -> Bool -> Bool
&& Int
n1 forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. Foldable t => t a -> Int
length [a]
backBuf)
                let (StreamK m a
s1, [a]
backBuf1) = forall a (m :: * -> *).
Int -> [a] -> StreamK m a -> (StreamK m a, [a])
backTrackSingular Int
n1 [a]
backBuf forall (m :: * -> *) a. StreamK m a
nil
                 in [a]
-> (Input a -> m (Step a m b))
-> StreamK m a
-> m (Either ParseError b, StreamK m a)
go [a]
backBuf1 Input a -> m (Step a m b)
cont1 forall {m :: * -> *}. StreamK m a
s1
            ParserK.Done Int
0 b
b ->
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, forall (m :: * -> *) a. StreamK m a
nil)
            ParserK.Done Int
n b
b -> do
                let n1 :: Int
n1 = forall a. Num a => a -> a
negate Int
n
                assertM(Int
n1 forall a. Ord a => a -> a -> Bool
>= Int
0 Bool -> Bool -> Bool
&& Int
n1 forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. Foldable t => t a -> Int
length [a]
backBuf)
                let (StreamK m a
s1, [a]
_) = forall a (m :: * -> *).
Int -> [a] -> StreamK m a -> (StreamK m a, [a])
backTrackSingular Int
n1 [a]
backBuf forall (m :: * -> *) a. StreamK m a
nil
                 in forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, forall {m :: * -> *}. StreamK m a
s1)
            ParserK.Error Int
_ [Char]
err -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left ([Char] -> ParseError
ParseError [Char]
err), forall (m :: * -> *) a. StreamK m a
nil)

    seekErr :: a -> a
seekErr a
n =
        forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"parseBreak: Partial: forward seek not implemented n = "
            forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show a
n

    yieldk
        :: [a]
        -> (ParserK.Input a -> m (ParserK.Step a m b))
        -> a
        -> StreamK m a
        -> m (Either ParseError b, StreamK m a)
    yieldk :: [a]
-> (Input a -> m (Step a m b))
-> a
-> StreamK m a
-> m (Either ParseError b, StreamK m a)
yieldk [a]
backBuf Input a -> m (Step a m b)
parserk a
arr StreamK m a
stream = do
        Step a m b
pRes <- Input a -> m (Step a m b)
parserk (forall a. a -> Input a
ParserK.Chunk a
arr)
        case Step a m b
pRes of
            ParserK.Partial Int
1 Input a -> m (Step a m b)
cont1 -> [a]
-> (Input a -> m (Step a m b))
-> StreamK m a
-> m (Either ParseError b, StreamK m a)
go [] Input a -> m (Step a m b)
cont1 StreamK m a
stream
            ParserK.Partial Int
0 Input a -> m (Step a m b)
cont1 -> [a]
-> (Input a -> m (Step a m b))
-> StreamK m a
-> m (Either ParseError b, StreamK m a)
go [] Input a -> m (Step a m b)
cont1 (forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons a
arr StreamK m a
stream)
            ParserK.Partial Int
n Input a -> m (Step a m b)
_ | Int
n forall a. Ord a => a -> a -> Bool
> Int
1 -> forall {a} {a}. Show a => a -> a
seekErr Int
n
            ParserK.Partial Int
n Input a -> m (Step a m b)
cont1 -> do
                let n1 :: Int
n1 = forall a. Num a => a -> a
negate Int
n
                    bufLen :: Int
bufLen = forall (t :: * -> *) a. Foldable t => t a -> Int
length [a]
backBuf
                    s :: StreamK m a
s = forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons a
arr StreamK m a
stream
                assertM(Int
n1 forall a. Ord a => a -> a -> Bool
>= Int
0 Bool -> Bool -> Bool
&& Int
n1 forall a. Ord a => a -> a -> Bool
<= Int
bufLen)
                let (StreamK m a
s1, [a]
_) = forall a (m :: * -> *).
Int -> [a] -> StreamK m a -> (StreamK m a, [a])
backTrackSingular Int
n1 [a]
backBuf StreamK m a
s
                [a]
-> (Input a -> m (Step a m b))
-> StreamK m a
-> m (Either ParseError b, StreamK m a)
go [] Input a -> m (Step a m b)
cont1 StreamK m a
s1
            ParserK.Continue Int
1 Input a -> m (Step a m b)
cont1 -> [a]
-> (Input a -> m (Step a m b))
-> StreamK m a
-> m (Either ParseError b, StreamK m a)
go (a
arrforall a. a -> [a] -> [a]
:[a]
backBuf) Input a -> m (Step a m b)
cont1 StreamK m a
stream
            ParserK.Continue Int
0 Input a -> m (Step a m b)
cont1 ->
                [a]
-> (Input a -> m (Step a m b))
-> StreamK m a
-> m (Either ParseError b, StreamK m a)
go [a]
backBuf Input a -> m (Step a m b)
cont1 (forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons a
arr StreamK m a
stream)
            ParserK.Continue Int
n Input a -> m (Step a m b)
_ | Int
n forall a. Ord a => a -> a -> Bool
> Int
1 -> forall {a} {a}. Show a => a -> a
seekErr Int
n
            ParserK.Continue Int
n Input a -> m (Step a m b)
cont1 -> do
                let n1 :: Int
n1 = forall a. Num a => a -> a
negate Int
n
                    bufLen :: Int
bufLen = forall (t :: * -> *) a. Foldable t => t a -> Int
length [a]
backBuf
                    s :: StreamK m a
s = forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons a
arr StreamK m a
stream
                assertM(Int
n1 forall a. Ord a => a -> a -> Bool
>= Int
0 Bool -> Bool -> Bool
&& Int
n1 forall a. Ord a => a -> a -> Bool
<= Int
bufLen)
                let (StreamK m a
s1, [a]
backBuf1) = forall a (m :: * -> *).
Int -> [a] -> StreamK m a -> (StreamK m a, [a])
backTrackSingular Int
n1 [a]
backBuf StreamK m a
s
                [a]
-> (Input a -> m (Step a m b))
-> StreamK m a
-> m (Either ParseError b, StreamK m a)
go [a]
backBuf1 Input a -> m (Step a m b)
cont1 StreamK m a
s1
            ParserK.Done Int
1 b
b -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right b
b, StreamK m a
stream)
            ParserK.Done Int
0 b
b -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right b
b, forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons a
arr StreamK m a
stream)
            ParserK.Done Int
n b
_ | Int
n forall a. Ord a => a -> a -> Bool
> Int
1 -> forall {a} {a}. Show a => a -> a
seekErr Int
n
            ParserK.Done Int
n b
b -> do
                let n1 :: Int
n1 = forall a. Num a => a -> a
negate Int
n
                    bufLen :: Int
bufLen = forall (t :: * -> *) a. Foldable t => t a -> Int
length [a]
backBuf
                    s :: StreamK m a
s = forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons a
arr StreamK m a
stream
                assertM(Int
n1 forall a. Ord a => a -> a -> Bool
>= Int
0 Bool -> Bool -> Bool
&& Int
n1 forall a. Ord a => a -> a -> Bool
<= Int
bufLen)
                let (StreamK m a
s1, [a]
_) = forall a (m :: * -> *).
Int -> [a] -> StreamK m a -> (StreamK m a, [a])
backTrackSingular Int
n1 [a]
backBuf StreamK m a
s
                forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right b
b, StreamK m a
s1)
            ParserK.Error Int
_ [Char]
err -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left ([Char] -> ParseError
ParseError [Char]
err), forall (m :: * -> *) a. StreamK m a
nil)

    go
        :: [a]
        -> (ParserK.Input a -> m (ParserK.Step a m b))
        -> StreamK m a
        -> m (Either ParseError b, StreamK m a)
    go :: [a]
-> (Input a -> m (Step a m b))
-> StreamK m a
-> m (Either ParseError b, StreamK m a)
go [a]
backBuf Input a -> m (Step a m b)
parserk StreamK m a
stream = do
        let stop :: m (Either ParseError b, StreamK m a)
stop = [a]
-> (Input a -> m (Step a m b))
-> m (Either ParseError b, StreamK m a)
goStop [a]
backBuf Input a -> m (Step a m b)
parserk
            single :: a -> m (Either ParseError b, StreamK m a)
single a
a = [a]
-> (Input a -> m (Step a m b))
-> a
-> StreamK m a
-> m (Either ParseError b, StreamK m a)
yieldk [a]
backBuf Input a -> m (Step a m b)
parserk a
a forall (m :: * -> *) a. StreamK m a
nil
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream
                forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState ([a]
-> (Input a -> m (Step a m b))
-> a
-> StreamK m a
-> m (Either ParseError b, StreamK m a)
yieldk [a]
backBuf Input a -> m (Step a m b)
parserk) a -> m (Either ParseError b, StreamK m a)
single m (Either ParseError b, StreamK m a)
stop StreamK m a
stream

-- | Run a 'ParserK' over a 'StreamK'. Please use 'parseChunks' where possible,
-- for better performance.
{-# INLINE parse #-}
parse :: Monad m =>
    ParserK.ParserK a m b -> StreamK m a -> m (Either ParseError b)
parse :: forall (m :: * -> *) a b.
Monad m =>
ParserK a m b -> StreamK m a -> m (Either ParseError b)
parse ParserK a m b
f = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> a
fst forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
Monad m =>
ParserK a m b
-> StreamK m a -> m (Either ParseError b, StreamK m a)
parseBreak ParserK a m b
f

-------------------------------------------------------------------------------
-- ParserK Chunked Generic
-------------------------------------------------------------------------------

{-# INLINE backTrackGenericChunks #-}
backTrackGenericChunks ::
       Int
    -> [GenArr.Array a]
    -> StreamK m (GenArr.Array a)
    -> (StreamK m (GenArr.Array a), [GenArr.Array a])
backTrackGenericChunks :: forall a (m :: * -> *).
Int
-> [Array a]
-> StreamK m (Array a)
-> (StreamK m (Array a), [Array a])
backTrackGenericChunks = forall a (m :: * -> *).
Int
-> [Array a]
-> StreamK m (Array a)
-> (StreamK m (Array a), [Array a])
go

    where

    go :: Int
-> [Array a]
-> StreamK m (Array a)
-> (StreamK m (Array a), [Array a])
go Int
_ [] StreamK m (Array a)
stream = (StreamK m (Array a)
stream, [])
    go Int
n [Array a]
xs StreamK m (Array a)
stream | Int
n forall a. Ord a => a -> a -> Bool
<= Int
0 = (StreamK m (Array a)
stream, [Array a]
xs)
    go Int
n (Array a
x:[Array a]
xs) StreamK m (Array a)
stream =
        let len :: Int
len = forall a. Array a -> Int
GenArr.length Array a
x
        in if Int
n forall a. Ord a => a -> a -> Bool
> Int
len
           then Int
-> [Array a]
-> StreamK m (Array a)
-> (StreamK m (Array a), [Array a])
go (Int
n forall a. Num a => a -> a -> a
- Int
len) [Array a]
xs (forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons Array a
x StreamK m (Array a)
stream)
           else if Int
n forall a. Eq a => a -> a -> Bool
== Int
len
           then (forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons Array a
x StreamK m (Array a)
stream, [Array a]
xs)
           else let arr1 :: Array a
arr1 = forall a. Int -> Int -> Array a -> Array a
GenArr.getSliceUnsafe (Int
len forall a. Num a => a -> a -> a
- Int
n) Int
n Array a
x
                    arr2 :: Array a
arr2 = forall a. Int -> Int -> Array a -> Array a
GenArr.getSliceUnsafe Int
0 (Int
len forall a. Num a => a -> a -> a
- Int
n) Array a
x
                 in (forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons Array a
arr1 StreamK m (Array a)
stream, Array a
arr2forall a. a -> [a] -> [a]
:[Array a]
xs)

-- | Similar to 'parseBreak' but works on generic arrays
--
{-# INLINE_NORMAL parseBreakChunksGeneric #-}
parseBreakChunksGeneric
    :: forall m a b. Monad m
    => ParserK.ParserK (GenArr.Array a) m b
    -> StreamK m (GenArr.Array a)
    -> m (Either ParseError b, StreamK m (GenArr.Array a))
parseBreakChunksGeneric :: forall (m :: * -> *) a b.
Monad m =>
ParserK (Array a) m b
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
parseBreakChunksGeneric ParserK (Array a) m b
parser StreamK m (Array a)
input = do
    let parserk :: Input (Array a) -> m (Step (Array a) m b)
parserk = forall a (m :: * -> *) b.
ParserK a m b
-> forall r.
   (ParseResult b -> Int -> Input a -> m (Step a m r))
   -> Int -> Int -> Input a -> m (Step a m r)
ParserK.runParser ParserK (Array a) m b
parser forall (m :: * -> *) b a.
Applicative m =>
ParseResult b -> Int -> Input a -> m (Step a m b)
parserDone Int
0 Int
0
     in [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
go [] Input (Array a) -> m (Step (Array a) m b)
parserk StreamK m (Array a)
input

    where

    {-# INLINE goStop #-}
    goStop
        :: [GenArr.Array a]
        -> (ParserK.Input (GenArr.Array a)
                -> m (ParserK.Step (GenArr.Array a) m b))
        -> m (Either ParseError b, StreamK m (GenArr.Array a))
    goStop :: [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> m (Either ParseError b, StreamK m (Array a))
goStop [Array a]
backBuf Input (Array a) -> m (Step (Array a) m b)
parserk = do
        Step (Array a) m b
pRes <- Input (Array a) -> m (Step (Array a) m b)
parserk forall a. Input a
ParserK.None
        case Step (Array a) m b
pRes of
            -- If we stop in an alternative, it will try calling the next
            -- parser, the next parser may call initial returning Partial and
            -- then immediately we have to call extract on it.
            ParserK.Partial Int
0 Input (Array a) -> m (Step (Array a) m b)
cont1 ->
                 [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
go [] Input (Array a) -> m (Step (Array a) m b)
cont1 forall (m :: * -> *) a. StreamK m a
nil
            ParserK.Partial Int
n Input (Array a) -> m (Step (Array a) m b)
cont1 -> do
                let n1 :: Int
n1 = forall a. Num a => a -> a
negate Int
n
                assertM(Int
n1 forall a. Ord a => a -> a -> Bool
>= Int
0 Bool -> Bool -> Bool
&& Int
n1 forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
Prelude.map forall a. Array a -> Int
GenArr.length [Array a]
backBuf))
                let (StreamK m (Array a)
s1, [Array a]
backBuf1) = forall a (m :: * -> *).
Int
-> [Array a]
-> StreamK m (Array a)
-> (StreamK m (Array a), [Array a])
backTrackGenericChunks Int
n1 [Array a]
backBuf forall (m :: * -> *) a. StreamK m a
nil
                 in [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
go [Array a]
backBuf1 Input (Array a) -> m (Step (Array a) m b)
cont1 forall {m :: * -> *}. StreamK m (Array a)
s1
            ParserK.Continue Int
0 Input (Array a) -> m (Step (Array a) m b)
cont1 ->
                [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
go [Array a]
backBuf Input (Array a) -> m (Step (Array a) m b)
cont1 forall (m :: * -> *) a. StreamK m a
nil
            ParserK.Continue Int
n Input (Array a) -> m (Step (Array a) m b)
cont1 -> do
                let n1 :: Int
n1 = forall a. Num a => a -> a
negate Int
n
                assertM(Int
n1 forall a. Ord a => a -> a -> Bool
>= Int
0 Bool -> Bool -> Bool
&& Int
n1 forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
Prelude.map forall a. Array a -> Int
GenArr.length [Array a]
backBuf))
                let (StreamK m (Array a)
s1, [Array a]
backBuf1) = forall a (m :: * -> *).
Int
-> [Array a]
-> StreamK m (Array a)
-> (StreamK m (Array a), [Array a])
backTrackGenericChunks Int
n1 [Array a]
backBuf forall (m :: * -> *) a. StreamK m a
nil
                 in [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
go [Array a]
backBuf1 Input (Array a) -> m (Step (Array a) m b)
cont1 forall {m :: * -> *}. StreamK m (Array a)
s1
            ParserK.Done Int
0 b
b ->
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, forall (m :: * -> *) a. StreamK m a
nil)
            ParserK.Done Int
n b
b -> do
                let n1 :: Int
n1 = forall a. Num a => a -> a
negate Int
n
                assertM(Int
n1 forall a. Ord a => a -> a -> Bool
>= Int
0 Bool -> Bool -> Bool
&& Int
n1 forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
Prelude.map forall a. Array a -> Int
GenArr.length [Array a]
backBuf))
                let (StreamK m (Array a)
s1, [Array a]
_) = forall a (m :: * -> *).
Int
-> [Array a]
-> StreamK m (Array a)
-> (StreamK m (Array a), [Array a])
backTrackGenericChunks Int
n1 [Array a]
backBuf forall (m :: * -> *) a. StreamK m a
nil
                 in forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, forall {m :: * -> *}. StreamK m (Array a)
s1)
            ParserK.Error Int
_ [Char]
err -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left ([Char] -> ParseError
ParseError [Char]
err), forall (m :: * -> *) a. StreamK m a
nil)

    seekErr :: a -> a -> a
seekErr a
n a
len =
        forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"parseBreak: Partial: forward seek not implemented n = "
            forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show a
n forall a. [a] -> [a] -> [a]
++ [Char]
" len = " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show a
len

    yieldk
        :: [GenArr.Array a]
        -> (ParserK.Input (GenArr.Array a)
                -> m (ParserK.Step (GenArr.Array a) m b))
        -> GenArr.Array a
        -> StreamK m (GenArr.Array a)
        -> m (Either ParseError b, StreamK m (GenArr.Array a))
    yieldk :: [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> Array a
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
yieldk [Array a]
backBuf Input (Array a) -> m (Step (Array a) m b)
parserk Array a
arr StreamK m (Array a)
stream = do
        Step (Array a) m b
pRes <- Input (Array a) -> m (Step (Array a) m b)
parserk (forall a. a -> Input a
ParserK.Chunk Array a
arr)
        let len :: Int
len = forall a. Array a -> Int
GenArr.length Array a
arr
        case Step (Array a) m b
pRes of
            ParserK.Partial Int
n Input (Array a) -> m (Step (Array a) m b)
cont1 ->
                case forall a. Ord a => a -> a -> Ordering
compare Int
n Int
len of
                    Ordering
EQ -> [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
go [] Input (Array a) -> m (Step (Array a) m b)
cont1 StreamK m (Array a)
stream
                    Ordering
LT -> do
                        if Int
n forall a. Ord a => a -> a -> Bool
>= Int
0
                        then [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> Array a
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
yieldk [] Input (Array a) -> m (Step (Array a) m b)
cont1 Array a
arr StreamK m (Array a)
stream
                        else do
                            let n1 :: Int
n1 = forall a. Num a => a -> a
negate Int
n
                                bufLen :: Int
bufLen = forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
Prelude.map forall a. Array a -> Int
GenArr.length [Array a]
backBuf)
                                s :: StreamK m (Array a)
s = forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons Array a
arr StreamK m (Array a)
stream
                            assertM(Int
n1 forall a. Ord a => a -> a -> Bool
>= Int
0 Bool -> Bool -> Bool
&& Int
n1 forall a. Ord a => a -> a -> Bool
<= Int
bufLen)
                            let (StreamK m (Array a)
s1, [Array a]
_) = forall a (m :: * -> *).
Int
-> [Array a]
-> StreamK m (Array a)
-> (StreamK m (Array a), [Array a])
backTrackGenericChunks Int
n1 [Array a]
backBuf StreamK m (Array a)
s
                            [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
go [] Input (Array a) -> m (Step (Array a) m b)
cont1 StreamK m (Array a)
s1
                    Ordering
GT -> forall {a} {a} {a}. (Show a, Show a) => a -> a -> a
seekErr Int
n Int
len
            ParserK.Continue Int
n Input (Array a) -> m (Step (Array a) m b)
cont1 ->
                case forall a. Ord a => a -> a -> Ordering
compare Int
n Int
len of
                    Ordering
EQ -> [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
go (Array a
arrforall a. a -> [a] -> [a]
:[Array a]
backBuf) Input (Array a) -> m (Step (Array a) m b)
cont1 StreamK m (Array a)
stream
                    Ordering
LT -> do
                        if Int
n forall a. Ord a => a -> a -> Bool
>= Int
0
                        then [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> Array a
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
yieldk [Array a]
backBuf Input (Array a) -> m (Step (Array a) m b)
cont1 Array a
arr StreamK m (Array a)
stream
                        else do
                            let n1 :: Int
n1 = forall a. Num a => a -> a
negate Int
n
                                bufLen :: Int
bufLen = forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
Prelude.map forall a. Array a -> Int
GenArr.length [Array a]
backBuf)
                                s :: StreamK m (Array a)
s = forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
cons Array a
arr StreamK m (Array a)
stream
                            assertM(Int
n1 forall a. Ord a => a -> a -> Bool
>= Int
0 Bool -> Bool -> Bool
&& Int
n1 forall a. Ord a => a -> a -> Bool
<= Int
bufLen)
                            let (StreamK m (Array a)
s1, [Array a]
backBuf1) = forall a (m :: * -> *).
Int
-> [Array a]
-> StreamK m (Array a)
-> (StreamK m (Array a), [Array a])
backTrackGenericChunks Int
n1 [Array a]
backBuf StreamK m (Array a)
s
                            [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
go [Array a]
backBuf1 Input (Array a) -> m (Step (Array a) m b)
cont1 StreamK m (Array a)
s1
                    Ordering
GT -> forall {a} {a} {a}. (Show a, Show a) => a -> a -> a
seekErr Int
n Int
len
            ParserK.Done Int
n b
b -> do
                let n1 :: Int
n1 = Int
len forall a. Num a => a -> a -> a
- Int
n
                assertM(Int
n1 forall a. Ord a => a -> a -> Bool
<= forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum (forall a b. (a -> b) -> [a] -> [b]
Prelude.map forall a. Array a -> Int
GenArr.length (Array a
arrforall a. a -> [a] -> [a]
:[Array a]
backBuf)))
                let (StreamK m (Array a)
s1, [Array a]
_) = forall a (m :: * -> *).
Int
-> [Array a]
-> StreamK m (Array a)
-> (StreamK m (Array a), [Array a])
backTrackGenericChunks Int
n1 (Array a
arrforall a. a -> [a] -> [a]
:[Array a]
backBuf) StreamK m (Array a)
stream
                 in forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
b, StreamK m (Array a)
s1)
            ParserK.Error Int
_ [Char]
err -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left ([Char] -> ParseError
ParseError [Char]
err), forall (m :: * -> *) a. StreamK m a
nil)

    go
        :: [GenArr.Array a]
        -> (ParserK.Input (GenArr.Array a)
                -> m (ParserK.Step (GenArr.Array a) m b))
        -> StreamK m (GenArr.Array a)
        -> m (Either ParseError b, StreamK m (GenArr.Array a))
    go :: [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
go [Array a]
backBuf Input (Array a) -> m (Step (Array a) m b)
parserk StreamK m (Array a)
stream = do
        let stop :: m (Either ParseError b, StreamK m (Array a))
stop = [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> m (Either ParseError b, StreamK m (Array a))
goStop [Array a]
backBuf Input (Array a) -> m (Step (Array a) m b)
parserk
            single :: Array a -> m (Either ParseError b, StreamK m (Array a))
single Array a
a = [Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> Array a
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
yieldk [Array a]
backBuf Input (Array a) -> m (Step (Array a) m b)
parserk Array a
a forall (m :: * -> *) a. StreamK m a
nil
         in forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStream
                forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState ([Array a]
-> (Input (Array a) -> m (Step (Array a) m b))
-> Array a
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
yieldk [Array a]
backBuf Input (Array a) -> m (Step (Array a) m b)
parserk) Array a -> m (Either ParseError b, StreamK m (Array a))
single m (Either ParseError b, StreamK m (Array a))
stop StreamK m (Array a)
stream

{-# INLINE parseChunksGeneric #-}
parseChunksGeneric ::
       (Monad m)
    => ParserK.ParserK (GenArr.Array a) m b
    -> StreamK m (GenArr.Array a)
    -> m (Either ParseError b)
parseChunksGeneric :: forall (m :: * -> *) a b.
Monad m =>
ParserK (Array a) m b
-> StreamK m (Array a) -> m (Either ParseError b)
parseChunksGeneric ParserK (Array a) m b
f = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> a
fst forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
Monad m =>
ParserK (Array a) m b
-> StreamK m (Array a)
-> m (Either ParseError b, StreamK m (Array a))
parseBreakChunksGeneric ParserK (Array a) m b
f

-------------------------------------------------------------------------------
-- Sorting
-------------------------------------------------------------------------------

-- | Sort the input stream using a supplied comparison function.
--
-- Sorting can be achieved by simply:
--
-- >>> sortBy cmp = StreamK.mergeMapWith (StreamK.mergeBy cmp) StreamK.fromPure
--
-- However, this combinator uses a parser to first split the input stream into
-- down and up sorted segments and then merges them to optimize sorting when
-- pre-sorted sequences exist in the input stream.
--
-- /O(n) space/
--
{-# INLINE sortBy #-}
sortBy :: Monad m => (a -> a -> Ordering) -> StreamK m a -> StreamK m a
-- sortBy f = Stream.concatPairsWith (Stream.mergeBy f) Stream.fromPure
sortBy :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> StreamK m a -> StreamK m a
sortBy a -> a -> Ordering
cmp =
    let p :: Parser a m (Either (StreamK n a) (StreamK n a))
p =
            forall (m :: * -> *) a b c.
Monad m =>
(a -> a -> Bool)
-> Fold m a b -> Fold m a c -> Parser a m (Either b c)
Parser.groupByRollingEither
                (\a
x -> (forall a. Ord a => a -> a -> Bool
< Ordering
GT) forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> a -> Ordering
cmp a
x)
                forall (m :: * -> *) a (n :: * -> *).
Monad m =>
Fold m a (StreamK n a)
FL.toStreamKRev
                forall (m :: * -> *) a (n :: * -> *).
Monad m =>
Fold m a (StreamK n a)
FL.toStreamK
     in   forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
mergeMapWith (forall a (m :: * -> *).
(a -> a -> Ordering) -> StreamK m a -> StreamK m a -> StreamK m a
mergeBy a -> a -> Ordering
cmp) forall a. a -> a
id
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
Monad m =>
Stream m (Either a b) -> Stream m b
Stream.catRights -- its a non-failing backtracking parser
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
Monad m =>
Parser a m b -> Stream m a -> Stream m (Either ParseError b)
Stream.parseMany (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall a. a -> a
id forall a. a -> a
id) forall {n :: * -> *} {n :: * -> *}.
Parser a m (Either (StreamK n a) (StreamK n a))
p)
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
Stream.fromStreamK