-- |
-- Module      : Streamly.Internal.Data.Fold.Type
-- Copyright   : (c) 2019 Composewell Technologies
--               (c) 2013 Gabriel Gonzalez
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- = Stream Consumers
--
-- We can classify stream consumers in the following categories in order of
-- increasing complexity and power:
--
-- == Accumulators
--
-- These are the simplest folds that never fail and never terminate, they
-- accumulate the input values forever and can always accept new inputs (never
-- terminate) and always have a valid result value.  A
-- 'Streamly.Internal.Data.Fold.sum' operation is an example of an accumulator.
-- Traditional Haskell left folds like 'foldl' are accumulators.
--
-- We can distribute an input stream to two or more accumulators using a @tee@
-- style composition.  Accumulators cannot be applied on a stream one after the
-- other, which we call a @serial@ append style composition of folds. This is
-- because accumulators never terminate, since the first accumulator in a
-- series will never terminate, the next one will never get to run.
--
-- == Terminating Folds
--
-- Terminating folds are accumulators that can terminate. Once a fold
-- terminates it no longer accepts any more inputs.  Terminating folds can be
-- used in a @serial@ append style composition where one fold can be applied
-- after the other on an input stream. We can apply a terminating fold
-- repeatedly on an input stream, splitting the stream and consuming it in
-- fragments.  Terminating folds never fail, therefore, they do not need
-- backtracking.
--
-- The 'Streamly.Internal.Data.Fold.take' operation is an example of a
-- terminating fold  It terminates after consuming @n@ items. Coupled with an
-- accumulator (e.g. sum) it can be used to split and process the stream into
-- chunks of fixed size.
--
-- == Terminating Folds with Leftovers
--
-- The next upgrade after terminating folds is terminating folds with leftover
-- inputs.  Consider the example of @takeWhile@ operation, it needs to inspect
-- an element for termination decision. However, it does not consume the
-- element on which it terminates. To implement @takeWhile@ a terminating fold
-- will have to implement a way to return unconsumed input to the fold driver.
--
-- Single element leftover case is the most common and its easy to implement it
-- in terminating folds using a @Done1@ constructor in the 'Step' type which
-- indicates that the last element was not consumed by the fold. The following
-- additional operations can be implemented as terminating folds if we do that.
--
-- @
-- takeWhile
-- groupBy
-- wordBy
-- @
--
-- However, it creates several complications.  The 'many' combinator  requires
-- a @Partial1@ ('Partial' with leftover) to handle a @Done1@ from the top
-- level fold, for efficient implementation.  If the collecting fold in "many"
-- returns a @Partial1@ or @Done1@ then what to do with all the elements that
-- have been consumed?
--
-- Similarly, in distribute, if one fold consumes a value and others say its a
-- leftover then what do we do?  Folds like "many" require the leftover to be
-- fed to it again. So in a distribute operation those folds which gave a
-- leftover will have to be fed the leftover while the folds that consumed will
-- have to be fed the next input.  This is very complicated to implement. We
-- have the same issue in backtracking parsers being used in a distribute
-- operation.
--
-- To avoid these issues we want to enforce by typing that the collecting folds
-- can never return a leftover. So we need a fold type without @Done1@ or
-- @Partial1@. This leads us to design folds to never return a leftover and the
-- use cases of single leftover are transferred to parsers where we have
-- general backtracking mechanism and single leftover is just a special case of
-- backtracking.
--
-- This means: takeWhile, groupBy, wordBy would be implemented as parsers.
-- "take 0" can implemented as a fold if we make initial return @Step@ type.
-- "takeInterval" can be implemented without @Done1@.
--
-- == Parsers
--
-- The next upgrade after terminating folds with a leftover are parsers.
-- Parsers are terminating folds that can fail and backtrack. Parsers can be
-- composed using an @alternative@ style composition where they can backtrack
-- and apply another parser if one parser fails.
-- 'Streamly.Internal.Data.Parser.satisfy' is a simple example of a parser, it
-- would succeed if the condition is satisfied and it would fail otherwise, on
-- failure an alternative parser can be used on the same input.
--
-- = Types for Stream Consumers
--
-- In streamly, there is no separate type for accumulators. Terminating folds
-- are a superset of accumulators and to avoid too many types we represent both
-- using the same type, 'Fold'.
--
-- We do not club the leftovers functionality with terminating folds because of
-- the reasons explained earlier. Instead combinators that require leftovers
-- are implemented as the 'Streamly.Internal.Data.Parser.Parser' type.  This is
-- a sweet spot to balance ease of use, type safety and performance.  Using
-- separate Accumulator and terminating fold types would encode more
-- information in types but it would make ease of use, implementation,
-- maintenance effort worse. Combining Accumulator, terminating folds and
-- Parser into a single 'Streamly.Internal.Data.Parser.Parser' type would make
-- ease of use even better but type safety and performance worse.
--
-- One of the design requirements that we have placed for better ease of use
-- and code reuse is that 'Streamly.Internal.Data.Parser.Parser' type should be
-- a strict superset of the 'Fold' type i.e. it can do everything that a 'Fold'
-- can do and more. Therefore, folds can be easily upgraded to parsers and we
-- can use parser combinators on folds as well when needed.
--
-- = Fold Design
--
-- A fold is represented by a collection of "initial", "step" and "extract"
-- functions. The "initial" action generates the initial state of the fold. The
-- state is internal to the fold and maintains the accumulated output. The
-- "step" function is invoked using the current state and the next input value
-- and results in a @Partial@ or @Done@. A @Partial@ returns the next intermediate
-- state of the fold, a @Done@ indicates that the fold has terminated and
-- returns the final value of the accumulator.
--
-- Every @Partial@ indicates that a new accumulated output is available.  The
-- accumulated output can be extracted from the state at any point using
-- "extract". "extract" can never fail. A fold returns a valid output even
-- without any input i.e. even if you call "extract" on "initial" state it
-- provides an output. This is not true for parsers.
--
-- In general, "extract" is used in two cases:
--
-- * When the fold is used as a scan @extract@ is called on the intermediate
-- state every time it is yielded by the fold, the resulting value is yielded
-- as a stream.
-- * When the fold is used as a regular fold, @extract@ is called once when
-- we are done feeding input to the fold.
--
-- = Alternate Designs
--
-- An alternate and simpler design would be to return the intermediate output
-- via @Partial@ along with the state, instead of using "extract" on the yielded
-- state and remove the extract function altogether.
--
-- This may even facilitate more efficient implementation.  Extract from the
-- intermediate state after each yield may be more costly compared to the fold
-- step itself yielding the output. The fold may have more efficient ways to
-- retrieve the output rather than stuffing it in the state and using extract
-- on the state.
--
-- However, removing extract altogether may lead to less optimal code in some
-- cases because the driver of the fold needs to thread around the intermediate
-- output to return it if the stream stops before the fold could @Done@.  When
-- using this approach, the @parseMany (FL.take filesize)@ benchmark shows a
-- 2x worse performance even after ensuring everything fuses.  So we keep the
-- "extract" approach to ensure better perf in all cases.
--
-- But we could still yield both state and the output in @Partial@, the output
-- can be used for the scan use case, instead of using extract. Extract would
-- then be used only for the case when the stream stops before the fold
-- completes.
--
-- = Accumulators and Terminating Folds
--
-- Folds in this module can be classified in two categories viz. accumulators
-- and terminating folds. Accumulators do not have a terminating condition,
-- they run forever and consume the entire stream, for example the 'length'
-- fold. Terminating folds have a terminating condition and can terminate
-- without consuming the entire stream, for example, the 'head' fold.
--
-- = Monoids
--
-- Monoids allow generalized, modular folding.  The accumulators in this module
-- can be expressed using 'mconcat' and a suitable 'Monoid'.  Instead of
-- writing folds we can write Monoids and turn them into folds.
--
-- = Performance Notes
--
-- 'Streamly.Prelude' module provides fold functions to directly fold streams
-- e.g.  Streamly.Prelude/'Streamly.Prelude.sum' serves the same purpose as
-- Fold/'sum'.  However, the functions in Streamly.Prelude cannot be
-- efficiently combined together e.g. we cannot drive the input stream through
-- @sum@ and @length@ fold functions simultaneously.  Using the 'Fold' type we
-- can efficiently split the stream across multiple folds because it allows the
-- compiler to perform stream fusion optimizations.
--
module Streamly.Internal.Data.Fold.Type
    (
    -- * Types
      Step (..)
    , Fold (..)

    -- * Constructors
    , foldl'
    , foldlM'
    , foldl1'
    , foldr
    , foldrM
    , mkFold
    , mkFold_
    , mkFoldM
    , mkFoldM_

    -- * Folds
    , fromPure
    , fromEffect
    , drain
    , toList

    -- * Combinators

    -- ** Mapping output
    , rmapM

    -- ** Mapping Input
    , map
    , lmap
    , lmapM

    -- ** Filtering
    , filter
    , filterM
    , catMaybes

    -- ** Trimming
    , take
    , takeInterval

    -- ** Serial Append
    , serialWith
    , serial_

    -- ** Parallel Distribution
    , GenericRunner(..)
    , teeWith
    , teeWithFst
    , teeWithMin

    -- ** Parallel Alternative
    , shortest
    , longest

    -- ** Splitting
    , ManyState
    , many
    , manyPost
    , chunksOf
    , intervalsOf

    -- ** Nesting
    , concatMap

    -- * Running Partially
    , duplicate
    , initialize
    , runStep

    -- * Fold2
    , Fold2 (..)
    , simplify
    , chunksOf2
    )
where

import Control.Monad (void, (>=>))
import Control.Concurrent (threadDelay, forkIO, killThread)
import Control.Concurrent.MVar (MVar, newMVar, swapMVar, readMVar)
import Control.Exception (SomeException(..), catch, mask)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Control (control)
import Data.Bifunctor (Bifunctor(..))
import Data.Maybe (isJust, fromJust)
import Fusion.Plugin.Types (Fuse(..))
import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe)
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3'(..))
import Streamly.Internal.Data.SVar (MonadAsync)

import Prelude hiding (concatMap, filter, foldr, map, take)

-- $setup
-- >>> :m
-- >>> :set -XFlexibleContexts
-- >>> import Prelude hiding (concatMap, filter, map)
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Fold as Fold

------------------------------------------------------------------------------
-- Step of a fold
------------------------------------------------------------------------------

-- The Step functor around b allows expressing early termination like a right
-- fold. Traditional list right folds use function composition and laziness to
-- terminate early whereas we use data constructors. It allows stream fusion in
-- contrast to the foldr/build fusion when composing with functions.

-- | Represents the result of the @step@ of a 'Fold'.  'Partial' returns an
-- intermediate state of the fold, the fold step can be called again with the
-- state or the driver can use @extract@ on the state to get the result out.
-- 'Done' returns the final result and the fold cannot be driven further.
--
-- /Pre-release/
--
{-# ANN type Step Fuse #-}
data Step s b
    = Partial !s
    | Done !b

-- | 'first' maps over 'Partial' and 'second' maps over 'Done'.
--
instance Bifunctor Step where
    {-# INLINE bimap #-}
    bimap :: (a -> b) -> (c -> d) -> Step a c -> Step b d
bimap a -> b
f c -> d
_ (Partial a
a) = b -> Step b d
forall s b. s -> Step s b
Partial (a -> b
f a
a)
    bimap a -> b
_ c -> d
g (Done c
b) = d -> Step b d
forall s b. b -> Step s b
Done (c -> d
g c
b)

    {-# INLINE first #-}
    first :: (a -> b) -> Step a c -> Step b c
first a -> b
f (Partial a
a) = b -> Step b c
forall s b. s -> Step s b
Partial (a -> b
f a
a)
    first a -> b
_ (Done c
x) = c -> Step b c
forall s b. b -> Step s b
Done c
x

    {-# INLINE second #-}
    second :: (b -> c) -> Step a b -> Step a c
second b -> c
_ (Partial a
x) = a -> Step a c
forall s b. s -> Step s b
Partial a
x
    second b -> c
f (Done b
a) = c -> Step a c
forall s b. b -> Step s b
Done (b -> c
f b
a)

-- | 'fmap' maps over 'Done'.
--
-- @
-- fmap = 'second'
-- @
--
instance Functor (Step s) where
    {-# INLINE fmap #-}
    fmap :: (a -> b) -> Step s a -> Step s b
fmap = (a -> b) -> Step s a -> Step s b
forall (p :: * -> * -> *) b c a.
Bifunctor p =>
(b -> c) -> p a b -> p a c
second

-- | Map a monadic function over the result @b@ in @Step s b@.
--
-- /Internal/
{-# INLINE mapMStep #-}
mapMStep :: Applicative m => (a -> m b) -> Step s a -> m (Step s b)
mapMStep :: (a -> m b) -> Step s a -> m (Step s b)
mapMStep a -> m b
f Step s a
res =
    case Step s a
res of
        Partial s
s -> Step s b -> m (Step s b)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step s b -> m (Step s b)) -> Step s b -> m (Step s b)
forall a b. (a -> b) -> a -> b
$ s -> Step s b
forall s b. s -> Step s b
Partial s
s
        Done a
b -> b -> Step s b
forall s b. b -> Step s b
Done (b -> Step s b) -> m b -> m (Step s b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> a -> m b
f a
b

------------------------------------------------------------------------------
-- The Fold type
------------------------------------------------------------------------------

-- | The type @Fold m a b@ having constructor @Fold step initial extract@
-- represents a fold over an input stream of values of type @a@ to a final
-- value of type @b@ in 'Monad' @m@.
--
-- The fold uses an intermediate state @s@ as accumulator, the type @s@ is
-- internal to the specific fold definition. The initial value of the fold
-- state @s@ is returned by @initial@. The @step@ function consumes an input
-- and either returns the final result @b@ if the fold is done or the next
-- intermediate state (see 'Step'). At any point the fold driver can extract
-- the result from the intermediate state using the @extract@ function.
--
-- NOTE: The constructor is not yet exposed via exposed modules, smart
-- constructors are provided to create folds.  If you think you need the
-- constructor of this type please consider using the smart constructors in
-- "Streamly.Internal.Data.Fold" instead.
--
-- /since 0.8.0 (type changed)/
--
-- @since 0.7.0

data Fold m a b =
  -- | @Fold @ @ step @ @ initial @ @ extract@
  forall s. Fold (s -> a -> m (Step s b)) (m (Step s b)) (s -> m b)

------------------------------------------------------------------------------
-- Mapping on the output
------------------------------------------------------------------------------

-- | Map a monadic function on the output of a fold.
--
-- @since 0.8.0
{-# INLINE rmapM #-}
rmapM :: Monad m => (b -> m c) -> Fold m a b -> Fold m a c
rmapM :: (b -> m c) -> Fold m a b -> Fold m a c
rmapM b -> m c
f (Fold s -> a -> m (Step s b)
step m (Step s b)
initial s -> m b
extract) = (s -> a -> m (Step s c))
-> m (Step s c) -> (s -> m c) -> Fold m a c
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold s -> a -> m (Step s c)
step1 m (Step s c)
initial1 (s -> m b
extract (s -> m b) -> (b -> m c) -> s -> m c
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> b -> m c
f)

    where

    initial1 :: m (Step s c)
initial1 = m (Step s b)
initial m (Step s b) -> (Step s b -> m (Step s c)) -> m (Step s c)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (b -> m c) -> Step s b -> m (Step s c)
forall (m :: * -> *) a b s.
Applicative m =>
(a -> m b) -> Step s a -> m (Step s b)
mapMStep b -> m c
f
    step1 :: s -> a -> m (Step s c)
step1 s
s a
a = s -> a -> m (Step s b)
step s
s a
a m (Step s b) -> (Step s b -> m (Step s c)) -> m (Step s c)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (b -> m c) -> Step s b -> m (Step s c)
forall (m :: * -> *) a b s.
Applicative m =>
(a -> m b) -> Step s a -> m (Step s b)
mapMStep b -> m c
f

------------------------------------------------------------------------------
-- Left fold constructors
------------------------------------------------------------------------------

-- | Make a fold from a left fold style pure step function and initial value of
-- the accumulator.
--
-- If your 'Fold' returns only 'Partial' (i.e. never returns a 'Done') then you
-- can use @foldl'*@ constructors.
--
-- A fold with an extract function can be expressed using fmap:
--
-- @
-- mkfoldlx :: Monad m => (s -> a -> s) -> s -> (s -> b) -> Fold m a b
-- mkfoldlx step initial extract = fmap extract (foldl' step initial)
-- @
--
-- See also: @Streamly.Prelude.foldl'@
--
-- @since 0.8.0
--
{-# INLINE foldl' #-}
foldl' :: Monad m => (b -> a -> b) -> b -> Fold m a b
foldl' :: (b -> a -> b) -> b -> Fold m a b
foldl' b -> a -> b
step b
initial =
    (b -> a -> m (Step b b))
-> m (Step b b) -> (b -> m b) -> Fold m a b
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold
        (\b
s a
a -> Step b b -> m (Step b b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step b b -> m (Step b b)) -> Step b b -> m (Step b b)
forall a b. (a -> b) -> a -> b
$ b -> Step b b
forall s b. s -> Step s b
Partial (b -> Step b b) -> b -> Step b b
forall a b. (a -> b) -> a -> b
$ b -> a -> b
step b
s a
a)
        (Step b b -> m (Step b b)
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Step b b
forall s b. s -> Step s b
Partial b
initial))
        b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return

-- | Make a fold from a left fold style monadic step function and initial value
-- of the accumulator.
--
-- A fold with an extract function can be expressed using rmapM:
--
-- @
-- mkFoldlxM :: Functor m => (s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
-- mkFoldlxM step initial extract = rmapM extract (foldlM' step initial)
-- @
--
-- See also: @Streamly.Prelude.foldlM'@
--
-- @since 0.8.0
--
{-# INLINE foldlM' #-}
foldlM' :: Monad m => (b -> a -> m b) -> m b -> Fold m a b
foldlM' :: (b -> a -> m b) -> m b -> Fold m a b
foldlM' b -> a -> m b
step m b
initial =
    (b -> a -> m (Step b b))
-> m (Step b b) -> (b -> m b) -> Fold m a b
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold (\b
s a
a -> b -> Step b b
forall s b. s -> Step s b
Partial (b -> Step b b) -> m b -> m (Step b b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> b -> a -> m b
step b
s a
a) (b -> Step b b
forall s b. s -> Step s b
Partial (b -> Step b b) -> m b -> m (Step b b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m b
initial) b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return

-- | Make a strict left fold, for non-empty streams, using first element as the
-- starting value. Returns Nothing if the stream is empty.
--
-- See also: @Streamly.Prelude.foldl1'@
--
-- /Pre-release/
{-# INLINE foldl1' #-}
foldl1' :: Monad m => (a -> a -> a) -> Fold m a (Maybe a)
foldl1' :: (a -> a -> a) -> Fold m a (Maybe a)
foldl1' a -> a -> a
step = (Maybe' a -> Maybe a) -> Fold m a (Maybe' a) -> Fold m a (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe' a -> Maybe a
forall a. Maybe' a -> Maybe a
toMaybe (Fold m a (Maybe' a) -> Fold m a (Maybe a))
-> Fold m a (Maybe' a) -> Fold m a (Maybe a)
forall a b. (a -> b) -> a -> b
$ (Maybe' a -> a -> Maybe' a) -> Maybe' a -> Fold m a (Maybe' a)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
foldl' Maybe' a -> a -> Maybe' a
step1 Maybe' a
forall a. Maybe' a
Nothing'

    where

    step1 :: Maybe' a -> a -> Maybe' a
step1 Maybe' a
Nothing' a
a = a -> Maybe' a
forall a. a -> Maybe' a
Just' a
a
    step1 (Just' a
x) a
a = a -> Maybe' a
forall a. a -> Maybe' a
Just' (a -> Maybe' a) -> a -> Maybe' a
forall a b. (a -> b) -> a -> b
$ a -> a -> a
step a
x a
a

------------------------------------------------------------------------------
-- Right fold constructors
------------------------------------------------------------------------------

-- | Make a fold using a right fold style step function and a terminal value.
-- It performs a strict right fold via a left fold using function composition.
-- Note that this is strict fold, it can only be useful for constructing strict
-- structures in memory. For reductions this will be very inefficient.
--
-- For example,
--
-- > toList = foldr (:) []
--
-- See also: 'Streamly.Prelude.foldr'
--
-- @since 0.8.0
{-# INLINE foldr #-}
foldr :: Monad m => (a -> b -> b) -> b -> Fold m a b
foldr :: (a -> b -> b) -> b -> Fold m a b
foldr a -> b -> b
g b
z = ((b -> b) -> b) -> Fold m a (b -> b) -> Fold m a b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((b -> b) -> b -> b
forall a b. (a -> b) -> a -> b
$ b
z) (Fold m a (b -> b) -> Fold m a b)
-> Fold m a (b -> b) -> Fold m a b
forall a b. (a -> b) -> a -> b
$ ((b -> b) -> a -> b -> b) -> (b -> b) -> Fold m a (b -> b)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
foldl' (\b -> b
f a
x -> b -> b
f (b -> b) -> (b -> b) -> b -> b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b -> b
g a
x) b -> b
forall a. a -> a
id

-- XXX we have not seen any use of this yet, not releasing until we have a use
-- case.
--
-- | Like 'foldr' but with a monadic step function.
--
-- For example,
--
-- > toList = foldrM (\a xs -> return $ a : xs) (return [])
--
-- See also: 'Streamly.Prelude.foldrM'
--
-- /Pre-release/
{-# INLINE foldrM #-}
foldrM :: Monad m => (a -> b -> m b) -> m b -> Fold m a b
foldrM :: (a -> b -> m b) -> m b -> Fold m a b
foldrM a -> b -> m b
g m b
z =
    ((b -> m b) -> m b) -> Fold m a (b -> m b) -> Fold m a b
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> Fold m a b -> Fold m a c
rmapM (m b
z m b -> (b -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=) (Fold m a (b -> m b) -> Fold m a b)
-> Fold m a (b -> m b) -> Fold m a b
forall a b. (a -> b) -> a -> b
$ ((b -> m b) -> a -> m (b -> m b))
-> m (b -> m b) -> Fold m a (b -> m b)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Fold m a b
foldlM' (\b -> m b
f a
x -> (b -> m b) -> m (b -> m b)
forall (m :: * -> *) a. Monad m => a -> m a
return ((b -> m b) -> m (b -> m b)) -> (b -> m b) -> m (b -> m b)
forall a b. (a -> b) -> a -> b
$ a -> b -> m b
g a
x (b -> m b) -> (b -> m b) -> b -> m b
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> b -> m b
f) ((b -> m b) -> m (b -> m b)
forall (m :: * -> *) a. Monad m => a -> m a
return b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return)

------------------------------------------------------------------------------
-- General fold constructors
------------------------------------------------------------------------------

-- XXX If the Step yield gives the result each time along with the state then
-- we can make the type of this as
--
-- mkFold :: Monad m => (s -> a -> Step s b) -> Step s b -> Fold m a b
--
-- Then similar to foldl' and foldr we can just fmap extract on it to extend
-- it to the version where an 'extract' function is required. Or do we even
-- need that?
--
-- Until we investigate this we are not releasing these.

-- | Make a terminating fold using a pure step function, a pure initial state
-- and a pure state extraction function.
--
-- /Pre-release/
--
{-# INLINE mkFold #-}
mkFold :: Monad m => (s -> a -> Step s b) -> Step s b -> (s -> b) -> Fold m a b
mkFold :: (s -> a -> Step s b) -> Step s b -> (s -> b) -> Fold m a b
mkFold s -> a -> Step s b
step Step s b
initial s -> b
extract =
    (s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold (\s
s a
a -> Step s b -> m (Step s b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s b -> m (Step s b)) -> Step s b -> m (Step s b)
forall a b. (a -> b) -> a -> b
$ s -> a -> Step s b
step s
s a
a) (Step s b -> m (Step s b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step s b
initial) (b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> m b) -> (s -> b) -> s -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. s -> b
extract)

-- | Similar to 'mkFold' but the final state extracted is identical to the
-- intermediate state.
--
-- @
-- mkFold_ step initial = mkFold step initial id
-- @
--
-- /Pre-release/
--
{-# INLINE mkFold_ #-}
mkFold_ :: Monad m => (b -> a -> Step b b) -> Step b b -> Fold m a b
mkFold_ :: (b -> a -> Step b b) -> Step b b -> Fold m a b
mkFold_ b -> a -> Step b b
step Step b b
initial = (b -> a -> Step b b) -> Step b b -> (b -> b) -> Fold m a b
forall (m :: * -> *) s a b.
Monad m =>
(s -> a -> Step s b) -> Step s b -> (s -> b) -> Fold m a b
mkFold b -> a -> Step b b
step Step b b
initial b -> b
forall a. a -> a
id

-- | Make a terminating fold with an effectful step function and initial state,
-- and a state extraction function.
--
-- > mkFoldM = Fold
--
--  We can just use 'Fold' but it is provided for completeness.
--
-- /Pre-release/
--
{-# INLINE mkFoldM #-}
mkFoldM :: (s -> a -> m (Step s b)) -> m (Step s b) -> (s -> m b) -> Fold m a b
mkFoldM :: (s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
mkFoldM = (s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold

-- | Similar to 'mkFoldM' but the final state extracted is identical to the
-- intermediate state.
--
-- @
-- mkFoldM_ step initial = mkFoldM step initial return
-- @
--
-- /Pre-release/
--
{-# INLINE mkFoldM_ #-}
mkFoldM_ :: Monad m => (b -> a -> m (Step b b)) -> m (Step b b) -> Fold m a b
mkFoldM_ :: (b -> a -> m (Step b b)) -> m (Step b b) -> Fold m a b
mkFoldM_ b -> a -> m (Step b b)
step m (Step b b)
initial = (b -> a -> m (Step b b))
-> m (Step b b) -> (b -> m b) -> Fold m a b
forall s a (m :: * -> *) b.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
mkFoldM b -> a -> m (Step b b)
step m (Step b b)
initial b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return

------------------------------------------------------------------------------
-- Fold2
------------------------------------------------------------------------------

-- | Experimental type to provide a side input to the fold for generating the
-- initial state. For example, if we have to fold chunks of a stream and write
-- each chunk to a different file, then we can generate the file name using a
-- monadic action. This is a generalized version of 'Fold'.
--
-- /Internal/
data Fold2 m c a b =
  -- | @Fold @ @ step @ @ inject @ @ extract@
  forall s. Fold2 (s -> a -> m s) (c -> m s) (s -> m b)

-- | Convert more general type 'Fold2' into a simpler type 'Fold'
--
-- /Internal/
simplify :: Functor m => Fold2 m c a b -> c -> Fold m a b
simplify :: Fold2 m c a b -> c -> Fold m a b
simplify (Fold2 s -> a -> m s
step c -> m s
inject s -> m b
extract) c
c =
    (s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold (\s
x a
a -> s -> Step s b
forall s b. s -> Step s b
Partial (s -> Step s b) -> m s -> m (Step s b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> a -> m s
step s
x a
a) (s -> Step s b
forall s b. s -> Step s b
Partial (s -> Step s b) -> m s -> m (Step s b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> c -> m s
inject c
c) s -> m b
extract

------------------------------------------------------------------------------
-- Basic Folds
------------------------------------------------------------------------------

-- | A fold that drains all its input, running the effects and discarding the
-- results.
--
-- > drain = drainBy (const (return ()))
--
-- @since 0.7.0
{-# INLINABLE drain #-}
drain :: Monad m => Fold m a ()
drain :: Fold m a ()
drain = (() -> a -> ()) -> () -> Fold m a ()
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
foldl' (\()
_ a
_ -> ()) ()

-- | Folds the input stream to a list.
--
-- /Warning!/ working on large lists accumulated as buffers in memory could be
-- very inefficient, consider using "Streamly.Data.Array.Foreign"
-- instead.
--
-- > toList = foldr (:) []
--
-- @since 0.7.0
{-# INLINABLE toList #-}
toList :: Monad m => Fold m a [a]
toList :: Fold m a [a]
toList = (a -> [a] -> [a]) -> [a] -> Fold m a [a]
forall (m :: * -> *) a b.
Monad m =>
(a -> b -> b) -> b -> Fold m a b
foldr (:) []

------------------------------------------------------------------------------
-- Instances
------------------------------------------------------------------------------

-- | Maps a function on the output of the fold (the type @b@).
instance Functor m => Functor (Fold m a) where
    {-# INLINE fmap #-}
    fmap :: (a -> b) -> Fold m a a -> Fold m a b
fmap a -> b
f (Fold s -> a -> m (Step s a)
step1 m (Step s a)
initial1 s -> m a
extract) = (s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold s -> a -> m (Step s b)
step m (Step s b)
initial ((a -> b) -> (s -> m a) -> s -> m b
forall (f :: * -> *) (f :: * -> *) a b.
(Functor f, Functor f) =>
(a -> b) -> f (f a) -> f (f b)
fmap2 a -> b
f s -> m a
extract)

        where

        initial :: m (Step s b)
initial = (a -> b) -> m (Step s a) -> m (Step s b)
forall (f :: * -> *) (f :: * -> *) a b.
(Functor f, Functor f) =>
(a -> b) -> f (f a) -> f (f b)
fmap2 a -> b
f m (Step s a)
initial1
        step :: s -> a -> m (Step s b)
step s
s a
b = (a -> b) -> m (Step s a) -> m (Step s b)
forall (f :: * -> *) (f :: * -> *) a b.
(Functor f, Functor f) =>
(a -> b) -> f (f a) -> f (f b)
fmap2 a -> b
f (s -> a -> m (Step s a)
step1 s
s a
b)
        fmap2 :: (a -> b) -> f (f a) -> f (f b)
fmap2 a -> b
g = (f a -> f b) -> f (f a) -> f (f b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((a -> b) -> f a -> f b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
g)

-- This is the dual of stream "fromPure".
--
-- | A fold that always yields a pure value without consuming any input.
--
-- /Pre-release/
--
{-# INLINE fromPure #-}
fromPure :: Applicative m => b -> Fold m a b
fromPure :: b -> Fold m a b
fromPure b
b = (b -> a -> m (Step b b))
-> m (Step b b) -> (b -> m b) -> Fold m a b
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold b -> a -> m (Step b b)
forall a. HasCallStack => a
undefined (Step b b -> m (Step b b)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step b b -> m (Step b b)) -> Step b b -> m (Step b b)
forall a b. (a -> b) -> a -> b
$ b -> Step b b
forall s b. b -> Step s b
Done b
b) b -> m b
forall (f :: * -> *) a. Applicative f => a -> f a
pure

-- This is the dual of stream "fromEffect".
--
-- | A fold that always yields the result of an effectful action without
-- consuming any input.
--
-- /Pre-release/
--
{-# INLINE fromEffect #-}
fromEffect :: Applicative m => m b -> Fold m a b
fromEffect :: m b -> Fold m a b
fromEffect m b
b = (b -> a -> m (Step b b))
-> m (Step b b) -> (b -> m b) -> Fold m a b
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold b -> a -> m (Step b b)
forall a. HasCallStack => a
undefined (b -> Step b b
forall s b. b -> Step s b
Done (b -> Step b b) -> m b -> m (Step b b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m b
b) b -> m b
forall (f :: * -> *) a. Applicative f => a -> f a
pure

{-# ANN type Step Fuse #-}
data SeqFoldState sl f sr = SeqFoldL !sl | SeqFoldR !f !sr

-- | Sequential fold application. Apply two folds sequentially to an input
-- stream.  The input is provided to the first fold, when it is done - the
-- remaining input is provided to the second fold. When the second fold is done
-- or if the input stream is over, the outputs of the two folds are combined
-- using the supplied function.
--
-- >>> f = Fold.serialWith (,) (Fold.take 8 Fold.toList) (Fold.takeEndBy (== '\n') Fold.toList)
-- >>> Stream.fold f $ Stream.fromList "header: hello\n"
-- ("header: ","hello\n")
--
-- Note: This is dual to appending streams using 'Streamly.Prelude.serial'.
--
-- Note: this implementation allows for stream fusion but has quadratic time
-- complexity, because each composition adds a new branch that each subsequent
-- fold's input element has to traverse, therefore, it cannot scale to a large
-- number of compositions. After around 100 compositions the performance starts
-- dipping rapidly compared to a CPS style implementation.
--
-- /Time: O(n^2) where n is the number of compositions./
--
-- @since 0.8.0
--
{-# INLINE serialWith #-}
serialWith :: Monad m => (a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c
serialWith :: (a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c
serialWith a -> b -> c
func (Fold s -> x -> m (Step s a)
stepL m (Step s a)
initialL s -> m a
extractL) (Fold s -> x -> m (Step s b)
stepR m (Step s b)
initialR s -> m b
extractR) =
    (SeqFoldState s (b -> c) s
 -> x -> m (Step (SeqFoldState s (b -> c) s) c))
-> m (Step (SeqFoldState s (b -> c) s) c)
-> (SeqFoldState s (b -> c) s -> m c)
-> Fold m x c
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold SeqFoldState s (b -> c) s
-> x -> m (Step (SeqFoldState s (b -> c) s) c)
step m (Step (SeqFoldState s (b -> c) s) c)
initial SeqFoldState s (b -> c) s -> m c
extract

    where

    initial :: m (Step (SeqFoldState s (b -> c) s) c)
initial = do
        Step s a
resL <- m (Step s a)
initialL
        case Step s a
resL of
            Partial s
sl -> Step (SeqFoldState s (b -> c) s) c
-> m (Step (SeqFoldState s (b -> c) s) c)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SeqFoldState s (b -> c) s) c
 -> m (Step (SeqFoldState s (b -> c) s) c))
-> Step (SeqFoldState s (b -> c) s) c
-> m (Step (SeqFoldState s (b -> c) s) c)
forall a b. (a -> b) -> a -> b
$ SeqFoldState s (b -> c) s -> Step (SeqFoldState s (b -> c) s) c
forall s b. s -> Step s b
Partial (SeqFoldState s (b -> c) s -> Step (SeqFoldState s (b -> c) s) c)
-> SeqFoldState s (b -> c) s -> Step (SeqFoldState s (b -> c) s) c
forall a b. (a -> b) -> a -> b
$ s -> SeqFoldState s (b -> c) s
forall sl f sr. sl -> SeqFoldState sl f sr
SeqFoldL s
sl
            Done a
bl -> do
                Step s b
resR <- m (Step s b)
initialR
                Step (SeqFoldState s (b -> c) s) c
-> m (Step (SeqFoldState s (b -> c) s) c)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SeqFoldState s (b -> c) s) c
 -> m (Step (SeqFoldState s (b -> c) s) c))
-> Step (SeqFoldState s (b -> c) s) c
-> m (Step (SeqFoldState s (b -> c) s) c)
forall a b. (a -> b) -> a -> b
$ (s -> SeqFoldState s (b -> c) s)
-> (b -> c) -> Step s b -> Step (SeqFoldState s (b -> c) s) c
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap ((b -> c) -> s -> SeqFoldState s (b -> c) s
forall sl f sr. f -> sr -> SeqFoldState sl f sr
SeqFoldR (a -> b -> c
func a
bl)) (a -> b -> c
func a
bl) Step s b
resR

    step :: SeqFoldState s (b -> c) s
-> x -> m (Step (SeqFoldState s (b -> c) s) c)
step (SeqFoldL s
st) x
a = do
        Step s a
r <- s -> x -> m (Step s a)
stepL s
st x
a
        case Step s a
r of
            Partial s
s -> Step (SeqFoldState s (b -> c) s) c
-> m (Step (SeqFoldState s (b -> c) s) c)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SeqFoldState s (b -> c) s) c
 -> m (Step (SeqFoldState s (b -> c) s) c))
-> Step (SeqFoldState s (b -> c) s) c
-> m (Step (SeqFoldState s (b -> c) s) c)
forall a b. (a -> b) -> a -> b
$ SeqFoldState s (b -> c) s -> Step (SeqFoldState s (b -> c) s) c
forall s b. s -> Step s b
Partial (s -> SeqFoldState s (b -> c) s
forall sl f sr. sl -> SeqFoldState sl f sr
SeqFoldL s
s)
            Done a
b -> do
                Step s b
res <- m (Step s b)
initialR
                Step (SeqFoldState s (b -> c) s) c
-> m (Step (SeqFoldState s (b -> c) s) c)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SeqFoldState s (b -> c) s) c
 -> m (Step (SeqFoldState s (b -> c) s) c))
-> Step (SeqFoldState s (b -> c) s) c
-> m (Step (SeqFoldState s (b -> c) s) c)
forall a b. (a -> b) -> a -> b
$ (s -> SeqFoldState s (b -> c) s)
-> (b -> c) -> Step s b -> Step (SeqFoldState s (b -> c) s) c
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap ((b -> c) -> s -> SeqFoldState s (b -> c) s
forall sl f sr. f -> sr -> SeqFoldState sl f sr
SeqFoldR (a -> b -> c
func a
b)) (a -> b -> c
func a
b) Step s b
res
    step (SeqFoldR b -> c
f s
st) x
a = do
        Step s b
r <- s -> x -> m (Step s b)
stepR s
st x
a
        Step (SeqFoldState s (b -> c) s) c
-> m (Step (SeqFoldState s (b -> c) s) c)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (SeqFoldState s (b -> c) s) c
 -> m (Step (SeqFoldState s (b -> c) s) c))
-> Step (SeqFoldState s (b -> c) s) c
-> m (Step (SeqFoldState s (b -> c) s) c)
forall a b. (a -> b) -> a -> b
$ case Step s b
r of
                  Partial s
s -> SeqFoldState s (b -> c) s -> Step (SeqFoldState s (b -> c) s) c
forall s b. s -> Step s b
Partial ((b -> c) -> s -> SeqFoldState s (b -> c) s
forall sl f sr. f -> sr -> SeqFoldState sl f sr
SeqFoldR b -> c
f s
s)
                  Done b
b -> c -> Step (SeqFoldState s (b -> c) s) c
forall s b. b -> Step s b
Done (b -> c
f b
b)

    extract :: SeqFoldState s (b -> c) s -> m c
extract (SeqFoldR b -> c
f s
sR) = (b -> c) -> m b -> m c
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap b -> c
f (s -> m b
extractR s
sR)
    extract (SeqFoldL s
sL) = do
        a
rL <- s -> m a
extractL s
sL
        Step s b
res <- m (Step s b)
initialR
        case Step s b
res of
            Partial s
sR -> do
                b
rR <- s -> m b
extractR s
sR
                c -> m c
forall (m :: * -> *) a. Monad m => a -> m a
return (c -> m c) -> c -> m c
forall a b. (a -> b) -> a -> b
$ a -> b -> c
func a
rL b
rR
            Done b
rR -> c -> m c
forall (m :: * -> *) a. Monad m => a -> m a
return (c -> m c) -> c -> m c
forall a b. (a -> b) -> a -> b
$ a -> b -> c
func a
rL b
rR

-- | Same as applicative '*>'. Run two folds serially one after the other
-- discarding the result of the first.
--
-- /Unimplemented/
--
{-# INLINE serial_ #-}
serial_ :: -- Monad m =>
    Fold m x a -> Fold m x b -> Fold m x b
serial_ :: Fold m x a -> Fold m x b -> Fold m x b
serial_ Fold m x a
_f1 Fold m x b
_f2 = Fold m x b
forall a. HasCallStack => a
undefined

{-# ANN type GenericRunner Fuse #-}
data GenericRunner sL sR bL bR
    = RunBoth !sL !sR
    | RunLeft !sL !bR
    | RunRight !bL !sR

-- | @teeWith k f1 f2@ distributes its input to both @f1@ and @f2@ until both
-- of them terminate and combines their output using @k@.
--
-- >>> avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)
-- >>> Stream.fold avg $ Stream.fromList [1.0..100.0]
-- 50.5
--
-- > teeWith k f1 f2 = fmap (uncurry k) ((Fold.tee f1 f2)
--
-- For applicative composition using this combinator see
-- "Streamly.Internal.Data.Fold.Tee".
--
-- See also: "Streamly.Internal.Data.Fold.Tee"
--
-- @since 0.8.0
--
{-# INLINE teeWith #-}
teeWith :: Monad m => (a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c
teeWith :: (a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c
teeWith a -> b -> c
f (Fold s -> x -> m (Step s a)
stepL m (Step s a)
beginL s -> m a
doneL) (Fold s -> x -> m (Step s b)
stepR m (Step s b)
beginR s -> m b
doneR) =
    (GenericRunner s s a b -> x -> m (Step (GenericRunner s s a b) c))
-> m (Step (GenericRunner s s a b) c)
-> (GenericRunner s s a b -> m c)
-> Fold m x c
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold GenericRunner s s a b -> x -> m (Step (GenericRunner s s a b) c)
step m (Step (GenericRunner s s a b) c)
begin GenericRunner s s a b -> m c
done

    where

    begin :: m (Step (GenericRunner s s a b) c)
begin = do
        Step s a
resL <- m (Step s a)
beginL
        Step s b
resR <- m (Step s b)
beginR
        Step (GenericRunner s s a b) c
-> m (Step (GenericRunner s s a b) c)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (GenericRunner s s a b) c
 -> m (Step (GenericRunner s s a b) c))
-> Step (GenericRunner s s a b) c
-> m (Step (GenericRunner s s a b) c)
forall a b. (a -> b) -> a -> b
$ case Step s a
resL of
                  Partial s
sl ->
                      GenericRunner s s a b -> Step (GenericRunner s s a b) c
forall s b. s -> Step s b
Partial
                          (GenericRunner s s a b -> Step (GenericRunner s s a b) c)
-> GenericRunner s s a b -> Step (GenericRunner s s a b) c
forall a b. (a -> b) -> a -> b
$ case Step s b
resR of
                                Partial s
sr -> s -> s -> GenericRunner s s a b
forall sL sR bL bR. sL -> sR -> GenericRunner sL sR bL bR
RunBoth s
sl s
sr
                                Done b
br -> s -> b -> GenericRunner s s a b
forall sL sR bL bR. sL -> bR -> GenericRunner sL sR bL bR
RunLeft s
sl b
br
                  Done a
bl -> (s -> GenericRunner s s a b)
-> (b -> c) -> Step s b -> Step (GenericRunner s s a b) c
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap (a -> s -> GenericRunner s s a b
forall sL sR bL bR. bL -> sR -> GenericRunner sL sR bL bR
RunRight a
bl) (a -> b -> c
f a
bl) Step s b
resR

    step :: GenericRunner s s a b -> x -> m (Step (GenericRunner s s a b) c)
step (RunBoth s
sL s
sR) x
a = do
        Step s a
resL <- s -> x -> m (Step s a)
stepL s
sL x
a
        Step s b
resR <- s -> x -> m (Step s b)
stepR s
sR x
a
        case Step s a
resL of
            Partial s
sL1 ->
                Step (GenericRunner s s a b) c
-> m (Step (GenericRunner s s a b) c)
forall (m :: * -> *) a. Monad m => a -> m a
return
                    (Step (GenericRunner s s a b) c
 -> m (Step (GenericRunner s s a b) c))
-> Step (GenericRunner s s a b) c
-> m (Step (GenericRunner s s a b) c)
forall a b. (a -> b) -> a -> b
$ GenericRunner s s a b -> Step (GenericRunner s s a b) c
forall s b. s -> Step s b
Partial
                    (GenericRunner s s a b -> Step (GenericRunner s s a b) c)
-> GenericRunner s s a b -> Step (GenericRunner s s a b) c
forall a b. (a -> b) -> a -> b
$ case Step s b
resR of
                          Partial s
sR1 -> s -> s -> GenericRunner s s a b
forall sL sR bL bR. sL -> sR -> GenericRunner sL sR bL bR
RunBoth s
sL1 s
sR1
                          Done b
bR -> s -> b -> GenericRunner s s a b
forall sL sR bL bR. sL -> bR -> GenericRunner sL sR bL bR
RunLeft s
sL1 b
bR
            Done a
bL ->
                Step (GenericRunner s s a b) c
-> m (Step (GenericRunner s s a b) c)
forall (m :: * -> *) a. Monad m => a -> m a
return
                    (Step (GenericRunner s s a b) c
 -> m (Step (GenericRunner s s a b) c))
-> Step (GenericRunner s s a b) c
-> m (Step (GenericRunner s s a b) c)
forall a b. (a -> b) -> a -> b
$ case Step s b
resR of
                          Partial s
sR1 -> GenericRunner s s a b -> Step (GenericRunner s s a b) c
forall s b. s -> Step s b
Partial (GenericRunner s s a b -> Step (GenericRunner s s a b) c)
-> GenericRunner s s a b -> Step (GenericRunner s s a b) c
forall a b. (a -> b) -> a -> b
$ a -> s -> GenericRunner s s a b
forall sL sR bL bR. bL -> sR -> GenericRunner sL sR bL bR
RunRight a
bL s
sR1
                          Done b
bR -> c -> Step (GenericRunner s s a b) c
forall s b. b -> Step s b
Done (c -> Step (GenericRunner s s a b) c)
-> c -> Step (GenericRunner s s a b) c
forall a b. (a -> b) -> a -> b
$ a -> b -> c
f a
bL b
bR
    step (RunLeft s
sL b
bR) x
a = do
        Step s a
resL <- s -> x -> m (Step s a)
stepL s
sL x
a
        Step (GenericRunner s s a b) c
-> m (Step (GenericRunner s s a b) c)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (GenericRunner s s a b) c
 -> m (Step (GenericRunner s s a b) c))
-> Step (GenericRunner s s a b) c
-> m (Step (GenericRunner s s a b) c)
forall a b. (a -> b) -> a -> b
$ case Step s a
resL of
                  Partial s
sL1 -> GenericRunner s s a b -> Step (GenericRunner s s a b) c
forall s b. s -> Step s b
Partial (GenericRunner s s a b -> Step (GenericRunner s s a b) c)
-> GenericRunner s s a b -> Step (GenericRunner s s a b) c
forall a b. (a -> b) -> a -> b
$ s -> b -> GenericRunner s s a b
forall sL sR bL bR. sL -> bR -> GenericRunner sL sR bL bR
RunLeft s
sL1 b
bR
                  Done a
bL -> c -> Step (GenericRunner s s a b) c
forall s b. b -> Step s b
Done (c -> Step (GenericRunner s s a b) c)
-> c -> Step (GenericRunner s s a b) c
forall a b. (a -> b) -> a -> b
$ a -> b -> c
f a
bL b
bR
    step (RunRight a
bL s
sR) x
a = do
        Step s b
resR <- s -> x -> m (Step s b)
stepR s
sR x
a
        Step (GenericRunner s s a b) c
-> m (Step (GenericRunner s s a b) c)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (GenericRunner s s a b) c
 -> m (Step (GenericRunner s s a b) c))
-> Step (GenericRunner s s a b) c
-> m (Step (GenericRunner s s a b) c)
forall a b. (a -> b) -> a -> b
$ case Step s b
resR of
                  Partial s
sR1 -> GenericRunner s s a b -> Step (GenericRunner s s a b) c
forall s b. s -> Step s b
Partial (GenericRunner s s a b -> Step (GenericRunner s s a b) c)
-> GenericRunner s s a b -> Step (GenericRunner s s a b) c
forall a b. (a -> b) -> a -> b
$ a -> s -> GenericRunner s s a b
forall sL sR bL bR. bL -> sR -> GenericRunner sL sR bL bR
RunRight a
bL s
sR1
                  Done b
bR -> c -> Step (GenericRunner s s a b) c
forall s b. b -> Step s b
Done (c -> Step (GenericRunner s s a b) c)
-> c -> Step (GenericRunner s s a b) c
forall a b. (a -> b) -> a -> b
$ a -> b -> c
f a
bL b
bR

    done :: GenericRunner s s a b -> m c
done (RunBoth s
sL s
sR) = do
        a
bL <- s -> m a
doneL s
sL
        b
bR <- s -> m b
doneR s
sR
        c -> m c
forall (m :: * -> *) a. Monad m => a -> m a
return (c -> m c) -> c -> m c
forall a b. (a -> b) -> a -> b
$ a -> b -> c
f a
bL b
bR
    done (RunLeft s
sL b
bR) = do
        a
bL <- s -> m a
doneL s
sL
        c -> m c
forall (m :: * -> *) a. Monad m => a -> m a
return (c -> m c) -> c -> m c
forall a b. (a -> b) -> a -> b
$ a -> b -> c
f a
bL b
bR
    done (RunRight a
bL s
sR) = do
        b
bR <- s -> m b
doneR s
sR
        c -> m c
forall (m :: * -> *) a. Monad m => a -> m a
return (c -> m c) -> c -> m c
forall a b. (a -> b) -> a -> b
$ a -> b -> c
f a
bL b
bR

-- | Like 'teeWith' but terminates as soon as the first fold terminates.
--
-- /Unimplemented/
--
{-# INLINE teeWithFst #-}
teeWithFst :: (b -> c -> d) -> Fold m a b -> Fold m a c -> Fold m a d
teeWithFst :: (b -> c -> d) -> Fold m a b -> Fold m a c -> Fold m a d
teeWithFst = (b -> c -> d) -> Fold m a b -> Fold m a c -> Fold m a d
forall a. HasCallStack => a
undefined

-- | Like 'teeWith' but terminates as soon as any one of the two folds
-- terminates.
--
-- /Unimplemented/
--
{-# INLINE teeWithMin #-}
teeWithMin :: (b -> c -> d) -> Fold m a b -> Fold m a c -> Fold m a d
teeWithMin :: (b -> c -> d) -> Fold m a b -> Fold m a c -> Fold m a d
teeWithMin = (b -> c -> d) -> Fold m a b -> Fold m a c -> Fold m a d
forall a. HasCallStack => a
undefined

-- | Shortest alternative. Apply both folds in parallel but choose the result
-- from the one which consumed least input i.e. take the shortest succeeding
-- fold.
--
-- /Unimplemented/
--
{-# INLINE shortest #-}
shortest :: -- Monad m =>
    Fold m x a -> Fold m x a -> Fold m x a
shortest :: Fold m x a -> Fold m x a -> Fold m x a
shortest Fold m x a
_f1 Fold m x a
_f2 = Fold m x a
forall a. HasCallStack => a
undefined

-- | Longest alternative. Apply both folds in parallel but choose the result
-- from the one which consumed more input i.e. take the longest succeeding
-- fold.
--
-- /Unimplemented/
--
{-# INLINE longest #-}
longest :: -- Monad m =>
    Fold m x a -> Fold m x a -> Fold m x a
longest :: Fold m x a -> Fold m x a -> Fold m x a
longest Fold m x a
_f1 Fold m x a
_f2 = Fold m x a
forall a. HasCallStack => a
undefined

data ConcatMapState m sa a c
    = B !sa
    | forall s. C (s -> a -> m (Step s c)) !s (s -> m c)

-- Compare with foldIterate.
--
-- | Map a 'Fold' returning function on the result of a 'Fold' and run the
-- returned fold. This operation can be used to express data dependencies
-- between fold operations.
--
-- Let's say the first element in the stream is a count of the following
-- elements that we have to add, then:
--
-- >>> import Data.Maybe (fromJust)
-- >>> count = fmap fromJust Fold.head
-- >>> total n = Fold.take n Fold.sum
-- >>> Stream.fold (Fold.concatMap total count) $ Stream.fromList [10,9..1]
-- 45
--
-- /Time: O(n^2) where @n@ is the number of compositions./
--
-- See also: 'Streamly.Internal.Data.Stream.IsStream.foldIterateM'
--
-- @since 0.8.0
--
{-# INLINE concatMap #-}
concatMap :: Monad m => (b -> Fold m a c) -> Fold m a b -> Fold m a c
concatMap :: (b -> Fold m a c) -> Fold m a b -> Fold m a c
concatMap b -> Fold m a c
f (Fold s -> a -> m (Step s b)
stepa m (Step s b)
initiala s -> m b
extracta) = (ConcatMapState m s a c
 -> a -> m (Step (ConcatMapState m s a c) c))
-> m (Step (ConcatMapState m s a c) c)
-> (ConcatMapState m s a c -> m c)
-> Fold m a c
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold ConcatMapState m s a c -> a -> m (Step (ConcatMapState m s a c) c)
stepc m (Step (ConcatMapState m s a c) c)
initialc ConcatMapState m s a c -> m c
forall a. ConcatMapState m s a c -> m c
extractc
  where
    initialc :: m (Step (ConcatMapState m s a c) c)
initialc = do
        Step s b
r <- m (Step s b)
initiala
        case Step s b
r of
            Partial s
s -> Step (ConcatMapState m s a c) c
-> m (Step (ConcatMapState m s a c) c)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ConcatMapState m s a c) c
 -> m (Step (ConcatMapState m s a c) c))
-> Step (ConcatMapState m s a c) c
-> m (Step (ConcatMapState m s a c) c)
forall a b. (a -> b) -> a -> b
$ ConcatMapState m s a c -> Step (ConcatMapState m s a c) c
forall s b. s -> Step s b
Partial (s -> ConcatMapState m s a c
forall (m :: * -> *) sa a c. sa -> ConcatMapState m sa a c
B s
s)
            Done b
b -> Fold m a c -> m (Step (ConcatMapState m s a c) c)
forall (m :: * -> *) a b sa.
Monad m =>
Fold m a b -> m (Step (ConcatMapState m sa a b) b)
initInnerFold (b -> Fold m a c
f b
b)

    stepc :: ConcatMapState m s a c -> a -> m (Step (ConcatMapState m s a c) c)
stepc (B s
s) a
a = do
        Step s b
r <- s -> a -> m (Step s b)
stepa s
s a
a
        case Step s b
r of
            Partial s
s1 -> Step (ConcatMapState m s a c) c
-> m (Step (ConcatMapState m s a c) c)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ConcatMapState m s a c) c
 -> m (Step (ConcatMapState m s a c) c))
-> Step (ConcatMapState m s a c) c
-> m (Step (ConcatMapState m s a c) c)
forall a b. (a -> b) -> a -> b
$ ConcatMapState m s a c -> Step (ConcatMapState m s a c) c
forall s b. s -> Step s b
Partial (s -> ConcatMapState m s a c
forall (m :: * -> *) sa a c. sa -> ConcatMapState m sa a c
B s
s1)
            Done b
b -> Fold m a c -> m (Step (ConcatMapState m s a c) c)
forall (m :: * -> *) a b sa.
Monad m =>
Fold m a b -> m (Step (ConcatMapState m sa a b) b)
initInnerFold (b -> Fold m a c
f b
b)

    stepc (C s -> a -> m (Step s c)
stepInner s
s s -> m c
extractInner) a
a = do
        Step s c
r <- s -> a -> m (Step s c)
stepInner s
s a
a
        Step (ConcatMapState m s a c) c
-> m (Step (ConcatMapState m s a c) c)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ConcatMapState m s a c) c
 -> m (Step (ConcatMapState m s a c) c))
-> Step (ConcatMapState m s a c) c
-> m (Step (ConcatMapState m s a c) c)
forall a b. (a -> b) -> a -> b
$ case Step s c
r of
            Partial s
sc -> ConcatMapState m s a c -> Step (ConcatMapState m s a c) c
forall s b. s -> Step s b
Partial ((s -> a -> m (Step s c))
-> s -> (s -> m c) -> ConcatMapState m s a c
forall (m :: * -> *) sa a c s.
(s -> a -> m (Step s c))
-> s -> (s -> m c) -> ConcatMapState m sa a c
C s -> a -> m (Step s c)
stepInner s
sc s -> m c
extractInner)
            Done c
c -> c -> Step (ConcatMapState m s a c) c
forall s b. b -> Step s b
Done c
c

    extractc :: ConcatMapState m s a c -> m c
extractc (B s
s) = do
        b
r <- s -> m b
extracta s
s
        Fold m a c -> m c
forall (m :: * -> *) a b. Monad m => Fold m a b -> m b
initExtract (b -> Fold m a c
f b
r)
    extractc (C s -> a -> m (Step s c)
_ s
sInner s -> m c
extractInner) = s -> m c
extractInner s
sInner

    initInnerFold :: Fold m a b -> m (Step (ConcatMapState m sa a b) b)
initInnerFold (Fold s -> a -> m (Step s b)
step m (Step s b)
i s -> m b
e) = do
        Step s b
r <- m (Step s b)
i
        Step (ConcatMapState m sa a b) b
-> m (Step (ConcatMapState m sa a b) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ConcatMapState m sa a b) b
 -> m (Step (ConcatMapState m sa a b) b))
-> Step (ConcatMapState m sa a b) b
-> m (Step (ConcatMapState m sa a b) b)
forall a b. (a -> b) -> a -> b
$ case Step s b
r of
            Partial s
s -> ConcatMapState m sa a b -> Step (ConcatMapState m sa a b) b
forall s b. s -> Step s b
Partial ((s -> a -> m (Step s b))
-> s -> (s -> m b) -> ConcatMapState m sa a b
forall (m :: * -> *) sa a c s.
(s -> a -> m (Step s c))
-> s -> (s -> m c) -> ConcatMapState m sa a c
C s -> a -> m (Step s b)
step s
s s -> m b
e)
            Done b
c -> b -> Step (ConcatMapState m sa a b) b
forall s b. b -> Step s b
Done b
c

    initExtract :: Fold m a b -> m b
initExtract (Fold s -> a -> m (Step s b)
_ m (Step s b)
i s -> m b
e) = do
        Step s b
r <- m (Step s b)
i
        case Step s b
r of
            Partial s
s -> s -> m b
e s
s
            Done b
c -> b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return b
c

------------------------------------------------------------------------------
-- Mapping on input
------------------------------------------------------------------------------

-- | @lmap f fold@ maps the function @f@ on the input of the fold.
--
-- >>> Stream.fold (Fold.lmap (\x -> x * x) Fold.sum) (Stream.enumerateFromTo 1 100)
-- 338350
--
-- > lmap = Fold.lmapM return
--
-- @since 0.8.0
{-# INLINABLE lmap #-}
lmap :: (a -> b) -> Fold m b r -> Fold m a r
lmap :: (a -> b) -> Fold m b r -> Fold m a r
lmap a -> b
f (Fold s -> b -> m (Step s r)
step m (Step s r)
begin s -> m r
done) = (s -> a -> m (Step s r))
-> m (Step s r) -> (s -> m r) -> Fold m a r
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold s -> a -> m (Step s r)
step' m (Step s r)
begin s -> m r
done
    where
    step' :: s -> a -> m (Step s r)
step' s
x a
a = s -> b -> m (Step s r)
step s
x (a -> b
f a
a)

-- XXX should be removed
-- |
-- /Internal/
{-# INLINE map #-}
map :: (a -> b) -> Fold m b r -> Fold m a r
map :: (a -> b) -> Fold m b r -> Fold m a r
map = (a -> b) -> Fold m b r -> Fold m a r
forall a b (m :: * -> *) r. (a -> b) -> Fold m b r -> Fold m a r
lmap

-- | @lmapM f fold@ maps the monadic function @f@ on the input of the fold.
--
-- @since 0.8.0
{-# INLINABLE lmapM #-}
lmapM :: Monad m => (a -> m b) -> Fold m b r -> Fold m a r
lmapM :: (a -> m b) -> Fold m b r -> Fold m a r
lmapM a -> m b
f (Fold s -> b -> m (Step s r)
step m (Step s r)
begin s -> m r
done) = (s -> a -> m (Step s r))
-> m (Step s r) -> (s -> m r) -> Fold m a r
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold s -> a -> m (Step s r)
step' m (Step s r)
begin s -> m r
done
    where
    step' :: s -> a -> m (Step s r)
step' s
x a
a = a -> m b
f a
a m b -> (b -> m (Step s r)) -> m (Step s r)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= s -> b -> m (Step s r)
step s
x

------------------------------------------------------------------------------
-- Filtering
------------------------------------------------------------------------------

-- | Include only those elements that pass a predicate.
--
-- >>> Stream.fold (Fold.filter (> 5) Fold.sum) $ Stream.fromList [1..10]
-- 40
--
-- > filter f = Fold.filterM (return . f)
--
-- @since 0.8.0
{-# INLINABLE filter #-}
filter :: Monad m => (a -> Bool) -> Fold m a r -> Fold m a r
filter :: (a -> Bool) -> Fold m a r -> Fold m a r
filter a -> Bool
f (Fold s -> a -> m (Step s r)
step m (Step s r)
begin s -> m r
done) = (s -> a -> m (Step s r))
-> m (Step s r) -> (s -> m r) -> Fold m a r
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold s -> a -> m (Step s r)
step' m (Step s r)
begin s -> m r
done
    where
    step' :: s -> a -> m (Step s r)
step' s
x a
a = if a -> Bool
f a
a then s -> a -> m (Step s r)
step s
x a
a else Step s r -> m (Step s r)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s r -> m (Step s r)) -> Step s r -> m (Step s r)
forall a b. (a -> b) -> a -> b
$ s -> Step s r
forall s b. s -> Step s b
Partial s
x

-- | Like 'filter' but with a monadic predicate.
--
-- @since 0.8.0
{-# INLINABLE filterM #-}
filterM :: Monad m => (a -> m Bool) -> Fold m a r -> Fold m a r
filterM :: (a -> m Bool) -> Fold m a r -> Fold m a r
filterM a -> m Bool
f (Fold s -> a -> m (Step s r)
step m (Step s r)
begin s -> m r
done) = (s -> a -> m (Step s r))
-> m (Step s r) -> (s -> m r) -> Fold m a r
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold s -> a -> m (Step s r)
step' m (Step s r)
begin s -> m r
done
    where
    step' :: s -> a -> m (Step s r)
step' s
x a
a = do
      Bool
use <- a -> m Bool
f a
a
      if Bool
use then s -> a -> m (Step s r)
step s
x a
a else Step s r -> m (Step s r)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s r -> m (Step s r)) -> Step s r -> m (Step s r)
forall a b. (a -> b) -> a -> b
$ s -> Step s r
forall s b. s -> Step s b
Partial s
x

-- | Modify a fold to receive a 'Maybe' input, the 'Just' values are unwrapped
-- and sent to the original fold, 'Nothing' values are discarded.
--
-- @since 0.8.0
{-# INLINE catMaybes #-}
catMaybes :: Monad m => Fold m a b -> Fold m (Maybe a) b
catMaybes :: Fold m a b -> Fold m (Maybe a) b
catMaybes = (Maybe a -> Bool) -> Fold m (Maybe a) b -> Fold m (Maybe a) b
forall (m :: * -> *) a r.
Monad m =>
(a -> Bool) -> Fold m a r -> Fold m a r
filter Maybe a -> Bool
forall a. Maybe a -> Bool
isJust (Fold m (Maybe a) b -> Fold m (Maybe a) b)
-> (Fold m a b -> Fold m (Maybe a) b)
-> Fold m a b
-> Fold m (Maybe a) b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe a -> a) -> Fold m a b -> Fold m (Maybe a) b
forall a b (m :: * -> *) r. (a -> b) -> Fold m b r -> Fold m a r
map Maybe a -> a
forall a. HasCallStack => Maybe a -> a
fromJust

------------------------------------------------------------------------------
-- Parsing
------------------------------------------------------------------------------

-- Required to fuse "take" with "many" in "chunksOf", for ghc-9.x
{-# ANN type Tuple'Fused Fuse #-}
data Tuple'Fused a b = Tuple'Fused !a !b deriving Int -> Tuple'Fused a b -> ShowS
[Tuple'Fused a b] -> ShowS
Tuple'Fused a b -> String
(Int -> Tuple'Fused a b -> ShowS)
-> (Tuple'Fused a b -> String)
-> ([Tuple'Fused a b] -> ShowS)
-> Show (Tuple'Fused a b)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall a b. (Show a, Show b) => Int -> Tuple'Fused a b -> ShowS
forall a b. (Show a, Show b) => [Tuple'Fused a b] -> ShowS
forall a b. (Show a, Show b) => Tuple'Fused a b -> String
showList :: [Tuple'Fused a b] -> ShowS
$cshowList :: forall a b. (Show a, Show b) => [Tuple'Fused a b] -> ShowS
show :: Tuple'Fused a b -> String
$cshow :: forall a b. (Show a, Show b) => Tuple'Fused a b -> String
showsPrec :: Int -> Tuple'Fused a b -> ShowS
$cshowsPrec :: forall a b. (Show a, Show b) => Int -> Tuple'Fused a b -> ShowS
Show

-- | Take at most @n@ input elements and fold them using the supplied fold. A
-- negative count is treated as 0.
--
-- >>> Stream.fold (Fold.take 2 Fold.toList) $ Stream.fromList [1..10]
-- [1,2]
--
-- @since 0.8.0
{-# INLINE take #-}
take :: Monad m => Int -> Fold m a b -> Fold m a b
take :: Int -> Fold m a b -> Fold m a b
take Int
n (Fold s -> a -> m (Step s b)
fstep m (Step s b)
finitial s -> m b
fextract) = (Tuple'Fused Int s -> a -> m (Step (Tuple'Fused Int s) b))
-> m (Step (Tuple'Fused Int s) b)
-> (Tuple'Fused Int s -> m b)
-> Fold m a b
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold Tuple'Fused Int s -> a -> m (Step (Tuple'Fused Int s) b)
step m (Step (Tuple'Fused Int s) b)
initial Tuple'Fused Int s -> m b
forall a. Tuple'Fused a s -> m b
extract

    where

    initial :: m (Step (Tuple'Fused Int s) b)
initial = do
        Step s b
res <- m (Step s b)
finitial
        case Step s b
res of
            Partial s
s ->
                if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                then Step (Tuple'Fused Int s) b -> m (Step (Tuple'Fused Int s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple'Fused Int s) b -> m (Step (Tuple'Fused Int s) b))
-> Step (Tuple'Fused Int s) b -> m (Step (Tuple'Fused Int s) b)
forall a b. (a -> b) -> a -> b
$ Tuple'Fused Int s -> Step (Tuple'Fused Int s) b
forall s b. s -> Step s b
Partial (Tuple'Fused Int s -> Step (Tuple'Fused Int s) b)
-> Tuple'Fused Int s -> Step (Tuple'Fused Int s) b
forall a b. (a -> b) -> a -> b
$ Int -> s -> Tuple'Fused Int s
forall a b. a -> b -> Tuple'Fused a b
Tuple'Fused Int
0 s
s
                else b -> Step (Tuple'Fused Int s) b
forall s b. b -> Step s b
Done (b -> Step (Tuple'Fused Int s) b)
-> m b -> m (Step (Tuple'Fused Int s) b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> m b
fextract s
s
            Done b
b -> Step (Tuple'Fused Int s) b -> m (Step (Tuple'Fused Int s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple'Fused Int s) b -> m (Step (Tuple'Fused Int s) b))
-> Step (Tuple'Fused Int s) b -> m (Step (Tuple'Fused Int s) b)
forall a b. (a -> b) -> a -> b
$ b -> Step (Tuple'Fused Int s) b
forall s b. b -> Step s b
Done b
b

    step :: Tuple'Fused Int s -> a -> m (Step (Tuple'Fused Int s) b)
step (Tuple'Fused Int
i s
r) a
a = do
        Step s b
res <- s -> a -> m (Step s b)
fstep s
r a
a
        case Step s b
res of
            Partial s
sres -> do
                let i1 :: Int
i1 = Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
                    s1 :: Tuple'Fused Int s
s1 = Int -> s -> Tuple'Fused Int s
forall a b. a -> b -> Tuple'Fused a b
Tuple'Fused Int
i1 s
sres
                if Int
i1 Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
n
                then Step (Tuple'Fused Int s) b -> m (Step (Tuple'Fused Int s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple'Fused Int s) b -> m (Step (Tuple'Fused Int s) b))
-> Step (Tuple'Fused Int s) b -> m (Step (Tuple'Fused Int s) b)
forall a b. (a -> b) -> a -> b
$ Tuple'Fused Int s -> Step (Tuple'Fused Int s) b
forall s b. s -> Step s b
Partial Tuple'Fused Int s
s1
                else b -> Step (Tuple'Fused Int s) b
forall s b. b -> Step s b
Done (b -> Step (Tuple'Fused Int s) b)
-> m b -> m (Step (Tuple'Fused Int s) b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> m b
fextract s
sres
            Done b
bres -> Step (Tuple'Fused Int s) b -> m (Step (Tuple'Fused Int s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple'Fused Int s) b -> m (Step (Tuple'Fused Int s) b))
-> Step (Tuple'Fused Int s) b -> m (Step (Tuple'Fused Int s) b)
forall a b. (a -> b) -> a -> b
$ b -> Step (Tuple'Fused Int s) b
forall s b. b -> Step s b
Done b
bres

    extract :: Tuple'Fused a s -> m b
extract (Tuple'Fused a
_ s
r) = s -> m b
fextract s
r

------------------------------------------------------------------------------
-- Nesting
------------------------------------------------------------------------------

-- | Modify the fold such that it returns a new 'Fold' instead of the output.
-- If the fold was already done the returned fold would always yield the
-- result. If the fold was partial, the returned fold starts from where we left
-- i.e. it uses the last accumulator value as the initial value of the
-- accumulator. Thus we can resume the fold later and feed it more input.
--
-- >>> :{
-- do
--  more <- Stream.fold (Fold.duplicate Fold.sum) (Stream.enumerateFromTo 1 10)
--  evenMore <- Stream.fold (Fold.duplicate more) (Stream.enumerateFromTo 11 20)
--  Stream.fold evenMore (Stream.enumerateFromTo 21 30)
-- :}
-- 465
--
-- /Pre-release/
{-# INLINABLE duplicate #-}
duplicate :: Monad m => Fold m a b -> Fold m a (Fold m a b)
duplicate :: Fold m a b -> Fold m a (Fold m a b)
duplicate (Fold s -> a -> m (Step s b)
step1 m (Step s b)
initial1 s -> m b
extract1) =
    (s -> a -> m (Step s (Fold m a b)))
-> m (Step s (Fold m a b))
-> (s -> m (Fold m a b))
-> Fold m a (Fold m a b)
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold s -> a -> m (Step s (Fold m a b))
forall (m :: * -> *) a.
Applicative m =>
s -> a -> m (Step s (Fold m a b))
step m (Step s (Fold m a b))
forall a. m (Step s (Fold m a b))
initial (\s
s -> Fold m a b -> m (Fold m a b)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Fold m a b -> m (Fold m a b)) -> Fold m a b -> m (Fold m a b)
forall a b. (a -> b) -> a -> b
$ (s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold s -> a -> m (Step s b)
step1 (Step s b -> m (Step s b)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Step s b -> m (Step s b)) -> Step s b -> m (Step s b)
forall a b. (a -> b) -> a -> b
$ s -> Step s b
forall s b. s -> Step s b
Partial s
s) s -> m b
extract1)

    where

    initial :: m (Step s (Fold m a b))
initial = (b -> Fold m a b) -> Step s b -> Step s (Fold m a b)
forall (p :: * -> * -> *) b c a.
Bifunctor p =>
(b -> c) -> p a b -> p a c
second b -> Fold m a b
forall (m :: * -> *) b a. Applicative m => b -> Fold m a b
fromPure (Step s b -> Step s (Fold m a b))
-> m (Step s b) -> m (Step s (Fold m a b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m (Step s b)
initial1

    step :: s -> a -> m (Step s (Fold m a b))
step s
s a
a = (b -> Fold m a b) -> Step s b -> Step s (Fold m a b)
forall (p :: * -> * -> *) b c a.
Bifunctor p =>
(b -> c) -> p a b -> p a c
second b -> Fold m a b
forall (m :: * -> *) b a. Applicative m => b -> Fold m a b
fromPure (Step s b -> Step s (Fold m a b))
-> m (Step s b) -> m (Step s (Fold m a b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> a -> m (Step s b)
step1 s
s a
a

-- | Run the initialization effect of a fold. The returned fold would use the
-- value returned by this effect as its initial value.
--
-- /Pre-release/
{-# INLINE initialize #-}
initialize :: Monad m => Fold m a b -> m (Fold m a b)
initialize :: Fold m a b -> m (Fold m a b)
initialize (Fold s -> a -> m (Step s b)
step m (Step s b)
initial s -> m b
extract) = do
    Step s b
i <- m (Step s b)
initial
    Fold m a b -> m (Fold m a b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Fold m a b -> m (Fold m a b)) -> Fold m a b -> m (Fold m a b)
forall a b. (a -> b) -> a -> b
$ (s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold s -> a -> m (Step s b)
step (Step s b -> m (Step s b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step s b
i) s -> m b
extract

-- | Run one step of a fold and store the accumulator as an initial value in
-- the returned fold.
--
-- /Pre-release/
{-# INLINE runStep #-}
runStep :: Monad m => Fold m a b -> a -> m (Fold m a b)
runStep :: Fold m a b -> a -> m (Fold m a b)
runStep (Fold s -> a -> m (Step s b)
step m (Step s b)
initial s -> m b
extract) a
a = do
    Step s b
res <- m (Step s b)
initial
    Step s b
r <- case Step s b
res of
          Partial s
fs -> s -> a -> m (Step s b)
step s
fs a
a
          b :: Step s b
b@(Done b
_) -> Step s b -> m (Step s b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step s b
b
    Fold m a b -> m (Fold m a b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Fold m a b -> m (Fold m a b)) -> Fold m a b -> m (Fold m a b)
forall a b. (a -> b) -> a -> b
$ (s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold s -> a -> m (Step s b)
step (Step s b -> m (Step s b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step s b
r) s -> m b
extract

------------------------------------------------------------------------------
-- Parsing
------------------------------------------------------------------------------

-- All the grouping transformation that we apply to a stream can also be
-- applied to a fold input stream. groupBy et al can be written as terminating
-- folds and then we can apply "many" to use those repeatedly on a stream.

{-# ANN type ManyState Fuse #-}
data ManyState s1 s2
    = ManyFirst !s1 !s2
    | ManyLoop !s1 !s2

-- | Collect zero or more applications of a fold.  @many split collect@ applies
-- the @split@ fold repeatedly on the input stream and accumulates zero or more
-- fold results using @collect@.
--
-- >>> two = Fold.take 2 Fold.toList
-- >>> twos = Fold.many two Fold.toList
-- >>> Stream.fold twos $ Stream.fromList [1..10]
-- [[1,2],[3,4],[5,6],[7,8],[9,10]]
--
-- Stops when @collect@ stops.
--
-- See also: 'Streamly.Prelude.concatMap', 'Streamly.Prelude.foldMany'
--
-- @since 0.8.0
--
{-# INLINE many #-}
many :: Monad m => Fold m a b -> Fold m b c -> Fold m a c
many :: Fold m a b -> Fold m b c -> Fold m a c
many (Fold s -> a -> m (Step s b)
sstep m (Step s b)
sinitial s -> m b
sextract) (Fold s -> b -> m (Step s c)
cstep m (Step s c)
cinitial s -> m c
cextract) =
    (ManyState s s -> a -> m (Step (ManyState s s) c))
-> m (Step (ManyState s s) c)
-> (ManyState s s -> m c)
-> Fold m a c
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold ManyState s s -> a -> m (Step (ManyState s s) c)
step m (Step (ManyState s s) c)
initial ManyState s s -> m c
extract

    where

    -- cs = collect state
    -- ss = split state
    -- cres = collect state result
    -- sres = split state result
    -- cb = collect done
    -- sb = split done

    -- Caution! There is mutual recursion here, inlining the right functions is
    -- important.

    {-# INLINE handleSplitStep #-}
    handleSplitStep :: (s -> s -> ManyState s s)
-> s -> Step s b -> m (Step (ManyState s s) c)
handleSplitStep s -> s -> ManyState s s
branch s
cs Step s b
sres =
        case Step s b
sres of
            Partial s
ss1 -> Step (ManyState s s) c -> m (Step (ManyState s s) c)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ManyState s s) c -> m (Step (ManyState s s) c))
-> Step (ManyState s s) c -> m (Step (ManyState s s) c)
forall a b. (a -> b) -> a -> b
$ ManyState s s -> Step (ManyState s s) c
forall s b. s -> Step s b
Partial (ManyState s s -> Step (ManyState s s) c)
-> ManyState s s -> Step (ManyState s s) c
forall a b. (a -> b) -> a -> b
$ s -> s -> ManyState s s
branch s
ss1 s
cs
            Done b
sb -> (s -> s -> ManyState s s) -> s -> b -> m (Step (ManyState s s) c)
runCollector s -> s -> ManyState s s
forall s1 s2. s1 -> s2 -> ManyState s1 s2
ManyFirst s
cs b
sb

    {-# INLINE handleCollectStep #-}
    handleCollectStep :: (s -> s -> ManyState s s) -> Step s c -> m (Step (ManyState s s) c)
handleCollectStep s -> s -> ManyState s s
branch Step s c
cres =
        case Step s c
cres of
            Partial s
cs -> do
                Step s b
sres <- m (Step s b)
sinitial
                (s -> s -> ManyState s s)
-> s -> Step s b -> m (Step (ManyState s s) c)
handleSplitStep s -> s -> ManyState s s
branch s
cs Step s b
sres
            Done c
cb -> Step (ManyState s s) c -> m (Step (ManyState s s) c)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ManyState s s) c -> m (Step (ManyState s s) c))
-> Step (ManyState s s) c -> m (Step (ManyState s s) c)
forall a b. (a -> b) -> a -> b
$ c -> Step (ManyState s s) c
forall s b. b -> Step s b
Done c
cb

    -- Do not inline this
    runCollector :: (s -> s -> ManyState s s) -> s -> b -> m (Step (ManyState s s) c)
runCollector s -> s -> ManyState s s
branch s
cs b
sb = s -> b -> m (Step s c)
cstep s
cs b
sb m (Step s c)
-> (Step s c -> m (Step (ManyState s s) c))
-> m (Step (ManyState s s) c)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (s -> s -> ManyState s s) -> Step s c -> m (Step (ManyState s s) c)
handleCollectStep s -> s -> ManyState s s
branch

    initial :: m (Step (ManyState s s) c)
initial = m (Step s c)
cinitial m (Step s c)
-> (Step s c -> m (Step (ManyState s s) c))
-> m (Step (ManyState s s) c)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (s -> s -> ManyState s s) -> Step s c -> m (Step (ManyState s s) c)
handleCollectStep s -> s -> ManyState s s
forall s1 s2. s1 -> s2 -> ManyState s1 s2
ManyFirst

    {-# INLINE step_ #-}
    step_ :: s -> s -> a -> m (Step (ManyState s s) c)
step_ s
ss s
cs a
a = do
        Step s b
sres <- s -> a -> m (Step s b)
sstep s
ss a
a
        (s -> s -> ManyState s s)
-> s -> Step s b -> m (Step (ManyState s s) c)
handleSplitStep s -> s -> ManyState s s
forall s1 s2. s1 -> s2 -> ManyState s1 s2
ManyLoop s
cs Step s b
sres

    {-# INLINE step #-}
    step :: ManyState s s -> a -> m (Step (ManyState s s) c)
step (ManyFirst s
ss s
cs) a
a = s -> s -> a -> m (Step (ManyState s s) c)
step_ s
ss s
cs a
a
    step (ManyLoop s
ss s
cs) a
a = s -> s -> a -> m (Step (ManyState s s) c)
step_ s
ss s
cs a
a

    extract :: ManyState s s -> m c
extract (ManyFirst s
_ s
cs) = s -> m c
cextract s
cs
    extract (ManyLoop s
ss s
cs) = do
        b
sb <- s -> m b
sextract s
ss
        Step s c
cres <- s -> b -> m (Step s c)
cstep s
cs b
sb
        case Step s c
cres of
            Partial s
s -> s -> m c
cextract s
s
            Done c
b -> c -> m c
forall (m :: * -> *) a. Monad m => a -> m a
return c
b

-- | Like many, but inner fold emits an output at the end even if no input is
-- received.
--
-- /Internal/
--
-- /See also: 'Streamly.Prelude.concatMap', 'Streamly.Prelude.foldMany'/
--
{-# INLINE manyPost #-}
manyPost :: Monad m => Fold m a b -> Fold m b c -> Fold m a c
manyPost :: Fold m a b -> Fold m b c -> Fold m a c
manyPost (Fold s -> a -> m (Step s b)
sstep m (Step s b)
sinitial s -> m b
sextract) (Fold s -> b -> m (Step s c)
cstep m (Step s c)
cinitial s -> m c
cextract) =
    (Tuple' s s -> a -> m (Step (Tuple' s s) c))
-> m (Step (Tuple' s s) c) -> (Tuple' s s -> m c) -> Fold m a c
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold Tuple' s s -> a -> m (Step (Tuple' s s) c)
step m (Step (Tuple' s s) c)
initial Tuple' s s -> m c
extract

    where

    -- cs = collect state
    -- ss = split state
    -- cres = collect state result
    -- sres = split state result
    -- cb = collect done
    -- sb = split done

    -- Caution! There is mutual recursion here, inlining the right functions is
    -- important.

    {-# INLINE handleSplitStep #-}
    handleSplitStep :: s -> Step s b -> m (Step (Tuple' s s) c)
handleSplitStep s
cs Step s b
sres =
        case Step s b
sres of
            Partial s
ss1 -> Step (Tuple' s s) c -> m (Step (Tuple' s s) c)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple' s s) c -> m (Step (Tuple' s s) c))
-> Step (Tuple' s s) c -> m (Step (Tuple' s s) c)
forall a b. (a -> b) -> a -> b
$ Tuple' s s -> Step (Tuple' s s) c
forall s b. s -> Step s b
Partial (Tuple' s s -> Step (Tuple' s s) c)
-> Tuple' s s -> Step (Tuple' s s) c
forall a b. (a -> b) -> a -> b
$ s -> s -> Tuple' s s
forall a b. a -> b -> Tuple' a b
Tuple' s
ss1 s
cs
            Done b
sb -> s -> b -> m (Step (Tuple' s s) c)
runCollector s
cs b
sb

    {-# INLINE handleCollectStep #-}
    handleCollectStep :: Step s c -> m (Step (Tuple' s s) c)
handleCollectStep Step s c
cres =
        case Step s c
cres of
            Partial s
cs -> do
                Step s b
sres <- m (Step s b)
sinitial
                s -> Step s b -> m (Step (Tuple' s s) c)
handleSplitStep s
cs Step s b
sres
            Done c
cb -> Step (Tuple' s s) c -> m (Step (Tuple' s s) c)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple' s s) c -> m (Step (Tuple' s s) c))
-> Step (Tuple' s s) c -> m (Step (Tuple' s s) c)
forall a b. (a -> b) -> a -> b
$ c -> Step (Tuple' s s) c
forall s b. b -> Step s b
Done c
cb

    -- Do not inline this
    runCollector :: s -> b -> m (Step (Tuple' s s) c)
runCollector s
cs b
sb = s -> b -> m (Step s c)
cstep s
cs b
sb m (Step s c)
-> (Step s c -> m (Step (Tuple' s s) c)) -> m (Step (Tuple' s s) c)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Step s c -> m (Step (Tuple' s s) c)
handleCollectStep

    initial :: m (Step (Tuple' s s) c)
initial = m (Step s c)
cinitial m (Step s c)
-> (Step s c -> m (Step (Tuple' s s) c)) -> m (Step (Tuple' s s) c)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Step s c -> m (Step (Tuple' s s) c)
handleCollectStep

    {-# INLINE step #-}
    step :: Tuple' s s -> a -> m (Step (Tuple' s s) c)
step (Tuple' s
ss s
cs) a
a = do
        Step s b
sres <- s -> a -> m (Step s b)
sstep s
ss a
a
        s -> Step s b -> m (Step (Tuple' s s) c)
handleSplitStep s
cs Step s b
sres

    extract :: Tuple' s s -> m c
extract (Tuple' s
ss s
cs) = do
        b
sb <- s -> m b
sextract s
ss
        Step s c
cres <- s -> b -> m (Step s c)
cstep s
cs b
sb
        case Step s c
cres of
            Partial s
s -> s -> m c
cextract s
s
            Done c
b -> c -> m c
forall (m :: * -> *) a. Monad m => a -> m a
return c
b

-- | @chunksOf n split collect@ repeatedly applies the @split@ fold to chunks
-- of @n@ items in the input stream and supplies the result to the @collect@
-- fold.
--
-- >>> twos = Fold.chunksOf 2 Fold.toList Fold.toList
-- >>> Stream.fold twos $ Stream.fromList [1..10]
-- [[1,2],[3,4],[5,6],[7,8],[9,10]]
--
-- > chunksOf n split = many (take n split)
--
-- Stops when @collect@ stops.
--
-- @since 0.8.0
--
{-# INLINE chunksOf #-}
chunksOf :: Monad m => Int -> Fold m a b -> Fold m b c -> Fold m a c
chunksOf :: Int -> Fold m a b -> Fold m b c -> Fold m a c
chunksOf Int
n Fold m a b
split = Fold m a b -> Fold m b c -> Fold m a c
forall (m :: * -> *) a b c.
Monad m =>
Fold m a b -> Fold m b c -> Fold m a c
many (Int -> Fold m a b -> Fold m a b
forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Fold m a b
take Int
n Fold m a b
split)

-- |
--
-- /Internal/
{-# INLINE chunksOf2 #-}
chunksOf2 :: Monad m => Int -> Fold m a b -> Fold2 m x b c -> Fold2 m x a c
chunksOf2 :: Int -> Fold m a b -> Fold2 m x b c -> Fold2 m x a c
chunksOf2 Int
n (Fold s -> a -> m (Step s b)
step1 m (Step s b)
initial1 s -> m b
extract1) (Fold2 s -> b -> m s
step2 x -> m s
inject2 s -> m c
extract2) =
    (Tuple3' Int s s -> a -> m (Tuple3' Int s s))
-> (x -> m (Tuple3' Int s s))
-> (Tuple3' Int s s -> m c)
-> Fold2 m x a c
forall (m :: * -> *) c a b s.
(s -> a -> m s) -> (c -> m s) -> (s -> m b) -> Fold2 m c a b
Fold2 Tuple3' Int s s -> a -> m (Tuple3' Int s s)
step' x -> m (Tuple3' Int s s)
forall a. Num a => x -> m (Tuple3' a s s)
inject' Tuple3' Int s s -> m c
forall a. Tuple3' a s s -> m c
extract'

    where

    loopUntilPartial :: t -> m (Tuple3' a s t)
loopUntilPartial t
s = do
        Step s b
res <- m (Step s b)
initial1
        case Step s b
res of
            Partial s
fs -> Tuple3' a s t -> m (Tuple3' a s t)
forall (m :: * -> *) a. Monad m => a -> m a
return (Tuple3' a s t -> m (Tuple3' a s t))
-> Tuple3' a s t -> m (Tuple3' a s t)
forall a b. (a -> b) -> a -> b
$ a -> s -> t -> Tuple3' a s t
forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' a
0 s
fs t
s
            Done b
_ -> t -> m (Tuple3' a s t)
loopUntilPartial t
s

    inject' :: x -> m (Tuple3' a s s)
inject' x
x = x -> m s
inject2 x
x m s -> (s -> m (Tuple3' a s s)) -> m (Tuple3' a s s)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= s -> m (Tuple3' a s s)
forall a t. Num a => t -> m (Tuple3' a s t)
loopUntilPartial

    step' :: Tuple3' Int s s -> a -> m (Tuple3' Int s s)
step' (Tuple3' Int
i s
r1 s
r2) a
a =
        if Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
n
        then do
            Step s b
res <- s -> a -> m (Step s b)
step1 s
r1 a
a
            case Step s b
res of
                Partial s
s -> Tuple3' Int s s -> m (Tuple3' Int s s)
forall (m :: * -> *) a. Monad m => a -> m a
return (Tuple3' Int s s -> m (Tuple3' Int s s))
-> Tuple3' Int s s -> m (Tuple3' Int s s)
forall a b. (a -> b) -> a -> b
$ Int -> s -> s -> Tuple3' Int s s
forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) s
s s
r2
                Done b
b -> s -> b -> m s
step2 s
r2 b
b m s -> (s -> m (Tuple3' Int s s)) -> m (Tuple3' Int s s)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= s -> m (Tuple3' Int s s)
forall a t. Num a => t -> m (Tuple3' a s t)
loopUntilPartial
        else s -> m b
extract1 s
r1 m b -> (b -> m s) -> m s
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= s -> b -> m s
step2 s
r2 m s -> (s -> m (Tuple3' Int s s)) -> m (Tuple3' Int s s)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= s -> m (Tuple3' Int s s)
forall a t. Num a => t -> m (Tuple3' a s t)
loopUntilPartial

    extract' :: Tuple3' a s s -> m c
extract' (Tuple3' a
_ s
r1 s
r2) = s -> m b
extract1 s
r1 m b -> (b -> m s) -> m s
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= s -> b -> m s
step2 s
r2 m s -> (s -> m c) -> m c
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= s -> m c
extract2

-- XXX We can use asyncClock here. A parser can be used to return an input that
-- arrives after the timeout.
-- XXX If n is 0 return immediately in initial.
-- XXX we should probably discard the input received after the timeout like
-- takeEndBy_.
--
-- | @takeInterval n fold@ uses @fold@ to fold the input items arriving within
-- a window of first @n@ seconds.
--
-- >>> Stream.fold (Fold.takeInterval 1.0 Fold.toList) $ Stream.delay 0.1 $ Stream.fromList [1..]
-- [1,2,3,4,5,6,7,8,9,10,11]
--
-- Stops when @fold@ stops or when the timeout occurs. Note that the fold needs
-- an input after the timeout to stop. For example, if no input is pushed to
-- the fold until one hour after the timeout had occurred, then the fold will
-- be done only after consuming that input.
--
-- /Pre-release/
--
{-# INLINE takeInterval #-}
takeInterval :: MonadAsync m => Double -> Fold m a b -> Fold m a b
takeInterval :: Double -> Fold m a b -> Fold m a b
takeInterval Double
n (Fold s -> a -> m (Step s b)
step m (Step s b)
initial s -> m b
done) = (Tuple3' s (MVar Bool) ThreadId
 -> a -> m (Step (Tuple3' s (MVar Bool) ThreadId) b))
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
-> (Tuple3' s (MVar Bool) ThreadId -> m b)
-> Fold m a b
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold Tuple3' s (MVar Bool) ThreadId
-> a -> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
step' m (Step (Tuple3' s (MVar Bool) ThreadId) b)
initial' Tuple3' s (MVar Bool) ThreadId -> m b
forall b c. Tuple3' s b c -> m b
done'

    where

    initial' :: m (Step (Tuple3' s (MVar Bool) ThreadId) b)
initial' = do
        Step s b
res <- m (Step s b)
initial
        case Step s b
res of
            Partial s
s -> do
                MVar Bool
mv <- IO (MVar Bool) -> m (MVar Bool)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (MVar Bool) -> m (MVar Bool))
-> IO (MVar Bool) -> m (MVar Bool)
forall a b. (a -> b) -> a -> b
$ Bool -> IO (MVar Bool)
forall a. a -> IO (MVar a)
newMVar Bool
False
                ThreadId
t <-
                    (RunInBase m IO -> IO (StM m ThreadId)) -> m ThreadId
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
(RunInBase m b -> b (StM m a)) -> m a
control ((RunInBase m IO -> IO (StM m ThreadId)) -> m ThreadId)
-> (RunInBase m IO -> IO (StM m ThreadId)) -> m ThreadId
forall a b. (a -> b) -> a -> b
$ \RunInBase m IO
run ->
                        ((forall a. IO a -> IO a) -> IO (StM m ThreadId))
-> IO (StM m ThreadId)
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO (StM m ThreadId))
 -> IO (StM m ThreadId))
-> ((forall a. IO a -> IO a) -> IO (StM m ThreadId))
-> IO (StM m ThreadId)
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
                            ThreadId
tid <-
                                IO () -> IO ThreadId
forkIO
                                  (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch
                                        (IO () -> IO ()
forall a. IO a -> IO a
restore (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (StM m ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (StM m ()) -> IO ()) -> IO (StM m ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> IO (StM m ())
RunInBase m IO
run (MVar Bool -> m ()
forall (m :: * -> *). MonadIO m => MVar Bool -> m ()
timerThread MVar Bool
mv))
                                        (MVar Bool -> SomeException -> IO ()
handleChildException MVar Bool
mv)
                            m ThreadId -> IO (StM m ThreadId)
RunInBase m IO
run (ThreadId -> m ThreadId
forall (m :: * -> *) a. Monad m => a -> m a
return ThreadId
tid)
                Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple3' s (MVar Bool) ThreadId) b
 -> m (Step (Tuple3' s (MVar Bool) ThreadId) b))
-> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall a b. (a -> b) -> a -> b
$ Tuple3' s (MVar Bool) ThreadId
-> Step (Tuple3' s (MVar Bool) ThreadId) b
forall s b. s -> Step s b
Partial (Tuple3' s (MVar Bool) ThreadId
 -> Step (Tuple3' s (MVar Bool) ThreadId) b)
-> Tuple3' s (MVar Bool) ThreadId
-> Step (Tuple3' s (MVar Bool) ThreadId) b
forall a b. (a -> b) -> a -> b
$ s -> MVar Bool -> ThreadId -> Tuple3' s (MVar Bool) ThreadId
forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' s
s MVar Bool
mv ThreadId
t
            Done b
b -> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple3' s (MVar Bool) ThreadId) b
 -> m (Step (Tuple3' s (MVar Bool) ThreadId) b))
-> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall a b. (a -> b) -> a -> b
$ b -> Step (Tuple3' s (MVar Bool) ThreadId) b
forall s b. b -> Step s b
Done b
b

    step' :: Tuple3' s (MVar Bool) ThreadId
-> a -> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
step' (Tuple3' s
s MVar Bool
mv ThreadId
t) a
a = do
        Bool
val <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ MVar Bool -> IO Bool
forall a. MVar a -> IO a
readMVar MVar Bool
mv
        if Bool
val
        then do
            Step s b
res <- s -> a -> m (Step s b)
step s
s a
a
            case Step s b
res of
                Partial s
sres -> b -> Step (Tuple3' s (MVar Bool) ThreadId) b
forall s b. b -> Step s b
Done (b -> Step (Tuple3' s (MVar Bool) ThreadId) b)
-> m b -> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> m b
done s
sres
                Done b
bres -> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple3' s (MVar Bool) ThreadId) b
 -> m (Step (Tuple3' s (MVar Bool) ThreadId) b))
-> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall a b. (a -> b) -> a -> b
$ b -> Step (Tuple3' s (MVar Bool) ThreadId) b
forall s b. b -> Step s b
Done b
bres
        else do
            Step s b
res <- s -> a -> m (Step s b)
step s
s a
a
            case Step s b
res of
                Partial s
fs -> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple3' s (MVar Bool) ThreadId) b
 -> m (Step (Tuple3' s (MVar Bool) ThreadId) b))
-> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall a b. (a -> b) -> a -> b
$ Tuple3' s (MVar Bool) ThreadId
-> Step (Tuple3' s (MVar Bool) ThreadId) b
forall s b. s -> Step s b
Partial (Tuple3' s (MVar Bool) ThreadId
 -> Step (Tuple3' s (MVar Bool) ThreadId) b)
-> Tuple3' s (MVar Bool) ThreadId
-> Step (Tuple3' s (MVar Bool) ThreadId) b
forall a b. (a -> b) -> a -> b
$ s -> MVar Bool -> ThreadId -> Tuple3' s (MVar Bool) ThreadId
forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' s
fs MVar Bool
mv ThreadId
t
                Done b
b -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ThreadId -> IO ()
killThread ThreadId
t) m ()
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Step (Tuple3' s (MVar Bool) ThreadId) b
forall s b. b -> Step s b
Done b
b)

    done' :: Tuple3' s b c -> m b
done' (Tuple3' s
s b
_ c
_) = s -> m b
done s
s

    timerThread :: MVar Bool -> m ()
timerThread MVar Bool
mv = do
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
n Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000000)
        -- Use IORef + CAS? instead of MVar since its a Bool?
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar Bool -> Bool -> IO Bool
forall a. MVar a -> a -> IO a
swapMVar MVar Bool
mv Bool
True

    handleChildException :: MVar Bool -> SomeException -> IO ()
    handleChildException :: MVar Bool -> SomeException -> IO ()
handleChildException MVar Bool
mv SomeException
_ = IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar Bool -> Bool -> IO Bool
forall a. MVar a -> a -> IO a
swapMVar MVar Bool
mv Bool
True

-- For example, we can copy and distribute a stream to multiple folds where
-- each fold can group the input differently e.g. by one second, one minute and
-- one hour windows respectively and fold each resulting stream of folds.

-- | Group the input stream into windows of n second each using the first fold
-- and then fold the resulting groups using the second fold.
--
-- >>> intervals = Fold.intervalsOf 0.5 Fold.toList Fold.toList
-- >>> Stream.fold intervals $ Stream.delay 0.2 $ Stream.fromList [1..10]
-- [[1,2,3,4],[5,6,7],[8,9,10]]
--
-- > intervalsOf n split = many (takeInterval n split)
--
-- /Pre-release/
--
{-# INLINE intervalsOf #-}
intervalsOf :: MonadAsync m => Double -> Fold m a b -> Fold m b c -> Fold m a c
intervalsOf :: Double -> Fold m a b -> Fold m b c -> Fold m a c
intervalsOf Double
n Fold m a b
split = Fold m a b -> Fold m b c -> Fold m a c
forall (m :: * -> *) a b c.
Monad m =>
Fold m a b -> Fold m b c -> Fold m a c
many (Double -> Fold m a b -> Fold m a b
forall (m :: * -> *) a b.
MonadAsync m =>
Double -> Fold m a b -> Fold m a b
takeInterval Double
n Fold m a b
split)