#include "inline.hs"

-- |
-- Module      : Streamly.Internal.Data.Stream.StreamD.Type
-- Copyright   : (c) 2018 Composewell Technologies
--               (c) Roman Leshchinskiy 2008-2010
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

-- The stream type is inspired by the vector package.  A few functions in this
-- module have been originally adapted from the vector package (c) Roman
-- Leshchinskiy. See the notes in specific functions.

module Streamly.Internal.Data.Stream.StreamD.Type
    (
    -- * The stream type
      Step (..)
    -- XXX UnStream is exported to avoid a performance issue in concatMap if we
    -- use the pattern synonym "Stream".
    , Stream (Stream, UnStream)

    -- * Primitives
    , nilM
    , consM
    , uncons

    -- * From Unfold
    , unfold

    -- * From Values
    , fromPure
    , fromEffect

    -- * From Containers
    , fromList

    -- * Conversions From/To
    , fromStreamK
    , toStreamK

    -- * Running a 'Fold'
    , fold
    , fold_
    , foldOn

    -- * Right Folds
    , foldrT
    , foldrM
    , foldrMx
    , foldr
    , foldrS

    -- * Left Folds
    , foldl'
    , foldlM'
    , foldlx'
    , foldlMx'

    -- * Special Folds
    , drain

    -- * To Containers
    , toList

    -- * Multi-stream folds
    , eqBy
    , cmpBy

    -- * Transformations
    , map
    , mapM
    , take
    , takeWhile
    , takeWhileM

    -- * Nesting
    , ConcatMapUState (..)
    , unfoldMany
    , concatMap
    , concatMapM
    , FoldMany (..) -- for inspection testing
    , FoldManyPost (..)
    , foldMany
    , foldManyPost
    , refoldMany
    , chunksOf
    )
where

import Control.Applicative (liftA2)
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.Trans.Class (lift, MonadTrans)
import Data.Functor (($>))
import Data.Functor.Identity (Identity(..))
import Fusion.Plugin.Types (Fuse(..))
import GHC.Base (build)
import GHC.Types (SPEC(..))
import Prelude hiding (map, mapM, foldr, take, concatMap, takeWhile)

import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Refold.Type (Refold(..))
import Streamly.Internal.Data.Stream.StreamD.Step (Step (..))
import Streamly.Internal.Data.SVar.Type (State, adaptState, defState)
import Streamly.Internal.Data.Unfold.Type (Unfold(..))

import qualified Streamly.Internal.Data.Fold.Type as FL
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K

------------------------------------------------------------------------------
-- The direct style stream type
------------------------------------------------------------------------------

-- gst = global state
-- | A stream consists of a step function that generates the next step given a
-- current state, and the current state.
data Stream m a =
    forall s. UnStream (State K.Stream m a -> s -> m (Step s a)) s

unShare :: Stream m a -> Stream m a
unShare :: Stream m a -> Stream m a
unShare (UnStream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
UnStream State Stream m a -> s -> m (Step s a)
forall (m :: * -> *) a. State Stream m a -> s -> m (Step s a)
step' s
state
    where step' :: State Stream m a -> s -> m (Step s a)
step' State Stream m a
gst = State Stream m a -> s -> m (Step s a)
step (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst)

pattern Stream :: (State K.Stream m a -> s -> m (Step s a)) -> s -> Stream m a
pattern $bStream :: (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
$mStream :: forall r (m :: * -> *) a.
Stream m a
-> (forall s. (State Stream m a -> s -> m (Step s a)) -> s -> r)
-> (Void# -> r)
-> r
Stream step state <- (unShare -> UnStream step state)
    where Stream = (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
UnStream

#if __GLASGOW_HASKELL__ >= 802
{-# COMPLETE Stream #-}
#endif

------------------------------------------------------------------------------
-- Primitives
------------------------------------------------------------------------------

-- | An empty 'Stream' with a side effect.
{-# INLINE_NORMAL nilM #-}
nilM :: Applicative m => m b -> Stream m a
nilM :: m b -> Stream m a
nilM m b
m = (State Stream m a -> () -> m (Step () a)) -> () -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream (\State Stream m a
_ ()
_ -> m b
m m b -> Step () a -> m (Step () a)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Step () a
forall s a. Step s a
Stop) ()

{-# INLINE_NORMAL consM #-}
consM :: Applicative m => m a -> Stream m a -> Stream m a
consM :: m a -> Stream m a -> Stream m a
consM m a
m (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a -> Maybe s -> m (Step (Maybe s) a))
-> Maybe s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> Maybe s -> m (Step (Maybe s) a)
step1 Maybe s
forall a. Maybe a
Nothing

    where

    {-# INLINE_LATE step1 #-}
    step1 :: State Stream m a -> Maybe s -> m (Step (Maybe s) a)
step1 State Stream m a
_ Maybe s
Nothing = (a -> Maybe s -> Step (Maybe s) a
forall s a. a -> s -> Step s a
`Yield` s -> Maybe s
forall a. a -> Maybe a
Just s
state) (a -> Step (Maybe s) a) -> m a -> m (Step (Maybe s) a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m a
m
    step1 State Stream m a
gst (Just s
st) = do
          (\case
            Yield a
a s
s -> a -> Maybe s -> Step (Maybe s) a
forall s a. a -> s -> Step s a
Yield a
a (s -> Maybe s
forall a. a -> Maybe a
Just s
s)
            Skip  s
s   -> Maybe s -> Step (Maybe s) a
forall s a. s -> Step s a
Skip (s -> Maybe s
forall a. a -> Maybe a
Just s
s)
            Step s a
Stop      -> Step (Maybe s) a
forall s a. Step s a
Stop) (Step s a -> Step (Maybe s) a)
-> m (Step s a) -> m (Step (Maybe s) a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st

-- | Does not fuse, has the same performance as the StreamK version.
{-# INLINE_NORMAL uncons #-}
uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a))
uncons :: Stream m a -> m (Maybe (a, Stream m a))
uncons (UnStream State Stream m a -> s -> m (Step s a)
step s
state) = s -> m (Maybe (a, Stream m a))
go s
state
  where
    go :: s -> m (Maybe (a, Stream m a))
go s
st = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s a
r of
            Yield a
x s
s -> Maybe (a, Stream m a) -> m (Maybe (a, Stream m a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (a, Stream m a) -> m (Maybe (a, Stream m a)))
-> Maybe (a, Stream m a) -> m (Maybe (a, Stream m a))
forall a b. (a -> b) -> a -> b
$ (a, Stream m a) -> Maybe (a, Stream m a)
forall a. a -> Maybe a
Just (a
x, (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
step s
s)
            Skip  s
s   -> s -> m (Maybe (a, Stream m a))
go s
s
            Step s a
Stop      -> Maybe (a, Stream m a) -> m (Maybe (a, Stream m a))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (a, Stream m a)
forall a. Maybe a
Nothing

------------------------------------------------------------------------------
-- From 'Unfold'
------------------------------------------------------------------------------

data UnfoldState s = UnfoldNothing | UnfoldJust s

-- | Convert an 'Unfold' into a 'Stream' by supplying it a seed.
--
{-# INLINE_NORMAL unfold #-}
unfold :: Applicative m => Unfold m a b -> a -> Stream m b
unfold :: Unfold m a b -> a -> Stream m b
unfold (Unfold s -> m (Step s b)
ustep a -> m s
inject) a
seed = (State Stream m b -> UnfoldState s -> m (Step (UnfoldState s) b))
-> UnfoldState s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> UnfoldState s -> m (Step (UnfoldState s) b)
forall p. p -> UnfoldState s -> m (Step (UnfoldState s) b)
step UnfoldState s
forall s. UnfoldState s
UnfoldNothing

    where

    {-# INLINE_LATE step #-}
    step :: p -> UnfoldState s -> m (Step (UnfoldState s) b)
step p
_ UnfoldState s
UnfoldNothing = UnfoldState s -> Step (UnfoldState s) b
forall s a. s -> Step s a
Skip (UnfoldState s -> Step (UnfoldState s) b)
-> (s -> UnfoldState s) -> s -> Step (UnfoldState s) b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. s -> UnfoldState s
forall s. s -> UnfoldState s
UnfoldJust (s -> Step (UnfoldState s) b) -> m s -> m (Step (UnfoldState s) b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> a -> m s
inject a
seed
    step p
_ (UnfoldJust s
st) = do
        (\case
            Yield b
x s
s -> b -> UnfoldState s -> Step (UnfoldState s) b
forall s a. a -> s -> Step s a
Yield b
x (s -> UnfoldState s
forall s. s -> UnfoldState s
UnfoldJust s
s)
            Skip s
s    -> UnfoldState s -> Step (UnfoldState s) b
forall s a. s -> Step s a
Skip (s -> UnfoldState s
forall s. s -> UnfoldState s
UnfoldJust s
s)
            Step s b
Stop      -> Step (UnfoldState s) b
forall s a. Step s a
Stop) (Step s b -> Step (UnfoldState s) b)
-> m (Step s b) -> m (Step (UnfoldState s) b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> m (Step s b)
ustep s
st

------------------------------------------------------------------------------
-- From Values
------------------------------------------------------------------------------

-- | Create a singleton 'Stream' from a pure value.
{-# INLINE_NORMAL fromPure #-}
fromPure :: Applicative m => a -> Stream m a
fromPure :: a -> Stream m a
fromPure a
x = (State Stream m a -> Bool -> m (Step Bool a)) -> Bool -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream (\State Stream m a
_ Bool
s -> Step Bool a -> m (Step Bool a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step Bool a -> m (Step Bool a)) -> Step Bool a -> m (Step Bool a)
forall a b. (a -> b) -> a -> b
$ Any -> Bool -> Step Bool a
forall p. p -> Bool -> Step Bool a
step Any
forall a. HasCallStack => a
undefined Bool
s) Bool
True
  where
    {-# INLINE_LATE step #-}
    step :: p -> Bool -> Step Bool a
step p
_ Bool
True  = a -> Bool -> Step Bool a
forall s a. a -> s -> Step s a
Yield a
x Bool
False
    step p
_ Bool
False = Step Bool a
forall s a. Step s a
Stop

-- | Create a singleton 'Stream' from a monadic action.
{-# INLINE_NORMAL fromEffect #-}
fromEffect :: Applicative m => m a -> Stream m a
fromEffect :: m a -> Stream m a
fromEffect m a
m = (State Stream m a -> Bool -> m (Step Bool a)) -> Bool -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> Bool -> m (Step Bool a)
forall p. p -> Bool -> m (Step Bool a)
step Bool
True

    where

    {-# INLINE_LATE step #-}
    step :: p -> Bool -> m (Step Bool a)
step p
_ Bool
True  = (a -> Bool -> Step Bool a
forall s a. a -> s -> Step s a
`Yield` Bool
False) (a -> Step Bool a) -> m a -> m (Step Bool a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m a
m
    step p
_ Bool
False = Step Bool a -> m (Step Bool a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Step Bool a
forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- From Containers
------------------------------------------------------------------------------

-- Adapted from the vector package.
-- | Convert a list of pure values to a 'Stream'
{-# INLINE_LATE fromList #-}
fromList :: Applicative m => [a] -> Stream m a
fromList :: [a] -> Stream m a
fromList = (State Stream m a -> [a] -> m (Step [a] a)) -> [a] -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> [a] -> m (Step [a] a)
forall (f :: * -> *) p a.
Applicative f =>
p -> [a] -> f (Step [a] a)
step
  where
    {-# INLINE_LATE step #-}
    step :: p -> [a] -> f (Step [a] a)
step p
_ (a
x:[a]
xs) = Step [a] a -> f (Step [a] a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step [a] a -> f (Step [a] a)) -> Step [a] a -> f (Step [a] a)
forall a b. (a -> b) -> a -> b
$ a -> [a] -> Step [a] a
forall s a. a -> s -> Step s a
Yield a
x [a]
xs
    step p
_ []     = Step [a] a -> f (Step [a] a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Step [a] a
forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Conversions From/To
------------------------------------------------------------------------------

-- | Convert a CPS encoded StreamK to direct style step encoded StreamD
{-# INLINE_LATE fromStreamK #-}
fromStreamK :: Applicative m => K.Stream m a -> Stream m a
fromStreamK :: Stream m a -> Stream m a
fromStreamK = (State Stream m a -> Stream m a -> m (Step (Stream m a) a))
-> Stream m a -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> Stream m a -> m (Step (Stream m a) a)
forall (m :: * -> *) a.
Applicative m =>
State Stream m a -> Stream m a -> m (Step (Stream m a) a)
step
    where
    step :: State Stream m a -> Stream m a -> m (Step (Stream m a) a)
step State Stream m a
gst Stream m a
m1 =
        let stop :: m (Step s a)
stop       = Step s a -> m (Step s a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Step s a
forall s a. Step s a
Stop
            single :: a -> f (Step (Stream m a) a)
single a
a   = Step (Stream m a) a -> f (Step (Stream m a) a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step (Stream m a) a -> f (Step (Stream m a) a))
-> Step (Stream m a) a -> f (Step (Stream m a) a)
forall a b. (a -> b) -> a -> b
$ a -> Stream m a -> Step (Stream m a) a
forall s a. a -> s -> Step s a
Yield a
a Stream m a
forall (m :: * -> *) a. Stream m a
K.nil
            yieldk :: a -> s -> f (Step s a)
yieldk a
a s
r = Step s a -> f (Step s a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step s a -> f (Step s a)) -> Step s a -> f (Step s a)
forall a b. (a -> b) -> a -> b
$ a -> s -> Step s a
forall s a. a -> s -> Step s a
Yield a
a s
r
         in State Stream m a
-> (a -> Stream m a -> m (Step (Stream m a) a))
-> (a -> m (Step (Stream m a) a))
-> m (Step (Stream m a) a)
-> Stream m a
-> m (Step (Stream m a) a)
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
gst a -> Stream m a -> m (Step (Stream m a) a)
forall (f :: * -> *) a s. Applicative f => a -> s -> f (Step s a)
yieldk a -> m (Step (Stream m a) a)
forall (f :: * -> *) a (m :: * -> *) a.
Applicative f =>
a -> f (Step (Stream m a) a)
single m (Step (Stream m a) a)
forall s a. m (Step s a)
stop Stream m a
m1

-- | Convert a direct style step encoded StreamD to a CPS encoded StreamK
{-# INLINE_LATE toStreamK #-}
toStreamK :: Monad m => Stream m a -> K.Stream m a
toStreamK :: Stream m a -> Stream m a
toStreamK (Stream State Stream m a -> s -> m (Step s a)
step s
state) = s -> Stream m a
go s
state
    where
    go :: s -> Stream m a
go s
st = (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.MkStream ((forall r.
  State Stream m a
  -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
 -> Stream m a)
-> (forall r.
    State Stream m a
    -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
gst a -> Stream m a -> m r
yld a -> m r
_ m r
stp ->
      let go' :: s -> m r
go' s
ss = do
           Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
ss
           case Step s a
r of
               Yield a
x s
s -> a -> Stream m a -> m r
yld a
x (s -> Stream m a
go s
s)
               Skip  s
s   -> s -> m r
go' s
s
               Step s a
Stop      -> m r
stp
      in s -> m r
go' s
st

#ifndef DISABLE_FUSION
{-# RULES "fromStreamK/toStreamK fusion"
    forall s. toStreamK (fromStreamK s) = s #-}
{-# RULES "toStreamK/fromStreamK fusion"
    forall s. fromStreamK (toStreamK s) = s #-}
#endif

------------------------------------------------------------------------------
-- Running a 'Fold'
------------------------------------------------------------------------------

{-# INLINE_NORMAL fold #-}
fold :: Monad m => Fold m a b -> Stream m a -> m b
fold :: Fold m a b -> Stream m a -> m b
fold Fold m a b
fld Stream m a
strm = do
    (b
b, Stream m a
_) <- Fold m a b -> Stream m a -> m (b, Stream m a)
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m (b, Stream m a)
fold_ Fold m a b
fld Stream m a
strm
    b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return b
b

{-# INLINE_NORMAL fold_ #-}
fold_ :: Monad m => Fold m a b -> Stream m a -> m (b, Stream m a)
fold_ :: Fold m a b -> Stream m a -> m (b, Stream m a)
fold_ (Fold s -> a -> m (Step s b)
fstep m (Step s b)
begin s -> m b
done) (Stream State Stream m a -> s -> m (Step s a)
step s
state) = do
    Step s b
res <- m (Step s b)
begin
    case Step s b
res of
        FL.Partial s
fs -> SPEC -> s -> s -> m (b, Stream m a)
go SPEC
SPEC s
fs s
state
        FL.Done b
fb -> (b, Stream m a) -> m (b, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return ((b, Stream m a) -> m (b, Stream m a))
-> (b, Stream m a) -> m (b, Stream m a)
forall a b. (a -> b) -> a -> b
$! (b
fb, (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
step s
state)

    where

    {-# INLINE go #-}
    go :: SPEC -> s -> s -> m (b, Stream m a)
go !SPEC
_ !s
fs s
st = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s a
r of
            Yield a
x s
s -> do
                Step s b
res <- s -> a -> m (Step s b)
fstep s
fs a
x
                case Step s b
res of
                    FL.Done b
b -> (b, Stream m a) -> m (b, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return ((b, Stream m a) -> m (b, Stream m a))
-> (b, Stream m a) -> m (b, Stream m a)
forall a b. (a -> b) -> a -> b
$! (b
b, (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
step s
s)
                    FL.Partial s
fs1 -> SPEC -> s -> s -> m (b, Stream m a)
go SPEC
SPEC s
fs1 s
s
            Skip s
s -> SPEC -> s -> s -> m (b, Stream m a)
go SPEC
SPEC s
fs s
s
            Step s a
Stop -> do
                b
b <- s -> m b
done s
fs
                (b, Stream m a) -> m (b, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return ((b, Stream m a) -> m (b, Stream m a))
-> (b, Stream m a) -> m (b, Stream m a)
forall a b. (a -> b) -> a -> b
$! (b
b, (State Stream m a -> () -> m (Step () a)) -> () -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream (\ State Stream m a
_ ()
_ -> Step () a -> m (Step () a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step () a
forall s a. Step s a
Stop) ())

{-# INLINE_NORMAL foldOn #-}
foldOn :: Monad m => Fold m a b -> Stream m a -> Fold m a b
foldOn :: Fold m a b -> Stream m a -> Fold m a b
foldOn (Fold s -> a -> m (Step s b)
fstep m (Step s b)
finitial s -> m b
fextract) (Stream State Stream m a -> s -> m (Step s a)
sstep s
state) =
    (s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold s -> a -> m (Step s b)
fstep m (Step s b)
initial s -> m b
fextract

    where

    initial :: m (Step s b)
initial = do
        Step s b
res <- m (Step s b)
finitial
        case Step s b
res of
            FL.Partial s
fs -> SPEC -> s -> s -> m (Step s b)
go SPEC
SPEC s
fs s
state
            FL.Done b
fb -> Step s b -> m (Step s b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s b -> m (Step s b)) -> Step s b -> m (Step s b)
forall a b. (a -> b) -> a -> b
$ b -> Step s b
forall s b. b -> Step s b
FL.Done b
fb

    {-# INLINE go #-}
    go :: SPEC -> s -> s -> m (Step s b)
go !SPEC
_ !s
fs s
st = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
sstep State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s a
r of
            Yield a
x s
s -> do
                Step s b
res <- s -> a -> m (Step s b)
fstep s
fs a
x
                case Step s b
res of
                    FL.Done b
b -> Step s b -> m (Step s b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s b -> m (Step s b)) -> Step s b -> m (Step s b)
forall a b. (a -> b) -> a -> b
$ b -> Step s b
forall s b. b -> Step s b
FL.Done b
b
                    FL.Partial s
fs1 -> SPEC -> s -> s -> m (Step s b)
go SPEC
SPEC s
fs1 s
s
            Skip s
s -> SPEC -> s -> s -> m (Step s b)
go SPEC
SPEC s
fs s
s
            Step s a
Stop -> Step s b -> m (Step s b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s b -> m (Step s b)) -> Step s b -> m (Step s b)
forall a b. (a -> b) -> a -> b
$ s -> Step s b
forall s b. s -> Step s b
FL.Partial s
fs

------------------------------------------------------------------------------
-- Right Folds
------------------------------------------------------------------------------

-- Adapted from the vector package.
--
-- XXX Use of SPEC constructor in folds causes 2x performance degradation in
-- one shot operations, but helps immensely in operations composed of multiple
-- combinators or the same combinator many times. There seems to be an
-- opportunity to optimize here, can we get both, better perf for single ops
-- as well as composed ops? Without SPEC, all single operation benchmarks
-- become 2x faster.

-- The way we want a left fold to be strict, dually we want the right fold to
-- be lazy.  The correct signature of the fold function to keep it lazy must be
-- (a -> m b -> m b) instead of (a -> b -> m b). We were using the latter
-- earlier, which is incorrect. In the latter signature we have to feed the
-- value to the fold function after evaluating the monadic action, depending on
-- the bind behavior of the monad, the action may get evaluated immediately
-- introducing unnecessary strictness to the fold. If the implementation is
-- lazy the following example, must work:
--
-- S.foldrM (\x t -> if x then return t else return False) (return True)
--  (S.fromList [False,undefined] :: SerialT IO Bool)
--
{-# INLINE_NORMAL foldrM #-}
foldrM :: Monad m => (a -> m b -> m b) -> m b -> Stream m a -> m b
foldrM :: (a -> m b -> m b) -> m b -> Stream m a -> m b
foldrM a -> m b -> m b
f m b
z (Stream State Stream m a -> s -> m (Step s a)
step s
state) = SPEC -> s -> m b
go SPEC
SPEC s
state
  where
    {-# INLINE_LATE go #-}
    go :: SPEC -> s -> m b
go !SPEC
_ s
st = do
          Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
          case Step s a
r of
            Yield a
x s
s -> a -> m b -> m b
f a
x (SPEC -> s -> m b
go SPEC
SPEC s
s)
            Skip s
s    -> SPEC -> s -> m b
go SPEC
SPEC s
s
            Step s a
Stop      -> m b
z

{-# INLINE_NORMAL foldrMx #-}
foldrMx :: Monad m
    => (a -> m x -> m x) -> m x -> (m x -> m b) -> Stream m a -> m b
foldrMx :: (a -> m x -> m x) -> m x -> (m x -> m b) -> Stream m a -> m b
foldrMx a -> m x -> m x
fstep m x
final m x -> m b
convert (Stream State Stream m a -> s -> m (Step s a)
step s
state) = m x -> m b
convert (m x -> m b) -> m x -> m b
forall a b. (a -> b) -> a -> b
$ SPEC -> s -> m x
go SPEC
SPEC s
state
  where
    {-# INLINE_LATE go #-}
    go :: SPEC -> s -> m x
go !SPEC
_ s
st = do
          Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
          case Step s a
r of
            Yield a
x s
s -> a -> m x -> m x
fstep a
x (SPEC -> s -> m x
go SPEC
SPEC s
s)
            Skip s
s    -> SPEC -> s -> m x
go SPEC
SPEC s
s
            Step s a
Stop      -> m x
final

-- Note that foldr works on pure values, therefore it becomes necessarily
-- strict when the monad m is strict. In that case it cannot terminate early,
-- it would evaluate all of its input.  Though, this should work fine with lazy
-- monads. For example, if "any" is implemented using "foldr" instead of
-- "foldrM" it performs the same with Identity monad but performs 1000x slower
-- with IO monad.
--
{-# INLINE_NORMAL foldr #-}
foldr :: Monad m => (a -> b -> b) -> b -> Stream m a -> m b
foldr :: (a -> b -> b) -> b -> Stream m a -> m b
foldr a -> b -> b
f b
z = (a -> m b -> m b) -> m b -> Stream m a -> m b
forall (m :: * -> *) a b.
Monad m =>
(a -> m b -> m b) -> m b -> Stream m a -> m b
foldrM ((a -> b -> b) -> m a -> m b -> m b
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 a -> b -> b
f (m a -> m b -> m b) -> (a -> m a) -> a -> m b -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return) (b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return b
z)

-- this performs horribly, should not be used
{-# INLINE_NORMAL foldrS #-}
foldrS
    :: Monad m
    => (a -> Stream m b -> Stream m b)
    -> Stream m b
    -> Stream m a
    -> Stream m b
foldrS :: (a -> Stream m b -> Stream m b)
-> Stream m b -> Stream m a -> Stream m b
foldrS a -> Stream m b -> Stream m b
f Stream m b
final (Stream State Stream m a -> s -> m (Step s a)
step s
state) = SPEC -> s -> Stream m b
go SPEC
SPEC s
state
  where
    {-# INLINE_LATE go #-}
    go :: SPEC -> s -> Stream m b
go !SPEC
_ s
st = do
        -- defState??
        Step s a
r <- m (Step s a) -> Stream m (Step s a)
forall (m :: * -> *) a. Applicative m => m a -> Stream m a
fromEffect (m (Step s a) -> Stream m (Step s a))
-> m (Step s a) -> Stream m (Step s a)
forall a b. (a -> b) -> a -> b
$ State Stream m a -> s -> m (Step s a)
step State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s a
r of
          Yield a
x s
s -> a -> Stream m b -> Stream m b
f a
x (SPEC -> s -> Stream m b
go SPEC
SPEC s
s)
          Skip s
s    -> SPEC -> s -> Stream m b
go SPEC
SPEC s
s
          Step s a
Stop      -> Stream m b
final

-- Right fold to some transformer (T) monad.  This can be useful to implement
-- stateless combinators like map, filtering, insertions, takeWhile, dropWhile.
--
{-# INLINE_NORMAL foldrT #-}
foldrT :: (Monad m, Monad (t m), MonadTrans t)
    => (a -> t m b -> t m b) -> t m b -> Stream m a -> t m b
foldrT :: (a -> t m b -> t m b) -> t m b -> Stream m a -> t m b
foldrT a -> t m b -> t m b
f t m b
final (Stream State Stream m a -> s -> m (Step s a)
step s
state) = SPEC -> s -> t m b
go SPEC
SPEC s
state
  where
    {-# INLINE_LATE go #-}
    go :: SPEC -> s -> t m b
go !SPEC
_ s
st = do
          Step s a
r <- m (Step s a) -> t m (Step s a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Step s a) -> t m (Step s a)) -> m (Step s a) -> t m (Step s a)
forall a b. (a -> b) -> a -> b
$ State Stream m a -> s -> m (Step s a)
step State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
          case Step s a
r of
            Yield a
x s
s -> a -> t m b -> t m b
f a
x (SPEC -> s -> t m b
go SPEC
SPEC s
s)
            Skip s
s    -> SPEC -> s -> t m b
go SPEC
SPEC s
s
            Step s a
Stop      -> t m b
final

------------------------------------------------------------------------------
-- Left Folds
------------------------------------------------------------------------------

-- XXX run begin action only if the stream is not empty.
{-# INLINE_NORMAL foldlMx' #-}
foldlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> m b
foldlMx' :: (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> m b
foldlMx' x -> a -> m x
fstep m x
begin x -> m b
done (Stream State Stream m a -> s -> m (Step s a)
step s
state) =
    m x
begin m x -> (x -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \x
x -> SPEC -> x -> s -> m b
go SPEC
SPEC x
x s
state
  where
    -- XXX !acc?
    {-# INLINE_LATE go #-}
    go :: SPEC -> x -> s -> m b
go !SPEC
_ x
acc s
st = x
acc x -> m b -> m b
`seq` do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s a
r of
            Yield a
x s
s -> do
                x
acc' <- x -> a -> m x
fstep x
acc a
x
                SPEC -> x -> s -> m b
go SPEC
SPEC x
acc' s
s
            Skip s
s -> SPEC -> x -> s -> m b
go SPEC
SPEC x
acc s
s
            Step s a
Stop   -> x -> m b
done x
acc

{-# INLINE foldlx' #-}
foldlx' :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> m b
foldlx' :: (x -> a -> x) -> x -> (x -> b) -> Stream m a -> m b
foldlx' x -> a -> x
fstep x
begin x -> b
done =
    (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> m b
forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> m b
foldlMx' (\x
b a
a -> x -> m x
forall (m :: * -> *) a. Monad m => a -> m a
return (x -> a -> x
fstep x
b a
a)) (x -> m x
forall (m :: * -> *) a. Monad m => a -> m a
return x
begin) (b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> m b) -> (x -> b) -> x -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> b
done)

-- Adapted from the vector package.
-- XXX implement in terms of foldlMx'?
{-# INLINE_NORMAL foldlM' #-}
foldlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> m b
foldlM' :: (b -> a -> m b) -> m b -> Stream m a -> m b
foldlM' b -> a -> m b
fstep m b
mbegin (Stream State Stream m a -> s -> m (Step s a)
step s
state) = do
    b
begin <- m b
mbegin
    SPEC -> b -> s -> m b
go SPEC
SPEC b
begin s
state
  where
    {-# INLINE_LATE go #-}
    go :: SPEC -> b -> s -> m b
go !SPEC
_ b
acc s
st = b
acc b -> m b -> m b
`seq` do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s a
r of
            Yield a
x s
s -> do
                b
acc' <- b -> a -> m b
fstep b
acc a
x
                SPEC -> b -> s -> m b
go SPEC
SPEC b
acc' s
s
            Skip s
s -> SPEC -> b -> s -> m b
go SPEC
SPEC b
acc s
s
            Step s a
Stop   -> b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return b
acc

{-# INLINE foldl' #-}
foldl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> m b
foldl' :: (b -> a -> b) -> b -> Stream m a -> m b
foldl' b -> a -> b
fstep b
begin = (b -> a -> m b) -> m b -> Stream m a -> m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> m b
foldlM' (\b
b a
a -> b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> a -> b
fstep b
b a
a)) (b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return b
begin)

------------------------------------------------------------------------------
-- Special folds
------------------------------------------------------------------------------

-- | Run a streaming composition, discard the results.
{-# INLINE_LATE drain #-}
drain :: Monad m => Stream m a -> m ()
-- drain = foldrM (\_ xs -> xs) (return ())
drain :: Stream m a -> m ()
drain (Stream State Stream m a -> s -> m (Step s a)
step s
state) = SPEC -> s -> m ()
go SPEC
SPEC s
state
  where
    go :: SPEC -> s -> m ()
go !SPEC
_ s
st = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s a
r of
            Yield a
_ s
s -> SPEC -> s -> m ()
go SPEC
SPEC s
s
            Skip s
s    -> SPEC -> s -> m ()
go SPEC
SPEC s
s
            Step s a
Stop      -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

------------------------------------------------------------------------------
-- To Containers
------------------------------------------------------------------------------

{-# INLINE_NORMAL toList #-}
toList :: Monad m => Stream m a -> m [a]
toList :: Stream m a -> m [a]
toList = (a -> [a] -> [a]) -> [a] -> Stream m a -> m [a]
forall (m :: * -> *) a b.
Monad m =>
(a -> b -> b) -> b -> Stream m a -> m b
foldr (:) []

-- Use foldr/build fusion to fuse with list consumers
-- This can be useful when using the IsList instance
{-# INLINE_LATE toListFB #-}
toListFB :: (a -> b -> b) -> b -> Stream Identity a -> b
toListFB :: (a -> b -> b) -> b -> Stream Identity a -> b
toListFB a -> b -> b
c b
n (Stream State Stream Identity a -> s -> Identity (Step s a)
step s
state) = s -> b
go s
state
  where
    go :: s -> b
go s
st = case Identity (Step s a) -> Step s a
forall a. Identity a -> a
runIdentity (State Stream Identity a -> s -> Identity (Step s a)
step State Stream Identity a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st) of
             Yield a
x s
s -> a
x a -> b -> b
`c` s -> b
go s
s
             Skip s
s    -> s -> b
go s
s
             Step s a
Stop      -> b
n

{-# RULES "toList Identity" toList = toListId #-}
{-# INLINE_EARLY toListId #-}
toListId :: Stream Identity a -> Identity [a]
toListId :: Stream Identity a -> Identity [a]
toListId Stream Identity a
s = [a] -> Identity [a]
forall a. a -> Identity a
Identity ([a] -> Identity [a]) -> [a] -> Identity [a]
forall a b. (a -> b) -> a -> b
$ (forall b. (a -> b -> b) -> b -> b) -> [a]
forall a. (forall b. (a -> b -> b) -> b -> b) -> [a]
build (\a -> b -> b
c b
n -> (a -> b -> b) -> b -> Stream Identity a -> b
forall a b. (a -> b -> b) -> b -> Stream Identity a -> b
toListFB a -> b -> b
c b
n Stream Identity a
s)

------------------------------------------------------------------------------
-- Multi-stream folds
------------------------------------------------------------------------------

-- Adapted from the vector package.
{-# INLINE_NORMAL eqBy #-}
eqBy :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool
eqBy :: (a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool
eqBy a -> b -> Bool
eq (Stream State Stream m a -> s -> m (Step s a)
step1 s
t1) (Stream State Stream m b -> s -> m (Step s b)
step2 s
t2) = SPEC -> s -> s -> m Bool
eq_loop0 SPEC
SPEC s
t1 s
t2
  where
    eq_loop0 :: SPEC -> s -> s -> m Bool
eq_loop0 !SPEC
_ s
s1 s
s2 = do
      Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
s1
      case Step s a
r of
        Yield a
x s
s1' -> SPEC -> a -> s -> s -> m Bool
eq_loop1 SPEC
SPEC a
x s
s1' s
s2
        Skip    s
s1' -> SPEC -> s -> s -> m Bool
eq_loop0 SPEC
SPEC   s
s1' s
s2
        Step s a
Stop        -> s -> m Bool
eq_null s
s2

    eq_loop1 :: SPEC -> a -> s -> s -> m Bool
eq_loop1 !SPEC
_ a
x s
s1 s
s2 = do
      Step s b
r <- State Stream m b -> s -> m (Step s b)
step2 State Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
s2
      case Step s b
r of
        Yield b
y s
s2'
          | a -> b -> Bool
eq a
x b
y    -> SPEC -> s -> s -> m Bool
eq_loop0 SPEC
SPEC   s
s1 s
s2'
          | Bool
otherwise -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        Skip    s
s2'   -> SPEC -> a -> s -> s -> m Bool
eq_loop1 SPEC
SPEC a
x s
s1 s
s2'
        Step s b
Stop          -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

    eq_null :: s -> m Bool
eq_null s
s2 = do
      Step s b
r <- State Stream m b -> s -> m (Step s b)
step2 State Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
s2
      case Step s b
r of
        Yield b
_ s
_ -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        Skip s
s2'  -> s -> m Bool
eq_null s
s2'
        Step s b
Stop      -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True

-- Adapted from the vector package.
-- | Compare two streams lexicographically
{-# INLINE_NORMAL cmpBy #-}
cmpBy
    :: Monad m
    => (a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering
cmpBy :: (a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering
cmpBy a -> b -> Ordering
cmp (Stream State Stream m a -> s -> m (Step s a)
step1 s
t1) (Stream State Stream m b -> s -> m (Step s b)
step2 s
t2) = SPEC -> s -> s -> m Ordering
cmp_loop0 SPEC
SPEC s
t1 s
t2
  where
    cmp_loop0 :: SPEC -> s -> s -> m Ordering
cmp_loop0 !SPEC
_ s
s1 s
s2 = do
      Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
s1
      case Step s a
r of
        Yield a
x s
s1' -> SPEC -> a -> s -> s -> m Ordering
cmp_loop1 SPEC
SPEC a
x s
s1' s
s2
        Skip    s
s1' -> SPEC -> s -> s -> m Ordering
cmp_loop0 SPEC
SPEC   s
s1' s
s2
        Step s a
Stop        -> s -> m Ordering
cmp_null s
s2

    cmp_loop1 :: SPEC -> a -> s -> s -> m Ordering
cmp_loop1 !SPEC
_ a
x s
s1 s
s2 = do
      Step s b
r <- State Stream m b -> s -> m (Step s b)
step2 State Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
s2
      case Step s b
r of
        Yield b
y s
s2' -> case a
x a -> b -> Ordering
`cmp` b
y of
                         Ordering
EQ -> SPEC -> s -> s -> m Ordering
cmp_loop0 SPEC
SPEC s
s1 s
s2'
                         Ordering
c  -> Ordering -> m Ordering
forall (m :: * -> *) a. Monad m => a -> m a
return Ordering
c
        Skip    s
s2' -> SPEC -> a -> s -> s -> m Ordering
cmp_loop1 SPEC
SPEC a
x s
s1 s
s2'
        Step s b
Stop        -> Ordering -> m Ordering
forall (m :: * -> *) a. Monad m => a -> m a
return Ordering
GT

    cmp_null :: s -> m Ordering
cmp_null s
s2 = do
      Step s b
r <- State Stream m b -> s -> m (Step s b)
step2 State Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
s2
      case Step s b
r of
        Yield b
_ s
_ -> Ordering -> m Ordering
forall (m :: * -> *) a. Monad m => a -> m a
return Ordering
LT
        Skip s
s2'  -> s -> m Ordering
cmp_null s
s2'
        Step s b
Stop      -> Ordering -> m Ordering
forall (m :: * -> *) a. Monad m => a -> m a
return Ordering
EQ

------------------------------------------------------------------------------
-- Transformations
------------------------------------------------------------------------------

-- Adapted from the vector package.
-- | Map a monadic function over a 'Stream'
{-# INLINE_NORMAL mapM #-}
mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b
mapM :: (a -> m b) -> Stream m a -> Stream m b
mapM a -> m b
f (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
forall (m :: * -> *) a. State Stream m a -> s -> m (Step s b)
step' s
state
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> s -> m (Step s b)
step' State Stream m a
gst s
st = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> a -> m b
f a
x m b -> (b -> m (Step s b)) -> m (Step s b)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \b
a -> Step s b -> m (Step s b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s b -> m (Step s b)) -> Step s b -> m (Step s b)
forall a b. (a -> b) -> a -> b
$ b -> s -> Step s b
forall s a. a -> s -> Step s a
Yield b
a s
s
            Skip s
s    -> Step s b -> m (Step s b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s b -> m (Step s b)) -> Step s b -> m (Step s b)
forall a b. (a -> b) -> a -> b
$ s -> Step s b
forall s a. s -> Step s a
Skip s
s
            Step s a
Stop      -> Step s b -> m (Step s b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step s b
forall s a. Step s a
Stop

{-# INLINE map #-}
map :: Monad m => (a -> b) -> Stream m a -> Stream m b
map :: (a -> b) -> Stream m a -> Stream m b
map a -> b
f = (a -> m b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
mapM (b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> m b) -> (a -> b) -> a -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
f)

instance Functor m => Functor (Stream m) where
    {-# INLINE fmap #-}
    fmap :: (a -> b) -> Stream m a -> Stream m b
fmap a -> b
f (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
forall (m :: * -> *) a. State Stream m a -> s -> m (Step s b)
step' s
state
      where
        {-# INLINE_LATE step' #-}
        step' :: State Stream m a -> s -> m (Step s b)
step' State Stream m a
gst s
st = (Step s a -> Step s b) -> m (Step s a) -> m (Step s b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((a -> b) -> Step s a -> Step s b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f) (State Stream m a -> s -> m (Step s a)
step (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st)

    {-# INLINE (<$) #-}
    <$ :: a -> Stream m b -> Stream m a
(<$) = (b -> a) -> Stream m b -> Stream m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((b -> a) -> Stream m b -> Stream m a)
-> (a -> b -> a) -> a -> Stream m b -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b -> a
forall a b. a -> b -> a
const

-------------------------------------------------------------------------------
-- Filtering
-------------------------------------------------------------------------------

-- Adapted from the vector package.
{-# INLINE_NORMAL take #-}
take :: Applicative m => Int -> Stream m a -> Stream m a
take :: Int -> Stream m a -> Stream m a
take Int
n (Stream State Stream m a -> s -> m (Step s a)
step s
state) = Int
n Int -> Stream m a -> Stream m a
`seq` (State Stream m a -> (s, Int) -> m (Step (s, Int) a))
-> (s, Int) -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> (s, Int) -> m (Step (s, Int) a)
step' (s
state, Int
0)

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> (s, Int) -> m (Step (s, Int) a)
step' State Stream m a
gst (s
st, Int
i) | Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
n = do
        (\case
            Yield a
x s
s -> a -> (s, Int) -> Step (s, Int) a
forall s a. a -> s -> Step s a
Yield a
x (s
s, Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
            Skip s
s    -> (s, Int) -> Step (s, Int) a
forall s a. s -> Step s a
Skip (s
s, Int
i)
            Step s a
Stop      -> Step (s, Int) a
forall s a. Step s a
Stop) (Step s a -> Step (s, Int) a)
-> m (Step s a) -> m (Step (s, Int) a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
    step' State Stream m a
_ (s
_, Int
_) = Step (s, Int) a -> m (Step (s, Int) a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Step (s, Int) a
forall s a. Step s a
Stop

-- Adapted from the vector package.
{-# INLINE_NORMAL takeWhileM #-}
takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
takeWhileM :: (a -> m Bool) -> Stream m a -> Stream m a
takeWhileM a -> m Bool
f (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
step' s
state
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> s -> m (Step s a)
step' State Stream m a
gst s
st = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> do
                Bool
b <- a -> m Bool
f a
x
                Step s a -> m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a -> m (Step s a)) -> Step s a -> m (Step s a)
forall a b. (a -> b) -> a -> b
$ if Bool
b then a -> s -> Step s a
forall s a. a -> s -> Step s a
Yield a
x s
s else Step s a
forall s a. Step s a
Stop
            Skip s
s -> Step s a -> m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a -> m (Step s a)) -> Step s a -> m (Step s a)
forall a b. (a -> b) -> a -> b
$ s -> Step s a
forall s a. s -> Step s a
Skip s
s
            Step s a
Stop   -> Step s a -> m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step s a
forall s a. Step s a
Stop

{-# INLINE takeWhile #-}
takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
takeWhile :: (a -> Bool) -> Stream m a -> Stream m a
takeWhile a -> Bool
f = (a -> m Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
takeWhileM (Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> m Bool) -> (a -> Bool) -> a -> m Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Bool
f)

------------------------------------------------------------------------------
-- Combine N Streams - concatAp
------------------------------------------------------------------------------

{-# INLINE_NORMAL concatAp #-}
concatAp :: Functor f => Stream f (a -> b) -> Stream f a -> Stream f b
concatAp :: Stream f (a -> b) -> Stream f a -> Stream f b
concatAp (Stream State Stream f (a -> b) -> s -> f (Step s (a -> b))
stepa s
statea) (Stream State Stream f a -> s -> f (Step s a)
stepb s
stateb) =
    (State Stream f b
 -> Either s (a -> b, s, s) -> f (Step (Either s (a -> b, s, s)) b))
-> Either s (a -> b, s, s) -> Stream f b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream f b
-> Either s (a -> b, s, s) -> f (Step (Either s (a -> b, s, s)) b)
forall (m :: * -> *) a.
State Stream m a
-> Either s (a -> b, s, s) -> f (Step (Either s (a -> b, s, s)) b)
step' (s -> Either s (a -> b, s, s)
forall a b. a -> Either a b
Left s
statea)

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> Either s (a -> b, s, s) -> f (Step (Either s (a -> b, s, s)) b)
step' State Stream m a
gst (Left s
st) = (Step s (a -> b) -> Step (Either s (a -> b, s, s)) b)
-> f (Step s (a -> b)) -> f (Step (Either s (a -> b, s, s)) b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
        (\case
            Yield a -> b
f s
s -> Either s (a -> b, s, s) -> Step (Either s (a -> b, s, s)) b
forall s a. s -> Step s a
Skip ((a -> b, s, s) -> Either s (a -> b, s, s)
forall a b. b -> Either a b
Right (a -> b
f, s
s, s
stateb))
            Skip    s
s -> Either s (a -> b, s, s) -> Step (Either s (a -> b, s, s)) b
forall s a. s -> Step s a
Skip (s -> Either s (a -> b, s, s)
forall a b. a -> Either a b
Left s
s)
            Step s (a -> b)
Stop      -> Step (Either s (a -> b, s, s)) b
forall s a. Step s a
Stop)
        (State Stream f (a -> b) -> s -> f (Step s (a -> b))
stepa (State Stream m a -> State Stream f (a -> b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st)
    step' State Stream m a
gst (Right (a -> b
f, s
os, s
st)) = (Step s a -> Step (Either s (a -> b, s, s)) b)
-> f (Step s a) -> f (Step (Either s (a -> b, s, s)) b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
        (\case
            Yield a
a s
s -> b -> Either s (a -> b, s, s) -> Step (Either s (a -> b, s, s)) b
forall s a. a -> s -> Step s a
Yield (a -> b
f a
a) ((a -> b, s, s) -> Either s (a -> b, s, s)
forall a b. b -> Either a b
Right (a -> b
f, s
os, s
s))
            Skip s
s    -> Either s (a -> b, s, s) -> Step (Either s (a -> b, s, s)) b
forall s a. s -> Step s a
Skip ((a -> b, s, s) -> Either s (a -> b, s, s)
forall a b. b -> Either a b
Right (a -> b
f,s
os, s
s))
            Step s a
Stop      -> Either s (a -> b, s, s) -> Step (Either s (a -> b, s, s)) b
forall s a. s -> Step s a
Skip (s -> Either s (a -> b, s, s)
forall a b. a -> Either a b
Left s
os))
        (State Stream f a -> s -> f (Step s a)
stepb (State Stream m a -> State Stream f a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st)

{-# INLINE_NORMAL apSequence #-}
apSequence :: Functor f => Stream f a -> Stream f b -> Stream f b
apSequence :: Stream f a -> Stream f b -> Stream f b
apSequence (Stream State Stream f a -> s -> f (Step s a)
stepa s
statea) (Stream State Stream f b -> s -> f (Step s b)
stepb s
stateb) =
    (State Stream f b
 -> Either s (s, s) -> f (Step (Either s (s, s)) b))
-> Either s (s, s) -> Stream f b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream f b -> Either s (s, s) -> f (Step (Either s (s, s)) b)
step (s -> Either s (s, s)
forall a b. a -> Either a b
Left s
statea)

    where

    {-# INLINE_LATE step #-}
    step :: State Stream f b -> Either s (s, s) -> f (Step (Either s (s, s)) b)
step State Stream f b
gst (Left s
st) =
        (Step s a -> Step (Either s (s, s)) b)
-> f (Step s a) -> f (Step (Either s (s, s)) b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
            (\case
                 Yield a
_ s
s -> Either s (s, s) -> Step (Either s (s, s)) b
forall s a. s -> Step s a
Skip ((s, s) -> Either s (s, s)
forall a b. b -> Either a b
Right (s
s, s
stateb))
                 Skip s
s -> Either s (s, s) -> Step (Either s (s, s)) b
forall s a. s -> Step s a
Skip (s -> Either s (s, s)
forall a b. a -> Either a b
Left s
s)
                 Step s a
Stop -> Step (Either s (s, s)) b
forall s a. Step s a
Stop)
            (State Stream f a -> s -> f (Step s a)
stepa (State Stream f b -> State Stream f a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream f b
gst) s
st)
    step State Stream f b
gst (Right (s
ostate, s
st)) =
        (Step s b -> Step (Either s (s, s)) b)
-> f (Step s b) -> f (Step (Either s (s, s)) b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
            (\case
                 Yield b
b s
s -> b -> Either s (s, s) -> Step (Either s (s, s)) b
forall s a. a -> s -> Step s a
Yield b
b ((s, s) -> Either s (s, s)
forall a b. b -> Either a b
Right (s
ostate, s
s))
                 Skip s
s -> Either s (s, s) -> Step (Either s (s, s)) b
forall s a. s -> Step s a
Skip ((s, s) -> Either s (s, s)
forall a b. b -> Either a b
Right (s
ostate, s
s))
                 Step s b
Stop -> Either s (s, s) -> Step (Either s (s, s)) b
forall s a. s -> Step s a
Skip (s -> Either s (s, s)
forall a b. a -> Either a b
Left s
ostate))
            (State Stream f b -> s -> f (Step s b)
stepb State Stream f b
gst s
st)

{-# INLINE_NORMAL apDiscardSnd #-}
apDiscardSnd :: Functor f => Stream f a -> Stream f b -> Stream f a
apDiscardSnd :: Stream f a -> Stream f b -> Stream f a
apDiscardSnd (Stream State Stream f a -> s -> f (Step s a)
stepa s
statea) (Stream State Stream f b -> s -> f (Step s b)
stepb s
stateb) =
    (State Stream f a
 -> Either s (s, s, a) -> f (Step (Either s (s, s, a)) a))
-> Either s (s, s, a) -> Stream f a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream f a
-> Either s (s, s, a) -> f (Step (Either s (s, s, a)) a)
step (s -> Either s (s, s, a)
forall a b. a -> Either a b
Left s
statea)

    where

    {-# INLINE_LATE step #-}
    step :: State Stream f a
-> Either s (s, s, a) -> f (Step (Either s (s, s, a)) a)
step State Stream f a
gst (Left s
st) =
        (Step s a -> Step (Either s (s, s, a)) a)
-> f (Step s a) -> f (Step (Either s (s, s, a)) a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
            (\case
                 Yield a
b s
s -> Either s (s, s, a) -> Step (Either s (s, s, a)) a
forall s a. s -> Step s a
Skip ((s, s, a) -> Either s (s, s, a)
forall a b. b -> Either a b
Right (s
s, s
stateb, a
b))
                 Skip s
s -> Either s (s, s, a) -> Step (Either s (s, s, a)) a
forall s a. s -> Step s a
Skip (s -> Either s (s, s, a)
forall a b. a -> Either a b
Left s
s)
                 Step s a
Stop -> Step (Either s (s, s, a)) a
forall s a. Step s a
Stop)
            (State Stream f a -> s -> f (Step s a)
stepa State Stream f a
gst s
st)
    step State Stream f a
gst (Right (s
ostate, s
st, a
b)) =
        (Step s b -> Step (Either s (s, s, a)) a)
-> f (Step s b) -> f (Step (Either s (s, s, a)) a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
            (\case
                 Yield b
_ s
s -> a -> Either s (s, s, a) -> Step (Either s (s, s, a)) a
forall s a. a -> s -> Step s a
Yield a
b ((s, s, a) -> Either s (s, s, a)
forall a b. b -> Either a b
Right (s
ostate, s
s, a
b))
                 Skip s
s -> Either s (s, s, a) -> Step (Either s (s, s, a)) a
forall s a. s -> Step s a
Skip ((s, s, a) -> Either s (s, s, a)
forall a b. b -> Either a b
Right (s
ostate, s
s, a
b))
                 Step s b
Stop -> Either s (s, s, a) -> Step (Either s (s, s, a)) a
forall s a. s -> Step s a
Skip (s -> Either s (s, s, a)
forall a b. a -> Either a b
Left s
ostate))
            (State Stream f b -> s -> f (Step s b)
stepb (State Stream f a -> State Stream f b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream f a
gst) s
st)

instance Applicative f => Applicative (Stream f) where
    {-# INLINE pure #-}
    pure :: a -> Stream f a
pure = a -> Stream f a
forall (m :: * -> *) a. Applicative m => a -> Stream m a
fromPure

    {-# INLINE (<*>) #-}
    <*> :: Stream f (a -> b) -> Stream f a -> Stream f b
(<*>) = Stream f (a -> b) -> Stream f a -> Stream f b
forall (f :: * -> *) a b.
Functor f =>
Stream f (a -> b) -> Stream f a -> Stream f b
concatAp

#if MIN_VERSION_base(4,10,0)
    {-# INLINE liftA2 #-}
    liftA2 :: (a -> b -> c) -> Stream f a -> Stream f b -> Stream f c
liftA2 a -> b -> c
f Stream f a
x = Stream f (b -> c) -> Stream f b -> Stream f c
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
(<*>) ((a -> b -> c) -> Stream f a -> Stream f (b -> c)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b -> c
f Stream f a
x)
#endif

    {-# INLINE (*>) #-}
    *> :: Stream f a -> Stream f b -> Stream f b
(*>) = Stream f a -> Stream f b -> Stream f b
forall (f :: * -> *) a b.
Functor f =>
Stream f a -> Stream f b -> Stream f b
apSequence

    {-# INLINE (<*) #-}
    <* :: Stream f a -> Stream f b -> Stream f a
(<*) = Stream f a -> Stream f b -> Stream f a
forall (f :: * -> *) a b.
Functor f =>
Stream f a -> Stream f b -> Stream f a
apDiscardSnd

------------------------------------------------------------------------------
-- Combine N Streams - unfoldMany
------------------------------------------------------------------------------

-- Define a unique structure to use in inspection testing
data ConcatMapUState o i =
      ConcatMapUOuter o
    | ConcatMapUInner o i

-- | @unfoldMany unfold stream@ uses @unfold@ to map the input stream elements
-- to streams and then flattens the generated streams into a single output
-- stream.

-- This is like 'concatMap' but uses an unfold with an explicit state to
-- generate the stream instead of a 'Stream' type generator. This allows better
-- optimization via fusion.  This can be many times more efficient than
-- 'concatMap'.

{-# INLINE_NORMAL unfoldMany #-}
unfoldMany :: Monad m => Unfold m a b -> Stream m a -> Stream m b
unfoldMany :: Unfold m a b -> Stream m a -> Stream m b
unfoldMany (Unfold s -> m (Step s b)
istep a -> m s
inject) (Stream State Stream m a -> s -> m (Step s a)
ostep s
ost) =
    (State Stream m b
 -> ConcatMapUState s s -> m (Step (ConcatMapUState s s) b))
-> ConcatMapUState s s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b
-> ConcatMapUState s s -> m (Step (ConcatMapUState s s) b)
forall (m :: * -> *) a.
State Stream m a
-> ConcatMapUState s s -> m (Step (ConcatMapUState s s) b)
step (s -> ConcatMapUState s s
forall o i. o -> ConcatMapUState o i
ConcatMapUOuter s
ost)
  where
    {-# INLINE_LATE step #-}
    step :: State Stream m a
-> ConcatMapUState s s -> m (Step (ConcatMapUState s s) b)
step State Stream m a
gst (ConcatMapUOuter s
o) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
ostep (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
o
        case Step s a
r of
            Yield a
a s
o' -> do
                s
i <- a -> m s
inject a
a
                s
i s
-> m (Step (ConcatMapUState s s) b)
-> m (Step (ConcatMapUState s s) b)
`seq` Step (ConcatMapUState s s) b -> m (Step (ConcatMapUState s s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (ConcatMapUState s s -> Step (ConcatMapUState s s) b
forall s a. s -> Step s a
Skip (s -> s -> ConcatMapUState s s
forall o i. o -> i -> ConcatMapUState o i
ConcatMapUInner s
o' s
i))
            Skip s
o' -> Step (ConcatMapUState s s) b -> m (Step (ConcatMapUState s s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ConcatMapUState s s) b -> m (Step (ConcatMapUState s s) b))
-> Step (ConcatMapUState s s) b -> m (Step (ConcatMapUState s s) b)
forall a b. (a -> b) -> a -> b
$ ConcatMapUState s s -> Step (ConcatMapUState s s) b
forall s a. s -> Step s a
Skip (s -> ConcatMapUState s s
forall o i. o -> ConcatMapUState o i
ConcatMapUOuter s
o')
            Step s a
Stop -> Step (ConcatMapUState s s) b -> m (Step (ConcatMapUState s s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (ConcatMapUState s s) b
forall s a. Step s a
Stop

    step State Stream m a
_ (ConcatMapUInner s
o s
i) = do
        Step s b
r <- s -> m (Step s b)
istep s
i
        Step (ConcatMapUState s s) b -> m (Step (ConcatMapUState s s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ConcatMapUState s s) b -> m (Step (ConcatMapUState s s) b))
-> Step (ConcatMapUState s s) b -> m (Step (ConcatMapUState s s) b)
forall a b. (a -> b) -> a -> b
$ case Step s b
r of
            Yield b
x s
i' -> b -> ConcatMapUState s s -> Step (ConcatMapUState s s) b
forall s a. a -> s -> Step s a
Yield b
x (s -> s -> ConcatMapUState s s
forall o i. o -> i -> ConcatMapUState o i
ConcatMapUInner s
o s
i')
            Skip s
i'    -> ConcatMapUState s s -> Step (ConcatMapUState s s) b
forall s a. s -> Step s a
Skip (s -> s -> ConcatMapUState s s
forall o i. o -> i -> ConcatMapUState o i
ConcatMapUInner s
o s
i')
            Step s b
Stop       -> ConcatMapUState s s -> Step (ConcatMapUState s s) b
forall s a. s -> Step s a
Skip (s -> ConcatMapUState s s
forall o i. o -> ConcatMapUState o i
ConcatMapUOuter s
o)

------------------------------------------------------------------------------
-- Combine N Streams - concatMap
------------------------------------------------------------------------------

-- Adapted from the vector package.
{-# INLINE_NORMAL concatMapM #-}
concatMapM :: Monad m => (a -> m (Stream m b)) -> Stream m a -> Stream m b
concatMapM :: (a -> m (Stream m b)) -> Stream m a -> Stream m b
concatMapM a -> m (Stream m b)
f (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m b
 -> Either s (Stream m b, s)
 -> m (Step (Either s (Stream m b, s)) b))
-> Either s (Stream m b, s) -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b
-> Either s (Stream m b, s)
-> m (Step (Either s (Stream m b, s)) b)
forall (m :: * -> *) a.
State Stream m a
-> Either s (Stream m b, s)
-> m (Step (Either s (Stream m b, s)) b)
step' (s -> Either s (Stream m b, s)
forall a b. a -> Either a b
Left s
state)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> Either s (Stream m b, s)
-> m (Step (Either s (Stream m b, s)) b)
step' State Stream m a
gst (Left s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
a s
s -> do
                Stream m b
b_stream <- a -> m (Stream m b)
f a
a
                Step (Either s (Stream m b, s)) b
-> m (Step (Either s (Stream m b, s)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either s (Stream m b, s)) b
 -> m (Step (Either s (Stream m b, s)) b))
-> Step (Either s (Stream m b, s)) b
-> m (Step (Either s (Stream m b, s)) b)
forall a b. (a -> b) -> a -> b
$ Either s (Stream m b, s) -> Step (Either s (Stream m b, s)) b
forall s a. s -> Step s a
Skip ((Stream m b, s) -> Either s (Stream m b, s)
forall a b. b -> Either a b
Right (Stream m b
b_stream, s
s))
            Skip s
s -> Step (Either s (Stream m b, s)) b
-> m (Step (Either s (Stream m b, s)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either s (Stream m b, s)) b
 -> m (Step (Either s (Stream m b, s)) b))
-> Step (Either s (Stream m b, s)) b
-> m (Step (Either s (Stream m b, s)) b)
forall a b. (a -> b) -> a -> b
$ Either s (Stream m b, s) -> Step (Either s (Stream m b, s)) b
forall s a. s -> Step s a
Skip (s -> Either s (Stream m b, s)
forall a b. a -> Either a b
Left s
s)
            Step s a
Stop -> Step (Either s (Stream m b, s)) b
-> m (Step (Either s (Stream m b, s)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Either s (Stream m b, s)) b
forall s a. Step s a
Stop

    -- XXX flattenArrays is 5x faster than "concatMap fromArray". if somehow we
    -- can get inner_step to inline and fuse here we can perhaps get the same
    -- performance using "concatMap fromArray".
    --
    -- XXX using the pattern synonym "Stream" causes a major performance issue
    -- here even if the synonym does not include an adaptState call. Need to
    -- find out why. Is that something to be fixed in GHC?
    step' State Stream m a
gst (Right (UnStream State Stream m b -> s -> m (Step s b)
inner_step s
inner_st, s
st)) = do
        Step s b
r <- State Stream m b -> s -> m (Step s b)
inner_step (State Stream m a -> State Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
inner_st
        case Step s b
r of
            Yield b
b s
inner_s ->
                Step (Either s (Stream m b, s)) b
-> m (Step (Either s (Stream m b, s)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either s (Stream m b, s)) b
 -> m (Step (Either s (Stream m b, s)) b))
-> Step (Either s (Stream m b, s)) b
-> m (Step (Either s (Stream m b, s)) b)
forall a b. (a -> b) -> a -> b
$ b -> Either s (Stream m b, s) -> Step (Either s (Stream m b, s)) b
forall s a. a -> s -> Step s a
Yield b
b ((Stream m b, s) -> Either s (Stream m b, s)
forall a b. b -> Either a b
Right ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
inner_step s
inner_s, s
st))
            Skip s
inner_s ->
                Step (Either s (Stream m b, s)) b
-> m (Step (Either s (Stream m b, s)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either s (Stream m b, s)) b
 -> m (Step (Either s (Stream m b, s)) b))
-> Step (Either s (Stream m b, s)) b
-> m (Step (Either s (Stream m b, s)) b)
forall a b. (a -> b) -> a -> b
$ Either s (Stream m b, s) -> Step (Either s (Stream m b, s)) b
forall s a. s -> Step s a
Skip ((Stream m b, s) -> Either s (Stream m b, s)
forall a b. b -> Either a b
Right ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
inner_step s
inner_s, s
st))
            Step s b
Stop -> Step (Either s (Stream m b, s)) b
-> m (Step (Either s (Stream m b, s)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either s (Stream m b, s)) b
 -> m (Step (Either s (Stream m b, s)) b))
-> Step (Either s (Stream m b, s)) b
-> m (Step (Either s (Stream m b, s)) b)
forall a b. (a -> b) -> a -> b
$ Either s (Stream m b, s) -> Step (Either s (Stream m b, s)) b
forall s a. s -> Step s a
Skip (s -> Either s (Stream m b, s)
forall a b. a -> Either a b
Left s
st)

{-# INLINE concatMap #-}
concatMap :: Monad m => (a -> Stream m b) -> Stream m a -> Stream m b
concatMap :: (a -> Stream m b) -> Stream m a -> Stream m b
concatMap a -> Stream m b
f = (a -> m (Stream m b)) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> m (Stream m b)) -> Stream m a -> Stream m b
concatMapM (Stream m b -> m (Stream m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m b -> m (Stream m b))
-> (a -> Stream m b) -> a -> m (Stream m b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m b
f)

-- XXX The idea behind this rule is to rewrite any calls to "concatMap
-- fromArray" automatically to flattenArrays which is much faster.  However, we
-- need an INLINE_EARLY on concatMap for this rule to fire. But if we use
-- INLINE_EARLY on concatMap or fromArray then direct uses of
-- "concatMap fromArray" (without the RULE) become much slower, this means
-- "concatMap f" in general would become slower. Need to find a solution to
-- this.
--
-- {-# RULES "concatMap Array.toStreamD"
--      concatMap Array.toStreamD = Array.flattenArray #-}

-- NOTE: even though concatMap for StreamD is 4x faster compared to StreamK,
-- the monad instance does not seem to be significantly faster.
instance Monad m => Monad (Stream m) where
    {-# INLINE return #-}
    return :: a -> Stream m a
return = a -> Stream m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure

    {-# INLINE (>>=) #-}
    >>= :: Stream m a -> (a -> Stream m b) -> Stream m b
(>>=) = ((a -> Stream m b) -> Stream m a -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
forall a b c. (a -> b -> c) -> b -> a -> c
flip (a -> Stream m b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> Stream m b) -> Stream m a -> Stream m b
concatMap

    {-# INLINE (>>) #-}
    >> :: Stream m a -> Stream m b -> Stream m b
(>>) = Stream m a -> Stream m b -> Stream m b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
(*>)

------------------------------------------------------------------------------
-- Grouping/Splitting
------------------------------------------------------------------------------

-- s = stream state, fs = fold state
{-# ANN type FoldManyPost Fuse #-}
data FoldManyPost s fs b a
    = FoldManyPostStart s
    | FoldManyPostLoop s fs
    | FoldManyPostYield b (FoldManyPost s fs b a)
    | FoldManyPostDone

-- | Like foldMany but with the following differences:
--
-- * If the stream is empty the default value of the fold would still be
-- emitted in the output.
-- * At the end of the stream if the last application of the fold did not
-- receive any input it would still yield the default fold accumulator as the
-- last value.
--
{-# INLINE_NORMAL foldManyPost #-}
foldManyPost :: Monad m => Fold m a b -> Stream m a -> Stream m b
foldManyPost :: Fold m a b -> Stream m a -> Stream m b
foldManyPost (Fold s -> a -> m (Step s b)
fstep m (Step s b)
initial s -> m b
extract) (Stream State Stream m a -> s -> m (Step s a)
step s
state) =
    (State Stream m b
 -> FoldManyPost s s b Any -> m (Step (FoldManyPost s s b Any) b))
-> FoldManyPost s s b Any -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b
-> FoldManyPost s s b Any -> m (Step (FoldManyPost s s b Any) b)
forall (m :: * -> *) a a.
State Stream m a
-> FoldManyPost s s b a -> m (Step (FoldManyPost s s b a) b)
step' (s -> FoldManyPost s s b Any
forall s fs b a. s -> FoldManyPost s fs b a
FoldManyPostStart s
state)

    where

    {-# INLINE consume #-}
    consume :: a -> s -> s -> m (Step (FoldManyPost s s b a) a)
consume a
x s
s s
fs = do
        Step s b
res <- s -> a -> m (Step s b)
fstep s
fs a
x
        Step (FoldManyPost s s b a) a -> m (Step (FoldManyPost s s b a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (FoldManyPost s s b a) a
 -> m (Step (FoldManyPost s s b a) a))
-> Step (FoldManyPost s s b a) a
-> m (Step (FoldManyPost s s b a) a)
forall a b. (a -> b) -> a -> b
$ FoldManyPost s s b a -> Step (FoldManyPost s s b a) a
forall s a. s -> Step s a
Skip
            (FoldManyPost s s b a -> Step (FoldManyPost s s b a) a)
-> FoldManyPost s s b a -> Step (FoldManyPost s s b a) a
forall a b. (a -> b) -> a -> b
$ case Step s b
res of
                  FL.Done b
b -> b -> FoldManyPost s s b a -> FoldManyPost s s b a
forall s fs b a.
b -> FoldManyPost s fs b a -> FoldManyPost s fs b a
FoldManyPostYield b
b (s -> FoldManyPost s s b a
forall s fs b a. s -> FoldManyPost s fs b a
FoldManyPostStart s
s)
                  FL.Partial s
ps -> s -> s -> FoldManyPost s s b a
forall s fs b a. s -> fs -> FoldManyPost s fs b a
FoldManyPostLoop s
s s
ps

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> FoldManyPost s s b a -> m (Step (FoldManyPost s s b a) b)
step' State Stream m a
_ (FoldManyPostStart s
st) = do
        Step s b
r <- m (Step s b)
initial
        Step (FoldManyPost s s b a) b -> m (Step (FoldManyPost s s b a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (FoldManyPost s s b a) b
 -> m (Step (FoldManyPost s s b a) b))
-> Step (FoldManyPost s s b a) b
-> m (Step (FoldManyPost s s b a) b)
forall a b. (a -> b) -> a -> b
$ FoldManyPost s s b a -> Step (FoldManyPost s s b a) b
forall s a. s -> Step s a
Skip
            (FoldManyPost s s b a -> Step (FoldManyPost s s b a) b)
-> FoldManyPost s s b a -> Step (FoldManyPost s s b a) b
forall a b. (a -> b) -> a -> b
$ case Step s b
r of
                  FL.Done b
b -> b -> FoldManyPost s s b a -> FoldManyPost s s b a
forall s fs b a.
b -> FoldManyPost s fs b a -> FoldManyPost s fs b a
FoldManyPostYield b
b (s -> FoldManyPost s s b a
forall s fs b a. s -> FoldManyPost s fs b a
FoldManyPostStart s
st)
                  FL.Partial s
fs -> s -> s -> FoldManyPost s s b a
forall s fs b a. s -> fs -> FoldManyPost s fs b a
FoldManyPostLoop s
st s
fs
    step' State Stream m a
gst (FoldManyPostLoop s
st s
fs) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> a -> s -> s -> m (Step (FoldManyPost s s b a) b)
forall s a a. a -> s -> s -> m (Step (FoldManyPost s s b a) a)
consume a
x s
s s
fs
            Skip s
s -> Step (FoldManyPost s s b a) b -> m (Step (FoldManyPost s s b a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FoldManyPost s s b a) b
 -> m (Step (FoldManyPost s s b a) b))
-> Step (FoldManyPost s s b a) b
-> m (Step (FoldManyPost s s b a) b)
forall a b. (a -> b) -> a -> b
$ FoldManyPost s s b a -> Step (FoldManyPost s s b a) b
forall s a. s -> Step s a
Skip (s -> s -> FoldManyPost s s b a
forall s fs b a. s -> fs -> FoldManyPost s fs b a
FoldManyPostLoop s
s s
fs)
            Step s a
Stop -> do
                b
b <- s -> m b
extract s
fs
                Step (FoldManyPost s s b a) b -> m (Step (FoldManyPost s s b a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FoldManyPost s s b a) b
 -> m (Step (FoldManyPost s s b a) b))
-> Step (FoldManyPost s s b a) b
-> m (Step (FoldManyPost s s b a) b)
forall a b. (a -> b) -> a -> b
$ FoldManyPost s s b a -> Step (FoldManyPost s s b a) b
forall s a. s -> Step s a
Skip (b -> FoldManyPost s s b a -> FoldManyPost s s b a
forall s fs b a.
b -> FoldManyPost s fs b a -> FoldManyPost s fs b a
FoldManyPostYield b
b FoldManyPost s s b a
forall s fs b a. FoldManyPost s fs b a
FoldManyPostDone)
    step' State Stream m a
_ (FoldManyPostYield b
b FoldManyPost s s b a
next) = Step (FoldManyPost s s b a) b -> m (Step (FoldManyPost s s b a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FoldManyPost s s b a) b
 -> m (Step (FoldManyPost s s b a) b))
-> Step (FoldManyPost s s b a) b
-> m (Step (FoldManyPost s s b a) b)
forall a b. (a -> b) -> a -> b
$ b -> FoldManyPost s s b a -> Step (FoldManyPost s s b a) b
forall s a. a -> s -> Step s a
Yield b
b FoldManyPost s s b a
next
    step' State Stream m a
_ FoldManyPost s s b a
FoldManyPostDone = Step (FoldManyPost s s b a) b -> m (Step (FoldManyPost s s b a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (FoldManyPost s s b a) b
forall s a. Step s a
Stop

{-# ANN type FoldMany Fuse #-}
data FoldMany s fs b a
    = FoldManyStart s
    | FoldManyFirst fs s
    | FoldManyLoop s fs
    | FoldManyYield b (FoldMany s fs b a)
    | FoldManyDone

-- | Apply a fold multiple times until the stream ends. If the stream is empty
-- the output would be empty.
--
-- @foldMany f = parseMany (fromFold f)@
--
-- A terminating fold may terminate even without accepting a single input. So
-- we run the fold's initial action before evaluating the stream. However, this
-- means that if later the stream does not yield anything we have to discard
-- the fold's initial result which could have generated an effect.
--
{-# INLINE_NORMAL foldMany #-}
foldMany :: Monad m => Fold m a b -> Stream m a -> Stream m b
foldMany :: Fold m a b -> Stream m a -> Stream m b
foldMany (Fold s -> a -> m (Step s b)
fstep m (Step s b)
initial s -> m b
extract) (Stream State Stream m a -> s -> m (Step s a)
step s
state) =
    (State Stream m b
 -> FoldMany s s b Any -> m (Step (FoldMany s s b Any) b))
-> FoldMany s s b Any -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b
-> FoldMany s s b Any -> m (Step (FoldMany s s b Any) b)
forall (m :: * -> *) a a.
State Stream m a
-> FoldMany s s b a -> m (Step (FoldMany s s b a) b)
step' (s -> FoldMany s s b Any
forall s fs b a. s -> FoldMany s fs b a
FoldManyStart s
state)

    where

    {-# INLINE consume #-}
    consume :: a -> s -> s -> m (Step (FoldMany s s b a) a)
consume a
x s
s s
fs = do
        Step s b
res <- s -> a -> m (Step s b)
fstep s
fs a
x
        Step (FoldMany s s b a) a -> m (Step (FoldMany s s b a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (FoldMany s s b a) a -> m (Step (FoldMany s s b a) a))
-> Step (FoldMany s s b a) a -> m (Step (FoldMany s s b a) a)
forall a b. (a -> b) -> a -> b
$ FoldMany s s b a -> Step (FoldMany s s b a) a
forall s a. s -> Step s a
Skip
            (FoldMany s s b a -> Step (FoldMany s s b a) a)
-> FoldMany s s b a -> Step (FoldMany s s b a) a
forall a b. (a -> b) -> a -> b
$ case Step s b
res of
                  FL.Done b
b -> b -> FoldMany s s b a -> FoldMany s s b a
forall s fs b a. b -> FoldMany s fs b a -> FoldMany s fs b a
FoldManyYield b
b (s -> FoldMany s s b a
forall s fs b a. s -> FoldMany s fs b a
FoldManyStart s
s)
                  FL.Partial s
ps -> s -> s -> FoldMany s s b a
forall s fs b a. s -> fs -> FoldMany s fs b a
FoldManyLoop s
s s
ps

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> FoldMany s s b a -> m (Step (FoldMany s s b a) b)
step' State Stream m a
_ (FoldManyStart s
st) = do
        Step s b
r <- m (Step s b)
initial
        Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b))
-> Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall a b. (a -> b) -> a -> b
$ FoldMany s s b a -> Step (FoldMany s s b a) b
forall s a. s -> Step s a
Skip
            (FoldMany s s b a -> Step (FoldMany s s b a) b)
-> FoldMany s s b a -> Step (FoldMany s s b a) b
forall a b. (a -> b) -> a -> b
$ case Step s b
r of
                  FL.Done b
b -> b -> FoldMany s s b a -> FoldMany s s b a
forall s fs b a. b -> FoldMany s fs b a -> FoldMany s fs b a
FoldManyYield b
b (s -> FoldMany s s b a
forall s fs b a. s -> FoldMany s fs b a
FoldManyStart s
st)
                  FL.Partial s
fs -> s -> s -> FoldMany s s b a
forall s fs b a. fs -> s -> FoldMany s fs b a
FoldManyFirst s
fs s
st
    step' State Stream m a
gst (FoldManyFirst s
fs s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> a -> s -> s -> m (Step (FoldMany s s b a) b)
forall s a a. a -> s -> s -> m (Step (FoldMany s s b a) a)
consume a
x s
s s
fs
            Skip s
s -> Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b))
-> Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall a b. (a -> b) -> a -> b
$ FoldMany s s b a -> Step (FoldMany s s b a) b
forall s a. s -> Step s a
Skip (s -> s -> FoldMany s s b a
forall s fs b a. fs -> s -> FoldMany s fs b a
FoldManyFirst s
fs s
s)
            Step s a
Stop -> Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (FoldMany s s b a) b
forall s a. Step s a
Stop
    step' State Stream m a
gst (FoldManyLoop s
st s
fs) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> a -> s -> s -> m (Step (FoldMany s s b a) b)
forall s a a. a -> s -> s -> m (Step (FoldMany s s b a) a)
consume a
x s
s s
fs
            Skip s
s -> Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b))
-> Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall a b. (a -> b) -> a -> b
$ FoldMany s s b a -> Step (FoldMany s s b a) b
forall s a. s -> Step s a
Skip (s -> s -> FoldMany s s b a
forall s fs b a. s -> fs -> FoldMany s fs b a
FoldManyLoop s
s s
fs)
            Step s a
Stop -> do
                b
b <- s -> m b
extract s
fs
                Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b))
-> Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall a b. (a -> b) -> a -> b
$ FoldMany s s b a -> Step (FoldMany s s b a) b
forall s a. s -> Step s a
Skip (b -> FoldMany s s b a -> FoldMany s s b a
forall s fs b a. b -> FoldMany s fs b a -> FoldMany s fs b a
FoldManyYield b
b FoldMany s s b a
forall s fs b a. FoldMany s fs b a
FoldManyDone)
    step' State Stream m a
_ (FoldManyYield b
b FoldMany s s b a
next) = Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b))
-> Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall a b. (a -> b) -> a -> b
$ b -> FoldMany s s b a -> Step (FoldMany s s b a) b
forall s a. a -> s -> Step s a
Yield b
b FoldMany s s b a
next
    step' State Stream m a
_ FoldMany s s b a
FoldManyDone = Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (FoldMany s s b a) b
forall s a. Step s a
Stop

{-# INLINE chunksOf #-}
chunksOf :: Monad m => Int -> Fold m a b -> Stream m a -> Stream m b
chunksOf :: Int -> Fold m a b -> Stream m a -> Stream m b
chunksOf Int
n Fold m a b
f = Fold m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
foldMany (Int -> Fold m a b -> Fold m a b
forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Fold m a b
FL.take Int
n Fold m a b
f)

-- Keep the argument order consistent with consumeIterateM.
--
-- | Like 'foldMany' but for the 'Refold' type.  The supplied action is used as
-- the initial value for each refold.
--
-- /Internal/
{-# INLINE_NORMAL refoldMany #-}
refoldMany :: Monad m => Refold m x a b -> m x -> Stream m a -> Stream m b
refoldMany :: Refold m x a b -> m x -> Stream m a -> Stream m b
refoldMany (Refold s -> a -> m (Step s b)
fstep x -> m (Step s b)
inject s -> m b
extract) m x
action (Stream State Stream m a -> s -> m (Step s a)
step s
state) =
    (State Stream m b
 -> FoldMany s s b Any -> m (Step (FoldMany s s b Any) b))
-> FoldMany s s b Any -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b
-> FoldMany s s b Any -> m (Step (FoldMany s s b Any) b)
forall (m :: * -> *) a a.
State Stream m a
-> FoldMany s s b a -> m (Step (FoldMany s s b a) b)
step' (s -> FoldMany s s b Any
forall s fs b a. s -> FoldMany s fs b a
FoldManyStart s
state)

    where

    {-# INLINE consume #-}
    consume :: a -> s -> s -> m (Step (FoldMany s s b a) a)
consume a
x s
s s
fs = do
        Step s b
res <- s -> a -> m (Step s b)
fstep s
fs a
x
        Step (FoldMany s s b a) a -> m (Step (FoldMany s s b a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (FoldMany s s b a) a -> m (Step (FoldMany s s b a) a))
-> Step (FoldMany s s b a) a -> m (Step (FoldMany s s b a) a)
forall a b. (a -> b) -> a -> b
$ FoldMany s s b a -> Step (FoldMany s s b a) a
forall s a. s -> Step s a
Skip
            (FoldMany s s b a -> Step (FoldMany s s b a) a)
-> FoldMany s s b a -> Step (FoldMany s s b a) a
forall a b. (a -> b) -> a -> b
$ case Step s b
res of
                  FL.Done b
b -> b -> FoldMany s s b a -> FoldMany s s b a
forall s fs b a. b -> FoldMany s fs b a -> FoldMany s fs b a
FoldManyYield b
b (s -> FoldMany s s b a
forall s fs b a. s -> FoldMany s fs b a
FoldManyStart s
s)
                  FL.Partial s
ps -> s -> s -> FoldMany s s b a
forall s fs b a. s -> fs -> FoldMany s fs b a
FoldManyLoop s
s s
ps

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> FoldMany s s b a -> m (Step (FoldMany s s b a) b)
step' State Stream m a
_ (FoldManyStart s
st) = do
        Step s b
r <- m x
action m x -> (x -> m (Step s b)) -> m (Step s b)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= x -> m (Step s b)
inject
        Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b))
-> Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall a b. (a -> b) -> a -> b
$ FoldMany s s b a -> Step (FoldMany s s b a) b
forall s a. s -> Step s a
Skip
            (FoldMany s s b a -> Step (FoldMany s s b a) b)
-> FoldMany s s b a -> Step (FoldMany s s b a) b
forall a b. (a -> b) -> a -> b
$ case Step s b
r of
                  FL.Done b
b -> b -> FoldMany s s b a -> FoldMany s s b a
forall s fs b a. b -> FoldMany s fs b a -> FoldMany s fs b a
FoldManyYield b
b (s -> FoldMany s s b a
forall s fs b a. s -> FoldMany s fs b a
FoldManyStart s
st)
                  FL.Partial s
fs -> s -> s -> FoldMany s s b a
forall s fs b a. fs -> s -> FoldMany s fs b a
FoldManyFirst s
fs s
st
    step' State Stream m a
gst (FoldManyFirst s
fs s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> a -> s -> s -> m (Step (FoldMany s s b a) b)
forall s a a. a -> s -> s -> m (Step (FoldMany s s b a) a)
consume a
x s
s s
fs
            Skip s
s -> Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b))
-> Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall a b. (a -> b) -> a -> b
$ FoldMany s s b a -> Step (FoldMany s s b a) b
forall s a. s -> Step s a
Skip (s -> s -> FoldMany s s b a
forall s fs b a. fs -> s -> FoldMany s fs b a
FoldManyFirst s
fs s
s)
            Step s a
Stop -> Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (FoldMany s s b a) b
forall s a. Step s a
Stop
    step' State Stream m a
gst (FoldManyLoop s
st s
fs) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> a -> s -> s -> m (Step (FoldMany s s b a) b)
forall s a a. a -> s -> s -> m (Step (FoldMany s s b a) a)
consume a
x s
s s
fs
            Skip s
s -> Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b))
-> Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall a b. (a -> b) -> a -> b
$ FoldMany s s b a -> Step (FoldMany s s b a) b
forall s a. s -> Step s a
Skip (s -> s -> FoldMany s s b a
forall s fs b a. s -> fs -> FoldMany s fs b a
FoldManyLoop s
s s
fs)
            Step s a
Stop -> do
                b
b <- s -> m b
extract s
fs
                Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b))
-> Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall a b. (a -> b) -> a -> b
$ FoldMany s s b a -> Step (FoldMany s s b a) b
forall s a. s -> Step s a
Skip (b -> FoldMany s s b a -> FoldMany s s b a
forall s fs b a. b -> FoldMany s fs b a -> FoldMany s fs b a
FoldManyYield b
b FoldMany s s b a
forall s fs b a. FoldMany s fs b a
FoldManyDone)
    step' State Stream m a
_ (FoldManyYield b
b FoldMany s s b a
next) = Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b))
-> Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall a b. (a -> b) -> a -> b
$ b -> FoldMany s s b a -> Step (FoldMany s s b a) b
forall s a. a -> s -> Step s a
Yield b
b FoldMany s s b a
next
    step' State Stream m a
_ FoldMany s s b a
FoldManyDone = Step (FoldMany s s b a) b -> m (Step (FoldMany s s b a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (FoldMany s s b a) b
forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Other instances
------------------------------------------------------------------------------

instance MonadTrans Stream where
    {-# INLINE lift #-}
    lift :: m a -> Stream m a
lift = m a -> Stream m a
forall (m :: * -> *) a. Applicative m => m a -> Stream m a
fromEffect

instance (MonadThrow m) => MonadThrow (Stream m) where
    throwM :: e -> Stream m a
throwM = m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m a -> Stream m a) -> (e -> m a) -> e -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM