{-# OPTIONS_GHC -Wno-deprecations #-}
{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -Wno-orphans #-}

-- |
-- Module      : Streamly.Internal.Data.Stream.Type
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
module Streamly.Internal.Data.Stream.IsStream.Type {-# DEPRECATED "Please use \"Streamly.Data.Stream.*\" instead." #-}
    (
    -- * IsStream Type Class
      IsStream (..)
    , K.StreamK (..)

    -- * Type Conversion
    , fromStreamD
    , toStreamD
    , toStreamK
    , fromStreamK
    , adapt
    , toConsK

    -- * Building a stream
    , mkStream
    , foldStreamShared
    , foldStream

    -- * Stream Types
    , SerialT
    , Serial
    , fromSerial

    , WSerialT
    , WSerial
    , fromWSerial

    , AsyncT
    , Async
    , fromAsync

    , WAsyncT
    , WAsync
    , fromWAsync

    , AheadT
    , Ahead
    , fromAhead

    , ParallelT
    , Parallel
    , fromParallel

    , ZipSerialM
    , ZipSerial
    , fromZipSerial

    , ZipAsyncM
    , ZipAsync
    , fromZipAsync

    -- * Construction
    , cons
    , (.:)
    , nil
    , nilM
    , fromPure
    , fromEffect
    , repeat

    -- * Bind/Concat
    , bindWith
    , concatMapWith

    -- * Fold Utilities
    , concatFoldableWith
    , concatMapFoldableWith
    , concatForFoldableWith

    -- * Running Effects
    , drain

    -- * Conversion operations
    , fromList
    , toList

    -- * Fold operations
    , foldrMx

    , foldlx'
    , foldlMx'
    , foldl'
    , fold

    -- * Zip style operations
    , eqBy
    , cmpBy
    )
where

import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Fold (Fold (..))
import Streamly.Internal.Data.Stream.Serial
    (SerialT, Serial, WSerialT(..), WSerial)
import Streamly.Internal.Data.Stream.Async
    (AsyncT(..), Async, WAsyncT(..), WAsync)
import Streamly.Internal.Data.Stream.Ahead (AheadT(..), Ahead)
import Streamly.Internal.Data.Stream.Parallel (ParallelT(..), Parallel)
import Streamly.Internal.Data.Stream.Zip (ZipSerialM, ZipSerial)
import Streamly.Internal.Data.Stream.ZipAsync (ZipAsyncM(..), ZipAsync)
import Streamly.Internal.Data.SVar.Type (State, adaptState)

import qualified Prelude
import qualified Streamly.Internal.Data.Stream.Ahead as Ahead
import qualified Streamly.Internal.Data.Stream.Async as Async
import qualified Streamly.Internal.Data.Stream.Parallel as Parallel
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream as D
    (Stream(..), toStreamK, fromStreamK
    , drain, eqBy, cmpBy, fromList, toList, foldrMx, foldlMx'
    , foldlx', foldl', fold)
import qualified Streamly.Internal.Data.StreamK as K
    (StreamK(..), cons, fromEffect
    , nil, fromPure, bindWith, drain
    , fromFoldable, nilM, repeat)
import qualified Streamly.Internal.Data.StreamK as StreamK
import qualified Streamly.Internal.Data.Stream.Serial as Stream
    (fromStreamK, toStreamK)
import qualified Streamly.Internal.Data.Stream.Zip as Zip
import qualified Streamly.Internal.Data.Stream.ZipAsync as ZipAsync

import Prelude hiding (Foldable(..), repeat)
import Data.Foldable (Foldable)

#include "inline.hs"

------------------------------------------------------------------------------
-- Types that can behave as a Stream
------------------------------------------------------------------------------

infixr 5 `consM`
infixr 5 |:

-- XXX Use a different SVar based on the stream type. But we need to make sure
-- that we do not lose performance due to polymorphism.
--
-- | Class of types that can represent a stream of elements of some type 'a' in
-- some monad 'm'.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
class
    ( forall m a. MonadAsync m => Semigroup (t m a)
    , forall m a. MonadAsync m => Monoid (t m a)
    , forall m. Monad m => Functor (t m)
    , forall m. MonadAsync m => Applicative (t m)
    ) =>
      IsStream t where
    toStream :: t m a -> K.StreamK m a
    fromStream :: K.StreamK m a -> t m a
    -- | Constructs a stream by adding a monadic action at the head of an
    -- existing stream. For example:
    --
    -- @
    -- > toList $ getLine \`consM` getLine \`consM` nil
    -- hello
    -- world
    -- ["hello","world"]
    -- @
    --
    -- /Concurrent (do not use 'fromParallel' to construct infinite streams)/
    --
    -- @since 0.2.0
    consM :: MonadAsync m => m a -> t m a -> t m a
    -- | Operator equivalent of 'consM'. We can read it as "@parallel colon@"
    -- to remember that @|@ comes before ':'.
    --
    -- @
    -- > toList $ getLine |: getLine |: nil
    -- hello
    -- world
    -- ["hello","world"]
    -- @
    --
    -- @
    -- let delay = threadDelay 1000000 >> print 1
    -- drain $ fromSerial  $ delay |: delay |: delay |: nil
    -- drain $ fromParallel $ delay |: delay |: delay |: nil
    -- @
    --
    -- /Concurrent (do not use 'fromParallel' to construct infinite streams)/
    --
    -- @since 0.2.0
    (|:) :: MonadAsync m => m a -> t m a -> t m a
    -- We can define (|:) just as 'consM' but it is defined explicitly for each
    -- type because we want to use SPECIALIZE pragma on the definition.

-------------------------------------------------------------------------------
-- Type adapting combinators
-------------------------------------------------------------------------------

{-# INLINE toStreamK #-}
toStreamK :: IsStream t => t m a -> StreamK.StreamK m a
toStreamK :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStreamK = t m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream

{-# INLINE fromStreamK #-}
fromStreamK :: IsStream t => StreamK.StreamK m a -> t m a
fromStreamK :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStreamK = StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream

-- XXX Move/reset the State here by reconstructing the stream with cleared
-- state. Can we make sure we do not do that when t1 = t2? If we do this then
-- we do not need to do that explicitly using svarStyle.  It would act as
-- unShare when the stream type is the same.
--
-- | Adapt any specific stream type to any other specific stream type.
--
-- /Since: 0.1.0 ("Streamly")/
--
-- @since 0.8.0
adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a
adapt :: forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt = StreamK m a -> t2 m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream (StreamK m a -> t2 m a)
-> (t1 m a -> StreamK m a) -> t1 m a -> t2 m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t1 m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream

{-# INLINE fromStreamD #-}
fromStreamD :: (IsStream t, Monad m) => D.Stream m a -> t m a
fromStreamD :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD = StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream (StreamK m a -> t m a)
-> (Stream m a -> StreamK m a) -> Stream m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
D.toStreamK

-- | Adapt a polymorphic consM operation to a StreamK cons operation
{-# INLINE toConsK #-}
toConsK :: IsStream t =>
    (m a -> t m a -> t m a) -> m a -> K.StreamK m a -> K.StreamK m a
toConsK :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(m a -> t m a -> t m a) -> m a -> StreamK m a -> StreamK m a
toConsK m a -> t m a -> t m a
cns m a
x StreamK m a
xs = t m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream (t m a -> StreamK m a) -> t m a -> StreamK m a
forall a b. (a -> b) -> a -> b
$ m a
x m a -> t m a -> t m a
`cns` StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream StreamK m a
xs

------------------------------------------------------------------------------
-- Conversion to and from direct style stream
------------------------------------------------------------------------------

{-# INLINE toStreamD #-}
toStreamD :: (IsStream t, Monad m) => t m a -> D.Stream m a
toStreamD :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD = StreamK m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
D.fromStreamK (StreamK m a -> Stream m a)
-> (t m a -> StreamK m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream

------------------------------------------------------------------------------
-- Elimination
------------------------------------------------------------------------------

{-# INLINE_EARLY drain #-}
drain :: (IsStream t, Monad m) => t m a -> m ()
drain :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> m ()
drain t m a
m = Stream m a -> m ()
forall (m :: * -> *) a. Monad m => Stream m a -> m ()
D.drain (Stream m a -> m ()) -> Stream m a -> m ()
forall a b. (a -> b) -> a -> b
$ StreamK m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
D.fromStreamK (t m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream t m a
m)
{-# RULES "drain fallback to CPS" [1]
    forall a. D.drain (D.fromStreamK a) = K.drain a #-}

------------------------------------------------------------------------------
-- Comparison
------------------------------------------------------------------------------

-- | Compare two streams for equality
--
-- @since 0.5.3
{-# INLINE eqBy #-}
eqBy :: (IsStream t, Monad m) =>
    (a -> b -> Bool) -> t m a -> t m b -> m Bool
eqBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b -> Bool) -> t m a -> t m b -> m Bool
eqBy a -> b -> Bool
f t m a
m1 t m b
m2 = (a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool
forall (m :: * -> *) a b.
Monad m =>
(a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool
D.eqBy a -> b -> Bool
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m1) (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m2)

-- | Compare two streams
--
-- @since 0.5.3
{-# INLINE cmpBy #-}
cmpBy
    :: (IsStream t, Monad m)
    => (a -> b -> Ordering) -> t m a -> t m b -> m Ordering
cmpBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b -> Ordering) -> t m a -> t m b -> m Ordering
cmpBy a -> b -> Ordering
f t m a
m1 t m b
m2 = (a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering
forall (m :: * -> *) a b.
Monad m =>
(a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering
D.cmpBy a -> b -> Ordering
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m1) (t m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m b
m2)

------------------------------------------------------------------------------
-- List Conversions
------------------------------------------------------------------------------

-- |
-- @
-- fromList = 'Prelude.foldr' 'K.cons' 'K.nil'
-- @
--
-- Construct a stream from a list of pure values. This is more efficient than
-- 'K.fromFoldable' for serial streams.
--
-- @since 0.4.0
{-# INLINE_EARLY fromList #-}
fromList :: (Monad m, IsStream t) => [a] -> t m a
fromList :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
fromList = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> ([a] -> Stream m a) -> [a] -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [a] -> Stream m a
forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
D.fromList
{-# RULES "fromList fallback to StreamK" [1]
    forall a. D.toStreamK (D.fromList a) = K.fromFoldable a #-}

-- | Convert a stream into a list in the underlying monad.
--
-- @since 0.1.0
{-# INLINE toList #-}
toList :: (IsStream t, Monad m) => t m a -> m [a]
toList :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> m [a]
toList t m a
m = Stream m a -> m [a]
forall (m :: * -> *) a. Monad m => Stream m a -> m [a]
D.toList (Stream m a -> m [a]) -> Stream m a -> m [a]
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

------------------------------------------------------------------------------
-- Building a stream
------------------------------------------------------------------------------

-- XXX The State is always parameterized by "Stream" which means State is not
-- different for different stream types. So we have to manually make sure that
-- when converting from one stream to another we migrate the state correctly.
-- This can be fixed if we use a different SVar type for different streams.
-- Currently we always use "SVar Stream" and therefore a different State type
-- parameterized by that stream.
--
-- XXX Since t is coercible we should be able to coerce k
-- mkStream k = fromStream $ MkStream $ coerce k
--
-- | Build a stream from an 'SVar', a stop continuation, a singleton stream
-- continuation and a yield continuation.
{-# INLINE_EARLY mkStream #-}
mkStream :: IsStream t
    => (forall r. State K.StreamK m a
        -> (a -> t m a -> m r)
        -> (a -> m r)
        -> m r
        -> m r)
    -> t m a
mkStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State StreamK m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r
k = StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream (StreamK m a -> t m a) -> StreamK m a -> t m a
forall a b. (a -> b) -> a -> b
$ (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.MkStream ((forall r.
  State StreamK m a
  -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
 -> StreamK m a)
-> (forall r.
    State StreamK m a
    -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp ->
    let yieldk :: a -> t m a -> m r
yieldk a
a t m a
r = a -> StreamK m a -> m r
yld a
a (t m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream t m a
r)
     in State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r
forall r.
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r
k State StreamK m a
st a -> t m a -> m r
forall {t :: (* -> *) -> * -> *}. IsStream t => a -> t m a -> m r
yieldk a -> m r
sng m r
stp

{-# RULES "mkStream from stream" mkStream = mkStreamFromStream #-}
mkStreamFromStream :: IsStream t
    => (forall r. State K.StreamK m a
        -> (a -> K.StreamK m a -> m r)
        -> (a -> m r)
        -> m r
        -> m r)
    -> t m a
mkStreamFromStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStreamFromStream forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
k = StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream (StreamK m a -> t m a) -> StreamK m a -> t m a
forall a b. (a -> b) -> a -> b
$ (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.MkStream forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
k

{-# RULES "mkStream stream" mkStream = mkStreamStream #-}
mkStreamStream
    :: (forall r. State K.StreamK m a
        -> (a -> K.StreamK m a -> m r)
        -> (a -> m r)
        -> m r
        -> m r)
    -> K.StreamK m a
mkStreamStream :: forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
mkStreamStream = (forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.MkStream

------------------------------------------------------------------------------
-- Folds
------------------------------------------------------------------------------

{-# INLINE foldrMx #-}
foldrMx :: (IsStream t, Monad m)
    => (a -> m x -> m x) -> m x -> (m x -> m b) -> t m a -> m b
foldrMx :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a x b.
(IsStream t, Monad m) =>
(a -> m x -> m x) -> m x -> (m x -> m b) -> t m a -> m b
foldrMx a -> m x -> m x
step m x
final m x -> m b
project t m a
m = (a -> m x -> m x) -> m x -> (m x -> m b) -> Stream m a -> m b
forall (m :: * -> *) a x b.
Monad m =>
(a -> m x -> m x) -> m x -> (m x -> m b) -> Stream m a -> m b
D.foldrMx a -> m x -> m x
step m x
final m x -> m b
project (Stream m a -> m b) -> Stream m a -> m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

-- | Like 'foldlx'', but with a monadic step function.
--
-- @since 0.7.0
{-# INLINE foldlMx' #-}
foldlMx' ::
    (IsStream t, Monad m)
    => (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b
foldlMx' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) x a b.
(IsStream t, Monad m) =>
(x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b
foldlMx' x -> a -> m x
step m x
begin x -> m b
done t m a
m = (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> m b
forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> m b
D.foldlMx' x -> a -> m x
step m x
begin x -> m b
done (Stream m a -> m b) -> Stream m a -> m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

-- | Strict left fold with an extraction function. Like the standard strict
-- left fold, but applies a user supplied extraction function (the third
-- argument) to the folded value at the end. This is designed to work with the
-- @foldl@ library. The suffix @x@ is a mnemonic for extraction.
--
-- @since 0.7.0
{-# INLINE foldlx' #-}
foldlx' ::
    (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b
foldlx' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) x a b.
(IsStream t, Monad m) =>
(x -> a -> x) -> x -> (x -> b) -> t m a -> m b
foldlx' x -> a -> x
step x
begin x -> b
done t m a
m = (x -> a -> x) -> x -> (x -> b) -> Stream m a -> m b
forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> x) -> x -> (x -> b) -> Stream m a -> m b
D.foldlx' x -> a -> x
step x
begin x -> b
done (Stream m a -> m b) -> Stream m a -> m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

-- | Strict left associative fold.
--
-- @since 0.2.0
{-# INLINE foldl' #-}
foldl' ::
    (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> m b
foldl' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> b) -> b -> t m a -> m b
foldl' b -> a -> b
step b
begin t m a
m = (b -> a -> b) -> b -> Stream m a -> m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> m b
D.foldl' b -> a -> b
step b
begin (Stream m a -> m b) -> Stream m a -> m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m


{-# INLINE fold #-}
fold :: (IsStream t, Monad m) => Fold m a b -> t m a -> m b
fold :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> m b
fold Fold m a b
fld t m a
m = Fold m a b -> Stream m a -> m b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold Fold m a b
fld (Stream m a -> m b) -> Stream m a -> m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

------------------------------------------------------------------------------
-- Folding a stream
------------------------------------------------------------------------------

-- | Fold a stream by providing an SVar, a stop continuation, a singleton
-- continuation and a yield continuation. The stream would share the current
-- SVar passed via the State.
{-# INLINE_EARLY foldStreamShared #-}
foldStreamShared
    :: IsStream t
    => State K.StreamK m a
    -> (a -> t m a -> m r)
    -> (a -> m r)
    -> m r
    -> t m a
    -> m r
foldStreamShared :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State StreamK m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp t m a
m =
    let yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
x = a -> t m a -> m r
yld a
a (StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream StreamK m a
x)
        K.MkStream forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
k = t m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream t m a
m
     in State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
k State StreamK m a
st a -> StreamK m a -> m r
yieldk a -> m r
sng m r
stp

-- XXX write a similar rule for foldStream as well?
{-# RULES "foldStreamShared from stream"
   foldStreamShared = foldStreamSharedStream #-}
foldStreamSharedStream
    :: State K.StreamK m a
    -> (a -> K.StreamK m a -> m r)
    -> (a -> m r)
    -> m r
    -> K.StreamK m a
    -> m r
foldStreamSharedStream :: forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
foldStreamSharedStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
m =
    let K.MkStream forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
k = StreamK m a
m
     in State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
k State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp

-- | Fold a stream by providing a State, stop continuation, a singleton
-- continuation and a yield continuation. The stream will not use the SVar
-- passed via State.
{-# INLINE foldStream #-}
foldStream
    :: IsStream t
    => State K.StreamK m a
    -> (a -> t m a -> m r)
    -> (a -> m r)
    -> m r
    -> t m a
    -> m r
foldStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State StreamK m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State StreamK m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp t m a
m =
    let yieldk :: a -> StreamK m a -> m r
yieldk a
a StreamK m a
x = a -> t m a -> m r
yld a
a (StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream StreamK m a
x)
        K.MkStream forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
k = t m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream t m a
m
     in State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r
k (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st) a -> StreamK m a -> m r
yieldk a -> m r
sng m r
stp

-------------------------------------------------------------------------------
-- Serial
-------------------------------------------------------------------------------

-- | Fix the type of a polymorphic stream as 'SerialT'.
--
-- /Since: 0.1.0 ("Streamly")/
--
-- @since 0.8.0
fromSerial :: IsStream t => SerialT m a -> t m a
fromSerial :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
SerialT m a -> t m a
fromSerial = SerialT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt

instance IsStream SerialT where
    toStream :: forall (m :: * -> *) a. SerialT m a -> StreamK m a
toStream = SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> StreamK m a
Stream.toStreamK
    fromStream :: forall (m :: * -> *) a. StreamK m a -> SerialT m a
fromStream = Stream m a -> SerialT m a
forall (m :: * -> *) a. StreamK m a -> SerialT m a
Stream.fromStreamK

    {-# INLINE consM #-}
    {-# SPECIALIZE consM :: IO a -> SerialT IO a -> SerialT IO a #-}
    consM :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> SerialT m a -> SerialT m a
consM = m a -> SerialT m a -> SerialT m a
forall (m :: * -> *) a.
Monad m =>
m a -> SerialT m a -> SerialT m a
Serial.consM

    {-# INLINE (|:) #-}
    {-# SPECIALIZE (|:) :: IO a -> SerialT IO a -> SerialT IO a #-}
    |: :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> SerialT m a -> SerialT m a
(|:) = m a -> SerialT m a -> SerialT m a
forall (m :: * -> *) a.
Monad m =>
m a -> SerialT m a -> SerialT m a
Serial.consM

-- | Fix the type of a polymorphic stream as 'WSerialT'.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
fromWSerial :: IsStream t => WSerialT m a -> t m a
fromWSerial :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
WSerialT m a -> t m a
fromWSerial = WSerialT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt

instance IsStream WSerialT where
    toStream :: forall (m :: * -> *) a. WSerialT m a -> StreamK m a
toStream = WSerialT m a -> Stream m a
forall (m :: * -> *) a. WSerialT m a -> StreamK m a
getWSerialT
    fromStream :: forall (m :: * -> *) a. StreamK m a -> WSerialT m a
fromStream = Stream m a -> WSerialT m a
forall (m :: * -> *) a. StreamK m a -> WSerialT m a
WSerialT

    {-# INLINE consM #-}
    {-# SPECIALIZE consM :: IO a -> WSerialT IO a -> WSerialT IO a #-}
    consM :: Monad m => m a -> WSerialT m a -> WSerialT m a
    consM :: forall (m :: * -> *) a.
Monad m =>
m a -> WSerialT m a -> WSerialT m a
consM = m a -> WSerialT m a -> WSerialT m a
forall (m :: * -> *) a.
Monad m =>
m a -> WSerialT m a -> WSerialT m a
Serial.consMWSerial

    {-# INLINE (|:) #-}
    {-# SPECIALIZE (|:) :: IO a -> WSerialT IO a -> WSerialT IO a #-}
    (|:) :: Monad m => m a -> WSerialT m a -> WSerialT m a
    |: :: forall (m :: * -> *) a.
Monad m =>
m a -> WSerialT m a -> WSerialT m a
(|:) = m a -> WSerialT m a -> WSerialT m a
forall (m :: * -> *) a.
Monad m =>
m a -> WSerialT m a -> WSerialT m a
Serial.consMWSerial

-------------------------------------------------------------------------------
-- Async
-------------------------------------------------------------------------------

-- | Fix the type of a polymorphic stream as 'AsyncT'.
--
-- /Since: 0.1.0 ("Streamly")/
--
-- @since 0.8.0
fromAsync :: IsStream t => AsyncT m a -> t m a
fromAsync :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
AsyncT m a -> t m a
fromAsync = AsyncT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt

instance IsStream AsyncT where
    toStream :: forall (m :: * -> *) a. AsyncT m a -> StreamK m a
toStream = AsyncT m a -> Stream m a
forall (m :: * -> *) a. AsyncT m a -> StreamK m a
getAsyncT
    fromStream :: forall (m :: * -> *) a. StreamK m a -> AsyncT m a
fromStream = Stream m a -> AsyncT m a
forall (m :: * -> *) a. StreamK m a -> AsyncT m a
AsyncT

    {-# INLINE consM #-}
    {-# SPECIALIZE consM :: IO a -> AsyncT IO a -> AsyncT IO a #-}
    consM :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> AsyncT m a -> AsyncT m a
consM = m a -> AsyncT m a -> AsyncT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> AsyncT m a -> AsyncT m a
Async.consMAsync

    {-# INLINE (|:) #-}
    {-# SPECIALIZE (|:) :: IO a -> AsyncT IO a -> AsyncT IO a #-}
    |: :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> AsyncT m a -> AsyncT m a
(|:) = m a -> AsyncT m a -> AsyncT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> AsyncT m a -> AsyncT m a
Async.consMAsync

-- | Fix the type of a polymorphic stream as 'WAsyncT'.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
fromWAsync :: IsStream t => WAsyncT m a -> t m a
fromWAsync :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
WAsyncT m a -> t m a
fromWAsync = WAsyncT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt

instance IsStream WAsyncT where
    toStream :: forall (m :: * -> *) a. WAsyncT m a -> StreamK m a
toStream = WAsyncT m a -> Stream m a
forall (m :: * -> *) a. WAsyncT m a -> StreamK m a
getWAsyncT
    fromStream :: forall (m :: * -> *) a. StreamK m a -> WAsyncT m a
fromStream = Stream m a -> WAsyncT m a
forall (m :: * -> *) a. StreamK m a -> WAsyncT m a
WAsyncT

    {-# INLINE consM #-}
    {-# SPECIALIZE consM :: IO a -> WAsyncT IO a -> WAsyncT IO a #-}
    consM :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> WAsyncT m a -> WAsyncT m a
consM = m a -> WAsyncT m a -> WAsyncT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> WAsyncT m a -> WAsyncT m a
Async.consMWAsync

    {-# INLINE (|:) #-}
    {-# SPECIALIZE (|:) :: IO a -> WAsyncT IO a -> WAsyncT IO a #-}
    |: :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> WAsyncT m a -> WAsyncT m a
(|:) = m a -> WAsyncT m a -> WAsyncT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> WAsyncT m a -> WAsyncT m a
Async.consMWAsync

-------------------------------------------------------------------------------
-- Ahead
-------------------------------------------------------------------------------

-- | Fix the type of a polymorphic stream as 'AheadT'.
--
-- /Since: 0.3.0 ("Streamly")/
--
-- @since 0.8.0
fromAhead :: IsStream t => AheadT m a -> t m a
fromAhead :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
AheadT m a -> t m a
fromAhead = AheadT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt

instance IsStream AheadT where
    toStream :: forall (m :: * -> *) a. AheadT m a -> StreamK m a
toStream = AheadT m a -> Stream m a
forall (m :: * -> *) a. AheadT m a -> StreamK m a
getAheadT
    fromStream :: forall (m :: * -> *) a. StreamK m a -> AheadT m a
fromStream = Stream m a -> AheadT m a
forall (m :: * -> *) a. StreamK m a -> AheadT m a
AheadT

    {-# INLINE consM #-}
    {-# SPECIALIZE consM :: IO a -> AheadT IO a -> AheadT IO a #-}
    consM :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> AheadT m a -> AheadT m a
consM = m a -> AheadT m a -> AheadT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> AheadT m a -> AheadT m a
Ahead.consM

    {-# INLINE (|:) #-}
    {-# SPECIALIZE (|:) :: IO a -> AheadT IO a -> AheadT IO a #-}
    |: :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> AheadT m a -> AheadT m a
(|:) = m a -> AheadT m a -> AheadT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> AheadT m a -> AheadT m a
Ahead.consM

-------------------------------------------------------------------------------
-- Parallel
-------------------------------------------------------------------------------

-- | Fix the type of a polymorphic stream as 'ParallelT'.
--
-- /Since: 0.1.0 ("Streamly")/
--
-- @since 0.8.0
fromParallel :: IsStream t => ParallelT m a -> t m a
fromParallel :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
ParallelT m a -> t m a
fromParallel = ParallelT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt

instance IsStream ParallelT where
    toStream :: forall (m :: * -> *) a. ParallelT m a -> StreamK m a
toStream = ParallelT m a -> StreamK m a
forall (m :: * -> *) a. ParallelT m a -> StreamK m a
getParallelT
    fromStream :: forall (m :: * -> *) a. StreamK m a -> ParallelT m a
fromStream = StreamK m a -> ParallelT m a
forall (m :: * -> *) a. StreamK m a -> ParallelT m a
ParallelT

    {-# INLINE consM #-}
    {-# SPECIALIZE consM :: IO a -> ParallelT IO a -> ParallelT IO a #-}
    consM :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> ParallelT m a -> ParallelT m a
consM = m a -> ParallelT m a -> ParallelT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> ParallelT m a -> ParallelT m a
Parallel.consM

    {-# INLINE (|:) #-}
    {-# SPECIALIZE (|:) :: IO a -> ParallelT IO a -> ParallelT IO a #-}
    |: :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> ParallelT m a -> ParallelT m a
(|:) = m a -> ParallelT m a -> ParallelT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> ParallelT m a -> ParallelT m a
Parallel.consM

-------------------------------------------------------------------------------
-- Zip
-------------------------------------------------------------------------------

consMZip :: Monad m => m a -> ZipSerialM m a -> ZipSerialM m a
consMZip :: forall (m :: * -> *) a.
Monad m =>
m a -> ZipSerialM m a -> ZipSerialM m a
consMZip m a
m (Zip.ZipSerialM Stream m a
r) =
    Stream m a -> ZipSerialM m a
forall (m :: * -> *) a. Stream m a -> ZipSerialM m a
Zip.ZipSerialM (Stream m a -> ZipSerialM m a) -> Stream m a -> ZipSerialM m a
forall a b. (a -> b) -> a -> b
$ m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
m a -> StreamK m a -> StreamK m a
StreamK.consM m a
m Stream m a
r

-- | Fix the type of a polymorphic stream as 'ZipSerialM'.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
fromZipSerial :: IsStream t => ZipSerialM m a -> t m a
fromZipSerial :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
ZipSerialM m a -> t m a
fromZipSerial = ZipSerialM m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt
instance IsStream ZipSerialM where
    toStream :: forall (m :: * -> *) a. ZipSerialM m a -> StreamK m a
toStream = ZipSerialM m a -> Stream m a
forall (m :: * -> *) a. ZipSerialM m a -> StreamK m a
Zip.getZipSerialM
    fromStream :: forall (m :: * -> *) a. Stream m a -> ZipSerialM m a
fromStream = Stream m a -> ZipSerialM m a
forall (m :: * -> *) a. Stream m a -> ZipSerialM m a
Zip.ZipSerialM

    {-# INLINE consM #-}
    {-# SPECIALIZE consM :: IO a -> ZipSerialM IO a -> ZipSerialM IO a #-}
    consM :: Monad m => m a -> ZipSerialM m a -> ZipSerialM m a
    consM :: forall (m :: * -> *) a.
Monad m =>
m a -> ZipSerialM m a -> ZipSerialM m a
consM = m a -> ZipSerialM m a -> ZipSerialM m a
forall (m :: * -> *) a.
Monad m =>
m a -> ZipSerialM m a -> ZipSerialM m a
consMZip

    {-# INLINE (|:) #-}
    {-# SPECIALIZE (|:) :: IO a -> ZipSerialM IO a -> ZipSerialM IO a #-}
    (|:) :: Monad m => m a -> ZipSerialM m a -> ZipSerialM m a
    |: :: forall (m :: * -> *) a.
Monad m =>
m a -> ZipSerialM m a -> ZipSerialM m a
(|:) = m a -> ZipSerialM m a -> ZipSerialM m a
forall (m :: * -> *) a.
Monad m =>
m a -> ZipSerialM m a -> ZipSerialM m a
consMZip

-- | Fix the type of a polymorphic stream as 'ZipAsyncM'.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
fromZipAsync :: IsStream t => ZipAsyncM m a -> t m a
fromZipAsync :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
ZipAsyncM m a -> t m a
fromZipAsync = ZipAsyncM m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt

instance IsStream ZipAsyncM where
    toStream :: forall (m :: * -> *) a. ZipAsyncM m a -> StreamK m a
toStream = ZipAsyncM m a -> Stream m a
forall (m :: * -> *) a. ZipAsyncM m a -> StreamK m a
getZipAsyncM
    fromStream :: forall (m :: * -> *) a. StreamK m a -> ZipAsyncM m a
fromStream = Stream m a -> ZipAsyncM m a
forall (m :: * -> *) a. StreamK m a -> ZipAsyncM m a
ZipAsyncM

    {-# INLINE consM #-}
    {-# SPECIALIZE consM :: IO a -> ZipAsyncM IO a -> ZipAsyncM IO a #-}
    consM :: Monad m => m a -> ZipAsyncM m a -> ZipAsyncM m a
    consM :: forall (m :: * -> *) a.
Monad m =>
m a -> ZipAsyncM m a -> ZipAsyncM m a
consM = m a -> ZipAsyncM m a -> ZipAsyncM m a
forall (m :: * -> *) a.
Monad m =>
m a -> ZipAsyncM m a -> ZipAsyncM m a
ZipAsync.consMZipAsync

    {-# INLINE (|:) #-}
    {-# SPECIALIZE (|:) :: IO a -> ZipAsyncM IO a -> ZipAsyncM IO a #-}
    (|:) :: Monad m => m a -> ZipAsyncM m a -> ZipAsyncM m a
    |: :: forall (m :: * -> *) a.
Monad m =>
m a -> ZipAsyncM m a -> ZipAsyncM m a
(|:) = m a -> ZipAsyncM m a -> ZipAsyncM m a
forall (m :: * -> *) a.
Monad m =>
m a -> ZipAsyncM m a -> ZipAsyncM m a
ZipAsync.consMZipAsync

-------------------------------------------------------------------------------
-- Construction
-------------------------------------------------------------------------------

infixr 5 `cons`

-- | Construct a stream by adding a pure value at the head of an existing
-- stream. For serial streams this is the same as @(return a) \`consM` r@ but
-- more efficient. For concurrent streams this is not concurrent whereas
-- 'consM' is concurrent. For example:
--
-- @
-- > toList $ 1 \`cons` 2 \`cons` 3 \`cons` nil
-- [1,2,3]
-- @
--
-- @since 0.1.0
{-# INLINE_NORMAL cons #-}
cons :: IsStream t => a -> t m a -> t m a
cons :: forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
cons a
a t m a
r = StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream (StreamK m a -> t m a) -> StreamK m a -> t m a
forall a b. (a -> b) -> a -> b
$ a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
K.cons a
a (t m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream t m a
r)

infixr 5 .:

-- | Operator equivalent of 'cons'.
--
-- @
-- > toList $ 1 .: 2 .: 3 .: nil
-- [1,2,3]
-- @
--
-- @since 0.1.1
{-# INLINE (.:) #-}
(.:) :: IsStream t => a -> t m a -> t m a
.: :: forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
(.:) = a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
cons

{-# INLINE_NORMAL nil #-}
nil :: IsStream t => t m a
nil :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
nil = StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream StreamK m a
forall (m :: * -> *) a. StreamK m a
K.nil

{-# INLINE_NORMAL nilM #-}
nilM :: (IsStream t, Monad m) => m b -> t m a
nilM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a
nilM = StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream (StreamK m a -> t m a) -> (m b -> StreamK m a) -> m b -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m b -> StreamK m a
forall (m :: * -> *) b a. Applicative m => m b -> StreamK m a
K.nilM

{-# INLINE_NORMAL fromPure #-}
fromPure :: IsStream t => a -> t m a
fromPure :: forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
fromPure = StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream (StreamK m a -> t m a) -> (a -> StreamK m a) -> a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a
K.fromPure

{-# INLINE_NORMAL fromEffect #-}
fromEffect :: (Monad m, IsStream t) => m a -> t m a
fromEffect :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect = StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream (StreamK m a -> t m a) -> (m a -> StreamK m a) -> m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> StreamK m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect

{-# INLINE repeat #-}
repeat :: IsStream t => a -> t m a
repeat :: forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
repeat = StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream (StreamK m a -> t m a) -> (a -> StreamK m a) -> a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a
K.repeat

-------------------------------------------------------------------------------
-- Bind/Concat
-------------------------------------------------------------------------------

{-# INLINE bindWith #-}
bindWith
    :: IsStream t
    => (t m b -> t m b -> t m b)
    -> t m a
    -> (a -> t m b)
    -> t m b
bindWith :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b
bindWith t m b -> t m b -> t m b
par t m a
m1 a -> t m b
f =
    StreamK m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream
        (StreamK m b -> t m b) -> StreamK m b -> t m b
forall a b. (a -> b) -> a -> b
$ (StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
K.bindWith
            (\StreamK m b
s1 StreamK m b
s2 -> t m b -> StreamK m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream (t m b -> StreamK m b) -> t m b -> StreamK m b
forall a b. (a -> b) -> a -> b
$ t m b -> t m b -> t m b
par (StreamK m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream StreamK m b
s1) (StreamK m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream StreamK m b
s2))
            (t m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream t m a
m1)
            (t m b -> StreamK m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream (t m b -> StreamK m b) -> (a -> t m b) -> a -> StreamK m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> t m b
f)

-- | @concatMapWith mixer generator stream@ is a two dimensional looping
-- combinator.  The @generator@ function is used to generate streams from the
-- elements in the input @stream@ and the @mixer@ function is used to merge
-- those streams.
--
-- Note we can merge streams concurrently by using a concurrent merge function.
--
-- /Since: 0.7.0/
--
-- /Since: 0.8.0 (signature change)/
{-# INLINE concatMapWith #-}
concatMapWith
    :: IsStream t
    => (t m b -> t m b -> t m b)
    -> (a -> t m b)
    -> t m a
    -> t m b
concatMapWith :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
concatMapWith t m b -> t m b -> t m b
par a -> t m b
f t m a
xs = (t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b
bindWith t m b -> t m b -> t m b
par t m a
xs a -> t m b
f

-- | A variant of 'foldMap' that allows you to map a monadic streaming action
-- on a 'Foldable' container and then fold it using the specified stream merge
-- operation.
--
-- @concatMapFoldableWith 'async' return [1..3]@
--
-- Equivalent to:
--
-- @
-- concatMapFoldableWith f g = Prelude.foldr (f . g) S.nil
-- concatMapFoldableWith f g xs = S.concatMapWith f g (S.fromFoldable xs)
-- @
--
-- /Since: 0.8.0 (Renamed foldMapWith to concatMapFoldableWith)/
--
-- /Since: 0.1.0 ("Streamly")/
{-# INLINE concatMapFoldableWith #-}
concatMapFoldableWith :: (IsStream t, Foldable f)
    => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
concatMapFoldableWith :: forall (t :: (* -> *) -> * -> *) (f :: * -> *) (m :: * -> *) b a.
(IsStream t, Foldable f) =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
concatMapFoldableWith t m b -> t m b -> t m b
f a -> t m b
g = (a -> t m b -> t m b) -> t m b -> f a -> t m b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
Prelude.foldr (t m b -> t m b -> t m b
f (t m b -> t m b -> t m b) -> (a -> t m b) -> a -> t m b -> t m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> t m b
g) t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
nil

-- | Like 'concatMapFoldableWith' but with the last two arguments reversed i.e. the
-- monadic streaming function is the last argument.
--
-- Equivalent to:
--
-- @
-- concatForFoldableWith f xs g = Prelude.foldr (f . g) D.nil xs
-- concatForFoldableWith f = flip (D.concatMapFoldableWith f)
-- @
--
-- /Since: 0.8.0 (Renamed forEachWith to concatForFoldableWith)/
--
-- /Since: 0.1.0 ("Streamly")/
{-# INLINE concatForFoldableWith #-}
concatForFoldableWith :: (IsStream t, Foldable f)
    => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b
concatForFoldableWith :: forall (t :: (* -> *) -> * -> *) (f :: * -> *) (m :: * -> *) b a.
(IsStream t, Foldable f) =>
(t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b
concatForFoldableWith t m b -> t m b -> t m b
f = ((a -> t m b) -> f a -> t m b) -> f a -> (a -> t m b) -> t m b
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
forall (t :: (* -> *) -> * -> *) (f :: * -> *) (m :: * -> *) b a.
(IsStream t, Foldable f) =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
concatMapFoldableWith t m b -> t m b -> t m b
f)

-- | A variant of 'Data.Foldable.fold' that allows you to fold a 'Foldable'
-- container of streams using the specified stream sum operation.
--
-- @concatFoldableWith 'async' $ map return [1..3]@
--
-- Equivalent to:
--
-- @
-- concatFoldableWith f = Prelude.foldr f D.nil
-- concatFoldableWith f = D.concatMapFoldableWith f id
-- @
--
-- /Since: 0.8.0 (Renamed foldWith to concatFoldableWith)/
--
-- /Since: 0.1.0 ("Streamly")/
{-# INLINE concatFoldableWith #-}
concatFoldableWith :: (IsStream t, Foldable f)
    => (t m a -> t m a -> t m a) -> f (t m a) -> t m a
concatFoldableWith :: forall (t :: (* -> *) -> * -> *) (f :: * -> *) (m :: * -> *) a.
(IsStream t, Foldable f) =>
(t m a -> t m a -> t m a) -> f (t m a) -> t m a
concatFoldableWith t m a -> t m a -> t m a
f = (t m a -> t m a -> t m a) -> (t m a -> t m a) -> f (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (f :: * -> *) (m :: * -> *) b a.
(IsStream t, Foldable f) =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
concatMapFoldableWith t m a -> t m a -> t m a
f t m a -> t m a
forall a. a -> a
id