{-# LANGUAGE UndecidableInstances #-}

#include "inline.hs"

-- |
-- Module      : Streamly.Internal.Data.Stream.StreamK.Type
-- Copyright   : (c) 2017 Composewell Technologies
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
-- Continuation passing style (CPS) stream implementation. The symbol 'K' below
-- denotes a function as well as a Kontinuation.
--
module Streamly.Internal.Data.Stream.StreamK.Type
    (
    -- * A class for streams
      IsStream (..)
    , adapt

    -- * The stream type
    , Stream (..)

    -- * Construction
    , mkStream
    , fromStopK
    , fromYieldK
    , consK

    -- * Elimination
    , foldStream
    , foldStreamShared
    , foldl'
    , foldlx'

    -- * foldr/build
    , foldrM
    , foldrS
    , foldrSShared
    , foldrSM
    , build
    , buildS
    , buildM
    , buildSM
    , sharedM
    , augmentS
    , augmentSM

    -- instances
    , cons
    , (.:)
    , consMStream
    , consMBy
    , fromEffect
    , fromPure

    , nil
    , nilM
    , conjoin
    , serial
    , map
    , mapM
    , mapMSerial
    , unShare
    , concatMapBy
    , concatMap
    , bindWith
    , concatPairsWith
    , apWith
    , apSerial
    , apSerialDiscardFst
    , apSerialDiscardSnd

    , Streaming   -- deprecated
    )
where

import Control.Monad (ap, (>=>))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Kind (Type)

#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import Prelude hiding (map, mapM, concatMap, foldr)

import Streamly.Internal.Data.SVar

-- $setup
-- >>> import Streamly.Prelude as Stream

------------------------------------------------------------------------------
-- Basic stream type
------------------------------------------------------------------------------

-- | The type @Stream m a@ represents a monadic stream of values of type 'a'
-- constructed using actions in monad 'm'. It uses stop, singleton and yield
-- continuations equivalent to the following direct style type:
--
-- @
-- data Stream m a = Stop | Singleton a | Yield a (Stream m a)
-- @
--
-- To facilitate parallel composition we maintain a local state in an 'SVar'
-- that is shared across and is used for synchronization of the streams being
-- composed.
--
-- The singleton case can be expressed in terms of stop and yield but we have
-- it as a separate case to optimize composition operations for streams with
-- single element.  We build singleton streams in the implementation of 'pure'
-- for Applicative and Monad, and in 'lift' for MonadTrans.

-- XXX remove the Stream type parameter from State as it is always constant.
-- We can remove it from SVar as well

newtype Stream m a =
    MkStream (forall r.
               State Stream m a         -- state
            -> (a -> Stream m a -> m r) -- yield
            -> (a -> m r)               -- singleton
            -> m r                      -- stop
            -> m r
            )

------------------------------------------------------------------------------
-- Types that can behave as a Stream
------------------------------------------------------------------------------

infixr 5 `consM`
infixr 5 |:

-- XXX Use a different SVar based on the stream type. But we need to make sure
-- that we do not lose performance due to polymorphism.
--
-- | Class of types that can represent a stream of elements of some type 'a' in
-- some monad 'm'.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
class
#if __GLASGOW_HASKELL__ >= 806
    ( forall m a. MonadAsync m => Semigroup (t m a)
    , forall m a. MonadAsync m => Monoid (t m a)
    , forall m. Monad m => Functor (t m)
    , forall m. MonadAsync m => Applicative (t m)
    ) =>
#endif
      IsStream t where
    toStream :: t m a -> Stream m a
    fromStream :: Stream m a -> t m a
    -- | Constructs a stream by adding a monadic action at the head of an
    -- existing stream. For example:
    --
    -- @
    -- > toList $ getLine \`consM` getLine \`consM` nil
    -- hello
    -- world
    -- ["hello","world"]
    -- @
    --
    -- /Concurrent (do not use 'fromParallel' to construct infinite streams)/
    --
    -- @since 0.2.0
    consM :: MonadAsync m => m a -> t m a -> t m a
    -- | Operator equivalent of 'consM'. We can read it as "@parallel colon@"
    -- to remember that @|@ comes before ':'.
    --
    -- @
    -- > toList $ getLine |: getLine |: nil
    -- hello
    -- world
    -- ["hello","world"]
    -- @
    --
    -- @
    -- let delay = threadDelay 1000000 >> print 1
    -- drain $ fromSerial  $ delay |: delay |: delay |: nil
    -- drain $ fromParallel $ delay |: delay |: delay |: nil
    -- @
    --
    -- /Concurrent (do not use 'fromParallel' to construct infinite streams)/
    --
    -- @since 0.2.0
    (|:) :: MonadAsync m => m a -> t m a -> t m a
    -- We can define (|:) just as 'consM' but it is defined explicitly for each
    -- type because we want to use SPECIALIZE pragma on the definition.

-- | Same as 'IsStream'.
--
-- @since 0.1.0
{-# DEPRECATED Streaming "Please use IsStream instead." #-}
type Streaming = IsStream

-------------------------------------------------------------------------------
-- Type adapting combinators
-------------------------------------------------------------------------------

-- XXX Move/reset the State here by reconstructing the stream with cleared
-- state. Can we make sure we do not do that when t1 = t2? If we do this then
-- we do not need to do that explicitly using svarStyle.  It would act as
-- unShare when the stream type is the same.
--
-- | Adapt any specific stream type to any other specific stream type.
--
-- /Since: 0.1.0 ("Streamly")/
--
-- @since 0.8.0
adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a
adapt :: t1 m a -> t2 m a
adapt = Stream m a -> t2 m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> t2 m a)
-> (t1 m a -> Stream m a) -> t1 m a -> t2 m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t1 m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream

------------------------------------------------------------------------------
-- Building a stream
------------------------------------------------------------------------------

-- XXX The State is always parameterized by "Stream" which means State is not
-- different for different stream types. So we have to manually make sure that
-- when converting from one stream to another we migrate the state correctly.
-- This can be fixed if we use a different SVar type for different streams.
-- Currently we always use "SVar Stream" and therefore a different State type
-- parameterized by that stream.
--
-- XXX Since t is coercible we should be able to coerce k
-- mkStream k = fromStream $ MkStream $ coerce k
--
-- | Build a stream from an 'SVar', a stop continuation, a singleton stream
-- continuation and a yield continuation.
{-# INLINE_EARLY mkStream #-}
mkStream :: IsStream t
    => (forall r. State Stream m a
        -> (a -> t m a -> m r)
        -> (a -> m r)
        -> m r
        -> m r)
    -> t m a
mkStream :: (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream forall r.
State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r
k = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
MkStream ((forall r.
  State Stream m a
  -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
 -> Stream m a)
-> (forall r.
    State Stream m a
    -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp ->
    let yieldk :: a -> t m a -> m r
yieldk a
a t m a
r = a -> Stream m a -> m r
yld a
a (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
r)
     in State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r
forall r.
State Stream m a -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r
k State Stream m a
st a -> t m a -> m r
forall (t :: (* -> *) -> * -> *). IsStream t => a -> t m a -> m r
yieldk a -> m r
sng m r
stp

{-# RULES "mkStream from stream" mkStream = mkStreamFromStream #-}
mkStreamFromStream :: IsStream t
    => (forall r. State Stream m a
        -> (a -> Stream m a -> m r)
        -> (a -> m r)
        -> m r
        -> m r)
    -> t m a
mkStreamFromStream :: (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStreamFromStream forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r
k = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
MkStream forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r
k

{-# RULES "mkStream stream" mkStream = mkStreamStream #-}
mkStreamStream
    :: (forall r. State Stream m a
        -> (a -> Stream m a -> m r)
        -> (a -> m r)
        -> m r
        -> m r)
    -> Stream m a
mkStreamStream :: (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
mkStreamStream = (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
MkStream

-- | A terminal function that has no continuation to follow.
type StopK m = forall r. m r -> m r

-- | A monadic continuation, it is a function that yields a value of type "a"
-- and calls the argument (a -> m r) as a continuation with that value. We can
-- also think of it as a callback with a handler (a -> m r).  Category
-- theorists call it a codensity type, a special type of right kan extension.
type YieldK m a = forall r. (a -> m r) -> m r

_wrapM :: Monad m => m a -> YieldK m a
_wrapM :: m a -> YieldK m a
_wrapM m a
m = \a -> m r
k -> m a
m m a -> (a -> m r) -> m r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> m r
k

-- | Make an empty stream from a stop function.
fromStopK :: IsStream t => StopK m -> t m a
fromStopK :: StopK m -> t m a
fromStopK StopK m
k = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
_ a -> t m a -> m r
_ a -> m r
_ m r
stp -> m r -> m r
StopK m
k m r
stp

-- | Make a singleton stream from a callback function. The callback function
-- calls the one-shot yield continuation to yield an element.
fromYieldK :: IsStream t => YieldK m a -> t m a
fromYieldK :: YieldK m a -> t m a
fromYieldK YieldK m a
k = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
_ a -> t m a -> m r
_ a -> m r
sng m r
_ -> (a -> m r) -> m r
YieldK m a
k a -> m r
sng

-- | Add a yield function at the head of the stream.
consK :: IsStream t => YieldK m a -> t m a -> t m a
consK :: YieldK m a -> t m a -> t m a
consK YieldK m a
k t m a
r = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
_ a -> t m a -> m r
yld a -> m r
_ m r
_ -> (a -> m r) -> m r
YieldK m a
k (\a
x -> a -> t m a -> m r
yld a
x t m a
r)

-- XXX Build a stream from a repeating callback function.

------------------------------------------------------------------------------
-- Construction
------------------------------------------------------------------------------

infixr 5 `cons`

-- faster than consM because there is no bind.
-- | Construct a stream by adding a pure value at the head of an existing
-- stream. For serial streams this is the same as @(return a) \`consM` r@ but
-- more efficient. For concurrent streams this is not concurrent whereas
-- 'consM' is concurrent. For example:
--
-- @
-- > toList $ 1 \`cons` 2 \`cons` 3 \`cons` nil
-- [1,2,3]
-- @
--
-- @since 0.1.0
{-# INLINE_NORMAL cons #-}
cons :: IsStream t => a -> t m a -> t m a
cons :: a -> t m a -> t m a
cons a
a t m a
r = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
_ a -> t m a -> m r
yld a -> m r
_ m r
_ -> a -> t m a -> m r
yld a
a t m a
r

infixr 5 .:

-- | Operator equivalent of 'cons'.
--
-- @
-- > toList $ 1 .: 2 .: 3 .: nil
-- [1,2,3]
-- @
--
-- @since 0.1.1
{-# INLINE (.:) #-}
(.:) :: IsStream t => a -> t m a -> t m a
.: :: a -> t m a -> t m a
(.:) = a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
cons

-- | An empty stream.
--
-- @
-- > toList nil
-- []
-- @
--
-- @since 0.1.0
{-# INLINE_NORMAL nil #-}
nil :: IsStream t => t m a
nil :: t m a
nil = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
_ a -> t m a -> m r
_ a -> m r
_ m r
stp -> m r
stp

-- | An empty stream producing a side effect.
--
-- @
-- > toList (nilM (print "nil"))
-- "nil"
-- []
-- @
--
-- /Pre-release/
{-# INLINE_NORMAL nilM #-}
nilM :: (IsStream t, Monad m) => m b -> t m a
nilM :: m b -> t m a
nilM m b
m = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
_ a -> t m a -> m r
_ a -> m r
_ m r
stp -> m b
m m b -> m r -> m r
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m r
stp

{-# INLINE_NORMAL fromPure #-}
fromPure :: IsStream t => a -> t m a
fromPure :: a -> t m a
fromPure a
a = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
_ a -> t m a -> m r
_ a -> m r
single m r
_ -> a -> m r
single a
a

{-# INLINE_NORMAL fromEffect #-}
fromEffect :: (Monad m, IsStream t) => m a -> t m a
fromEffect :: m a -> t m a
fromEffect m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
 -> Stream m a)
-> (forall r.
    State Stream m a
    -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
_ a -> Stream m a -> m r
_ a -> m r
single m r
_ -> m a
m m a -> (a -> m r) -> m r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> m r
single

-- XXX specialize to IO?
{-# INLINE consMBy #-}
consMBy :: (IsStream t, MonadAsync m) => (t m a -> t m a -> t m a)
    -> m a -> t m a -> t m a
consMBy :: (t m a -> t m a -> t m a) -> m a -> t m a -> t m a
consMBy t m a -> t m a -> t m a
f m a
m t m a
r = (Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect m a
m) t m a -> t m a -> t m a
`f` t m a
r

------------------------------------------------------------------------------
-- Folding a stream
------------------------------------------------------------------------------

-- | Fold a stream by providing an SVar, a stop continuation, a singleton
-- continuation and a yield continuation. The stream would share the current
-- SVar passed via the State.
{-# INLINE_EARLY foldStreamShared #-}
foldStreamShared
    :: IsStream t
    => State Stream m a
    -> (a -> t m a -> m r)
    -> (a -> m r)
    -> m r
    -> t m a
    -> m r
foldStreamShared :: State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp t m a
m =
    let yieldk :: a -> Stream m a -> m r
yieldk a
a Stream m a
x = a -> t m a -> m r
yld a
a (Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream Stream m a
x)
        MkStream forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r
k = t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m
     in State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r
forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r
k State Stream m a
st a -> Stream m a -> m r
yieldk a -> m r
sng m r
stp

-- XXX write a similar rule for foldStream as well?
{-# RULES "foldStreamShared from stream"
   foldStreamShared = foldStreamSharedStream #-}
foldStreamSharedStream
    :: State Stream m a
    -> (a -> Stream m a -> m r)
    -> (a -> m r)
    -> m r
    -> Stream m a
    -> m r
foldStreamSharedStream :: State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
foldStreamSharedStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp Stream m a
m =
    let MkStream forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r
k = Stream m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream Stream m a
m
     in State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r
forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r
k State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp

-- | Fold a stream by providing a State, stop continuation, a singleton
-- continuation and a yield continuation. The stream will not use the SVar
-- passed via State.
{-# INLINE foldStream #-}
foldStream
    :: IsStream t
    => State Stream m a
    -> (a -> t m a -> m r)
    -> (a -> m r)
    -> m r
    -> t m a
    -> m r
foldStream :: State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp t m a
m =
    let yieldk :: a -> Stream m a -> m r
yieldk a
a Stream m a
x = a -> t m a -> m r
yld a
a (Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream Stream m a
x)
        MkStream forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r
k = t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m
     in State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r
forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r
k (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
st) a -> Stream m a -> m r
yieldk a -> m r
sng m r
stp

-------------------------------------------------------------------------------
-- Instances
-------------------------------------------------------------------------------

-- NOTE: specializing the function outside the instance definition seems to
-- improve performance quite a bit at times, even if we have the same
-- SPECIALIZE in the instance definition.
{-# INLINE consMStream #-}
{-# SPECIALIZE consMStream :: IO a -> Stream IO a -> Stream IO a #-}
consMStream :: (Monad m) => m a -> Stream m a -> Stream m a
consMStream :: m a -> Stream m a -> Stream m a
consMStream m a
m Stream m a
r = (forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
 State Stream m a
 -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
MkStream ((forall r.
  State Stream m a
  -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
 -> Stream m a)
-> (forall r.
    State Stream m a
    -> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
_ a -> Stream m a -> m r
yld a -> m r
_ m r
_ -> m a
m m a -> (a -> m r) -> m r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
a -> a -> Stream m a -> m r
yld a
a Stream m a
r

-------------------------------------------------------------------------------
-- IsStream Stream
-------------------------------------------------------------------------------

instance IsStream Stream where
    toStream :: Stream m a -> Stream m a
toStream = Stream m a -> Stream m a
forall a. a -> a
id
    fromStream :: Stream m a -> Stream m a
fromStream = Stream m a -> Stream m a
forall a. a -> a
id

    {-# INLINE consM #-}
    {-# SPECIALIZE consM :: IO a -> Stream IO a -> Stream IO a #-}
    consM :: Monad m => m a -> Stream m a -> Stream m a
    consM :: m a -> Stream m a -> Stream m a
consM = m a -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
consMStream

    {-# INLINE (|:) #-}
    {-# SPECIALIZE (|:) :: IO a -> Stream IO a -> Stream IO a #-}
    (|:) :: Monad m => m a -> Stream m a -> Stream m a
    |: :: m a -> Stream m a -> Stream m a
(|:) = m a -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
consMStream

-------------------------------------------------------------------------------
-- foldr/build fusion
-------------------------------------------------------------------------------

-- XXX perhaps we can just use foldrSM/buildM everywhere as they are more
-- general and cover foldrS/buildS as well.

-- | The function 'f' decides how to reconstruct the stream. We could
-- reconstruct using a shared state (SVar) or without sharing the state.
--
{-# INLINE foldrSWith #-}
foldrSWith :: IsStream t
    => (forall r. State Stream m b
        -> (b -> t m b -> m r)
        -> (b -> m r)
        -> m r
        -> t m b
        -> m r)
    -> (a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrSWith :: (forall r.
 State Stream m b
 -> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r)
-> (a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrSWith forall r.
State Stream m b
-> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r
f a -> t m b -> t m b
step t m b
final t m a
m = t m a -> t m b
forall (t :: (* -> *) -> * -> *). IsStream t => t m a -> t m b
go t m a
m
    where
    go :: t m a -> t m b
go t m a
m1 = (forall r.
 State Stream m b
 -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
-> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m b
  -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
 -> t m b)
-> (forall r.
    State Stream m b
    -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
-> t m b
forall a b. (a -> b) -> a -> b
$ \State Stream m b
st b -> t m b -> m r
yld b -> m r
sng m r
stp ->
        let run :: t m b -> m r
run t m b
x = State Stream m b
-> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r
forall r.
State Stream m b
-> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r
f State Stream m b
st b -> t m b -> m r
yld b -> m r
sng m r
stp t m b
x
            stop :: m r
stop = t m b -> m r
run t m b
final
            single :: a -> m r
single a
a = t m b -> m r
run (t m b -> m r) -> t m b -> m r
forall a b. (a -> b) -> a -> b
$ a -> t m b -> t m b
step a
a t m b
final
            yieldk :: a -> t m a -> m r
yieldk a
a t m a
r = t m b -> m r
run (t m b -> m r) -> t m b -> m r
forall a b. (a -> b) -> a -> b
$ a -> t m b -> t m b
step a
a (t m a -> t m b
go t m a
r)
         -- XXX if type a and b are the same we do not need adaptState, can we
         -- save some perf with that?
         -- XXX since we are using adaptState anyway here we can use
         -- foldStreamShared instead, will that save some perf?
         in State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream (State Stream m b -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m b
st) a -> t m a -> m r
yieldk a -> m r
single m r
stop t m a
m1

-- XXX we can use rewrite rules just for foldrSWith, if the function f is the
-- same we can rewrite it.

-- | Fold sharing the SVar state within the reconstructed stream
{-# INLINE_NORMAL foldrSShared #-}
foldrSShared :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrSShared :: (a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrSShared = (forall r.
 State Stream m b
 -> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r)
-> (a -> t m b -> t m b) -> t m b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(forall r.
 State Stream m b
 -> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r)
-> (a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrSWith forall r.
State Stream m b
-> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared

-- XXX consM is a typeclass method, therefore rewritten already. Instead maybe
-- we can make consM polymorphic using rewrite rules.
-- {-# RULES "foldrSShared/id"     foldrSShared consM nil = \x -> x #-}
{-# RULES "foldrSShared/nil"
    forall k z. foldrSShared k z nil = z #-}
{-# RULES "foldrSShared/single"
    forall k z x. foldrSShared k z (fromPure x) = k x z #-}
-- {-# RULES "foldrSShared/app" [1]
--     forall ys. foldrSShared consM ys = \xs -> xs `conjoin` ys #-}

-- | Lazy right associative fold to a stream.
{-# INLINE_NORMAL foldrS #-}
foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrS :: (a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrS = (forall r.
 State Stream m b
 -> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r)
-> (a -> t m b -> t m b) -> t m b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(forall r.
 State Stream m b
 -> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r)
-> (a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrSWith forall r.
State Stream m b
-> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream

{-# RULES "foldrS/id"     foldrS cons nil = \x -> x #-}
{-# RULES "foldrS/nil"    forall k z.   foldrS k z nil  = z #-}
-- See notes in GHC.Base about this rule
-- {-# RULES "foldr/cons"
--  forall k z x xs. foldrS k z (x `cons` xs) = k x (foldrS k z xs) #-}
{-# RULES "foldrS/single" forall k z x. foldrS k z (fromPure x) = k x z #-}
-- {-# RULES "foldrS/app" [1]
--  forall ys. foldrS cons ys = \xs -> xs `conjoin` ys #-}

-------------------------------------------------------------------------------
-- foldrS with monadic cons i.e. consM
-------------------------------------------------------------------------------

{-# INLINE foldrSMWith #-}
foldrSMWith :: (IsStream t, Monad m)
    => (forall r. State Stream m b
        -> (b -> t m b -> m r)
        -> (b -> m r)
        -> m r
        -> t m b
        -> m r)
    -> (m a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrSMWith :: (forall r.
 State Stream m b
 -> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r)
-> (m a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrSMWith forall r.
State Stream m b
-> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r
f m a -> t m b -> t m b
step t m b
final t m a
m = t m a -> t m b
forall (t :: (* -> *) -> * -> *). IsStream t => t m a -> t m b
go t m a
m
    where
    go :: t m a -> t m b
go t m a
m1 = (forall r.
 State Stream m b
 -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
-> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m b
  -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
 -> t m b)
-> (forall r.
    State Stream m b
    -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
-> t m b
forall a b. (a -> b) -> a -> b
$ \State Stream m b
st b -> t m b -> m r
yld b -> m r
sng m r
stp ->
        let run :: t m b -> m r
run t m b
x = State Stream m b
-> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r
forall r.
State Stream m b
-> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r
f State Stream m b
st b -> t m b -> m r
yld b -> m r
sng m r
stp t m b
x
            stop :: m r
stop = t m b -> m r
run t m b
final
            single :: a -> m r
single a
a = t m b -> m r
run (t m b -> m r) -> t m b -> m r
forall a b. (a -> b) -> a -> b
$ m a -> t m b -> t m b
step (a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a) t m b
final
            yieldk :: a -> t m a -> m r
yieldk a
a t m a
r = t m b -> m r
run (t m b -> m r) -> t m b -> m r
forall a b. (a -> b) -> a -> b
$ m a -> t m b -> t m b
step (a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a) (t m a -> t m b
go t m a
r)
         in State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream (State Stream m b -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m b
st) a -> t m a -> m r
yieldk a -> m r
single m r
stop t m a
m1

{-# INLINE_NORMAL foldrSM #-}
foldrSM :: (IsStream t, Monad m)
    => (m a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrSM :: (m a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrSM = (forall r.
 State Stream m b
 -> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r)
-> (m a -> t m b -> t m b) -> t m b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(forall r.
 State Stream m b
 -> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r)
-> (m a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrSMWith forall r.
State Stream m b
-> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream

-- {-# RULES "foldrSM/id"     foldrSM consM nil = \x -> x #-}
{-# RULES "foldrSM/nil"    forall k z.   foldrSM k z nil  = z #-}
{-# RULES "foldrSM/single" forall k z x. foldrSM k z (fromEffect x) = k x z #-}
-- {-# RULES "foldrSM/app" [1]
--  forall ys. foldrSM consM ys = \xs -> xs `conjoin` ys #-}

-- Like foldrSM but sharing the SVar state within the recostructed stream.
{-# INLINE_NORMAL foldrSMShared #-}
foldrSMShared :: (IsStream t, Monad m)
    => (m a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrSMShared :: (m a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrSMShared = (forall r.
 State Stream m b
 -> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r)
-> (m a -> t m b -> t m b) -> t m b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(forall r.
 State Stream m b
 -> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r)
-> (m a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrSMWith forall r.
State Stream m b
-> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared

-- {-# RULES "foldrSM/id"     foldrSM consM nil = \x -> x #-}
{-# RULES "foldrSMShared/nil"
    forall k z. foldrSMShared k z nil = z #-}
{-# RULES "foldrSMShared/single"
    forall k z x. foldrSMShared k z (fromEffect x) = k x z #-}
-- {-# RULES "foldrSM/app" [1]
--  forall ys. foldrSM consM ys = \xs -> xs `conjoin` ys #-}

-------------------------------------------------------------------------------
-- build
-------------------------------------------------------------------------------

{-# INLINE_NORMAL build #-}
build :: IsStream t => forall a. (forall b. (a -> b -> b) -> b -> b) -> t m a
build :: forall a. (forall b. (a -> b -> b) -> b -> b) -> t m a
build forall b. (a -> b -> b) -> b -> b
g = (a -> t m a -> t m a) -> t m a -> t m a
forall b. (a -> b -> b) -> b -> b
g a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
cons t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
nil

{-# RULES "foldrM/build"
    forall k z (g :: forall b. (a -> b -> b) -> b -> b).
    foldrM k z (build g) = g k z #-}

{-# RULES "foldrS/build"
      forall k z (g :: forall b. (a -> b -> b) -> b -> b).
      foldrS k z (build g) = g k z #-}

{-# RULES "foldrS/cons/build"
      forall k z x (g :: forall b. (a -> b -> b) -> b -> b).
      foldrS k z (x `cons` build g) = k x (g k z) #-}

{-# RULES "foldrSShared/build"
      forall k z (g :: forall b. (a -> b -> b) -> b -> b).
      foldrSShared k z (build g) = g k z #-}

{-# RULES "foldrSShared/cons/build"
      forall k z x (g :: forall b. (a -> b -> b) -> b -> b).
      foldrSShared k z (x `cons` build g) = k x (g k z) #-}

-- build a stream by applying cons and nil to a build function
{-# INLINE_NORMAL buildS #-}
buildS :: IsStream t => ((a -> t m a -> t m a) -> t m a -> t m a) -> t m a
buildS :: ((a -> t m a -> t m a) -> t m a -> t m a) -> t m a
buildS (a -> t m a -> t m a) -> t m a -> t m a
g = (a -> t m a -> t m a) -> t m a -> t m a
g a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
cons t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
nil

{-# RULES "foldrS/buildS"
      forall k z (g :: (a -> t m a -> t m a) -> t m a -> t m a).
      foldrS k z (buildS g) = g k z #-}

{-# RULES "foldrS/cons/buildS"
      forall k z x (g :: (a -> t m a -> t m a) -> t m a -> t m a).
      foldrS k z (x `cons` buildS g) = k x (g k z) #-}

{-# RULES "foldrSShared/buildS"
      forall k z (g :: (a -> t m a -> t m a) -> t m a -> t m a).
      foldrSShared k z (buildS g) = g k z #-}

{-# RULES "foldrSShared/cons/buildS"
      forall k z x (g :: (a -> t m a -> t m a) -> t m a -> t m a).
      foldrSShared k z (x `cons` buildS g) = k x (g k z) #-}

-- build a stream by applying consM and nil to a build function
{-# INLINE_NORMAL buildSM #-}
buildSM :: (IsStream t, MonadAsync m)
    => ((m a -> t m a -> t m a) -> t m a -> t m a) -> t m a
buildSM :: ((m a -> t m a -> t m a) -> t m a -> t m a) -> t m a
buildSM (m a -> t m a -> t m a) -> t m a -> t m a
g = (m a -> t m a -> t m a) -> t m a -> t m a
g m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
consM t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
nil

{-# RULES "foldrSM/buildSM"
     forall k z (g :: (m a -> t m a -> t m a) -> t m a -> t m a).
     foldrSM k z (buildSM g) = g k z #-}

{-# RULES "foldrSMShared/buildSM"
     forall k z (g :: (m a -> t m a -> t m a) -> t m a -> t m a).
     foldrSMShared k z (buildSM g) = g k z #-}

-- Disabled because this may not fire as consM is a class Op
{-
{-# RULES "foldrS/consM/buildSM"
      forall k z x (g :: (m a -> t m a -> t m a) -> t m a -> t m a)
    . foldrSM k z (x `consM` buildSM g)
    = k x (g k z)
#-}
-}

-- Build using monadic build functions (continuations) instead of
-- reconstructing a stream.
{-# INLINE_NORMAL buildM #-}
buildM :: (IsStream t, MonadAsync m)
    => (forall r. (a -> t m a -> m r)
        -> (a -> m r)
        -> m r
        -> m r
       )
    -> t m a
buildM :: (forall r. (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
buildM forall r. (a -> t m a -> m r) -> (a -> m r) -> m r -> m r
g = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp ->
    (a -> t m a -> m r) -> (a -> m r) -> m r -> m r
forall r. (a -> t m a -> m r) -> (a -> m r) -> m r -> m r
g (\a
a t m a
r -> State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp (a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
`consM` t m a
r)) a -> m r
sng m r
stp

-- | Like 'buildM' but shares the SVar state across computations.
{-# INLINE_NORMAL sharedM #-}
sharedM :: (IsStream t, MonadAsync m)
    => (forall r. (a -> t m a -> m r)
        -> (a -> m r)
        -> m r
        -> m r
       )
    -> t m a
sharedM :: (forall r. (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
sharedM forall r. (a -> t m a -> m r) -> (a -> m r) -> m r -> m r
g = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp ->
    (a -> t m a -> m r) -> (a -> m r) -> m r -> m r
forall r. (a -> t m a -> m r) -> (a -> m r) -> m r -> m r
g (\a
a t m a
r -> State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp (a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
`consM` t m a
r)) a -> m r
sng m r
stp

-------------------------------------------------------------------------------
-- augment
-------------------------------------------------------------------------------

{-# INLINE_NORMAL augmentS #-}
augmentS :: IsStream t
    => ((a -> t m a -> t m a) -> t m a -> t m a) -> t m a -> t m a
augmentS :: ((a -> t m a -> t m a) -> t m a -> t m a) -> t m a -> t m a
augmentS (a -> t m a -> t m a) -> t m a -> t m a
g t m a
xs = (a -> t m a -> t m a) -> t m a -> t m a
g a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
cons t m a
xs

{-# RULES "augmentS/nil"
    forall (g :: (a -> t m a -> t m a) -> t m a -> t m a).
    augmentS g nil = buildS g
    #-}

{-# RULES "foldrS/augmentS"
    forall k z xs (g :: (a -> t m a -> t m a) -> t m a -> t m a).
    foldrS k z (augmentS g xs) = g k (foldrS k z xs)
    #-}

{-# RULES "augmentS/buildS"
    forall (g :: (a -> t m a -> t m a) -> t m a -> t m a)
           (h :: (a -> t m a -> t m a) -> t m a -> t m a).
    augmentS g (buildS h) = buildS (\c n -> g c (h c n))
    #-}

{-# INLINE_NORMAL augmentSM #-}
augmentSM :: (IsStream t, MonadAsync m)
    => ((m a -> t m a -> t m a) -> t m a -> t m a) -> t m a -> t m a
augmentSM :: ((m a -> t m a -> t m a) -> t m a -> t m a) -> t m a -> t m a
augmentSM (m a -> t m a -> t m a) -> t m a -> t m a
g t m a
xs = (m a -> t m a -> t m a) -> t m a -> t m a
g m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
consM t m a
xs

{-# RULES "augmentSM/nil"
    forall (g :: (m a -> t m a -> t m a) -> t m a -> t m a).
    augmentSM g nil = buildSM g
    #-}

{-# RULES "foldrSM/augmentSM"
    forall k z xs (g :: (m a -> t m a -> t m a) -> t m a -> t m a).
    foldrSM k z (augmentSM g xs) = g k (foldrSM k z xs)
    #-}

{-# RULES "augmentSM/buildSM"
    forall (g :: (m a -> t m a -> t m a) -> t m a -> t m a)
           (h :: (m a -> t m a -> t m a) -> t m a -> t m a).
    augmentSM g (buildSM h) = buildSM (\c n -> g c (h c n))
    #-}

-------------------------------------------------------------------------------
-- Experimental foldrM/buildM
-------------------------------------------------------------------------------

-- | Lazy right fold with a monadic step function.
{-# INLINE_NORMAL foldrM #-}
foldrM :: IsStream t => (a -> m b -> m b) -> m b -> t m a -> m b
foldrM :: (a -> m b -> m b) -> m b -> t m a -> m b
foldrM a -> m b -> m b
step m b
acc t m a
m = t m a -> m b
forall (t :: (* -> *) -> * -> *). IsStream t => t m a -> m b
go t m a
m
    where
    go :: t m a -> m b
go t m a
m1 =
        let stop :: m b
stop = m b
acc
            single :: a -> m b
single a
a = a -> m b -> m b
step a
a m b
acc
            yieldk :: a -> t m a -> m b
yieldk a
a t m a
r = a -> m b -> m b
step a
a (t m a -> m b
go t m a
r)
        in State Stream m a
-> (a -> t m a -> m b) -> (a -> m b) -> m b -> t m a -> m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> t m a -> m b
yieldk a -> m b
single m b
stop t m a
m1

{-# INLINE_NORMAL foldrMKWith #-}
foldrMKWith
    :: (State Stream m a
        -> (a -> t m a -> m b)
        -> (a -> m b)
        -> m b
        -> t m a
        -> m b)
    -> (a -> m b -> m b)
    -> m b
    -> ((a -> t m a -> m b) -> (a -> m b) -> m b -> m b)
    -> m b
foldrMKWith :: (State Stream m a
 -> (a -> t m a -> m b) -> (a -> m b) -> m b -> t m a -> m b)
-> (a -> m b -> m b)
-> m b
-> ((a -> t m a -> m b) -> (a -> m b) -> m b -> m b)
-> m b
foldrMKWith State Stream m a
-> (a -> t m a -> m b) -> (a -> m b) -> m b -> t m a -> m b
f a -> m b -> m b
step m b
acc (a -> t m a -> m b) -> (a -> m b) -> m b -> m b
g = ((a -> t m a -> m b) -> (a -> m b) -> m b -> m b) -> m b
go (a -> t m a -> m b) -> (a -> m b) -> m b -> m b
g
    where
    go :: ((a -> t m a -> m b) -> (a -> m b) -> m b -> m b) -> m b
go (a -> t m a -> m b) -> (a -> m b) -> m b -> m b
k =
        let stop :: m b
stop = m b
acc
            single :: a -> m b
single a
a = a -> m b -> m b
step a
a m b
acc
            yieldk :: a -> t m a -> m b
yieldk a
a t m a
r = a -> m b -> m b
step a
a (((a -> t m a -> m b) -> (a -> m b) -> m b -> m b) -> m b
go (\a -> t m a -> m b
yld a -> m b
sng m b
stp -> State Stream m a
-> (a -> t m a -> m b) -> (a -> m b) -> m b -> t m a -> m b
f State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> t m a -> m b
yld a -> m b
sng m b
stp t m a
r))
        in (a -> t m a -> m b) -> (a -> m b) -> m b -> m b
k a -> t m a -> m b
yieldk a -> m b
single m b
stop

{-
{-# RULES "foldrM/buildS"
      forall k z (g :: (a -> t m a -> t m a) -> t m a -> t m a)
    . foldrM k z (buildS g)
    = g k z
#-}
-}
-- XXX in which case will foldrM/buildM fusion be useful?
{-# RULES "foldrM/buildM"
    forall step acc (g :: (forall r.
           (a -> t m a -> m r)
        -> (a -> m r)
        -> m r
        -> m r
       )).
    foldrM step acc (buildM g) = foldrMKWith foldStream step acc g
    #-}

{-# RULES "foldrM/sharedM"
    forall step acc (g :: (forall r.
           (a -> t m a -> m r)
        -> (a -> m r)
        -> m r
        -> m r
       )).
    foldrM step acc (sharedM g) = foldrMKWith foldStreamShared step acc g
    #-}

------------------------------------------------------------------------------
-- Left fold
------------------------------------------------------------------------------

-- | Strict left fold with an extraction function. Like the standard strict
-- left fold, but applies a user supplied extraction function (the third
-- argument) to the folded value at the end. This is designed to work with the
-- @foldl@ library. The suffix @x@ is a mnemonic for extraction.
--
-- Note that the accumulator is always evaluated including the initial value.
{-# INLINE foldlx' #-}
foldlx' :: forall t m a b x. (IsStream t, Monad m)
    => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b
foldlx' :: (x -> a -> x) -> x -> (x -> b) -> t m a -> m b
foldlx' x -> a -> x
step x
begin x -> b
done t m a
m = t m x -> m b
get (t m x -> m b) -> t m x -> m b
forall a b. (a -> b) -> a -> b
$ t m a -> x -> t m x
go t m a
m x
begin
    where
    {-# NOINLINE get #-}
    get :: t m x -> m b
    get :: t m x -> m b
get t m x
m1 =
        -- XXX we are not strictly evaluating the accumulator here. Is this
        -- okay?
        let single :: x -> m b
single = b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> m b) -> (x -> b) -> x -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> b
done
        -- XXX this is foldSingleton. why foldStreamShared?
         in State Stream m x
-> (x -> t m x -> m b) -> (x -> m b) -> m b -> t m x -> m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m x
forall a. HasCallStack => a
undefined x -> t m x -> m b
forall a. HasCallStack => a
undefined x -> m b
single m b
forall a. HasCallStack => a
undefined t m x
m1

    -- Note, this can be implemented by making a recursive call to "go",
    -- however that is more expensive because of unnecessary recursion
    -- that cannot be tail call optimized. Unfolding recursion explicitly via
    -- continuations is much more efficient.
    go :: t m a -> x -> t m x
    go :: t m a -> x -> t m x
go t m a
m1 !x
acc = (forall r.
 State Stream m x
 -> (x -> t m x -> m r) -> (x -> m r) -> m r -> m r)
-> t m x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m x
  -> (x -> t m x -> m r) -> (x -> m r) -> m r -> m r)
 -> t m x)
-> (forall r.
    State Stream m x
    -> (x -> t m x -> m r) -> (x -> m r) -> m r -> m r)
-> t m x
forall a b. (a -> b) -> a -> b
$ \State Stream m x
_ x -> t m x -> m r
yld x -> m r
sng m r
_ ->
        let stop :: m r
stop = x -> m r
sng x
acc
            single :: a -> m r
single a
a = x -> m r
sng (x -> m r) -> x -> m r
forall a b. (a -> b) -> a -> b
$ x -> a -> x
step x
acc a
a
            -- XXX this is foldNonEmptyStream
            yieldk :: a -> t m a -> m r
yieldk a
a t m a
r = State Stream m x
-> (x -> t m x -> m r) -> (x -> m r) -> m r -> t m x -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m x
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState x -> t m x -> m r
yld x -> m r
sng m r
forall a. HasCallStack => a
undefined (t m x -> m r) -> t m x -> m r
forall a b. (a -> b) -> a -> b
$
                t m a -> x -> t m x
go t m a
r (x -> a -> x
step x
acc a
a)
        in State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState a -> t m a -> m r
yieldk a -> m r
single m r
stop t m a
m1

-- | Strict left associative fold.
{-# INLINE foldl' #-}
foldl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> m b
foldl' :: (b -> a -> b) -> b -> t m a -> m b
foldl' b -> a -> b
step b
begin = (b -> a -> b) -> b -> (b -> b) -> t m a -> m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b x.
(IsStream t, Monad m) =>
(x -> a -> x) -> x -> (x -> b) -> t m a -> m b
foldlx' b -> a -> b
step b
begin b -> b
forall a. a -> a
id

------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------

infixr 6 `serial`

-- | Appends two streams sequentially, yielding all elements from the first
-- stream, and then all elements from the second stream.
--
-- >>> import Streamly.Prelude (serial)
-- >>> stream1 = Stream.fromList [1,2]
-- >>> stream2 = Stream.fromList [3,4]
-- >>> Stream.toList $ stream1 `serial` stream2
-- [1,2,3,4]
--
-- This operation can be used to fold an infinite lazy container of streams.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
{-# INLINE serial #-}
serial :: IsStream t => t m a -> t m a -> t m a
-- XXX This doubles the time of toNullAp benchmark, may not be fusing properly
-- serial xs ys = augmentS (\c n -> foldrS c n xs) ys
serial :: t m a -> t m a -> t m a
serial t m a
m1 t m a
m2 = t m a -> t m a
forall (t :: (* -> *) -> * -> *). IsStream t => t m a -> t m a
go t m a
m1
    where
    go :: t m a -> t m a
go t m a
m = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp ->
               let stop :: m r
stop       = State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp t m a
m2
                   single :: a -> m r
single a
a   = a -> t m a -> m r
yld a
a t m a
m2
                   yieldk :: a -> t m a -> m r
yieldk a
a t m a
r = a -> t m a -> m r
yld a
a (t m a -> t m a
go t m a
r)
               in State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yieldk a -> m r
single m r
stop t m a
m

-- join/merge/append streams depending on consM
{-# INLINE conjoin #-}
conjoin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
conjoin :: t m a -> t m a -> t m a
conjoin t m a
xs t m a
ys = ((m a -> t m a -> t m a) -> t m a -> t m a) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
((m a -> t m a -> t m a) -> t m a -> t m a) -> t m a -> t m a
augmentSM (\m a -> t m a -> t m a
c t m a
n -> (m a -> t m a -> t m a) -> t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(m a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrSM m a -> t m a -> t m a
c t m a
n t m a
xs) t m a
ys

instance Semigroup (Stream m a) where
    <> :: Stream m a -> Stream m a -> Stream m a
(<>) = Stream m a -> Stream m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
serial

------------------------------------------------------------------------------
-- Monoid
------------------------------------------------------------------------------

instance Monoid (Stream m a) where
    mempty :: Stream m a
mempty = Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
nil
    mappend :: Stream m a -> Stream m a -> Stream m a
mappend = Stream m a -> Stream m a -> Stream m a
forall a. Semigroup a => a -> a -> a
(<>)

-------------------------------------------------------------------------------
-- Functor
-------------------------------------------------------------------------------

-- Note eta expanded
{-# INLINE_LATE mapFB #-}
mapFB :: forall (t :: (Type -> Type) -> Type -> Type) b m a.
    (b -> t m b -> t m b) -> (a -> b) -> a -> t m b -> t m b
mapFB :: (b -> t m b -> t m b) -> (a -> b) -> a -> t m b -> t m b
mapFB b -> t m b -> t m b
c a -> b
f = \a
x t m b
ys -> b -> t m b -> t m b
c (a -> b
f a
x) t m b
ys
#undef Type

{-# RULES
"mapFB/mapFB" forall c f g. mapFB (mapFB c f) g = mapFB c (f . g)
"mapFB/id"    forall c.     mapFB c (\x -> x)   = c
    #-}

{-# INLINE map #-}
map :: IsStream t => (a -> b) -> t m a -> t m b
map :: (a -> b) -> t m a -> t m b
map a -> b
f t m a
xs = ((b -> t m b -> t m b) -> t m b -> t m b) -> t m b
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
((a -> t m a -> t m a) -> t m a -> t m a) -> t m a
buildS (\b -> t m b -> t m b
c t m b
n -> (a -> t m b -> t m b) -> t m b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) a (m :: * -> *) b.
IsStream t =>
(a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrS ((b -> t m b -> t m b) -> (a -> b) -> a -> t m b -> t m b
forall (t :: (* -> *) -> * -> *) b (m :: * -> *) a.
(b -> t m b -> t m b) -> (a -> b) -> a -> t m b -> t m b
mapFB b -> t m b -> t m b
c a -> b
f) t m b
n t m a
xs)

-- XXX This definition might potentially be more efficient, but the cost in the
-- benchmark is dominated by unfoldrM cost so we cannot correctly determine
-- differences in the mapping cost. We should perhaps deduct the cost of
-- unfoldrM from the benchmarks and then compare.
{-
map f m = go m
    where
        go m1 =
            mkStream $ \st yld sng stp ->
            let single     = sng . f
                yieldk a r = yld (f a) (go r)
            in foldStream (adaptState st) yieldk single stp m1
-}

{-# INLINE_LATE mapMFB #-}
mapMFB :: Monad m => (m b -> t m b -> t m b) -> (a -> m b) -> m a -> t m b -> t m b
mapMFB :: (m b -> t m b -> t m b) -> (a -> m b) -> m a -> t m b -> t m b
mapMFB m b -> t m b -> t m b
c a -> m b
f = \m a
x t m b
ys -> m b -> t m b -> t m b
c (m a
x m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> m b
f) t m b
ys

{-# RULES
    "mapMFB/mapMFB" forall c f g. mapMFB (mapMFB c f) g = mapMFB c (f >=> g)
    #-}
-- XXX These rules may never fire because pure/return type class rules will
-- fire first.
{-
"mapMFB/pure"    forall c.     mapMFB c (\x -> pure x)   = c
"mapMFB/return"  forall c.     mapMFB c (\x -> return x) = c
-}

-- Be careful when modifying this, this uses a consM (|:) deliberately to allow
-- other stream types to overload it.
{-# INLINE mapM #-}
mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b
mapM :: (a -> m b) -> t m a -> t m b
mapM a -> m b
f = (a -> t m b -> t m b) -> t m b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) a (m :: * -> *) b.
IsStream t =>
(a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrSShared (\a
x t m b
xs -> a -> m b
f a
x m b -> t m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
`consM` t m b
xs) t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
nil
-- See note under map definition above.
{-
mapM f m = go m
    where
    go m1 = mkStream $ \st yld sng stp ->
        let single a  = f a >>= sng
            yieldk a r = foldStreamShared st yld sng stp $ f a |: go r
         in foldStream (adaptState st) yieldk single stp m1
         -}

-- This is experimental serial version supporting fusion.
--
-- XXX what if we do not want to fuse two concurrent mapMs?
-- XXX we can combine two concurrent mapM only if the SVar is of the same type
-- So for now we use it only for serial streams.
-- XXX fusion would be easier for monomoprhic stream types.
-- {-# RULES "mapM serial" mapM = mapMSerial #-}
{-# INLINE mapMSerial #-}
mapMSerial :: MonadAsync m => (a -> m b) -> Stream m a -> Stream m b
mapMSerial :: (a -> m b) -> Stream m a -> Stream m b
mapMSerial a -> m b
f Stream m a
xs = ((m b -> Stream m b -> Stream m b) -> Stream m b -> Stream m b)
-> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
((m a -> t m a -> t m a) -> t m a -> t m a) -> t m a
buildSM (\m b -> Stream m b -> Stream m b
c Stream m b
n -> (m a -> Stream m b -> Stream m b)
-> Stream m b -> Stream m a -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(m a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrSMShared ((m b -> Stream m b -> Stream m b)
-> (a -> m b) -> m a -> Stream m b -> Stream m b
forall (m :: * -> *) b (t :: (* -> *) -> * -> *) a.
Monad m =>
(m b -> t m b -> t m b) -> (a -> m b) -> m a -> t m b -> t m b
mapMFB m b -> Stream m b -> Stream m b
c a -> m b
f) Stream m b
n Stream m a
xs)

-- XXX in fact use the Stream type everywhere and only use polymorphism in the
-- high level modules/prelude.
instance Monad m => Functor (Stream m) where
    fmap :: (a -> b) -> Stream m a -> Stream m b
fmap = (a -> b) -> Stream m a -> Stream m b
forall (t :: (* -> *) -> * -> *) a b (m :: * -> *).
IsStream t =>
(a -> b) -> t m a -> t m b
map

-------------------------------------------------------------------------------
-- Transformers
-------------------------------------------------------------------------------

instance MonadTrans Stream where
    {-# INLINE lift #-}
    lift :: m a -> Stream m a
lift = m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect

-------------------------------------------------------------------------------
-- Nesting
-------------------------------------------------------------------------------

-- | Detach a stream from an SVar
{-# INLINE unShare #-}
unShare :: IsStream t => t m a -> t m a
unShare :: t m a -> t m a
unShare t m a
x = (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp ->
    State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp t m a
x

-- XXX the function stream and value stream can run in parallel
{-# INLINE apWith #-}
apWith
    :: IsStream t
    => (t m b -> t m b -> t m b)
    -> t m (a -> b)
    -> t m a
    -> t m b
apWith :: (t m b -> t m b -> t m b) -> t m (a -> b) -> t m a -> t m b
apWith t m b -> t m b -> t m b
par t m (a -> b)
fstream t m a
stream = t m (a -> b) -> t m b
forall (t :: (* -> *) -> * -> *).
IsStream t =>
t m (a -> b) -> t m b
go1 t m (a -> b)
fstream

    where

    go1 :: t m (a -> b) -> t m b
go1 t m (a -> b)
m =
        (forall r.
 State Stream m b
 -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
-> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m b
  -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
 -> t m b)
-> (forall r.
    State Stream m b
    -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
-> t m b
forall a b. (a -> b) -> a -> b
$ \State Stream m b
st b -> t m b -> m r
yld b -> m r
sng m r
stp ->
            let foldShared :: t m b -> m r
foldShared = State Stream m b
-> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m b
st b -> t m b -> m r
yld b -> m r
sng m r
stp
                single :: (a -> b) -> m r
single a -> b
f   = t m b -> m r
foldShared (t m b -> m r) -> t m b -> m r
forall a b. (a -> b) -> a -> b
$ t m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a
unShare ((a -> b) -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (t :: (* -> *) -> * -> *) t a
       (m :: * -> *).
(IsStream t, IsStream t) =>
(t -> a) -> t m t -> t m a
go2 a -> b
f t m a
stream)
                yieldk :: (a -> b) -> t m (a -> b) -> m r
yieldk a -> b
f t m (a -> b)
r = t m b -> m r
foldShared (t m b -> m r) -> t m b -> m r
forall a b. (a -> b) -> a -> b
$ t m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a
unShare ((a -> b) -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (t :: (* -> *) -> * -> *) t a
       (m :: * -> *).
(IsStream t, IsStream t) =>
(t -> a) -> t m t -> t m a
go2 a -> b
f t m a
stream) t m b -> t m b -> t m b
`par` t m (a -> b) -> t m b
go1 t m (a -> b)
r
            in State Stream m (a -> b)
-> ((a -> b) -> t m (a -> b) -> m r)
-> ((a -> b) -> m r)
-> m r
-> t m (a -> b)
-> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream (State Stream m b -> State Stream m (a -> b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m b
st) (a -> b) -> t m (a -> b) -> m r
yieldk (a -> b) -> m r
single m r
stp t m (a -> b)
m

    go2 :: (t -> a) -> t m t -> t m a
go2 t -> a
f t m t
m =
        (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp ->
            let single :: t -> m r
single t
a   = a -> m r
sng (t -> a
f t
a)
                yieldk :: t -> t m t -> m r
yieldk t
a t m t
r = a -> t m a -> m r
yld (t -> a
f t
a) ((t -> a) -> t m t -> t m a
go2 t -> a
f t m t
r)
            in State Stream m t
-> (t -> t m t -> m r) -> (t -> m r) -> m r -> t m t -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream (State Stream m a -> State Stream m t
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
st) t -> t m t -> m r
yieldk t -> m r
single m r
stp t m t
m

{-# INLINE apSerial #-}
apSerial
    :: IsStream t
    => t m (a -> b)
    -> t m a
    -> t m b
apSerial :: t m (a -> b) -> t m a -> t m b
apSerial t m (a -> b)
fstream t m a
stream = t m (a -> b) -> t m b
forall (t :: (* -> *) -> * -> *) (t :: (* -> *) -> * -> *) a.
(IsStream t, IsStream t) =>
t m (a -> a) -> t m a
go1 t m (a -> b)
fstream

    where

    go1 :: t m (a -> a) -> t m a
go1 t m (a -> a)
m =
        (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp ->
            let foldShared :: t m a -> m r
foldShared = State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp
                single :: (a -> a) -> m r
single a -> a
f   = t m a -> m r
foldShared (t m a -> m r) -> t m a -> m r
forall a b. (a -> b) -> a -> b
$ (a -> a) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (t :: (* -> *) -> * -> *) t a
       (m :: * -> *).
(IsStream t, IsStream t) =>
(t -> a) -> t m t -> t m a
go3 a -> a
f t m a
stream
                yieldk :: (a -> a) -> t m (a -> a) -> m r
yieldk a -> a
f t m (a -> a)
r = t m a -> m r
foldShared (t m a -> m r) -> t m a -> m r
forall a b. (a -> b) -> a -> b
$ (a -> a) -> t m (a -> a) -> t m a -> t m a
go2 a -> a
f t m (a -> a)
r t m a
stream
            in State Stream m (a -> a)
-> ((a -> a) -> t m (a -> a) -> m r)
-> ((a -> a) -> m r)
-> m r
-> t m (a -> a)
-> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream (State Stream m a -> State Stream m (a -> a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
st) (a -> a) -> t m (a -> a) -> m r
yieldk (a -> a) -> m r
single m r
stp t m (a -> a)
m

    go2 :: (a -> a) -> t m (a -> a) -> t m a -> t m a
go2 a -> a
f t m (a -> a)
r1 t m a
m =
        (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp ->
            let foldShared :: t m a -> m r
foldShared = State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp
                stop :: m r
stop = t m a -> m r
foldShared (t m a -> m r) -> t m a -> m r
forall a b. (a -> b) -> a -> b
$ t m (a -> a) -> t m a
go1 t m (a -> a)
r1
                single :: a -> m r
single a
a   = a -> t m a -> m r
yld (a -> a
f a
a) (t m (a -> a) -> t m a
go1 t m (a -> a)
r1)
                yieldk :: a -> t m a -> m r
yieldk a
a t m a
r = a -> t m a -> m r
yld (a -> a
f a
a) ((a -> a) -> t m (a -> a) -> t m a -> t m a
go2 a -> a
f t m (a -> a)
r1 t m a
r)
            in State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
st) a -> t m a -> m r
yieldk a -> m r
single m r
stop t m a
m

    go3 :: (t -> a) -> t m t -> t m a
go3 t -> a
f t m t
m =
        (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp ->
            let single :: t -> m r
single t
a   = a -> m r
sng (t -> a
f t
a)
                yieldk :: t -> t m t -> m r
yieldk t
a t m t
r = a -> t m a -> m r
yld (t -> a
f t
a) ((t -> a) -> t m t -> t m a
go3 t -> a
f t m t
r)
            in State Stream m t
-> (t -> t m t -> m r) -> (t -> m r) -> m r -> t m t -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream (State Stream m a -> State Stream m t
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
st) t -> t m t -> m r
yieldk t -> m r
single m r
stp t m t
m

{-# INLINE apSerialDiscardFst #-}
apSerialDiscardFst
    :: IsStream t
    => t m a
    -> t m b
    -> t m b
apSerialDiscardFst :: t m a -> t m b -> t m b
apSerialDiscardFst t m a
fstream t m b
stream = t m a -> t m b
forall (t :: (* -> *) -> * -> *) p. IsStream t => t m p -> t m b
go1 t m a
fstream

    where

    go1 :: t m p -> t m b
go1 t m p
m =
        (forall r.
 State Stream m b
 -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
-> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m b
  -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
 -> t m b)
-> (forall r.
    State Stream m b
    -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
-> t m b
forall a b. (a -> b) -> a -> b
$ \State Stream m b
st b -> t m b -> m r
yld b -> m r
sng m r
stp ->
            let foldShared :: t m b -> m r
foldShared = State Stream m b
-> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m b
st b -> t m b -> m r
yld b -> m r
sng m r
stp
                single :: p -> m r
single p
_   = t m b -> m r
foldShared (t m b -> m r) -> t m b -> m r
forall a b. (a -> b) -> a -> b
$ t m b
stream
                yieldk :: p -> t m p -> m r
yieldk p
_ t m p
r = t m b -> m r
foldShared (t m b -> m r) -> t m b -> m r
forall a b. (a -> b) -> a -> b
$ t m p -> t m b -> t m b
go2 t m p
r t m b
stream
            in State Stream m p
-> (p -> t m p -> m r) -> (p -> m r) -> m r -> t m p -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream (State Stream m b -> State Stream m p
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m b
st) p -> t m p -> m r
forall p. p -> t m p -> m r
yieldk p -> m r
forall p. p -> m r
single m r
stp t m p
m

    go2 :: t m p -> t m b -> t m b
go2 t m p
r1 t m b
m =
        (forall r.
 State Stream m b
 -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
-> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m b
  -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
 -> t m b)
-> (forall r.
    State Stream m b
    -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
-> t m b
forall a b. (a -> b) -> a -> b
$ \State Stream m b
st b -> t m b -> m r
yld b -> m r
sng m r
stp ->
            let foldShared :: t m b -> m r
foldShared = State Stream m b
-> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m b
st b -> t m b -> m r
yld b -> m r
sng m r
stp
                stop :: m r
stop = t m b -> m r
foldShared (t m b -> m r) -> t m b -> m r
forall a b. (a -> b) -> a -> b
$ t m p -> t m b
go1 t m p
r1
                single :: b -> m r
single b
a   = b -> t m b -> m r
yld b
a (t m p -> t m b
go1 t m p
r1)
                yieldk :: b -> t m b -> m r
yieldk b
a t m b
r = b -> t m b -> m r
yld b
a (t m p -> t m b -> t m b
go2 t m p
r1 t m b
r)
            in State Stream m b
-> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m b
st b -> t m b -> m r
yieldk b -> m r
single m r
stop t m b
m

{-# INLINE apSerialDiscardSnd #-}
apSerialDiscardSnd
    :: IsStream t
    => t m a
    -> t m b
    -> t m a
apSerialDiscardSnd :: t m a -> t m b -> t m a
apSerialDiscardSnd t m a
fstream t m b
stream = t m a -> t m a
forall (t :: (* -> *) -> * -> *) (t :: (* -> *) -> * -> *) a.
(IsStream t, IsStream t) =>
t m a -> t m a
go1 t m a
fstream

    where

    go1 :: t m a -> t m a
go1 t m a
m =
        (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp ->
            let foldShared :: t m a -> m r
foldShared = State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp
                single :: a -> m r
single a
f   = t m a -> m r
foldShared (t m a -> m r) -> t m a -> m r
forall a b. (a -> b) -> a -> b
$ a -> t m b -> t m a
forall (t :: (* -> *) -> * -> *) (t :: (* -> *) -> * -> *) a
       (m :: * -> *) p.
(IsStream t, IsStream t) =>
a -> t m p -> t m a
go3 a
f t m b
stream
                yieldk :: a -> t m a -> m r
yieldk a
f t m a
r = t m a -> m r
foldShared (t m a -> m r) -> t m a -> m r
forall a b. (a -> b) -> a -> b
$ a -> t m a -> t m b -> t m a
go2 a
f t m a
r t m b
stream
            in State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yieldk a -> m r
single m r
stp t m a
m

    go2 :: a -> t m a -> t m b -> t m a
go2 a
f t m a
r1 t m b
m =
        (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp ->
            let foldShared :: t m a -> m r
foldShared = State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp
                stop :: m r
stop = t m a -> m r
foldShared (t m a -> m r) -> t m a -> m r
forall a b. (a -> b) -> a -> b
$ t m a -> t m a
go1 t m a
r1
                single :: p -> m r
single p
_   = a -> t m a -> m r
yld a
f (t m a -> t m a
go1 t m a
r1)
                yieldk :: p -> t m b -> m r
yieldk p
_ t m b
r = a -> t m a -> m r
yld a
f (a -> t m a -> t m b -> t m a
go2 a
f t m a
r1 t m b
r)
            in State Stream m b
-> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream (State Stream m a -> State Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
st) b -> t m b -> m r
forall p. p -> t m b -> m r
yieldk b -> m r
forall p. p -> m r
single m r
stop t m b
m

    go3 :: a -> t m p -> t m a
go3 a
f t m p
m =
        (forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m a
  -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
 -> t m a)
-> (forall r.
    State Stream m a
    -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp ->
            let single :: p -> m r
single p
_   = a -> m r
sng a
f
                yieldk :: p -> t m p -> m r
yieldk p
_ t m p
r = a -> t m a -> m r
yld a
f (a -> t m p -> t m a
go3 a
f t m p
r)
            in State Stream m p
-> (p -> t m p -> m r) -> (p -> m r) -> m r -> t m p -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream (State Stream m a -> State Stream m p
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
st) p -> t m p -> m r
forall p. p -> t m p -> m r
yieldk p -> m r
forall p. p -> m r
single m r
stp t m p
m

-- XXX This is just concatMapBy with arguments flipped. We need to keep this
-- instead of using a concatMap style definition because the bind
-- implementation in Async and WAsync streams show significant perf degradation
-- if the argument order is changed.
{-# INLINE bindWith #-}
bindWith
    :: IsStream t
    => (t m b -> t m b -> t m b)
    -> t m a
    -> (a -> t m b)
    -> t m b
bindWith :: (t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b
bindWith t m b -> t m b -> t m b
par t m a
m1 a -> t m b
f = t m a -> t m b
forall (t :: (* -> *) -> * -> *). IsStream t => t m a -> t m b
go t m a
m1
    where
        go :: t m a -> t m b
go t m a
m =
            (forall r.
 State Stream m b
 -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
-> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m b
  -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
 -> t m b)
-> (forall r.
    State Stream m b
    -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
-> t m b
forall a b. (a -> b) -> a -> b
$ \State Stream m b
st b -> t m b -> m r
yld b -> m r
sng m r
stp ->
                let foldShared :: t m b -> m r
foldShared = State Stream m b
-> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m b
st b -> t m b -> m r
yld b -> m r
sng m r
stp
                    single :: a -> m r
single a
a   = t m b -> m r
foldShared (t m b -> m r) -> t m b -> m r
forall a b. (a -> b) -> a -> b
$ t m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a
unShare (a -> t m b
f a
a)
                    yieldk :: a -> t m a -> m r
yieldk a
a t m a
r = t m b -> m r
foldShared (t m b -> m r) -> t m b -> m r
forall a b. (a -> b) -> a -> b
$ t m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a
unShare (a -> t m b
f a
a) t m b -> t m b -> t m b
`par` t m a -> t m b
go t m a
r
                in State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream (State Stream m b -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m b
st) a -> t m a -> m r
yieldk a -> m r
single m r
stp t m a
m

-- XXX express in terms of foldrS?
-- XXX can we use a different stream type for the generated stream being
-- falttened so that we can combine them differently and keep the resulting
-- stream different?
-- XXX do we need specialize to IO?
-- XXX can we optimize when c and a are same, by removing the forall using
-- rewrite rules with type applications?

-- | Perform a 'concatMap' using a specified concat strategy. The first
-- argument specifies a merge or concat function that is used to merge the
-- streams generated by the map function. For example, the concat function
-- could be 'serial', 'parallel', 'async', 'ahead' or any other zip or merge
-- function.
--
-- @since 0.7.0
{-# INLINE concatMapBy #-}
concatMapBy
    :: IsStream t
    => (t m b -> t m b -> t m b)
    -> (a -> t m b)
    -> t m a
    -> t m b
concatMapBy :: (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
concatMapBy t m b -> t m b -> t m b
par a -> t m b
f t m a
xs = (t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b
bindWith t m b -> t m b -> t m b
par t m a
xs a -> t m b
f

{-# INLINE concatMap #-}
concatMap :: IsStream t => (a -> t m b) -> t m a -> t m b
concatMap :: (a -> t m b) -> t m a -> t m b
concatMap a -> t m b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$
    (Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b) -> Stream m a -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
concatMapBy Stream m b -> Stream m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
serial
        (\a
a -> Stream m b -> Stream m b
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt (Stream m b -> Stream m b) -> Stream m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream (t m b -> Stream m b) -> t m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ a -> t m b
f a
a)
        (Stream m a -> Stream m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m)

{-
-- Fused version.
-- XXX This fuses but when the stream is nil this performs poorly.
-- The filterAllOut benchmark degrades. Need to investigate and fix that.
{-# INLINE concatMap #-}
concatMap :: IsStream t => (a -> t m b) -> t m a -> t m b
concatMap f xs = buildS
    (\c n -> foldrS (\x b -> foldrS c b (f x)) n xs)

-- Stream polymorphic concatMap implementation
-- XXX need to use buildSM/foldrSMShared for parallel behavior
-- XXX unShare seems to degrade the fused performance
{-# INLINE_EARLY concatMap_ #-}
concatMap_ :: IsStream t => (a -> t m b) -> t m a -> t m b
concatMap_ f xs = buildS
     (\c n -> foldrSShared (\x b -> foldrSShared c b (unShare $ f x)) n xs)
-}

-- | See 'Streamly.Internal.Data.Stream.IsStream.concatPairsWith' for
-- documentation.
--
{-# INLINE concatPairsWith #-}
concatPairsWith
    :: IsStream t
    => (t m b -> t m b -> t m b)
    -> (a -> t m b)
    -> t m a
    -> t m b
concatPairsWith :: (t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
concatPairsWith t m b -> t m b -> t m b
combine a -> t m b
f = Maybe a -> t m a -> t m b
forall (t :: (* -> *) -> * -> *).
IsStream t =>
Maybe a -> t m a -> t m b
go Maybe a
forall a. Maybe a
Nothing

    where

    go :: Maybe a -> t m a -> t m b
go Maybe a
Nothing t m a
stream =
        (forall r.
 State Stream m b
 -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
-> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m b
  -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
 -> t m b)
-> (forall r.
    State Stream m b
    -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
-> t m b
forall a b. (a -> b) -> a -> b
$ \State Stream m b
st b -> t m b -> m r
yld b -> m r
sng m r
stp ->
            let foldShared :: t m b -> m r
foldShared = State Stream m b
-> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m b
st b -> t m b -> m r
yld b -> m r
sng m r
stp
                single :: a -> m r
single a
a   = t m b -> m r
foldShared (t m b -> m r) -> t m b -> m r
forall a b. (a -> b) -> a -> b
$ t m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a
unShare (a -> t m b
f a
a)
                yieldk :: a -> t m a -> m r
yieldk a
a t m a
r = t m b -> m r
foldShared (t m b -> m r) -> t m b -> m r
forall a b. (a -> b) -> a -> b
$ Maybe a -> t m a -> t m b
go (a -> Maybe a
forall a. a -> Maybe a
Just a
a) t m a
r
            in State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream (State Stream m b -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m b
st) a -> t m a -> m r
yieldk a -> m r
single m r
stp t m a
stream
    go (Just a
a1) t m a
stream =
        (forall r.
 State Stream m b
 -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
-> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m b
  -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
 -> t m b)
-> (forall r.
    State Stream m b
    -> (b -> t m b -> m r) -> (b -> m r) -> m r -> m r)
-> t m b
forall a b. (a -> b) -> a -> b
$ \State Stream m b
st b -> t m b -> m r
yld b -> m r
sng m r
stp ->
            let foldShared :: t m b -> m r
foldShared = State Stream m b
-> (b -> t m b -> m r) -> (b -> m r) -> m r -> t m b -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m b
st b -> t m b -> m r
yld b -> m r
sng m r
stp
                stop :: m r
stop = t m b -> m r
foldShared (t m b -> m r) -> t m b -> m r
forall a b. (a -> b) -> a -> b
$ t m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a
unShare (a -> t m b
f a
a1)
                single :: a -> m r
single a
a = t m b -> m r
foldShared (t m b -> m r) -> t m b -> m r
forall a b. (a -> b) -> a -> b
$ t m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a
unShare (a -> t m b
f a
a1) t m b -> t m b -> t m b
`combine` a -> t m b
f a
a
                yieldk :: a -> t m a -> m r
yieldk a
a t m a
r =
                    t m b -> m r
foldShared
                        (t m b -> m r) -> t m b -> m r
forall a b. (a -> b) -> a -> b
$ (t m b -> t m b -> t m b)
-> ((t m b, t m b) -> t m b) -> t m (t m b, t m b) -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
concatPairsWith t m b -> t m b -> t m b
combine
                            (\(t m b
x,t m b
y) -> t m b -> t m b -> t m b
combine (t m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a
unShare t m b
x) t m b
y)
                        (t m (t m b, t m b) -> t m b) -> t m (t m b, t m b) -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> t m b
f a
a1, a -> t m b
f a
a) (t m b, t m b) -> t m (t m b, t m b) -> t m (t m b, t m b)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
`cons` Maybe a -> t m a -> t m (t m b, t m b)
forall (t :: (* -> *) -> * -> *) (t :: (* -> *) -> * -> *)
       (m :: * -> *).
(IsStream t, IsStream t) =>
Maybe a -> t m a -> t m (t m b, t m b)
makePairs Maybe a
forall a. Maybe a
Nothing t m a
r
            in State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream (State Stream m b -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m b
st) a -> t m a -> m r
forall (t :: (* -> *) -> * -> *). IsStream t => a -> t m a -> m r
yieldk a -> m r
single m r
stop t m a
stream

    makePairs :: Maybe a -> t m a -> t m (t m b, t m b)
makePairs Maybe a
Nothing t m a
stream =
        (forall r.
 State Stream m (t m b, t m b)
 -> ((t m b, t m b) -> t m (t m b, t m b) -> m r)
 -> ((t m b, t m b) -> m r)
 -> m r
 -> m r)
-> t m (t m b, t m b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m (t m b, t m b)
  -> ((t m b, t m b) -> t m (t m b, t m b) -> m r)
  -> ((t m b, t m b) -> m r)
  -> m r
  -> m r)
 -> t m (t m b, t m b))
-> (forall r.
    State Stream m (t m b, t m b)
    -> ((t m b, t m b) -> t m (t m b, t m b) -> m r)
    -> ((t m b, t m b) -> m r)
    -> m r
    -> m r)
-> t m (t m b, t m b)
forall a b. (a -> b) -> a -> b
$ \State Stream m (t m b, t m b)
st (t m b, t m b) -> t m (t m b, t m b) -> m r
yld (t m b, t m b) -> m r
sng m r
stp ->
            let foldShared :: t m (t m b, t m b) -> m r
foldShared = State Stream m (t m b, t m b)
-> ((t m b, t m b) -> t m (t m b, t m b) -> m r)
-> ((t m b, t m b) -> m r)
-> m r
-> t m (t m b, t m b)
-> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m (t m b, t m b)
st (t m b, t m b) -> t m (t m b, t m b) -> m r
yld (t m b, t m b) -> m r
sng m r
stp
                single :: a -> m r
single a
a   = (t m b, t m b) -> m r
sng (a -> t m b
f a
a, t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
nil)
                yieldk :: a -> t m a -> m r
yieldk a
a t m a
r = t m (t m b, t m b) -> m r
foldShared (t m (t m b, t m b) -> m r) -> t m (t m b, t m b) -> m r
forall a b. (a -> b) -> a -> b
$ Maybe a -> t m a -> t m (t m b, t m b)
makePairs (a -> Maybe a
forall a. a -> Maybe a
Just a
a) t m a
r
            in State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream (State Stream m (t m b, t m b) -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m (t m b, t m b)
st) a -> t m a -> m r
yieldk a -> m r
single m r
stp t m a
stream
    makePairs (Just a
a1) t m a
stream =
        (forall r.
 State Stream m (t m b, t m b)
 -> ((t m b, t m b) -> t m (t m b, t m b) -> m r)
 -> ((t m b, t m b) -> m r)
 -> m r
 -> m r)
-> t m (t m b, t m b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State Stream m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
  State Stream m (t m b, t m b)
  -> ((t m b, t m b) -> t m (t m b, t m b) -> m r)
  -> ((t m b, t m b) -> m r)
  -> m r
  -> m r)
 -> t m (t m b, t m b))
-> (forall r.
    State Stream m (t m b, t m b)
    -> ((t m b, t m b) -> t m (t m b, t m b) -> m r)
    -> ((t m b, t m b) -> m r)
    -> m r
    -> m r)
-> t m (t m b, t m b)
forall a b. (a -> b) -> a -> b
$ \State Stream m (t m b, t m b)
st (t m b, t m b) -> t m (t m b, t m b) -> m r
yld (t m b, t m b) -> m r
sng m r
_ ->
            let stop :: m r
stop = (t m b, t m b) -> m r
sng (a -> t m b
f a
a1, t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
nil)
                single :: a -> m r
single a
a = (t m b, t m b) -> m r
sng (a -> t m b
f a
a1, a -> t m b
f a
a)
                yieldk :: a -> t m a -> m r
yieldk a
a t m a
r = (t m b, t m b) -> t m (t m b, t m b) -> m r
yld (a -> t m b
f a
a1, a -> t m b
f a
a) (Maybe a -> t m a -> t m (t m b, t m b)
makePairs Maybe a
forall a. Maybe a
Nothing t m a
r)
            in State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream (State Stream m (t m b, t m b) -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m (t m b, t m b)
st) a -> t m a -> m r
yieldk a -> m r
single m r
stop t m a
stream

instance Monad m => Applicative (Stream m) where
    {-# INLINE pure #-}
    pure :: a -> Stream m a
pure = a -> Stream m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
fromPure
    {-# INLINE (<*>) #-}
    <*> :: Stream m (a -> b) -> Stream m a -> Stream m b
(<*>) = Stream m (a -> b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b. Monad m => m (a -> b) -> m a -> m b
ap

-- NOTE: even though concatMap for StreamD is 3x faster compared to StreamK,
-- the monad instance of StreamD is slower than StreamK after foldr/build
-- fusion.
instance Monad m => Monad (Stream m) where
    {-# INLINE return #-}
    return :: a -> Stream m a
return = a -> Stream m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    {-# INLINE (>>=) #-}
    >>= :: Stream m a -> (a -> Stream m b) -> Stream m b
(>>=) = ((a -> Stream m b) -> Stream m a -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
forall a b c. (a -> b -> c) -> b -> a -> c
flip (a -> Stream m b) -> Stream m a -> Stream m b
forall (t :: (* -> *) -> * -> *) a (m :: * -> *) b.
IsStream t =>
(a -> t m b) -> t m a -> t m b
concatMap

{-
-- Like concatMap but generates stream using an unfold function. Similar to
-- unfoldMany but for StreamK.
concatUnfoldr :: IsStream t
    => (b -> t m (Maybe (a, b))) -> t m b -> t m a
concatUnfoldr = undefined
-}