{-# LANGUAGE BangPatterns              #-}
{-# LANGUAGE CPP                       #-}
{-# LANGUAGE ConstraintKinds           #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts          #-}
{-# LANGUAGE FlexibleInstances         #-}
{-# LANGUAGE MultiParamTypeClasses     #-}
{-# LANGUAGE PatternSynonyms           #-}
{-# LANGUAGE ViewPatterns              #-}
{-# LANGUAGE RankNTypes                #-}

#include "inline.hs"

-- |
-- Module      : Streamly.Internal.Data.Stream.StreamD.Type
-- Copyright   : (c) 2018 Harendra Kumar
--               (c) Roman Leshchinskiy 2008-2010
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Stream.StreamD.Type
    (
    -- * The stream type
      Step (..)
    -- XXX UnStream is exported to avoid a performance issue in concatMap if we
    -- use the pattern synonym "Stream".
#if __GLASGOW_HASKELL__ >= 800
    , Stream (Stream, UnStream)
#else
    , Stream (UnStream)
    , pattern Stream
#endif
    , fromStreamK
    , toStreamK
    , fromStreamD
    , map
    , mapM
    , yield
    , yieldM
    , concatMap
    , concatMapM

    , foldrT
    , foldrM
    , foldrMx
    , foldr
    , foldrS

    , foldl'
    , foldlM'
    , foldlx'
    , foldlMx'

    , toList
    , fromList

    , eqBy
    , cmpBy
    , take
    , GroupState (..) -- for inspection testing
    , groupsOf
    , groupsOf2
    )
where

import Control.Applicative (liftA2)
import Control.Monad (when)
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.Trans (lift, MonadTrans)
import Data.Functor.Identity (Identity(..))
import GHC.Base (build)
import GHC.Types (SPEC(..))
import Prelude hiding (map, mapM, foldr, take, concatMap)
import Fusion.Plugin.Types (Fuse(..))

import Streamly.Internal.Data.SVar (State(..), adaptState, defState)
import Streamly.Internal.Data.Fold.Types (Fold(..), Fold2(..))

import qualified Streamly.Internal.Data.Stream.StreamK as K

------------------------------------------------------------------------------
-- The direct style stream type
------------------------------------------------------------------------------

-- | A stream is a succession of 'Step's. A 'Yield' produces a single value and
-- the next state of the stream. 'Stop' indicates there are no more values in
-- the stream.
{-# ANN type Step Fuse #-}
data Step s a = Yield a s | Skip s | Stop

instance Functor (Step s) where
    {-# INLINE fmap #-}
    fmap :: forall a b. (a -> b) -> Step s a -> Step s b
fmap a -> b
f (Yield a
x s
s) = forall s a. a -> s -> Step s a
Yield (a -> b
f a
x) s
s
    fmap a -> b
_ (Skip s
s) = forall s a. s -> Step s a
Skip s
s
    fmap a -> b
_ Step s a
Stop = forall s a. Step s a
Stop

-- gst = global state
-- | A stream consists of a step function that generates the next step given a
-- current state, and the current state.
data Stream m a =
    forall s. UnStream (State K.Stream m a -> s -> m (Step s a)) s

unShare :: Stream m a -> Stream m a
unShare :: forall (m :: * -> *) a. Stream m a -> Stream m a
unShare (UnStream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
UnStream forall {m :: * -> *} {a}. State Stream m a -> s -> m (Step s a)
step' s
state
    where step' :: State Stream m a -> s -> m (Step s a)
step' State Stream m a
gst = State Stream m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst)

pattern Stream :: (State K.Stream m a -> s -> m (Step s a)) -> s -> Stream m a
pattern $bStream :: forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
$mStream :: forall {r} {m :: * -> *} {a}.
Stream m a
-> (forall {s}. (State Stream m a -> s -> m (Step s a)) -> s -> r)
-> ((# #) -> r)
-> r
Stream step state <- (unShare -> UnStream step state)
    where Stream = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
UnStream

#if __GLASGOW_HASKELL__ >= 802
{-# COMPLETE Stream #-}
#endif

{-# INLINE_LATE fromStreamK #-}
fromStreamK :: Monad m => K.Stream m a -> Stream m a
fromStreamK :: forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a
fromStreamK = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {t :: (* -> *) -> * -> *} {m :: * -> *} {a}.
(IsStream t, Monad m) =>
State Stream m a -> t m a -> m (Step (t m a) a)
step
    where
    step :: State Stream m a -> t m a -> m (Step (t m a) a)
step State Stream m a
gst t m a
m1 =
        let stop :: m (Step s a)
stop       = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
            single :: a -> m (Step (t m a) a)
single a
a   = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
a forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil
            yieldk :: a -> s -> m (Step s a)
yieldk a
a s
r = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
a s
r
         in forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
K.foldStreamShared State Stream m a
gst forall {m :: * -> *} {a} {s}. Monad m => a -> s -> m (Step s a)
yieldk forall {m :: * -> *} {t :: (* -> *) -> * -> *} {a} {m :: * -> *}
       {a}.
(Monad m, IsStream t) =>
a -> m (Step (t m a) a)
single forall {s} {a}. m (Step s a)
stop t m a
m1

-- Convert a direct stream to and from CPS encoded stream
{-# INLINE_LATE toStreamK #-}
toStreamK :: Monad m => Stream m a -> K.Stream m a
toStreamK :: forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a
toStreamK (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall {t :: (* -> *) -> * -> *}. IsStream t => s -> t m a
go s
state
    where
    go :: s -> t m a
go s
st = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
gst a -> t m a -> m r
yld a -> m r
_ m r
stp ->
      let go' :: s -> m r
go' s
ss = do
           Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
ss
           case Step s a
r of
               Yield a
x s
s -> a -> t m a -> m r
yld a
x (s -> t m a
go s
s)
               Skip  s
s   -> s -> m r
go' s
s
               Step s a
Stop      -> m r
stp
      in s -> m r
go' s
st

#ifndef DISABLE_FUSION
{-# RULES "fromStreamK/toStreamK fusion"
    forall s. toStreamK (fromStreamK s) = s #-}
{-# RULES "toStreamK/fromStreamK fusion"
    forall s. fromStreamK (toStreamK s) = s #-}
#endif

------------------------------------------------------------------------------
-- Converting folds
------------------------------------------------------------------------------

{-# INLINE fromStreamD #-}
fromStreamD :: (K.IsStream t, Monad m) => Stream m a -> t m a
fromStreamD :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
K.fromStream forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a
toStreamK

------------------------------------------------------------------------------
-- Instances
------------------------------------------------------------------------------

-- | Map a monadic function over a 'Stream'
{-# INLINE_NORMAL mapM #-}
mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b
mapM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
mapM a -> m b
f (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}. State Stream m a -> s -> m (Step s b)
step' s
state
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> s -> m (Step s b)
step' State Stream m a
gst s
st = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> a -> m b
f a
x forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \b
a -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
a s
s
            Skip s
s    -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip s
s
            Step s a
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE map #-}
map :: Monad m => (a -> b) -> Stream m a -> Stream m b
map :: forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> Stream m a -> Stream m b
map a -> b
f = forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
mapM (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
f)

instance Functor m => Functor (Stream m) where
    {-# INLINE fmap #-}
    fmap :: forall a b. (a -> b) -> Stream m a -> Stream m b
fmap a -> b
f (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}. State Stream m a -> s -> m (Step s b)
step' s
state
      where
        {-# INLINE_LATE step' #-}
        step' :: State Stream m a -> s -> m (Step s b)
step' State Stream m a
gst s
st = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f) (State Stream m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st)

------------------------------------------------------------------------------
-- concatMap
------------------------------------------------------------------------------

{-# INLINE_NORMAL concatMapM #-}
concatMapM :: Monad m => (a -> m (Stream m b)) -> Stream m a -> Stream m b
concatMapM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m (Stream m b)) -> Stream m a -> Stream m b
concatMapM a -> m (Stream m b)
f (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State Stream m a
-> Either s (Stream m b, s)
-> m (Step (Either s (Stream m b, s)) b)
step' (forall a b. a -> Either a b
Left s
state)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> Either s (Stream m b, s)
-> m (Step (Either s (Stream m b, s)) b)
step' State Stream m a
gst (Left s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
a s
s -> do
                Stream m b
b_stream <- a -> m (Stream m b)
f a
a
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a b. b -> Either a b
Right (Stream m b
b_stream, s
s))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left s
s)
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    -- XXX flattenArrays is 5x faster than "concatMap fromArray". if somehow we
    -- can get inner_step to inline and fuse here we can perhaps get the same
    -- performance using "concatMap fromArray".
    --
    -- XXX using the pattern synonym "Stream" causes a major performance issue
    -- here even if the synonym does not include an adaptState call. Need to
    -- find out why. Is that something to be fixed in GHC?
    step' State Stream m a
gst (Right (UnStream State Stream m b -> s -> m (Step s b)
inner_step s
inner_st, s
st)) = do
        Step s b
r <- State Stream m b -> s -> m (Step s b)
inner_step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
inner_st
        case Step s b
r of
            Yield b
b s
inner_s ->
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
b (forall a b. b -> Either a b
Right (forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
inner_step s
inner_s, s
st))
            Skip s
inner_s ->
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a b. b -> Either a b
Right (forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
inner_step s
inner_s, s
st))
            Step s b
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left s
st)

{-# INLINE concatMap #-}
concatMap :: Monad m => (a -> Stream m b) -> Stream m a -> Stream m b
concatMap :: forall (m :: * -> *) a b.
Monad m =>
(a -> Stream m b) -> Stream m a -> Stream m b
concatMap a -> Stream m b
f = forall (m :: * -> *) a b.
Monad m =>
(a -> m (Stream m b)) -> Stream m a -> Stream m b
concatMapM (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m b
f)

-- XXX The idea behind this rule is to rewrite any calls to "concatMap
-- fromArray" automatically to flattenArrays which is much faster.  However, we
-- need an INLINE_EARLY on concatMap for this rule to fire. But if we use
-- INLINE_EARLY on concatMap or fromArray then direct uses of
-- "concatMap fromArray" (without the RULE) become much slower, this means
-- "concatMap f" in general would become slower. Need to find a solution to
-- this.
--
-- {-# RULES "concatMap Array.toStreamD"
--      concatMap Array.toStreamD = Array.flattenArray #-}

-- | Create a singleton 'Stream' from a pure value.
{-# INLINE_NORMAL yield #-}
yield :: Applicative m => a -> Stream m a
yield :: forall (m :: * -> *) a. Applicative m => a -> Stream m a
yield a
x = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream (\State Stream m a
_ Bool
s -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall {p}. p -> Bool -> Step Bool a
step forall a. HasCallStack => a
undefined Bool
s) Bool
True
  where
    {-# INLINE_LATE step #-}
    step :: p -> Bool -> Step Bool a
step p
_ Bool
True  = forall s a. a -> s -> Step s a
Yield a
x Bool
False
    step p
_ Bool
False = forall s a. Step s a
Stop

{-# INLINE_NORMAL concatAp #-}
concatAp :: Functor f => Stream f (a -> b) -> Stream f a -> Stream f b
concatAp :: forall (f :: * -> *) a b.
Functor f =>
Stream f (a -> b) -> Stream f a -> Stream f b
concatAp (Stream State Stream f (a -> b) -> s -> f (Step s (a -> b))
stepa s
statea) (Stream State Stream f a -> s -> f (Step s a)
stepb s
stateb) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State Stream m a
-> Either s (a -> b, s, s) -> f (Step (Either s (a -> b, s, s)) b)
step' (forall a b. a -> Either a b
Left s
statea)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> Either s (a -> b, s, s) -> f (Step (Either s (a -> b, s, s)) b)
step' State Stream m a
gst (Left s
st) = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
        (\Step s (a -> b)
r -> case Step s (a -> b)
r of
            Yield a -> b
f s
s -> forall s a. s -> Step s a
Skip (forall a b. b -> Either a b
Right (a -> b
f, s
s, s
stateb))
            Skip    s
s -> forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left s
s)
            Step s (a -> b)
Stop      -> forall s a. Step s a
Stop)
        (State Stream f (a -> b) -> s -> f (Step s (a -> b))
stepa (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st)
    step' State Stream m a
gst (Right (a -> b
f, s
os, s
st)) = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
        (\Step s a
r -> case Step s a
r of
            Yield a
a s
s -> forall s a. a -> s -> Step s a
Yield (a -> b
f a
a) (forall a b. b -> Either a b
Right (a -> b
f, s
os, s
s))
            Skip s
s    -> forall s a. s -> Step s a
Skip (forall a b. b -> Either a b
Right (a -> b
f,s
os, s
s))
            Step s a
Stop      -> forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left s
os))
        (State Stream f a -> s -> f (Step s a)
stepb (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st)

{-# INLINE_NORMAL apSequence #-}
apSequence :: Functor f => Stream f a -> Stream f b -> Stream f b
apSequence :: forall (f :: * -> *) a b.
Functor f =>
Stream f a -> Stream f b -> Stream f b
apSequence (Stream State Stream f a -> s -> f (Step s a)
stepa s
statea) (Stream State Stream f b -> s -> f (Step s b)
stepb s
stateb) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream f b -> Either s (s, s) -> f (Step (Either s (s, s)) b)
step (forall a b. a -> Either a b
Left s
statea)
  where
    {-# INLINE_LATE step #-}
    step :: State Stream f b -> Either s (s, s) -> f (Step (Either s (s, s)) b)
step State Stream f b
gst (Left s
st) =
        forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
            (\Step s a
r ->
                 case Step s a
r of
                     Yield a
_ s
s -> forall s a. s -> Step s a
Skip (forall a b. b -> Either a b
Right (s
s, s
stateb))
                     Skip s
s -> forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left s
s)
                     Step s a
Stop -> forall s a. Step s a
Stop)
            (State Stream f a -> s -> f (Step s a)
stepa (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream f b
gst) s
st)
    step State Stream f b
gst (Right (s
ostate, s
st)) =
        forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
            (\Step s b
r ->
                 case Step s b
r of
                     Yield b
b s
s -> forall s a. a -> s -> Step s a
Yield b
b (forall a b. b -> Either a b
Right (s
ostate, s
s))
                     Skip s
s -> forall s a. s -> Step s a
Skip (forall a b. b -> Either a b
Right (s
ostate, s
s))
                     Step s b
Stop -> forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left s
ostate))
            (State Stream f b -> s -> f (Step s b)
stepb State Stream f b
gst s
st)

instance Applicative f => Applicative (Stream f) where
    {-# INLINE pure #-}
    pure :: forall a. a -> Stream f a
pure = forall (m :: * -> *) a. Applicative m => a -> Stream m a
yield
    {-# INLINE (<*>) #-}
    <*> :: forall a b. Stream f (a -> b) -> Stream f a -> Stream f b
(<*>) = forall (f :: * -> *) a b.
Functor f =>
Stream f (a -> b) -> Stream f a -> Stream f b
concatAp
    {-# INLINE (*>) #-}
    *> :: forall a b. Stream f a -> Stream f b -> Stream f b
(*>) = forall (f :: * -> *) a b.
Functor f =>
Stream f a -> Stream f b -> Stream f b
apSequence


-- NOTE: even though concatMap for StreamD is 4x faster compared to StreamK,
-- the monad instance does not seem to be significantly faster.
instance Monad m => Monad (Stream m) where
    {-# INLINE return #-}
    return :: forall a. a -> Stream m a
return = forall (f :: * -> *) a. Applicative f => a -> f a
pure
    {-# INLINE (>>=) #-}
    >>= :: forall a b. Stream m a -> (a -> Stream m b) -> Stream m b
(>>=) = forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (m :: * -> *) a b.
Monad m =>
(a -> Stream m b) -> Stream m a -> Stream m b
concatMap
    {-# INLINE (>>) #-}
    >> :: forall a b. Stream m a -> Stream m b -> Stream m b
(>>) = forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
(*>)

instance MonadTrans Stream where
    lift :: forall (m :: * -> *) a. Monad m => m a -> Stream m a
lift = forall (m :: * -> *) a. Monad m => m a -> Stream m a
yieldM

instance (MonadThrow m) => MonadThrow (Stream m) where
    throwM :: forall e a. Exception e => e -> Stream m a
throwM = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM

-- XXX Use of SPEC constructor in folds causes 2x performance degradation in
-- one shot operations, but helps immensely in operations composed of multiple
-- combinators or the same combinator many times. There seems to be an
-- opportunity to optimize here, can we get both, better perf for single ops
-- as well as composed ops? Without SPEC, all single operation benchmarks
-- become 2x faster.

-- The way we want a left fold to be strict, dually we want the right fold to
-- be lazy.  The correct signature of the fold function to keep it lazy must be
-- (a -> m b -> m b) instead of (a -> b -> m b). We were using the latter
-- earlier, which is incorrect. In the latter signature we have to feed the
-- value to the fold function after evaluating the monadic action, depending on
-- the bind behavior of the monad, the action may get evaluated immediately
-- introducing unnecessary strictness to the fold. If the implementation is
-- lazy the following example, must work:
--
-- S.foldrM (\x t -> if x then return t else return False) (return True)
--  (S.fromList [False,undefined] :: SerialT IO Bool)
--
{-# INLINE_NORMAL foldrM #-}
foldrM :: Monad m => (a -> m b -> m b) -> m b -> Stream m a -> m b
foldrM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b -> m b) -> m b -> Stream m a -> m b
foldrM a -> m b -> m b
f m b
z (Stream State Stream m a -> s -> m (Step s a)
step s
state) = SPEC -> s -> m b
go SPEC
SPEC s
state
  where
    {-# INLINE_LATE go #-}
    go :: SPEC -> s -> m b
go !SPEC
_ s
st = do
          Step s a
r <- State Stream m a -> s -> m (Step s a)
step forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
          case Step s a
r of
            Yield a
x s
s -> a -> m b -> m b
f a
x (SPEC -> s -> m b
go SPEC
SPEC s
s)
            Skip s
s    -> SPEC -> s -> m b
go SPEC
SPEC s
s
            Step s a
Stop      -> m b
z

{-# INLINE_NORMAL foldrMx #-}
foldrMx :: Monad m
    => (a -> m x -> m x) -> m x -> (m x -> m b) -> Stream m a -> m b
foldrMx :: forall (m :: * -> *) a x b.
Monad m =>
(a -> m x -> m x) -> m x -> (m x -> m b) -> Stream m a -> m b
foldrMx a -> m x -> m x
fstep m x
final m x -> m b
convert (Stream State Stream m a -> s -> m (Step s a)
step s
state) = m x -> m b
convert forall a b. (a -> b) -> a -> b
$ SPEC -> s -> m x
go SPEC
SPEC s
state
  where
    {-# INLINE_LATE go #-}
    go :: SPEC -> s -> m x
go !SPEC
_ s
st = do
          Step s a
r <- State Stream m a -> s -> m (Step s a)
step forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
          case Step s a
r of
            Yield a
x s
s -> a -> m x -> m x
fstep a
x (SPEC -> s -> m x
go SPEC
SPEC s
s)
            Skip s
s    -> SPEC -> s -> m x
go SPEC
SPEC s
s
            Step s a
Stop      -> m x
final

-- Note that foldr works on pure values, therefore it becomes necessarily
-- strict when the monad m is strict. In that case it cannot terminate early,
-- it would evaluate all of its input.  Though, this should work fine with lazy
-- monads. For example, if "any" is implemented using "foldr" instead of
-- "foldrM" it performs the same with Identity monad but performs 1000x slower
-- with IO monad.
--
{-# INLINE_NORMAL foldr #-}
foldr :: Monad m => (a -> b -> b) -> b -> Stream m a -> m b
foldr :: forall (m :: * -> *) a b.
Monad m =>
(a -> b -> b) -> b -> Stream m a -> m b
foldr a -> b -> b
f b
z = forall (m :: * -> *) a b.
Monad m =>
(a -> m b -> m b) -> m b -> Stream m a -> m b
foldrM (\a
a m b
b -> forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 a -> b -> b
f (forall (m :: * -> *) a. Monad m => a -> m a
return a
a) m b
b) (forall (m :: * -> *) a. Monad m => a -> m a
return b
z)

-- | Create a singleton 'Stream' from a monadic action.
{-# INLINE_NORMAL yieldM #-}
yieldM :: Monad m => m a -> Stream m a
yieldM :: forall (m :: * -> *) a. Monad m => m a -> Stream m a
yieldM m a
m = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {p}. p -> Bool -> m (Step Bool a)
step Bool
True
  where
    {-# INLINE_LATE step #-}
    step :: p -> Bool -> m (Step Bool a)
step p
_ Bool
True  = m a
m forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
x -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x Bool
False
    step p
_ Bool
False = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-- this performs horribly, should not be used
{-# INLINE_NORMAL foldrS #-}
foldrS
    :: Monad m
    => (a -> Stream m b -> Stream m b)
    -> Stream m b
    -> Stream m a
    -> Stream m b
foldrS :: forall (m :: * -> *) a b.
Monad m =>
(a -> Stream m b -> Stream m b)
-> Stream m b -> Stream m a -> Stream m b
foldrS a -> Stream m b -> Stream m b
f Stream m b
final (Stream State Stream m a -> s -> m (Step s a)
step s
state) = SPEC -> s -> Stream m b
go SPEC
SPEC s
state
  where
    {-# INLINE_LATE go #-}
    go :: SPEC -> s -> Stream m b
go !SPEC
_ s
st = do
        -- defState??
        Step s a
r <- forall (m :: * -> *) a. Monad m => m a -> Stream m a
yieldM forall a b. (a -> b) -> a -> b
$ State Stream m a -> s -> m (Step s a)
step forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s a
r of
          Yield a
x s
s -> a -> Stream m b -> Stream m b
f a
x (SPEC -> s -> Stream m b
go SPEC
SPEC s
s)
          Skip s
s    -> SPEC -> s -> Stream m b
go SPEC
SPEC s
s
          Step s a
Stop      -> Stream m b
final

-- Right fold to some transformer (T) monad.  This can be useful to implement
-- stateless combinators like map, filtering, insertions, takeWhile, dropWhile.
--
{-# INLINE_NORMAL foldrT #-}
foldrT :: (Monad m, Monad (t m), MonadTrans t)
    => (a -> t m b -> t m b) -> t m b -> Stream m a -> t m b
foldrT :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
(Monad m, Monad (t m), MonadTrans t) =>
(a -> t m b -> t m b) -> t m b -> Stream m a -> t m b
foldrT a -> t m b -> t m b
f t m b
final (Stream State Stream m a -> s -> m (Step s a)
step s
state) = SPEC -> s -> t m b
go SPEC
SPEC s
state
  where
    {-# INLINE_LATE go #-}
    go :: SPEC -> s -> t m b
go !SPEC
_ s
st = do
          Step s a
r <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ State Stream m a -> s -> m (Step s a)
step forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
          case Step s a
r of
            Yield a
x s
s -> a -> t m b -> t m b
f a
x (SPEC -> s -> t m b
go SPEC
SPEC s
s)
            Skip s
s    -> SPEC -> s -> t m b
go SPEC
SPEC s
s
            Step s a
Stop      -> t m b
final

{-# INLINE_NORMAL toList #-}
toList :: Monad m => Stream m a -> m [a]
toList :: forall (m :: * -> *) a. Monad m => Stream m a -> m [a]
toList = forall (m :: * -> *) a b.
Monad m =>
(a -> b -> b) -> b -> Stream m a -> m b
foldr (:) []

-- Use foldr/build fusion to fuse with list consumers
-- This can be useful when using the IsList instance
{-# INLINE_LATE toListFB #-}
toListFB :: (a -> b -> b) -> b -> Stream Identity a -> b
toListFB :: forall a b. (a -> b -> b) -> b -> Stream Identity a -> b
toListFB a -> b -> b
c b
n (Stream State Stream Identity a -> s -> Identity (Step s a)
step s
state) = s -> b
go s
state
  where
    go :: s -> b
go s
st = case forall a. Identity a -> a
runIdentity (State Stream Identity a -> s -> Identity (Step s a)
step forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st) of
             Yield a
x s
s -> a
x a -> b -> b
`c` s -> b
go s
s
             Skip s
s    -> s -> b
go s
s
             Step s a
Stop      -> b
n

{-# RULES "toList Identity" toList = toListId #-}
{-# INLINE_EARLY toListId #-}
toListId :: Stream Identity a -> Identity [a]
toListId :: forall a. Stream Identity a -> Identity [a]
toListId Stream Identity a
s = forall a. a -> Identity a
Identity forall a b. (a -> b) -> a -> b
$ forall a. (forall b. (a -> b -> b) -> b -> b) -> [a]
build (\a -> b -> b
c b
n -> forall a b. (a -> b -> b) -> b -> Stream Identity a -> b
toListFB a -> b -> b
c b
n Stream Identity a
s)

-- XXX run begin action only if the stream is not empty.
{-# INLINE_NORMAL foldlMx' #-}
foldlMx' :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> m b
foldlMx' :: forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> m b
foldlMx' x -> a -> m x
fstep m x
begin x -> m b
done (Stream State Stream m a -> s -> m (Step s a)
step s
state) =
    m x
begin forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \x
x -> SPEC -> x -> s -> m b
go SPEC
SPEC x
x s
state
  where
    -- XXX !acc?
    {-# INLINE_LATE go #-}
    go :: SPEC -> x -> s -> m b
go !SPEC
_ x
acc s
st = x
acc seq :: forall a b. a -> b -> b
`seq` do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s a
r of
            Yield a
x s
s -> do
                x
acc' <- x -> a -> m x
fstep x
acc a
x
                SPEC -> x -> s -> m b
go SPEC
SPEC x
acc' s
s
            Skip s
s -> SPEC -> x -> s -> m b
go SPEC
SPEC x
acc s
s
            Step s a
Stop   -> x -> m b
done x
acc

{-# INLINE foldlx' #-}
foldlx' :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> m b
foldlx' :: forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> x) -> x -> (x -> b) -> Stream m a -> m b
foldlx' x -> a -> x
fstep x
begin x -> b
done Stream m a
m =
    forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> m b
foldlMx' (\x
b a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return (x -> a -> x
fstep x
b a
a)) (forall (m :: * -> *) a. Monad m => a -> m a
return x
begin) (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> b
done) Stream m a
m

-- XXX implement in terms of foldlMx'?
{-# INLINE_NORMAL foldlM' #-}
foldlM' :: Monad m => (b -> a -> m b) -> b -> Stream m a -> m b
foldlM' :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> b -> Stream m a -> m b
foldlM' b -> a -> m b
fstep b
begin (Stream State Stream m a -> s -> m (Step s a)
step s
state) = SPEC -> b -> s -> m b
go SPEC
SPEC b
begin s
state
  where
    {-# INLINE_LATE go #-}
    go :: SPEC -> b -> s -> m b
go !SPEC
_ b
acc s
st = b
acc seq :: forall a b. a -> b -> b
`seq` do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s a
r of
            Yield a
x s
s -> do
                b
acc' <- b -> a -> m b
fstep b
acc a
x
                SPEC -> b -> s -> m b
go SPEC
SPEC b
acc' s
s
            Skip s
s -> SPEC -> b -> s -> m b
go SPEC
SPEC b
acc s
s
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return b
acc

{-# INLINE foldl' #-}
foldl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> m b
foldl' :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> m b
foldl' b -> a -> b
fstep = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> b -> Stream m a -> m b
foldlM' (\b
b a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return (b -> a -> b
fstep b
b a
a))

-- | Convert a list of pure values to a 'Stream'
{-# INLINE_LATE fromList #-}
fromList :: Applicative m => [a] -> Stream m a
fromList :: forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
fromList = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {f :: * -> *} {p} {a}.
Applicative f =>
p -> [a] -> f (Step [a] a)
step
  where
    {-# INLINE_LATE step #-}
    step :: p -> [a] -> f (Step [a] a)
step p
_ (a
x:[a]
xs) = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x [a]
xs
    step p
_ []     = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Comparisons
------------------------------------------------------------------------------

{-# INLINE_NORMAL eqBy #-}
eqBy :: Monad m => (a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool
eqBy :: forall (m :: * -> *) a b.
Monad m =>
(a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool
eqBy a -> b -> Bool
eq (Stream State Stream m a -> s -> m (Step s a)
step1 s
t1) (Stream State Stream m b -> s -> m (Step s b)
step2 s
t2) = SPEC -> s -> s -> m Bool
eq_loop0 SPEC
SPEC s
t1 s
t2
  where
    eq_loop0 :: SPEC -> s -> s -> m Bool
eq_loop0 !SPEC
_ s
s1 s
s2 = do
      Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
s1
      case Step s a
r of
        Yield a
x s
s1' -> SPEC -> a -> s -> s -> m Bool
eq_loop1 SPEC
SPEC a
x s
s1' s
s2
        Skip    s
s1' -> SPEC -> s -> s -> m Bool
eq_loop0 SPEC
SPEC   s
s1' s
s2
        Step s a
Stop        -> s -> m Bool
eq_null s
s2

    eq_loop1 :: SPEC -> a -> s -> s -> m Bool
eq_loop1 !SPEC
_ a
x s
s1 s
s2 = do
      Step s b
r <- State Stream m b -> s -> m (Step s b)
step2 forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
s2
      case Step s b
r of
        Yield b
y s
s2'
          | a -> b -> Bool
eq a
x b
y    -> SPEC -> s -> s -> m Bool
eq_loop0 SPEC
SPEC   s
s1 s
s2'
          | Bool
otherwise -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        Skip    s
s2'   -> SPEC -> a -> s -> s -> m Bool
eq_loop1 SPEC
SPEC a
x s
s1 s
s2'
        Step s b
Stop          -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

    eq_null :: s -> m Bool
eq_null s
s2 = do
      Step s b
r <- State Stream m b -> s -> m (Step s b)
step2 forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
s2
      case Step s b
r of
        Yield b
_ s
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        Skip s
s2'  -> s -> m Bool
eq_null s
s2'
        Step s b
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True

-- | Compare two streams lexicographically
{-# INLINE_NORMAL cmpBy #-}
cmpBy
    :: Monad m
    => (a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering
cmpBy :: forall (m :: * -> *) a b.
Monad m =>
(a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering
cmpBy a -> b -> Ordering
cmp (Stream State Stream m a -> s -> m (Step s a)
step1 s
t1) (Stream State Stream m b -> s -> m (Step s b)
step2 s
t2) = SPEC -> s -> s -> m Ordering
cmp_loop0 SPEC
SPEC s
t1 s
t2
  where
    cmp_loop0 :: SPEC -> s -> s -> m Ordering
cmp_loop0 !SPEC
_ s
s1 s
s2 = do
      Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
s1
      case Step s a
r of
        Yield a
x s
s1' -> SPEC -> a -> s -> s -> m Ordering
cmp_loop1 SPEC
SPEC a
x s
s1' s
s2
        Skip    s
s1' -> SPEC -> s -> s -> m Ordering
cmp_loop0 SPEC
SPEC   s
s1' s
s2
        Step s a
Stop        -> s -> m Ordering
cmp_null s
s2

    cmp_loop1 :: SPEC -> a -> s -> s -> m Ordering
cmp_loop1 !SPEC
_ a
x s
s1 s
s2 = do
      Step s b
r <- State Stream m b -> s -> m (Step s b)
step2 forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
s2
      case Step s b
r of
        Yield b
y s
s2' -> case a
x a -> b -> Ordering
`cmp` b
y of
                         Ordering
EQ -> SPEC -> s -> s -> m Ordering
cmp_loop0 SPEC
SPEC s
s1 s
s2'
                         Ordering
c  -> forall (m :: * -> *) a. Monad m => a -> m a
return Ordering
c
        Skip    s
s2' -> SPEC -> a -> s -> s -> m Ordering
cmp_loop1 SPEC
SPEC a
x s
s1 s
s2'
        Step s b
Stop        -> forall (m :: * -> *) a. Monad m => a -> m a
return Ordering
GT

    cmp_null :: s -> m Ordering
cmp_null s
s2 = do
      Step s b
r <- State Stream m b -> s -> m (Step s b)
step2 forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
s2
      case Step s b
r of
        Yield b
_ s
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return Ordering
LT
        Skip s
s2'  -> s -> m Ordering
cmp_null s
s2'
        Step s b
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return Ordering
EQ

{-# INLINE_NORMAL take #-}
take :: Monad m => Int -> Stream m a -> Stream m a
take :: forall (m :: * -> *) a. Monad m => Int -> Stream m a -> Stream m a
take Int
n (Stream State Stream m a -> s -> m (Step s a)
step s
state) = Int
n seq :: forall a b. a -> b -> b
`seq` forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> (s, Int) -> m (Step (s, Int) a)
step' (s
state, Int
0)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> (s, Int) -> m (Step (s, Int) a)
step' State Stream m a
gst (s
st, Int
i) | Int
i forall a. Ord a => a -> a -> Bool
< Int
n = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
x s
s -> forall s a. a -> s -> Step s a
Yield a
x (s
s, Int
i forall a. Num a => a -> a -> a
+ Int
1)
            Skip s
s    -> forall s a. s -> Step s a
Skip (s
s, Int
i)
            Step s a
Stop      -> forall s a. Step s a
Stop
    step' State Stream m a
_ (s
_, Int
_) = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Grouping/Splitting
------------------------------------------------------------------------------

-- s = stream state, fs = fold state
data GroupState s fs
    = GroupStart s
    | GroupBuffer s fs Int
    | GroupYield fs (GroupState s fs)
    | GroupFinish

{-# INLINE_NORMAL groupsOf #-}
groupsOf
    :: Monad m
    => Int
    -> Fold m a b
    -> Stream m a
    -> Stream m b
groupsOf :: forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Stream m a -> Stream m b
groupsOf Int
n (Fold s -> a -> m s
fstep m s
initial s -> m b
extract) (Stream State Stream m a -> s -> m (Step s a)
step s
state) =
    Int
n seq :: forall a b. a -> b -> b
`seq` forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State Stream m a -> GroupState s s -> m (Step (GroupState s s) b)
step' (forall s fs. s -> GroupState s fs
GroupStart s
state)

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> GroupState s s -> m (Step (GroupState s s) b)
step' State Stream m a
_ (GroupStart s
st) = do
        -- XXX shall we use the Natural type instead? Need to check performance
        -- implications.
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n forall a. Ord a => a -> a -> Bool
<= Int
0) forall a b. (a -> b) -> a -> b
$
            -- XXX we can pass the module string from the higher level API
            forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"Streamly.Internal.Data.Stream.StreamD.Type.groupsOf: the size of "
                 forall a. [a] -> [a] -> [a]
++ [Char]
"groups [" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Int
n forall a. [a] -> [a] -> [a]
++ [Char]
"] must be a natural number"
        -- fs = fold state
        s
fs <- m s
initial
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s fs. s -> fs -> Int -> GroupState s fs
GroupBuffer s
st s
fs Int
0)

    step' State Stream m a
gst (GroupBuffer s
st s
fs Int
i) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                !s
fs' <- s -> a -> m s
fstep s
fs a
x
                let i' :: Int
i' = Int
i forall a. Num a => a -> a -> a
+ Int
1
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
                    if Int
i' forall a. Ord a => a -> a -> Bool
>= Int
n
                    then forall s a. s -> Step s a
Skip (forall s fs. fs -> GroupState s fs -> GroupState s fs
GroupYield s
fs' (forall s fs. s -> GroupState s fs
GroupStart s
s))
                    else forall s a. s -> Step s a
Skip (forall s fs. s -> fs -> Int -> GroupState s fs
GroupBuffer s
s s
fs' Int
i')
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s fs. s -> fs -> Int -> GroupState s fs
GroupBuffer s
s s
fs Int
i)
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s fs. fs -> GroupState s fs -> GroupState s fs
GroupYield s
fs forall s fs. GroupState s fs
GroupFinish)

    step' State Stream m a
_ (GroupYield s
fs GroupState s s
next) = do
        b
r <- s -> m b
extract s
fs
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
r GroupState s s
next

    step' State Stream m a
_ GroupState s s
GroupFinish = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE_NORMAL groupsOf2 #-}
groupsOf2
    :: Monad m
    => Int
    -> m c
    -> Fold2 m c a b
    -> Stream m a
    -> Stream m b
groupsOf2 :: forall (m :: * -> *) c a b.
Monad m =>
Int -> m c -> Fold2 m c a b -> Stream m a -> Stream m b
groupsOf2 Int
n m c
input (Fold2 s -> a -> m s
fstep c -> m s
inject s -> m b
extract) (Stream State Stream m a -> s -> m (Step s a)
step s
state) =
    Int
n seq :: forall a b. a -> b -> b
`seq` forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State Stream m a -> GroupState s s -> m (Step (GroupState s s) b)
step' (forall s fs. s -> GroupState s fs
GroupStart s
state)

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> GroupState s s -> m (Step (GroupState s s) b)
step' State Stream m a
_ (GroupStart s
st) = do
        -- XXX shall we use the Natural type instead? Need to check performance
        -- implications.
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n forall a. Ord a => a -> a -> Bool
<= Int
0) forall a b. (a -> b) -> a -> b
$
            -- XXX we can pass the module string from the higher level API
            forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"Streamly.Internal.Data.Stream.StreamD.Type.groupsOf: the size of "
                 forall a. [a] -> [a] -> [a]
++ [Char]
"groups [" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Int
n forall a. [a] -> [a] -> [a]
++ [Char]
"] must be a natural number"
        -- fs = fold state
        s
fs <- m c
input forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= c -> m s
inject
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s fs. s -> fs -> Int -> GroupState s fs
GroupBuffer s
st s
fs Int
0)

    step' State Stream m a
gst (GroupBuffer s
st s
fs Int
i) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                !s
fs' <- s -> a -> m s
fstep s
fs a
x
                let i' :: Int
i' = Int
i forall a. Num a => a -> a -> a
+ Int
1
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
                    if Int
i' forall a. Ord a => a -> a -> Bool
>= Int
n
                    then forall s a. s -> Step s a
Skip (forall s fs. fs -> GroupState s fs -> GroupState s fs
GroupYield s
fs' (forall s fs. s -> GroupState s fs
GroupStart s
s))
                    else forall s a. s -> Step s a
Skip (forall s fs. s -> fs -> Int -> GroupState s fs
GroupBuffer s
s s
fs' Int
i')
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s fs. s -> fs -> Int -> GroupState s fs
GroupBuffer s
s s
fs Int
i)
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s fs. fs -> GroupState s fs -> GroupState s fs
GroupYield s
fs forall s fs. GroupState s fs
GroupFinish)

    step' State Stream m a
_ (GroupYield s
fs GroupState s s
next) = do
        b
r <- s -> m b
extract s
fs
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
r GroupState s s
next

    step' State Stream m a
_ GroupState s s
GroupFinish = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop