-- |
-- Module      : Streamly.Internal.Data.Stream.StreamD.Transform
-- Copyright   : (c) 2018 Composewell Technologies
--               (c) Roman Leshchinskiy 2008-2010
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- "Streamly.Internal.Data.Pipe" might ultimately replace this module.

-- A few functions in this module have been adapted from the vector package
-- (c) Roman Leshchinskiy. See the notes in specific combinators.

module Streamly.Internal.Data.Stream.StreamD.Transform
    (
    -- * Piping
    -- | Pass through a 'Pipe'.
      transform

    -- * Mapping
    -- | Stateless one-to-one maps.
    , map
    , mapM
    , sequence

    -- * Mapping Effects
    , tap
    , tapOffsetEvery
    , tapRate
    , pollCounts

    -- * Folding
    , foldrS
    , foldrT
    , foldlS
    , foldlT

    -- * Scanning By 'Fold'
    , postscanOnce -- XXX rename to postscan
    , scanOnce     -- XXX rename to scan

    -- * Scanning
    -- | Left scans. Stateful, mostly one-to-one maps.
    , scanlM'
    , scanlMAfter'
    , scanl'
    , scanlM
    , scanl
    , scanl1M'
    , scanl1'
    , scanl1M
    , scanl1

    , prescanl'
    , prescanlM'

    , postscanl
    , postscanlM
    , postscanl'
    , postscanlM'
    , postscanlMAfter'

    , postscanlx'
    , postscanlMx'
    , scanlMx'
    , scanlx'

    -- * Filtering
    -- | Produce a subset of the stream.
    , filter
    , filterM
    , deleteBy
    , uniq

    -- * Trimming
    -- | Produce a subset of the stream trimmed at ends.
    , take
    , takeByTime
    , takeWhile
    , takeWhileM
    , drop
    , dropByTime
    , dropWhile
    , dropWhileM

    -- * Inserting Elements
    -- | Produce a superset of the stream.
    , insertBy
    , intersperse
    , intersperseM
    , intersperseSuffix
    , intersperseSuffixBySpan

    -- * Inserting Side Effects
    , intersperseM_
    , intersperseSuffix_

    -- * Reordering
    -- | Produce strictly the same set but reordered.
    , reverse
    , reverse'

    -- * Position Indexing
    , indexed
    , indexedR

    -- * Searching
    , findIndices

    -- * Rolling map
    -- | Map using the previous element.
    , rollingMap
    , rollingMapM

    -- * Maybe Streams
    , mapMaybe
    , mapMaybeM
    )
where

#include "inline.hs"

import Control.Concurrent (killThread, threadDelay)
import Control.Exception (AsyncException)
import Control.Monad (void, when)
import Control.Monad.Catch (MonadCatch, throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.IORef (newIORef, mkWeakIORef)
import Data.Maybe (fromJust, isJust)
import Foreign.Storable (Storable(..))
import GHC.Types (SPEC(..))
import qualified Control.Monad.Catch as MC

import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Pipe.Type (Pipe(..), PipeState(..))
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units
       (TimeUnit64, toRelTime64, diffAbsTime64)

import qualified Streamly.Internal.Data.Array.Foreign.Type as A
import qualified Streamly.Internal.Data.Fold.Type as FL
import qualified Streamly.Internal.Data.IORef.Prim as Prim
import qualified Streamly.Internal.Data.Pipe.Type as Pipe
import qualified Streamly.Internal.Data.Stream.StreamK as K

import Prelude hiding
       ( drop, dropWhile, filter, map, mapM, reverse
       , scanl, scanl1, sequence, take, takeWhile)

import Streamly.Internal.Data.Stream.StreamD.Type
import Streamly.Internal.Data.SVar

------------------------------------------------------------------------------
-- Piping
------------------------------------------------------------------------------

{-# INLINE_NORMAL transform #-}
transform :: Monad m => Pipe m a b -> Stream m a -> Stream m b
transform :: Pipe m a b -> Stream m a -> Stream m b
transform (Pipe s1 -> a -> m (Step (PipeState s1 s2) b)
pstep1 s2 -> m (Step (PipeState s1 s2) b)
pstep2 s1
pstate) (Stream State Stream m a -> s -> m (Step s a)
step s
state) =
    (State Stream m b
 -> (PipeState s1 s2, s) -> m (Step (PipeState s1 s2, s) b))
-> (PipeState s1 s2, s) -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b
-> (PipeState s1 s2, s) -> m (Step (PipeState s1 s2, s) b)
forall (m :: * -> *) a.
State Stream m a
-> (PipeState s1 s2, s) -> m (Step (PipeState s1 s2, s) b)
step' (s1 -> PipeState s1 s2
forall s1 s2. s1 -> PipeState s1 s2
Consume s1
pstate, s
state)

  where

    {-# INLINE_LATE step' #-}

    step' :: State Stream m a
-> (PipeState s1 s2, s) -> m (Step (PipeState s1 s2, s) b)
step' State Stream m a
gst (Consume s1
pst, s
st) = s1
pst s1
-> m (Step (PipeState s1 s2, s) b)
-> m (Step (PipeState s1 s2, s) b)
`seq` do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                Step (PipeState s1 s2) b
res <- s1 -> a -> m (Step (PipeState s1 s2) b)
pstep1 s1
pst a
x
                case Step (PipeState s1 s2) b
res of
                    Pipe.Yield b
b PipeState s1 s2
pst' -> Step (PipeState s1 s2, s) b -> m (Step (PipeState s1 s2, s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (PipeState s1 s2, s) b -> m (Step (PipeState s1 s2, s) b))
-> Step (PipeState s1 s2, s) b -> m (Step (PipeState s1 s2, s) b)
forall a b. (a -> b) -> a -> b
$ b -> (PipeState s1 s2, s) -> Step (PipeState s1 s2, s) b
forall s a. a -> s -> Step s a
Yield b
b (PipeState s1 s2
pst', s
s)
                    Pipe.Continue PipeState s1 s2
pst' -> Step (PipeState s1 s2, s) b -> m (Step (PipeState s1 s2, s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (PipeState s1 s2, s) b -> m (Step (PipeState s1 s2, s) b))
-> Step (PipeState s1 s2, s) b -> m (Step (PipeState s1 s2, s) b)
forall a b. (a -> b) -> a -> b
$ (PipeState s1 s2, s) -> Step (PipeState s1 s2, s) b
forall s a. s -> Step s a
Skip (PipeState s1 s2
pst', s
s)
            Skip s
s -> Step (PipeState s1 s2, s) b -> m (Step (PipeState s1 s2, s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (PipeState s1 s2, s) b -> m (Step (PipeState s1 s2, s) b))
-> Step (PipeState s1 s2, s) b -> m (Step (PipeState s1 s2, s) b)
forall a b. (a -> b) -> a -> b
$ (PipeState s1 s2, s) -> Step (PipeState s1 s2, s) b
forall s a. s -> Step s a
Skip (s1 -> PipeState s1 s2
forall s1 s2. s1 -> PipeState s1 s2
Consume s1
pst, s
s)
            Step s a
Stop   -> Step (PipeState s1 s2, s) b -> m (Step (PipeState s1 s2, s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (PipeState s1 s2, s) b
forall s a. Step s a
Stop

    step' State Stream m a
_ (Produce s2
pst, s
st) = s2
pst s2
-> m (Step (PipeState s1 s2, s) b)
-> m (Step (PipeState s1 s2, s) b)
`seq` do
        Step (PipeState s1 s2) b
res <- s2 -> m (Step (PipeState s1 s2) b)
pstep2 s2
pst
        case Step (PipeState s1 s2) b
res of
            Pipe.Yield b
b PipeState s1 s2
pst' -> Step (PipeState s1 s2, s) b -> m (Step (PipeState s1 s2, s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (PipeState s1 s2, s) b -> m (Step (PipeState s1 s2, s) b))
-> Step (PipeState s1 s2, s) b -> m (Step (PipeState s1 s2, s) b)
forall a b. (a -> b) -> a -> b
$ b -> (PipeState s1 s2, s) -> Step (PipeState s1 s2, s) b
forall s a. a -> s -> Step s a
Yield b
b (PipeState s1 s2
pst', s
st)
            Pipe.Continue PipeState s1 s2
pst' -> Step (PipeState s1 s2, s) b -> m (Step (PipeState s1 s2, s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (PipeState s1 s2, s) b -> m (Step (PipeState s1 s2, s) b))
-> Step (PipeState s1 s2, s) b -> m (Step (PipeState s1 s2, s) b)
forall a b. (a -> b) -> a -> b
$ (PipeState s1 s2, s) -> Step (PipeState s1 s2, s) b
forall s a. s -> Step s a
Skip (PipeState s1 s2
pst', s
st)

------------------------------------------------------------------------------
-- Transformation Folds
------------------------------------------------------------------------------

{-# INLINE_NORMAL foldlT #-}
foldlT :: (Monad m, Monad (s m), MonadTrans s)
    => (s m b -> a -> s m b) -> s m b -> Stream m a -> s m b
foldlT :: (s m b -> a -> s m b) -> s m b -> Stream m a -> s m b
foldlT s m b -> a -> s m b
fstep s m b
begin (Stream State Stream m a -> s -> m (Step s a)
step s
state) = SPEC -> s m b -> s -> s m b
go SPEC
SPEC s m b
begin s
state
  where
    go :: SPEC -> s m b -> s -> s m b
go !SPEC
_ s m b
acc s
st = do
        Step s a
r <- m (Step s a) -> s m (Step s a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Step s a) -> s m (Step s a)) -> m (Step s a) -> s m (Step s a)
forall a b. (a -> b) -> a -> b
$ State Stream m a -> s -> m (Step s a)
step State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s a
r of
            Yield a
x s
s -> SPEC -> s m b -> s -> s m b
go SPEC
SPEC (s m b -> a -> s m b
fstep s m b
acc a
x) s
s
            Skip s
s -> SPEC -> s m b -> s -> s m b
go SPEC
SPEC s m b
acc s
s
            Step s a
Stop   -> s m b
acc

-- Note, this is going to have horrible performance, because of the nature of
-- the stream type (i.e. direct stream vs CPS). Its only for reference, it is
-- likely be practically unusable.
{-# INLINE_NORMAL foldlS #-}
foldlS :: Monad m
    => (Stream m b -> a -> Stream m b) -> Stream m b -> Stream m a -> Stream m b
foldlS :: (Stream m b -> a -> Stream m b)
-> Stream m b -> Stream m a -> Stream m b
foldlS Stream m b -> a -> Stream m b
fstep Stream m b
begin (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m b
 -> Either (s, Stream m b) (Stream m b)
 -> m (Step (Either (s, Stream m b) (Stream m b)) b))
-> Either (s, Stream m b) (Stream m b) -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b
-> Either (s, Stream m b) (Stream m b)
-> m (Step (Either (s, Stream m b) (Stream m b)) b)
forall (m :: * -> *) a.
State Stream m a
-> Either (s, Stream m b) (Stream m b)
-> m (Step (Either (s, Stream m b) (Stream m b)) b)
step' ((s, Stream m b) -> Either (s, Stream m b) (Stream m b)
forall a b. a -> Either a b
Left (s
state, Stream m b
begin))
  where
    step' :: State Stream m a
-> Either (s, Stream m b) (Stream m b)
-> m (Step (Either (s, Stream m b) (Stream m b)) b)
step' State Stream m a
gst (Left (s
st, Stream m b
acc)) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        Step (Either (s, Stream m b) (Stream m b)) b
-> m (Step (Either (s, Stream m b) (Stream m b)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either (s, Stream m b) (Stream m b)) b
 -> m (Step (Either (s, Stream m b) (Stream m b)) b))
-> Step (Either (s, Stream m b) (Stream m b)) b
-> m (Step (Either (s, Stream m b) (Stream m b)) b)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
x s
s -> Either (s, Stream m b) (Stream m b)
-> Step (Either (s, Stream m b) (Stream m b)) b
forall s a. s -> Step s a
Skip ((s, Stream m b) -> Either (s, Stream m b) (Stream m b)
forall a b. a -> Either a b
Left (s
s, Stream m b -> a -> Stream m b
fstep Stream m b
acc a
x))
            Skip s
s -> Either (s, Stream m b) (Stream m b)
-> Step (Either (s, Stream m b) (Stream m b)) b
forall s a. s -> Step s a
Skip ((s, Stream m b) -> Either (s, Stream m b) (Stream m b)
forall a b. a -> Either a b
Left (s
s, Stream m b
acc))
            Step s a
Stop   -> Either (s, Stream m b) (Stream m b)
-> Step (Either (s, Stream m b) (Stream m b)) b
forall s a. s -> Step s a
Skip (Stream m b -> Either (s, Stream m b) (Stream m b)
forall a b. b -> Either a b
Right Stream m b
acc)

    step' State Stream m a
gst (Right (Stream State Stream m b -> s -> m (Step s b)
stp s
stt)) = do
        Step s b
r <- State Stream m b -> s -> m (Step s b)
stp (State Stream m a -> State Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
stt
        Step (Either (s, Stream m b) (Stream m b)) b
-> m (Step (Either (s, Stream m b) (Stream m b)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either (s, Stream m b) (Stream m b)) b
 -> m (Step (Either (s, Stream m b) (Stream m b)) b))
-> Step (Either (s, Stream m b) (Stream m b)) b
-> m (Step (Either (s, Stream m b) (Stream m b)) b)
forall a b. (a -> b) -> a -> b
$ case Step s b
r of
            Yield b
x s
s -> b
-> Either (s, Stream m b) (Stream m b)
-> Step (Either (s, Stream m b) (Stream m b)) b
forall s a. a -> s -> Step s a
Yield b
x (Stream m b -> Either (s, Stream m b) (Stream m b)
forall a b. b -> Either a b
Right ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
stp s
s))
            Skip s
s -> Either (s, Stream m b) (Stream m b)
-> Step (Either (s, Stream m b) (Stream m b)) b
forall s a. s -> Step s a
Skip (Stream m b -> Either (s, Stream m b) (Stream m b)
forall a b. b -> Either a b
Right ((State Stream m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
stp s
s))
            Step s b
Stop   -> Step (Either (s, Stream m b) (Stream m b)) b
forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Transformation by Mapping
------------------------------------------------------------------------------

{-# INLINE_NORMAL sequence #-}
sequence :: Monad m => Stream m (m a) -> Stream m a
sequence :: Stream m (m a) -> Stream m a
sequence (Stream State Stream m (m a) -> s -> m (Step s (m a))
step s
state) = (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
forall (m :: * -> *) a. State Stream m a -> s -> m (Step s a)
step' s
state
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> s -> m (Step s a)
step' State Stream m a
gst s
st = do
         Step s (m a)
r <- State Stream m (m a) -> s -> m (Step s (m a))
step (State Stream m a -> State Stream m (m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
         case Step s (m a)
r of
             Yield m a
x s
s -> m a
x m a -> (a -> m (Step s a)) -> m (Step s a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
a -> Step s a -> m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> s -> Step s a
forall s a. a -> s -> Step s a
Yield a
a s
s)
             Skip s
s    -> Step s a -> m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a -> m (Step s a)) -> Step s a -> m (Step s a)
forall a b. (a -> b) -> a -> b
$ s -> Step s a
forall s a. s -> Step s a
Skip s
s
             Step s (m a)
Stop      -> Step s a -> m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step s a
forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Mapping side effects
------------------------------------------------------------------------------

data TapState fs st a
    = TapInit | Tapping !fs st | TapDone st

-- XXX Multiple yield points
{-# INLINE tap #-}
tap :: Monad m => Fold m a b -> Stream m a -> Stream m a
tap :: Fold m a b -> Stream m a -> Stream m a
tap (Fold s -> a -> m (Step s b)
fstep m (Step s b)
initial s -> m b
extract) (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a
 -> TapState s s Any -> m (Step (TapState s s Any) a))
-> TapState s s Any -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> TapState s s Any -> m (Step (TapState s s Any) a)
forall a a.
State Stream m a -> TapState s s a -> m (Step (TapState s s a) a)
step' TapState s s Any
forall fs st a. TapState fs st a
TapInit

    where

    step' :: State Stream m a -> TapState s s a -> m (Step (TapState s s a) a)
step' State Stream m a
_ TapState s s a
TapInit = do
        Step s b
res <- m (Step s b)
initial
        Step (TapState s s a) a -> m (Step (TapState s s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (TapState s s a) a -> m (Step (TapState s s a) a))
-> Step (TapState s s a) a -> m (Step (TapState s s a) a)
forall a b. (a -> b) -> a -> b
$ TapState s s a -> Step (TapState s s a) a
forall s a. s -> Step s a
Skip
            (TapState s s a -> Step (TapState s s a) a)
-> TapState s s a -> Step (TapState s s a) a
forall a b. (a -> b) -> a -> b
$ case Step s b
res of
                  FL.Partial s
s -> s -> s -> TapState s s a
forall fs st a. fs -> st -> TapState fs st a
Tapping s
s s
state
                  FL.Done b
_ -> s -> TapState s s a
forall fs st a. st -> TapState fs st a
TapDone s
state
    step' State Stream m a
gst (Tapping s
acc s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> do
                Step s b
res <- s -> a -> m (Step s b)
fstep s
acc a
x
                Step (TapState s s a) a -> m (Step (TapState s s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return
                    (Step (TapState s s a) a -> m (Step (TapState s s a) a))
-> Step (TapState s s a) a -> m (Step (TapState s s a) a)
forall a b. (a -> b) -> a -> b
$ a -> TapState s s a -> Step (TapState s s a) a
forall s a. a -> s -> Step s a
Yield a
x
                    (TapState s s a -> Step (TapState s s a) a)
-> TapState s s a -> Step (TapState s s a) a
forall a b. (a -> b) -> a -> b
$ case Step s b
res of
                          FL.Partial s
fs -> s -> s -> TapState s s a
forall fs st a. fs -> st -> TapState fs st a
Tapping s
fs s
s
                          FL.Done b
_ -> s -> TapState s s a
forall fs st a. st -> TapState fs st a
TapDone s
s
            Skip s
s -> Step (TapState s s a) a -> m (Step (TapState s s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState s s a) a -> m (Step (TapState s s a) a))
-> Step (TapState s s a) a -> m (Step (TapState s s a) a)
forall a b. (a -> b) -> a -> b
$ TapState s s a -> Step (TapState s s a) a
forall s a. s -> Step s a
Skip (s -> s -> TapState s s a
forall fs st a. fs -> st -> TapState fs st a
Tapping s
acc s
s)
            Step s a
Stop -> do
                m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m b -> m ()) -> m b -> m ()
forall a b. (a -> b) -> a -> b
$ s -> m b
extract s
acc
                Step (TapState s s a) a -> m (Step (TapState s s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (TapState s s a) a
forall s a. Step s a
Stop
    step' State Stream m a
gst (TapDone s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        Step (TapState s s a) a -> m (Step (TapState s s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (TapState s s a) a -> m (Step (TapState s s a) a))
-> Step (TapState s s a) a -> m (Step (TapState s s a) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
                  Yield a
x s
s -> a -> TapState s s a -> Step (TapState s s a) a
forall s a. a -> s -> Step s a
Yield a
x (s -> TapState s s a
forall fs st a. st -> TapState fs st a
TapDone s
s)
                  Skip s
s -> TapState s s a -> Step (TapState s s a) a
forall s a. s -> Step s a
Skip (s -> TapState s s a
forall fs st a. st -> TapState fs st a
TapDone s
s)
                  Step s a
Stop -> Step (TapState s s a) a
forall s a. Step s a
Stop

data TapOffState fs s a
    = TapOffInit
    | TapOffTapping !fs s Int
    | TapOffDone s

-- XXX Multiple yield points
{-# INLINE_NORMAL tapOffsetEvery #-}
tapOffsetEvery :: Monad m
    => Int -> Int -> Fold m a b -> Stream m a -> Stream m a
tapOffsetEvery :: Int -> Int -> Fold m a b -> Stream m a -> Stream m a
tapOffsetEvery Int
offset Int
n (Fold s -> a -> m (Step s b)
fstep m (Step s b)
initial s -> m b
extract) (Stream State Stream m a -> s -> m (Step s a)
step s
state) =
    (State Stream m a
 -> TapOffState s s Any -> m (Step (TapOffState s s Any) a))
-> TapOffState s s Any -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> TapOffState s s Any -> m (Step (TapOffState s s Any) a)
forall a a.
State Stream m a
-> TapOffState s s a -> m (Step (TapOffState s s a) a)
step' TapOffState s s Any
forall fs s a. TapOffState fs s a
TapOffInit

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> TapOffState s s a -> m (Step (TapOffState s s a) a)
step' State Stream m a
_ TapOffState s s a
TapOffInit = do
        Step s b
res <- m (Step s b)
initial
        Step (TapOffState s s a) a -> m (Step (TapOffState s s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (TapOffState s s a) a -> m (Step (TapOffState s s a) a))
-> Step (TapOffState s s a) a -> m (Step (TapOffState s s a) a)
forall a b. (a -> b) -> a -> b
$ TapOffState s s a -> Step (TapOffState s s a) a
forall s a. s -> Step s a
Skip
            (TapOffState s s a -> Step (TapOffState s s a) a)
-> TapOffState s s a -> Step (TapOffState s s a) a
forall a b. (a -> b) -> a -> b
$ case Step s b
res of
                  FL.Partial s
s -> s -> s -> Int -> TapOffState s s a
forall fs s a. fs -> s -> Int -> TapOffState fs s a
TapOffTapping s
s s
state (Int
offset Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
n)
                  FL.Done b
_ -> s -> TapOffState s s a
forall fs s a. s -> TapOffState fs s a
TapOffDone s
state
    step' State Stream m a
gst (TapOffTapping s
acc s
st Int
count) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> do
                TapOffState s s a
next <-
                    if Int
count Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
                    then do
                        Step s b
res <- s -> a -> m (Step s b)
fstep s
acc a
x
                        TapOffState s s a -> m (TapOffState s s a)
forall (m :: * -> *) a. Monad m => a -> m a
return
                            (TapOffState s s a -> m (TapOffState s s a))
-> TapOffState s s a -> m (TapOffState s s a)
forall a b. (a -> b) -> a -> b
$ case Step s b
res of
                                  FL.Partial s
sres ->
                                    s -> s -> Int -> TapOffState s s a
forall fs s a. fs -> s -> Int -> TapOffState fs s a
TapOffTapping s
sres s
s (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
                                  FL.Done b
_ -> s -> TapOffState s s a
forall fs s a. s -> TapOffState fs s a
TapOffDone s
s
                    else TapOffState s s a -> m (TapOffState s s a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TapOffState s s a -> m (TapOffState s s a))
-> TapOffState s s a -> m (TapOffState s s a)
forall a b. (a -> b) -> a -> b
$ s -> s -> Int -> TapOffState s s a
forall fs s a. fs -> s -> Int -> TapOffState fs s a
TapOffTapping s
acc s
s (Int
count Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
                Step (TapOffState s s a) a -> m (Step (TapOffState s s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapOffState s s a) a -> m (Step (TapOffState s s a) a))
-> Step (TapOffState s s a) a -> m (Step (TapOffState s s a) a)
forall a b. (a -> b) -> a -> b
$ a -> TapOffState s s a -> Step (TapOffState s s a) a
forall s a. a -> s -> Step s a
Yield a
x TapOffState s s a
next
            Skip s
s -> Step (TapOffState s s a) a -> m (Step (TapOffState s s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapOffState s s a) a -> m (Step (TapOffState s s a) a))
-> Step (TapOffState s s a) a -> m (Step (TapOffState s s a) a)
forall a b. (a -> b) -> a -> b
$ TapOffState s s a -> Step (TapOffState s s a) a
forall s a. s -> Step s a
Skip (s -> s -> Int -> TapOffState s s a
forall fs s a. fs -> s -> Int -> TapOffState fs s a
TapOffTapping s
acc s
s Int
count)
            Step s a
Stop -> do
                m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m b -> m ()) -> m b -> m ()
forall a b. (a -> b) -> a -> b
$ s -> m b
extract s
acc
                Step (TapOffState s s a) a -> m (Step (TapOffState s s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (TapOffState s s a) a
forall s a. Step s a
Stop
    step' State Stream m a
gst (TapOffDone s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        Step (TapOffState s s a) a -> m (Step (TapOffState s s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (TapOffState s s a) a -> m (Step (TapOffState s s a) a))
-> Step (TapOffState s s a) a -> m (Step (TapOffState s s a) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
                  Yield a
x s
s -> a -> TapOffState s s a -> Step (TapOffState s s a) a
forall s a. a -> s -> Step s a
Yield a
x (s -> TapOffState s s a
forall fs s a. s -> TapOffState fs s a
TapOffDone s
s)
                  Skip s
s -> TapOffState s s a -> Step (TapOffState s s a) a
forall s a. s -> Step s a
Skip (s -> TapOffState s s a
forall fs s a. s -> TapOffState fs s a
TapOffDone s
s)
                  Step s a
Stop -> Step (TapOffState s s a) a
forall s a. Step s a
Stop

{-# INLINE_NORMAL pollCounts #-}
pollCounts
    :: MonadAsync m
    => (a -> Bool)
    -> (Stream m Int -> Stream m Int)
    -> Fold m Int b
    -> Stream m a
    -> Stream m a
pollCounts :: (a -> Bool)
-> (Stream m Int -> Stream m Int)
-> Fold m Int b
-> Stream m a
-> Stream m a
pollCounts a -> Bool
predicate Stream m Int -> Stream m Int
transf Fold m Int b
fld (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a
 -> Maybe (IORef Int, ThreadId, s)
 -> m (Step (Maybe (IORef Int, ThreadId, s)) a))
-> Maybe (IORef Int, ThreadId, s) -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> Maybe (IORef Int, ThreadId, s)
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
step' Maybe (IORef Int, ThreadId, s)
forall a. Maybe a
Nothing
  where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> Maybe (IORef Int, ThreadId, s)
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
step' State Stream m a
_ Maybe (IORef Int, ThreadId, s)
Nothing = do
        -- As long as we are using an "Int" for counts lockfree reads from
        -- Var should work correctly on both 32-bit and 64-bit machines.
        -- However, an Int on a 32-bit machine may overflow quickly.
        IORef Int
countVar <- IO (IORef Int) -> m (IORef Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef Int) -> m (IORef Int))
-> IO (IORef Int) -> m (IORef Int)
forall a b. (a -> b) -> a -> b
$ Int -> IO (IORef Int)
forall a. Prim a => a -> IO (IORef a)
Prim.newIORef (Int
0 :: Int)
        ThreadId
tid <- m () -> m ThreadId
forall (m :: * -> *).
(MonadIO m, MonadBaseControl IO m) =>
m () -> m ThreadId
forkManaged
            (m () -> m ThreadId) -> m () -> m ThreadId
forall a b. (a -> b) -> a -> b
$ m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m b -> m ()) -> m b -> m ()
forall a b. (a -> b) -> a -> b
$ Fold m Int b -> Stream m Int -> m b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
fold Fold m Int b
fld
            (Stream m Int -> m b) -> Stream m Int -> m b
forall a b. (a -> b) -> a -> b
$ Stream m Int -> Stream m Int
transf (Stream m Int -> Stream m Int) -> Stream m Int -> Stream m Int
forall a b. (a -> b) -> a -> b
$ IORef Int -> Stream m Int
forall (m :: * -> *) a.
(MonadIO m, Prim a) =>
IORef a -> Stream m a
Prim.toStreamD IORef Int
countVar
        Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (IORef Int, ThreadId, s)) a
 -> m (Step (Maybe (IORef Int, ThreadId, s)) a))
-> Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Int, ThreadId, s)
-> Step (Maybe (IORef Int, ThreadId, s)) a
forall s a. s -> Step s a
Skip ((IORef Int, ThreadId, s) -> Maybe (IORef Int, ThreadId, s)
forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
state))

    step' State Stream m a
gst (Just (IORef Int
countVar, ThreadId
tid, s
st)) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> do
                Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (a -> Bool
predicate a
x) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> (Int -> Int) -> IO ()
forall a. Prim a => IORef a -> (a -> a) -> IO ()
Prim.modifyIORef' IORef Int
countVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (IORef Int, ThreadId, s)) a
 -> m (Step (Maybe (IORef Int, ThreadId, s)) a))
-> Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
forall a b. (a -> b) -> a -> b
$ a
-> Maybe (IORef Int, ThreadId, s)
-> Step (Maybe (IORef Int, ThreadId, s)) a
forall s a. a -> s -> Step s a
Yield a
x ((IORef Int, ThreadId, s) -> Maybe (IORef Int, ThreadId, s)
forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
s))
            Skip s
s -> Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (IORef Int, ThreadId, s)) a
 -> m (Step (Maybe (IORef Int, ThreadId, s)) a))
-> Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Int, ThreadId, s)
-> Step (Maybe (IORef Int, ThreadId, s)) a
forall s a. s -> Step s a
Skip ((IORef Int, ThreadId, s) -> Maybe (IORef Int, ThreadId, s)
forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
s))
            Step s a
Stop -> do
                IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ ThreadId -> IO ()
killThread ThreadId
tid
                Step (Maybe (IORef Int, ThreadId, s)) a
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Maybe (IORef Int, ThreadId, s)) a
forall s a. Step s a
Stop

{-# INLINE_NORMAL tapRate #-}
tapRate ::
       (MonadAsync m, MonadCatch m)
    => Double
    -> (Int -> m b)
    -> Stream m a
    -> Stream m a
tapRate :: Double -> (Int -> m b) -> Stream m a -> Stream m a
tapRate Double
samplingRate Int -> m b
action (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a
 -> Maybe (IORef Int, ThreadId, s, IORef ())
 -> m (Step (Maybe (IORef Int, ThreadId, s, IORef ())) a))
-> Maybe (IORef Int, ThreadId, s, IORef ()) -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> Maybe (IORef Int, ThreadId, s, IORef ())
-> m (Step (Maybe (IORef Int, ThreadId, s, IORef ())) a)
step' Maybe (IORef Int, ThreadId, s, IORef ())
forall a. Maybe a
Nothing
  where
    {-# NOINLINE loop #-}
    loop :: IORef Int -> Int -> m b
loop IORef Int
countVar Int
prev = do
        Int
i <-
            m Int -> (AsyncException -> m Int) -> m Int
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
MC.catch
                (do IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
samplingRate Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000000)
                    Int
i <- IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ IORef Int -> IO Int
forall a. Prim a => IORef a -> IO a
Prim.readIORef IORef Int
countVar
                    let !diff :: Int
diff = Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
prev
                    m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m b -> m ()) -> m b -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> m b
action Int
diff
                    Int -> m Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
i)
                (\(AsyncException
e :: AsyncException) -> do
                     Int
i <- IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ IORef Int -> IO Int
forall a. Prim a => IORef a -> IO a
Prim.readIORef IORef Int
countVar
                     let !diff :: Int
diff = Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
prev
                     m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m b -> m ()) -> m b -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> m b
action Int
diff
                     SomeException -> m Int
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (AsyncException -> SomeException
forall e. Exception e => e -> SomeException
MC.toException AsyncException
e))
        IORef Int -> Int -> m b
loop IORef Int
countVar Int
i

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> Maybe (IORef Int, ThreadId, s, IORef ())
-> m (Step (Maybe (IORef Int, ThreadId, s, IORef ())) a)
step' State Stream m a
_ Maybe (IORef Int, ThreadId, s, IORef ())
Nothing = do
        IORef Int
countVar <- IO (IORef Int) -> m (IORef Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef Int) -> m (IORef Int))
-> IO (IORef Int) -> m (IORef Int)
forall a b. (a -> b) -> a -> b
$ Int -> IO (IORef Int)
forall a. Prim a => a -> IO (IORef a)
Prim.newIORef Int
0
        ThreadId
tid <- m () -> m ThreadId
forall (m :: * -> *). MonadBaseControl IO m => m () -> m ThreadId
fork (m () -> m ThreadId) -> m () -> m ThreadId
forall a b. (a -> b) -> a -> b
$ IORef Int -> Int -> m ()
forall b. IORef Int -> Int -> m b
loop IORef Int
countVar Int
0
        IORef ()
ref <- IO (IORef ()) -> m (IORef ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ()) -> m (IORef ())) -> IO (IORef ()) -> m (IORef ())
forall a b. (a -> b) -> a -> b
$ () -> IO (IORef ())
forall a. a -> IO (IORef a)
newIORef ()
        Weak (IORef ())
_ <- IO (Weak (IORef ())) -> m (Weak (IORef ()))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Weak (IORef ())) -> m (Weak (IORef ())))
-> IO (Weak (IORef ())) -> m (Weak (IORef ()))
forall a b. (a -> b) -> a -> b
$ IORef () -> IO () -> IO (Weak (IORef ()))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
ref (ThreadId -> IO ()
killThread ThreadId
tid)
        Step (Maybe (IORef Int, ThreadId, s, IORef ())) a
-> m (Step (Maybe (IORef Int, ThreadId, s, IORef ())) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (IORef Int, ThreadId, s, IORef ())) a
 -> m (Step (Maybe (IORef Int, ThreadId, s, IORef ())) a))
-> Step (Maybe (IORef Int, ThreadId, s, IORef ())) a
-> m (Step (Maybe (IORef Int, ThreadId, s, IORef ())) a)
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Int, ThreadId, s, IORef ())
-> Step (Maybe (IORef Int, ThreadId, s, IORef ())) a
forall s a. s -> Step s a
Skip ((IORef Int, ThreadId, s, IORef ())
-> Maybe (IORef Int, ThreadId, s, IORef ())
forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
state, IORef ()
ref))

    step' State Stream m a
gst (Just (IORef Int
countVar, ThreadId
tid, s
st, IORef ()
ref)) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> do
                IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> (Int -> Int) -> IO ()
forall a. Prim a => IORef a -> (a -> a) -> IO ()
Prim.modifyIORef' IORef Int
countVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                Step (Maybe (IORef Int, ThreadId, s, IORef ())) a
-> m (Step (Maybe (IORef Int, ThreadId, s, IORef ())) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (IORef Int, ThreadId, s, IORef ())) a
 -> m (Step (Maybe (IORef Int, ThreadId, s, IORef ())) a))
-> Step (Maybe (IORef Int, ThreadId, s, IORef ())) a
-> m (Step (Maybe (IORef Int, ThreadId, s, IORef ())) a)
forall a b. (a -> b) -> a -> b
$ a
-> Maybe (IORef Int, ThreadId, s, IORef ())
-> Step (Maybe (IORef Int, ThreadId, s, IORef ())) a
forall s a. a -> s -> Step s a
Yield a
x ((IORef Int, ThreadId, s, IORef ())
-> Maybe (IORef Int, ThreadId, s, IORef ())
forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
s, IORef ()
ref))
            Skip s
s -> Step (Maybe (IORef Int, ThreadId, s, IORef ())) a
-> m (Step (Maybe (IORef Int, ThreadId, s, IORef ())) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (IORef Int, ThreadId, s, IORef ())) a
 -> m (Step (Maybe (IORef Int, ThreadId, s, IORef ())) a))
-> Step (Maybe (IORef Int, ThreadId, s, IORef ())) a
-> m (Step (Maybe (IORef Int, ThreadId, s, IORef ())) a)
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Int, ThreadId, s, IORef ())
-> Step (Maybe (IORef Int, ThreadId, s, IORef ())) a
forall s a. s -> Step s a
Skip ((IORef Int, ThreadId, s, IORef ())
-> Maybe (IORef Int, ThreadId, s, IORef ())
forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
s, IORef ()
ref))
            Step s a
Stop -> do
                IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ ThreadId -> IO ()
killThread ThreadId
tid
                Step (Maybe (IORef Int, ThreadId, s, IORef ())) a
-> m (Step (Maybe (IORef Int, ThreadId, s, IORef ())) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Maybe (IORef Int, ThreadId, s, IORef ())) a
forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Scanning with a Fold
------------------------------------------------------------------------------

data ScanState s f = ScanInit s | ScanDo s !f | ScanDone

{-# INLINE_NORMAL postscanOnce #-}
postscanOnce :: Monad m => FL.Fold m a b -> Stream m a -> Stream m b
postscanOnce :: Fold m a b -> Stream m a -> Stream m b
postscanOnce (FL.Fold s -> a -> m (Step s b)
fstep m (Step s b)
initial s -> m b
extract) (Stream State Stream m a -> s -> m (Step s a)
sstep s
state) =
    (State Stream m b -> ScanState s s -> m (Step (ScanState s s) b))
-> ScanState s s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> ScanState s s -> m (Step (ScanState s s) b)
forall (m :: * -> *) a.
State Stream m a -> ScanState s s -> m (Step (ScanState s s) b)
step (s -> ScanState s s
forall s f. s -> ScanState s f
ScanInit s
state)

    where

    {-# INLINE_LATE step #-}
    step :: State Stream m a -> ScanState s s -> m (Step (ScanState s s) b)
step State Stream m a
_ (ScanInit s
st) = do
        Step s b
res <- m (Step s b)
initial
        Step (ScanState s s) b -> m (Step (ScanState s s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (ScanState s s) b -> m (Step (ScanState s s) b))
-> Step (ScanState s s) b -> m (Step (ScanState s s) b)
forall a b. (a -> b) -> a -> b
$ case Step s b
res of
                  FL.Partial s
fs -> ScanState s s -> Step (ScanState s s) b
forall s a. s -> Step s a
Skip (ScanState s s -> Step (ScanState s s) b)
-> ScanState s s -> Step (ScanState s s) b
forall a b. (a -> b) -> a -> b
$ s -> s -> ScanState s s
forall s f. s -> f -> ScanState s f
ScanDo s
st s
fs
                  FL.Done b
b -> b -> ScanState s s -> Step (ScanState s s) b
forall s a. a -> s -> Step s a
Yield b
b ScanState s s
forall s f. ScanState s f
ScanDone
    step State Stream m a
gst (ScanDo s
st s
fs) = do
        Step s a
res <- State Stream m a -> s -> m (Step s a)
sstep (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
res of
            Yield a
x s
s -> do
                Step s b
r <- s -> a -> m (Step s b)
fstep s
fs a
x
                case Step s b
r of
                    FL.Partial s
fs1 -> do
                        !b
b <- s -> m b
extract s
fs1
                        Step (ScanState s s) b -> m (Step (ScanState s s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ScanState s s) b -> m (Step (ScanState s s) b))
-> Step (ScanState s s) b -> m (Step (ScanState s s) b)
forall a b. (a -> b) -> a -> b
$ b -> ScanState s s -> Step (ScanState s s) b
forall s a. a -> s -> Step s a
Yield b
b (ScanState s s -> Step (ScanState s s) b)
-> ScanState s s -> Step (ScanState s s) b
forall a b. (a -> b) -> a -> b
$ s -> s -> ScanState s s
forall s f. s -> f -> ScanState s f
ScanDo s
s s
fs1
                    FL.Done b
b -> Step (ScanState s s) b -> m (Step (ScanState s s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ScanState s s) b -> m (Step (ScanState s s) b))
-> Step (ScanState s s) b -> m (Step (ScanState s s) b)
forall a b. (a -> b) -> a -> b
$ b -> ScanState s s -> Step (ScanState s s) b
forall s a. a -> s -> Step s a
Yield b
b ScanState s s
forall s f. ScanState s f
ScanDone
            Skip s
s -> Step (ScanState s s) b -> m (Step (ScanState s s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ScanState s s) b -> m (Step (ScanState s s) b))
-> Step (ScanState s s) b -> m (Step (ScanState s s) b)
forall a b. (a -> b) -> a -> b
$ ScanState s s -> Step (ScanState s s) b
forall s a. s -> Step s a
Skip (ScanState s s -> Step (ScanState s s) b)
-> ScanState s s -> Step (ScanState s s) b
forall a b. (a -> b) -> a -> b
$ s -> s -> ScanState s s
forall s f. s -> f -> ScanState s f
ScanDo s
s s
fs
            Step s a
Stop -> Step (ScanState s s) b -> m (Step (ScanState s s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (ScanState s s) b
forall s a. Step s a
Stop
    step State Stream m a
_ ScanState s s
ScanDone = Step (ScanState s s) b -> m (Step (ScanState s s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (ScanState s s) b
forall s a. Step s a
Stop

{-# INLINE scanOnce #-}
scanOnce :: Monad m
    => FL.Fold m a b -> Stream m a -> Stream m b
scanOnce :: Fold m a b -> Stream m a -> Stream m b
scanOnce (FL.Fold s -> a -> m (Step s b)
fstep m (Step s b)
initial s -> m b
extract) (Stream State Stream m a -> s -> m (Step s a)
sstep s
state) =
    (State Stream m b -> ScanState s s -> m (Step (ScanState s s) b))
-> ScanState s s -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> ScanState s s -> m (Step (ScanState s s) b)
forall (m :: * -> *) a.
State Stream m a -> ScanState s s -> m (Step (ScanState s s) b)
step (s -> ScanState s s
forall s f. s -> ScanState s f
ScanInit s
state)

    where

    {-# INLINE_LATE step #-}
    step :: State Stream m a -> ScanState s s -> m (Step (ScanState s s) b)
step State Stream m a
_ (ScanInit s
st) = do
        Step s b
res <- m (Step s b)
initial
        case Step s b
res of
            FL.Partial s
fs -> do
                !b
b <- s -> m b
extract s
fs
                Step (ScanState s s) b -> m (Step (ScanState s s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ScanState s s) b -> m (Step (ScanState s s) b))
-> Step (ScanState s s) b -> m (Step (ScanState s s) b)
forall a b. (a -> b) -> a -> b
$ b -> ScanState s s -> Step (ScanState s s) b
forall s a. a -> s -> Step s a
Yield b
b (ScanState s s -> Step (ScanState s s) b)
-> ScanState s s -> Step (ScanState s s) b
forall a b. (a -> b) -> a -> b
$ s -> s -> ScanState s s
forall s f. s -> f -> ScanState s f
ScanDo s
st s
fs
            FL.Done b
b -> Step (ScanState s s) b -> m (Step (ScanState s s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ScanState s s) b -> m (Step (ScanState s s) b))
-> Step (ScanState s s) b -> m (Step (ScanState s s) b)
forall a b. (a -> b) -> a -> b
$ b -> ScanState s s -> Step (ScanState s s) b
forall s a. a -> s -> Step s a
Yield b
b ScanState s s
forall s f. ScanState s f
ScanDone
    step State Stream m a
gst (ScanDo s
st s
fs) = do
        Step s a
res <- State Stream m a -> s -> m (Step s a)
sstep (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
res of
            Yield a
x s
s -> do
                Step s b
r <- s -> a -> m (Step s b)
fstep s
fs a
x
                case Step s b
r of
                    FL.Partial s
fs1 -> do
                        !b
b <- s -> m b
extract s
fs1
                        Step (ScanState s s) b -> m (Step (ScanState s s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ScanState s s) b -> m (Step (ScanState s s) b))
-> Step (ScanState s s) b -> m (Step (ScanState s s) b)
forall a b. (a -> b) -> a -> b
$ b -> ScanState s s -> Step (ScanState s s) b
forall s a. a -> s -> Step s a
Yield b
b (ScanState s s -> Step (ScanState s s) b)
-> ScanState s s -> Step (ScanState s s) b
forall a b. (a -> b) -> a -> b
$ s -> s -> ScanState s s
forall s f. s -> f -> ScanState s f
ScanDo s
s s
fs1
                    FL.Done b
b -> Step (ScanState s s) b -> m (Step (ScanState s s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ScanState s s) b -> m (Step (ScanState s s) b))
-> Step (ScanState s s) b -> m (Step (ScanState s s) b)
forall a b. (a -> b) -> a -> b
$ b -> ScanState s s -> Step (ScanState s s) b
forall s a. a -> s -> Step s a
Yield b
b ScanState s s
forall s f. ScanState s f
ScanDone
            Skip s
s -> Step (ScanState s s) b -> m (Step (ScanState s s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ScanState s s) b -> m (Step (ScanState s s) b))
-> Step (ScanState s s) b -> m (Step (ScanState s s) b)
forall a b. (a -> b) -> a -> b
$ ScanState s s -> Step (ScanState s s) b
forall s a. s -> Step s a
Skip (ScanState s s -> Step (ScanState s s) b)
-> ScanState s s -> Step (ScanState s s) b
forall a b. (a -> b) -> a -> b
$ s -> s -> ScanState s s
forall s f. s -> f -> ScanState s f
ScanDo s
s s
fs
            Step s a
Stop -> Step (ScanState s s) b -> m (Step (ScanState s s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (ScanState s s) b
forall s a. Step s a
Stop
    step State Stream m a
_ ScanState s s
ScanDone = Step (ScanState s s) b -> m (Step (ScanState s s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (ScanState s s) b
forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Scanning - Prescans
------------------------------------------------------------------------------

-- Adapted from the vector package.
--
-- XXX Is a prescan useful, discarding the last step does not sound useful?  I
-- am not sure about the utility of this function, so this is implemented but
-- not exposed. We can expose it if someone provides good reasons why this is
-- useful.
--
-- XXX We have to execute the stream one step ahead to know that we are at the
-- last step.  The vector implementation of prescan executes the last fold step
-- but does not yield the result. This means we have executed the effect but
-- discarded value. This does not sound right. In this implementation we are
-- not executing the last fold step.
{-# INLINE_NORMAL prescanlM' #-}
prescanlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b
prescanlM' :: (b -> a -> m b) -> m b -> Stream m a -> Stream m b
prescanlM' b -> a -> m b
f m b
mz (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m b -> (s, m b) -> m (Step (s, m b) b))
-> (s, m b) -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> (s, m b) -> m (Step (s, m b) b)
forall (m :: * -> *) a.
State Stream m a -> (s, m b) -> m (Step (s, m b) b)
step' (s
state, m b
mz)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> (s, m b) -> m (Step (s, m b) b)
step' State Stream m a
gst (s
st, m b
prev) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                b
acc <- m b
prev
                Step (s, m b) b -> m (Step (s, m b) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, m b) b -> m (Step (s, m b) b))
-> Step (s, m b) b -> m (Step (s, m b) b)
forall a b. (a -> b) -> a -> b
$ b -> (s, m b) -> Step (s, m b) b
forall s a. a -> s -> Step s a
Yield b
acc (s
s, b -> a -> m b
f b
acc a
x)
            Skip s
s -> Step (s, m b) b -> m (Step (s, m b) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, m b) b -> m (Step (s, m b) b))
-> Step (s, m b) b -> m (Step (s, m b) b)
forall a b. (a -> b) -> a -> b
$ (s, m b) -> Step (s, m b) b
forall s a. s -> Step s a
Skip (s
s, m b
prev)
            Step s a
Stop   -> Step (s, m b) b -> m (Step (s, m b) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (s, m b) b
forall s a. Step s a
Stop

{-# INLINE prescanl' #-}
prescanl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b
prescanl' :: (b -> a -> b) -> b -> Stream m a -> Stream m b
prescanl' b -> a -> b
f b
z = (b -> a -> m b) -> m b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
prescanlM' (\b
a a
b -> b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> a -> b
f b
a a
b)) (b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return b
z)

------------------------------------------------------------------------------
-- Monolithic postscans (postscan followed by a map)
------------------------------------------------------------------------------

-- The performance of a modular postscan followed by a map seems to be
-- equivalent to this monolithic scan followed by map therefore we may not need
-- this implementation. We just have it for performance comparison and in case
-- modular version does not perform well in some situation.
--
{-# INLINE_NORMAL postscanlMx' #-}
postscanlMx' :: Monad m
    => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
postscanlMx' :: (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
postscanlMx' x -> a -> m x
fstep m x
begin x -> m b
done (Stream State Stream m a -> s -> m (Step s a)
step s
state) = do
    (State Stream m b -> (s, m x) -> m (Step (s, m x) b))
-> (s, m x) -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> (s, m x) -> m (Step (s, m x) b)
forall (m :: * -> *) a.
State Stream m a -> (s, m x) -> m (Step (s, m x) b)
step' (s
state, m x
begin)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> (s, m x) -> m (Step (s, m x) b)
step' State Stream m a
gst (s
st, m x
acc) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                x
old <- m x
acc
                x
y <- x -> a -> m x
fstep x
old a
x
                b
v <- x -> m b
done x
y
                b
v b -> m (Step (s, m x) b) -> m (Step (s, m x) b)
`seq` x
y x -> m (Step (s, m x) b) -> m (Step (s, m x) b)
`seq` Step (s, m x) b -> m (Step (s, m x) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> (s, m x) -> Step (s, m x) b
forall s a. a -> s -> Step s a
Yield b
v (s
s, x -> m x
forall (m :: * -> *) a. Monad m => a -> m a
return x
y))
            Skip s
s -> Step (s, m x) b -> m (Step (s, m x) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, m x) b -> m (Step (s, m x) b))
-> Step (s, m x) b -> m (Step (s, m x) b)
forall a b. (a -> b) -> a -> b
$ (s, m x) -> Step (s, m x) b
forall s a. s -> Step s a
Skip (s
s, m x
acc)
            Step s a
Stop   -> Step (s, m x) b -> m (Step (s, m x) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (s, m x) b
forall s a. Step s a
Stop

{-# INLINE_NORMAL postscanlx' #-}
postscanlx' :: Monad m
    => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
postscanlx' :: (x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
postscanlx' x -> a -> x
fstep x
begin x -> b
done Stream m a
s =
    (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
postscanlMx' (\x
b a
a -> x -> m x
forall (m :: * -> *) a. Monad m => a -> m a
return (x -> a -> x
fstep x
b a
a)) (x -> m x
forall (m :: * -> *) a. Monad m => a -> m a
return x
begin) (b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> m b) -> (x -> b) -> x -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> b
done) Stream m a
s

-- XXX do we need consM strict to evaluate the begin value?
{-# INLINE scanlMx' #-}
scanlMx' :: Monad m
    => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
scanlMx' :: (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
scanlMx' x -> a -> m x
fstep m x
begin x -> m b
done Stream m a
s =
    (m x
begin m x -> (x -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \x
x -> x
x x -> m b -> m b
`seq` x -> m b
done x
x) m b -> Stream m b -> Stream m b
forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
`consM` (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
postscanlMx' x -> a -> m x
fstep m x
begin x -> m b
done Stream m a
s

{-# INLINE scanlx' #-}
scanlx' :: Monad m
    => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
scanlx' :: (x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
scanlx' x -> a -> x
fstep x
begin x -> b
done Stream m a
s =
    (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
scanlMx' (\x
b a
a -> x -> m x
forall (m :: * -> *) a. Monad m => a -> m a
return (x -> a -> x
fstep x
b a
a)) (x -> m x
forall (m :: * -> *) a. Monad m => a -> m a
return x
begin) (b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> m b) -> (x -> b) -> x -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> b
done) Stream m a
s

------------------------------------------------------------------------------
-- postscans
------------------------------------------------------------------------------

-- Adapted from the vector package.
{-# INLINE_NORMAL postscanlM' #-}
postscanlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b
postscanlM' :: (b -> a -> m b) -> m b -> Stream m a -> Stream m b
postscanlM' b -> a -> m b
fstep m b
begin (Stream State Stream m a -> s -> m (Step s a)
step s
state) =
    (State Stream m b -> Maybe (s, b) -> m (Step (Maybe (s, b)) b))
-> Maybe (s, b) -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
forall (m :: * -> *) a.
State Stream m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' Maybe (s, b)
forall a. Maybe a
Nothing
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' State Stream m a
_ Maybe (s, b)
Nothing = do
        !b
x <- m b
begin
        Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b))
-> Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall a b. (a -> b) -> a -> b
$ Maybe (s, b) -> Step (Maybe (s, b)) b
forall s a. s -> Step s a
Skip ((s, b) -> Maybe (s, b)
forall a. a -> Maybe a
Just (s
state, b
x))

    step' State Stream m a
gst (Just (s
st, b
acc)) =  do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                !b
y <- b -> a -> m b
fstep b
acc a
x
                Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b))
-> Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall a b. (a -> b) -> a -> b
$ b -> Maybe (s, b) -> Step (Maybe (s, b)) b
forall s a. a -> s -> Step s a
Yield b
y ((s, b) -> Maybe (s, b)
forall a. a -> Maybe a
Just (s
s, b
y))
            Skip s
s -> Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b))
-> Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall a b. (a -> b) -> a -> b
$ Maybe (s, b) -> Step (Maybe (s, b)) b
forall s a. s -> Step s a
Skip ((s, b) -> Maybe (s, b)
forall a. a -> Maybe a
Just (s
s, b
acc))
            Step s a
Stop   -> Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Maybe (s, b)) b
forall s a. Step s a
Stop

{-# INLINE_NORMAL postscanl' #-}
postscanl' :: Monad m => (a -> b -> a) -> a -> Stream m b -> Stream m a
postscanl' :: (a -> b -> a) -> a -> Stream m b -> Stream m a
postscanl' a -> b -> a
f a
seed = (a -> b -> m a) -> m a -> Stream m b -> Stream m a
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
postscanlM' (\a
a b
b -> a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b -> a
f a
a b
b)) (a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
seed)

-- We can possibly have the "done" function as a Maybe to provide an option to
-- emit or not emit the accumulator when the stream stops.
--
-- TBD: use a single Yield point
--
{-# INLINE_NORMAL postscanlMAfter' #-}
postscanlMAfter' :: Monad m
    => (b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
postscanlMAfter' :: (b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
postscanlMAfter' b -> a -> m b
fstep m b
initial b -> m b
done (Stream State Stream m a -> s -> m (Step s a)
step1 s
state1) = do
    (State Stream m b -> Maybe (s, m b) -> m (Step (Maybe (s, m b)) b))
-> Maybe (s, m b) -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> Maybe (s, m b) -> m (Step (Maybe (s, m b)) b)
forall (m :: * -> *) a.
State Stream m a -> Maybe (s, m b) -> m (Step (Maybe (s, m b)) b)
step ((s, m b) -> Maybe (s, m b)
forall a. a -> Maybe a
Just (s
state1, m b
initial))

    where

    {-# INLINE_LATE step #-}
    step :: State Stream m a -> Maybe (s, m b) -> m (Step (Maybe (s, m b)) b)
step State Stream m a
gst (Just (s
st, m b
acc)) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                b
old <- m b
acc
                b
y <- b -> a -> m b
fstep b
old a
x
                b
y b -> m (Step (Maybe (s, m b)) b) -> m (Step (Maybe (s, m b)) b)
`seq` Step (Maybe (s, m b)) b -> m (Step (Maybe (s, m b)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Maybe (s, m b) -> Step (Maybe (s, m b)) b
forall s a. a -> s -> Step s a
Yield b
y ((s, m b) -> Maybe (s, m b)
forall a. a -> Maybe a
Just (s
s, b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return b
y)))
            Skip s
s -> Step (Maybe (s, m b)) b -> m (Step (Maybe (s, m b)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, m b)) b -> m (Step (Maybe (s, m b)) b))
-> Step (Maybe (s, m b)) b -> m (Step (Maybe (s, m b)) b)
forall a b. (a -> b) -> a -> b
$ Maybe (s, m b) -> Step (Maybe (s, m b)) b
forall s a. s -> Step s a
Skip (Maybe (s, m b) -> Step (Maybe (s, m b)) b)
-> Maybe (s, m b) -> Step (Maybe (s, m b)) b
forall a b. (a -> b) -> a -> b
$ (s, m b) -> Maybe (s, m b)
forall a. a -> Maybe a
Just (s
s, m b
acc)
            Step s a
Stop -> m b
acc m b -> (b -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= b -> m b
done m b
-> (b -> m (Step (Maybe (s, m b)) b))
-> m (Step (Maybe (s, m b)) b)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \b
res -> Step (Maybe (s, m b)) b -> m (Step (Maybe (s, m b)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Maybe (s, m b) -> Step (Maybe (s, m b)) b
forall s a. a -> s -> Step s a
Yield b
res Maybe (s, m b)
forall a. Maybe a
Nothing)
    step State Stream m a
_ Maybe (s, m b)
Nothing = Step (Maybe (s, m b)) b -> m (Step (Maybe (s, m b)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Maybe (s, m b)) b
forall s a. Step s a
Stop

{-# INLINE_NORMAL postscanlM #-}
postscanlM :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b
postscanlM :: (b -> a -> m b) -> m b -> Stream m a -> Stream m b
postscanlM b -> a -> m b
fstep m b
begin (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m b -> Maybe (s, b) -> m (Step (Maybe (s, b)) b))
-> Maybe (s, b) -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
forall (m :: * -> *) a.
State Stream m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' Maybe (s, b)
forall a. Maybe a
Nothing
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' State Stream m a
_ Maybe (s, b)
Nothing = do
        b
r <- m b
begin
        Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b))
-> Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall a b. (a -> b) -> a -> b
$ Maybe (s, b) -> Step (Maybe (s, b)) b
forall s a. s -> Step s a
Skip ((s, b) -> Maybe (s, b)
forall a. a -> Maybe a
Just (s
state, b
r))

    step' State Stream m a
gst (Just (s
st, b
acc)) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                b
y <- b -> a -> m b
fstep b
acc a
x
                Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Maybe (s, b) -> Step (Maybe (s, b)) b
forall s a. a -> s -> Step s a
Yield b
y ((s, b) -> Maybe (s, b)
forall a. a -> Maybe a
Just (s
s, b
y)))
            Skip s
s -> Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b))
-> Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall a b. (a -> b) -> a -> b
$ Maybe (s, b) -> Step (Maybe (s, b)) b
forall s a. s -> Step s a
Skip ((s, b) -> Maybe (s, b)
forall a. a -> Maybe a
Just (s
s, b
acc))
            Step s a
Stop   -> Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Maybe (s, b)) b
forall s a. Step s a
Stop

{-# INLINE_NORMAL postscanl #-}
postscanl :: Monad m => (a -> b -> a) -> a -> Stream m b -> Stream m a
postscanl :: (a -> b -> a) -> a -> Stream m b -> Stream m a
postscanl a -> b -> a
f a
seed = (a -> b -> m a) -> m a -> Stream m b -> Stream m a
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
postscanlM (\a
a b
b -> a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b -> a
f a
a b
b)) (a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
seed)

{-# INLINE_NORMAL scanlM' #-}
scanlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b
scanlM' :: (b -> a -> m b) -> m b -> Stream m a -> Stream m b
scanlM' b -> a -> m b
fstep m b
begin (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m b -> Maybe (s, b) -> m (Step (Maybe (s, b)) b))
-> Maybe (s, b) -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
forall (m :: * -> *) a.
State Stream m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' Maybe (s, b)
forall a. Maybe a
Nothing
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' State Stream m a
_ Maybe (s, b)
Nothing = do
        !b
x <- m b
begin
        Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b))
-> Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall a b. (a -> b) -> a -> b
$ b -> Maybe (s, b) -> Step (Maybe (s, b)) b
forall s a. a -> s -> Step s a
Yield b
x ((s, b) -> Maybe (s, b)
forall a. a -> Maybe a
Just (s
state, b
x))
    step' State Stream m a
gst (Just (s
st, b
acc)) =  do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                !b
y <- b -> a -> m b
fstep b
acc a
x
                Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b))
-> Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall a b. (a -> b) -> a -> b
$ b -> Maybe (s, b) -> Step (Maybe (s, b)) b
forall s a. a -> s -> Step s a
Yield b
y ((s, b) -> Maybe (s, b)
forall a. a -> Maybe a
Just (s
s, b
y))
            Skip s
s -> Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b))
-> Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall a b. (a -> b) -> a -> b
$ Maybe (s, b) -> Step (Maybe (s, b)) b
forall s a. s -> Step s a
Skip ((s, b) -> Maybe (s, b)
forall a. a -> Maybe a
Just (s
s, b
acc))
            Step s a
Stop   -> Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Maybe (s, b)) b
forall s a. Step s a
Stop

{-# INLINE scanlMAfter' #-}
scanlMAfter' :: Monad m
    => (b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
scanlMAfter' :: (b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
scanlMAfter' b -> a -> m b
fstep m b
initial b -> m b
done Stream m a
s =
    (m b
initial m b -> (b -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \b
x -> b
x b -> m b -> m b
`seq` b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return b
x) m b -> Stream m b -> Stream m b
forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
`consM`
        (b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
postscanlMAfter' b -> a -> m b
fstep m b
initial b -> m b
done Stream m a
s

{-# INLINE scanl' #-}
scanl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b
scanl' :: (b -> a -> b) -> b -> Stream m a -> Stream m b
scanl' b -> a -> b
f b
seed = (b -> a -> m b) -> m b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
scanlM' (\b
a a
b -> b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> a -> b
f b
a a
b)) (b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return b
seed)

{-# INLINE_NORMAL scanlM #-}
scanlM :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b
scanlM :: (b -> a -> m b) -> m b -> Stream m a -> Stream m b
scanlM b -> a -> m b
fstep m b
begin (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m b -> Maybe (s, b) -> m (Step (Maybe (s, b)) b))
-> Maybe (s, b) -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
forall (m :: * -> *) a.
State Stream m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' Maybe (s, b)
forall a. Maybe a
Nothing
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' State Stream m a
_ Maybe (s, b)
Nothing = do
        b
x <- m b
begin
        Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b))
-> Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall a b. (a -> b) -> a -> b
$ b -> Maybe (s, b) -> Step (Maybe (s, b)) b
forall s a. a -> s -> Step s a
Yield b
x ((s, b) -> Maybe (s, b)
forall a. a -> Maybe a
Just (s
state, b
x))
    step' State Stream m a
gst (Just (s
st, b
acc)) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                b
y <- b -> a -> m b
fstep b
acc a
x
                Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b))
-> Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall a b. (a -> b) -> a -> b
$ b -> Maybe (s, b) -> Step (Maybe (s, b)) b
forall s a. a -> s -> Step s a
Yield b
y ((s, b) -> Maybe (s, b)
forall a. a -> Maybe a
Just (s
s, b
y))
            Skip s
s -> Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b))
-> Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall a b. (a -> b) -> a -> b
$ Maybe (s, b) -> Step (Maybe (s, b)) b
forall s a. s -> Step s a
Skip ((s, b) -> Maybe (s, b)
forall a. a -> Maybe a
Just (s
s, b
acc))
            Step s a
Stop   -> Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b))
-> Step (Maybe (s, b)) b -> m (Step (Maybe (s, b)) b)
forall a b. (a -> b) -> a -> b
$ Step (Maybe (s, b)) b
forall s a. Step s a
Stop

{-# INLINE scanl #-}
scanl :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b
scanl :: (b -> a -> b) -> b -> Stream m a -> Stream m b
scanl b -> a -> b
f b
seed = (b -> a -> m b) -> m b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
scanlM (\b
a a
b -> b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> a -> b
f b
a a
b)) (b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return b
seed)

-- Adapted from the vector package
{-# INLINE_NORMAL scanl1M #-}
scanl1M :: Monad m => (a -> a -> m a) -> Stream m a -> Stream m a
scanl1M :: (a -> a -> m a) -> Stream m a -> Stream m a
scanl1M a -> a -> m a
fstep (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a -> (s, Maybe a) -> m (Step (s, Maybe a) a))
-> (s, Maybe a) -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> (s, Maybe a) -> m (Step (s, Maybe a) a)
step' (s
state, Maybe a
forall a. Maybe a
Nothing)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> (s, Maybe a) -> m (Step (s, Maybe a) a)
step' State Stream m a
gst (s
st, Maybe a
Nothing) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, Maybe a) a -> m (Step (s, Maybe a) a))
-> Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall a b. (a -> b) -> a -> b
$ a -> (s, Maybe a) -> Step (s, Maybe a) a
forall s a. a -> s -> Step s a
Yield a
x (s
s, a -> Maybe a
forall a. a -> Maybe a
Just a
x)
            Skip s
s -> Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, Maybe a) a -> m (Step (s, Maybe a) a))
-> Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall a b. (a -> b) -> a -> b
$ (s, Maybe a) -> Step (s, Maybe a) a
forall s a. s -> Step s a
Skip (s
s, Maybe a
forall a. Maybe a
Nothing)
            Step s a
Stop   -> Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (s, Maybe a) a
forall s a. Step s a
Stop

    step' State Stream m a
gst (s
st, Just a
acc) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
y s
s -> do
                a
z <- a -> a -> m a
fstep a
acc a
y
                Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, Maybe a) a -> m (Step (s, Maybe a) a))
-> Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall a b. (a -> b) -> a -> b
$ a -> (s, Maybe a) -> Step (s, Maybe a) a
forall s a. a -> s -> Step s a
Yield a
z (s
s, a -> Maybe a
forall a. a -> Maybe a
Just a
z)
            Skip s
s -> Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, Maybe a) a -> m (Step (s, Maybe a) a))
-> Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall a b. (a -> b) -> a -> b
$ (s, Maybe a) -> Step (s, Maybe a) a
forall s a. s -> Step s a
Skip (s
s, a -> Maybe a
forall a. a -> Maybe a
Just a
acc)
            Step s a
Stop   -> Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (s, Maybe a) a
forall s a. Step s a
Stop

{-# INLINE scanl1 #-}
scanl1 :: Monad m => (a -> a -> a) -> Stream m a -> Stream m a
scanl1 :: (a -> a -> a) -> Stream m a -> Stream m a
scanl1 a -> a -> a
f = (a -> a -> m a) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> m a) -> Stream m a -> Stream m a
scanl1M (\a
x a
y -> a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> a -> a
f a
x a
y))

-- Adapted from the vector package
{-# INLINE_NORMAL scanl1M' #-}
scanl1M' :: Monad m => (a -> a -> m a) -> Stream m a -> Stream m a
scanl1M' :: (a -> a -> m a) -> Stream m a -> Stream m a
scanl1M' a -> a -> m a
fstep (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a -> (s, Maybe a) -> m (Step (s, Maybe a) a))
-> (s, Maybe a) -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> (s, Maybe a) -> m (Step (s, Maybe a) a)
step' (s
state, Maybe a
forall a. Maybe a
Nothing)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> (s, Maybe a) -> m (Step (s, Maybe a) a)
step' State Stream m a
gst (s
st, Maybe a
Nothing) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> a
x a -> m (Step (s, Maybe a) a) -> m (Step (s, Maybe a) a)
`seq` Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, Maybe a) a -> m (Step (s, Maybe a) a))
-> Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall a b. (a -> b) -> a -> b
$ a -> (s, Maybe a) -> Step (s, Maybe a) a
forall s a. a -> s -> Step s a
Yield a
x (s
s, a -> Maybe a
forall a. a -> Maybe a
Just a
x)
            Skip s
s -> Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, Maybe a) a -> m (Step (s, Maybe a) a))
-> Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall a b. (a -> b) -> a -> b
$ (s, Maybe a) -> Step (s, Maybe a) a
forall s a. s -> Step s a
Skip (s
s, Maybe a
forall a. Maybe a
Nothing)
            Step s a
Stop   -> Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (s, Maybe a) a
forall s a. Step s a
Stop

    step' State Stream m a
gst (s
st, Just a
acc) = a
acc a -> m (Step (s, Maybe a) a) -> m (Step (s, Maybe a) a)
`seq` do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
y s
s -> do
                a
z <- a -> a -> m a
fstep a
acc a
y
                a
z a -> m (Step (s, Maybe a) a) -> m (Step (s, Maybe a) a)
`seq` Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, Maybe a) a -> m (Step (s, Maybe a) a))
-> Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall a b. (a -> b) -> a -> b
$ a -> (s, Maybe a) -> Step (s, Maybe a) a
forall s a. a -> s -> Step s a
Yield a
z (s
s, a -> Maybe a
forall a. a -> Maybe a
Just a
z)
            Skip s
s -> Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, Maybe a) a -> m (Step (s, Maybe a) a))
-> Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall a b. (a -> b) -> a -> b
$ (s, Maybe a) -> Step (s, Maybe a) a
forall s a. s -> Step s a
Skip (s
s, a -> Maybe a
forall a. a -> Maybe a
Just a
acc)
            Step s a
Stop   -> Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (s, Maybe a) a
forall s a. Step s a
Stop

{-# INLINE scanl1' #-}
scanl1' :: Monad m => (a -> a -> a) -> Stream m a -> Stream m a
scanl1' :: (a -> a -> a) -> Stream m a -> Stream m a
scanl1' a -> a -> a
f = (a -> a -> m a) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> m a) -> Stream m a -> Stream m a
scanl1M' (\a
x a
y -> a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> a -> a
f a
x a
y))

-------------------------------------------------------------------------------
-- Filtering
-------------------------------------------------------------------------------

-- Adapted from the vector package
{-# INLINE_NORMAL filterM #-}
filterM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
filterM :: (a -> m Bool) -> Stream m a -> Stream m a
filterM a -> m Bool
f (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
step' s
state
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> s -> m (Step s a)
step' State Stream m a
gst s
st = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> do
                Bool
b <- a -> m Bool
f a
x
                Step s a -> m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a -> m (Step s a)) -> Step s a -> m (Step s a)
forall a b. (a -> b) -> a -> b
$ if Bool
b
                         then a -> s -> Step s a
forall s a. a -> s -> Step s a
Yield a
x s
s
                         else s -> Step s a
forall s a. s -> Step s a
Skip s
s
            Skip s
s -> Step s a -> m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a -> m (Step s a)) -> Step s a -> m (Step s a)
forall a b. (a -> b) -> a -> b
$ s -> Step s a
forall s a. s -> Step s a
Skip s
s
            Step s a
Stop   -> Step s a -> m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step s a
forall s a. Step s a
Stop

{-# INLINE filter #-}
filter :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
filter :: (a -> Bool) -> Stream m a -> Stream m a
filter a -> Bool
f = (a -> m Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
filterM (Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> m Bool) -> (a -> Bool) -> a -> m Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Bool
f)

-- Adapted from the vector package
{-# INLINE_NORMAL uniq #-}
uniq :: (Eq a, Monad m) => Stream m a -> Stream m a
uniq :: Stream m a -> Stream m a
uniq (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a -> (Maybe a, s) -> m (Step (Maybe a, s) a))
-> (Maybe a, s) -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> (Maybe a, s) -> m (Step (Maybe a, s) a)
step' (Maybe a
forall a. Maybe a
Nothing, s
state)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> (Maybe a, s) -> m (Step (Maybe a, s) a)
step' State Stream m a
gst (Maybe a
Nothing, s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> Step (Maybe a, s) a -> m (Step (Maybe a, s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe a, s) a -> m (Step (Maybe a, s) a))
-> Step (Maybe a, s) a -> m (Step (Maybe a, s) a)
forall a b. (a -> b) -> a -> b
$ a -> (Maybe a, s) -> Step (Maybe a, s) a
forall s a. a -> s -> Step s a
Yield a
x (a -> Maybe a
forall a. a -> Maybe a
Just a
x, s
s)
            Skip  s
s   -> Step (Maybe a, s) a -> m (Step (Maybe a, s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe a, s) a -> m (Step (Maybe a, s) a))
-> Step (Maybe a, s) a -> m (Step (Maybe a, s) a)
forall a b. (a -> b) -> a -> b
$ (Maybe a, s) -> Step (Maybe a, s) a
forall s a. s -> Step s a
Skip  (Maybe a
forall a. Maybe a
Nothing, s
s)
            Step s a
Stop      -> Step (Maybe a, s) a -> m (Step (Maybe a, s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Maybe a, s) a
forall s a. Step s a
Stop
    step' State Stream m a
gst (Just a
x, s
st)  = do
         Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
         case Step s a
r of
             Yield a
y s
s | a
x a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
y   -> Step (Maybe a, s) a -> m (Step (Maybe a, s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe a, s) a -> m (Step (Maybe a, s) a))
-> Step (Maybe a, s) a -> m (Step (Maybe a, s) a)
forall a b. (a -> b) -> a -> b
$ (Maybe a, s) -> Step (Maybe a, s) a
forall s a. s -> Step s a
Skip (a -> Maybe a
forall a. a -> Maybe a
Just a
x, s
s)
                       | Bool
otherwise -> Step (Maybe a, s) a -> m (Step (Maybe a, s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe a, s) a -> m (Step (Maybe a, s) a))
-> Step (Maybe a, s) a -> m (Step (Maybe a, s) a)
forall a b. (a -> b) -> a -> b
$ a -> (Maybe a, s) -> Step (Maybe a, s) a
forall s a. a -> s -> Step s a
Yield a
y (a -> Maybe a
forall a. a -> Maybe a
Just a
y, s
s)
             Skip  s
s   -> Step (Maybe a, s) a -> m (Step (Maybe a, s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe a, s) a -> m (Step (Maybe a, s) a))
-> Step (Maybe a, s) a -> m (Step (Maybe a, s) a)
forall a b. (a -> b) -> a -> b
$ (Maybe a, s) -> Step (Maybe a, s) a
forall s a. s -> Step s a
Skip (a -> Maybe a
forall a. a -> Maybe a
Just a
x, s
s)
             Step s a
Stop      -> Step (Maybe a, s) a -> m (Step (Maybe a, s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Maybe a, s) a
forall s a. Step s a
Stop

{-# INLINE_NORMAL deleteBy #-}
deleteBy :: Monad m => (a -> a -> Bool) -> a -> Stream m a -> Stream m a
deleteBy :: (a -> a -> Bool) -> a -> Stream m a -> Stream m a
deleteBy a -> a -> Bool
eq a
x (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a -> (s, Bool) -> m (Step (s, Bool) a))
-> (s, Bool) -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> (s, Bool) -> m (Step (s, Bool) a)
step' (s
state, Bool
False)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> (s, Bool) -> m (Step (s, Bool) a)
step' State Stream m a
gst (s
st, Bool
False) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
y s
s -> Step (s, Bool) a -> m (Step (s, Bool) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, Bool) a -> m (Step (s, Bool) a))
-> Step (s, Bool) a -> m (Step (s, Bool) a)
forall a b. (a -> b) -> a -> b
$
                if a -> a -> Bool
eq a
x a
y then (s, Bool) -> Step (s, Bool) a
forall s a. s -> Step s a
Skip (s
s, Bool
True) else a -> (s, Bool) -> Step (s, Bool) a
forall s a. a -> s -> Step s a
Yield a
y (s
s, Bool
False)
            Skip s
s -> Step (s, Bool) a -> m (Step (s, Bool) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, Bool) a -> m (Step (s, Bool) a))
-> Step (s, Bool) a -> m (Step (s, Bool) a)
forall a b. (a -> b) -> a -> b
$ (s, Bool) -> Step (s, Bool) a
forall s a. s -> Step s a
Skip (s
s, Bool
False)
            Step s a
Stop   -> Step (s, Bool) a -> m (Step (s, Bool) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (s, Bool) a
forall s a. Step s a
Stop

    step' State Stream m a
gst (s
st, Bool
True) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
y s
s -> Step (s, Bool) a -> m (Step (s, Bool) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, Bool) a -> m (Step (s, Bool) a))
-> Step (s, Bool) a -> m (Step (s, Bool) a)
forall a b. (a -> b) -> a -> b
$ a -> (s, Bool) -> Step (s, Bool) a
forall s a. a -> s -> Step s a
Yield a
y (s
s, Bool
True)
            Skip s
s -> Step (s, Bool) a -> m (Step (s, Bool) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, Bool) a -> m (Step (s, Bool) a))
-> Step (s, Bool) a -> m (Step (s, Bool) a)
forall a b. (a -> b) -> a -> b
$ (s, Bool) -> Step (s, Bool) a
forall s a. s -> Step s a
Skip (s
s, Bool
True)
            Step s a
Stop   -> Step (s, Bool) a -> m (Step (s, Bool) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (s, Bool) a
forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Trimming
------------------------------------------------------------------------------

-- XXX using getTime in the loop can be pretty expensive especially for
-- computations where iterations are lightweight. We have the following
-- options:
--
-- 1) Run a timeout thread updating a flag asynchronously and check that
-- flag here, that way we can have a cheap termination check.
--
-- 2) Use COARSE clock to get time with lower resolution but more efficiently.
--
-- 3) Use rdtscp/rdtsc to get time directly from the processor, compute the
-- termination value of rdtsc in the beginning and then in each iteration just
-- get rdtsc and check if we should terminate.
--
data TakeByTime st s
    = TakeByTimeInit st
    | TakeByTimeCheck st s
    | TakeByTimeYield st s

{-# INLINE_NORMAL takeByTime #-}
takeByTime :: (MonadIO m, TimeUnit64 t) => t -> Stream m a -> Stream m a
takeByTime :: t -> Stream m a -> Stream m a
takeByTime t
duration (Stream State Stream m a -> s -> m (Step s a)
step1 s
state1) = (State Stream m a
 -> TakeByTime s AbsTime -> m (Step (TakeByTime s AbsTime) a))
-> TakeByTime s AbsTime -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> TakeByTime s AbsTime -> m (Step (TakeByTime s AbsTime) a)
step (s -> TakeByTime s AbsTime
forall st s. st -> TakeByTime st s
TakeByTimeInit s
state1)
    where

    lim :: RelTime64
lim = t -> RelTime64
forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 t
duration

    {-# INLINE_LATE step #-}
    step :: State Stream m a
-> TakeByTime s AbsTime -> m (Step (TakeByTime s AbsTime) a)
step State Stream m a
_ (TakeByTimeInit s
_) | RelTime64
lim RelTime64 -> RelTime64 -> Bool
forall a. Eq a => a -> a -> Bool
== RelTime64
0 = Step (TakeByTime s AbsTime) a -> m (Step (TakeByTime s AbsTime) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (TakeByTime s AbsTime) a
forall s a. Step s a
Stop
    step State Stream m a
_ (TakeByTimeInit s
st) = do
        AbsTime
t0 <- IO AbsTime -> m AbsTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
        Step (TakeByTime s AbsTime) a -> m (Step (TakeByTime s AbsTime) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TakeByTime s AbsTime) a
 -> m (Step (TakeByTime s AbsTime) a))
-> Step (TakeByTime s AbsTime) a
-> m (Step (TakeByTime s AbsTime) a)
forall a b. (a -> b) -> a -> b
$ TakeByTime s AbsTime -> Step (TakeByTime s AbsTime) a
forall s a. s -> Step s a
Skip (s -> AbsTime -> TakeByTime s AbsTime
forall st s. st -> s -> TakeByTime st s
TakeByTimeYield s
st AbsTime
t0)
    step State Stream m a
_ (TakeByTimeCheck s
st AbsTime
t0) = do
        AbsTime
t <- IO AbsTime -> m AbsTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
        Step (TakeByTime s AbsTime) a -> m (Step (TakeByTime s AbsTime) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TakeByTime s AbsTime) a
 -> m (Step (TakeByTime s AbsTime) a))
-> Step (TakeByTime s AbsTime) a
-> m (Step (TakeByTime s AbsTime) a)
forall a b. (a -> b) -> a -> b
$
            if AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
t AbsTime
t0 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
> RelTime64
lim
            then Step (TakeByTime s AbsTime) a
forall s a. Step s a
Stop
            else TakeByTime s AbsTime -> Step (TakeByTime s AbsTime) a
forall s a. s -> Step s a
Skip (s -> AbsTime -> TakeByTime s AbsTime
forall st s. st -> s -> TakeByTime st s
TakeByTimeYield s
st AbsTime
t0)
    step State Stream m a
gst (TakeByTimeYield s
st AbsTime
t0) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
        Step (TakeByTime s AbsTime) a -> m (Step (TakeByTime s AbsTime) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TakeByTime s AbsTime) a
 -> m (Step (TakeByTime s AbsTime) a))
-> Step (TakeByTime s AbsTime) a
-> m (Step (TakeByTime s AbsTime) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
             Yield a
x s
s -> a -> TakeByTime s AbsTime -> Step (TakeByTime s AbsTime) a
forall s a. a -> s -> Step s a
Yield a
x (s -> AbsTime -> TakeByTime s AbsTime
forall st s. st -> s -> TakeByTime st s
TakeByTimeCheck s
s AbsTime
t0)
             Skip s
s -> TakeByTime s AbsTime -> Step (TakeByTime s AbsTime) a
forall s a. s -> Step s a
Skip (s -> AbsTime -> TakeByTime s AbsTime
forall st s. st -> s -> TakeByTime st s
TakeByTimeCheck s
s AbsTime
t0)
             Step s a
Stop -> Step (TakeByTime s AbsTime) a
forall s a. Step s a
Stop

data DropByTime st s x
    = DropByTimeInit st
    | DropByTimeGen st s
    | DropByTimeCheck st s x
    | DropByTimeYield st

{-# INLINE_NORMAL dropByTime #-}
dropByTime :: (MonadIO m, TimeUnit64 t) => t -> Stream m a -> Stream m a
dropByTime :: t -> Stream m a -> Stream m a
dropByTime t
duration (Stream State Stream m a -> s -> m (Step s a)
step1 s
state1) = (State Stream m a
 -> DropByTime s AbsTime a -> m (Step (DropByTime s AbsTime a) a))
-> DropByTime s AbsTime a -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> DropByTime s AbsTime a -> m (Step (DropByTime s AbsTime a) a)
step (s -> DropByTime s AbsTime a
forall st s x. st -> DropByTime st s x
DropByTimeInit s
state1)
    where

    lim :: RelTime64
lim = t -> RelTime64
forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 t
duration

    {-# INLINE_LATE step #-}
    step :: State Stream m a
-> DropByTime s AbsTime a -> m (Step (DropByTime s AbsTime a) a)
step State Stream m a
_ (DropByTimeInit s
st) = do
        AbsTime
t0 <- IO AbsTime -> m AbsTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
        Step (DropByTime s AbsTime a) a
-> m (Step (DropByTime s AbsTime a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (DropByTime s AbsTime a) a
 -> m (Step (DropByTime s AbsTime a) a))
-> Step (DropByTime s AbsTime a) a
-> m (Step (DropByTime s AbsTime a) a)
forall a b. (a -> b) -> a -> b
$ DropByTime s AbsTime a -> Step (DropByTime s AbsTime a) a
forall s a. s -> Step s a
Skip (s -> AbsTime -> DropByTime s AbsTime a
forall st s x. st -> s -> DropByTime st s x
DropByTimeGen s
st AbsTime
t0)
    step State Stream m a
gst (DropByTimeGen s
st AbsTime
t0) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
        Step (DropByTime s AbsTime a) a
-> m (Step (DropByTime s AbsTime a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (DropByTime s AbsTime a) a
 -> m (Step (DropByTime s AbsTime a) a))
-> Step (DropByTime s AbsTime a) a
-> m (Step (DropByTime s AbsTime a) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
             Yield a
x s
s -> DropByTime s AbsTime a -> Step (DropByTime s AbsTime a) a
forall s a. s -> Step s a
Skip (s -> AbsTime -> a -> DropByTime s AbsTime a
forall st s x. st -> s -> x -> DropByTime st s x
DropByTimeCheck s
s AbsTime
t0 a
x)
             Skip s
s -> DropByTime s AbsTime a -> Step (DropByTime s AbsTime a) a
forall s a. s -> Step s a
Skip (s -> AbsTime -> DropByTime s AbsTime a
forall st s x. st -> s -> DropByTime st s x
DropByTimeGen s
s AbsTime
t0)
             Step s a
Stop -> Step (DropByTime s AbsTime a) a
forall s a. Step s a
Stop
    step State Stream m a
_ (DropByTimeCheck s
st AbsTime
t0 a
x) = do
        AbsTime
t <- IO AbsTime -> m AbsTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
        if AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
t AbsTime
t0 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
<= RelTime64
lim
        then Step (DropByTime s AbsTime a) a
-> m (Step (DropByTime s AbsTime a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (DropByTime s AbsTime a) a
 -> m (Step (DropByTime s AbsTime a) a))
-> Step (DropByTime s AbsTime a) a
-> m (Step (DropByTime s AbsTime a) a)
forall a b. (a -> b) -> a -> b
$ DropByTime s AbsTime a -> Step (DropByTime s AbsTime a) a
forall s a. s -> Step s a
Skip (DropByTime s AbsTime a -> Step (DropByTime s AbsTime a) a)
-> DropByTime s AbsTime a -> Step (DropByTime s AbsTime a) a
forall a b. (a -> b) -> a -> b
$ s -> AbsTime -> DropByTime s AbsTime a
forall st s x. st -> s -> DropByTime st s x
DropByTimeGen s
st AbsTime
t0
        else Step (DropByTime s AbsTime a) a
-> m (Step (DropByTime s AbsTime a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (DropByTime s AbsTime a) a
 -> m (Step (DropByTime s AbsTime a) a))
-> Step (DropByTime s AbsTime a) a
-> m (Step (DropByTime s AbsTime a) a)
forall a b. (a -> b) -> a -> b
$ a -> DropByTime s AbsTime a -> Step (DropByTime s AbsTime a) a
forall s a. a -> s -> Step s a
Yield a
x (DropByTime s AbsTime a -> Step (DropByTime s AbsTime a) a)
-> DropByTime s AbsTime a -> Step (DropByTime s AbsTime a) a
forall a b. (a -> b) -> a -> b
$ s -> DropByTime s AbsTime a
forall st s x. st -> DropByTime st s x
DropByTimeYield s
st
    step State Stream m a
gst (DropByTimeYield s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
        Step (DropByTime s AbsTime a) a
-> m (Step (DropByTime s AbsTime a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (DropByTime s AbsTime a) a
 -> m (Step (DropByTime s AbsTime a) a))
-> Step (DropByTime s AbsTime a) a
-> m (Step (DropByTime s AbsTime a) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
             Yield a
x s
s -> a -> DropByTime s AbsTime a -> Step (DropByTime s AbsTime a) a
forall s a. a -> s -> Step s a
Yield a
x (s -> DropByTime s AbsTime a
forall st s x. st -> DropByTime st s x
DropByTimeYield s
s)
             Skip s
s -> DropByTime s AbsTime a -> Step (DropByTime s AbsTime a) a
forall s a. s -> Step s a
Skip (s -> DropByTime s AbsTime a
forall st s x. st -> DropByTime st s x
DropByTimeYield s
s)
             Step s a
Stop -> Step (DropByTime s AbsTime a) a
forall s a. Step s a
Stop

-- Adapted from the vector package
{-# INLINE_NORMAL drop #-}
drop :: Monad m => Int -> Stream m a -> Stream m a
drop :: Int -> Stream m a -> Stream m a
drop Int
n (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a -> (s, Maybe Int) -> m (Step (s, Maybe Int) a))
-> (s, Maybe Int) -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> (s, Maybe Int) -> m (Step (s, Maybe Int) a)
forall a.
(Ord a, Num a) =>
State Stream m a -> (s, Maybe a) -> m (Step (s, Maybe a) a)
step' (s
state, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
n)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> (s, Maybe a) -> m (Step (s, Maybe a) a)
step' State Stream m a
gst (s
st, Just a
i)
      | a
i a -> a -> Bool
forall a. Ord a => a -> a -> Bool
> a
0 = do
          Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
          Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, Maybe a) a -> m (Step (s, Maybe a) a))
-> Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall a b. (a -> b) -> a -> b
$
            case Step s a
r of
              Yield a
_ s
s -> (s, Maybe a) -> Step (s, Maybe a) a
forall s a. s -> Step s a
Skip (s
s, a -> Maybe a
forall a. a -> Maybe a
Just (a
i a -> a -> a
forall a. Num a => a -> a -> a
- a
1))
              Skip s
s    -> (s, Maybe a) -> Step (s, Maybe a) a
forall s a. s -> Step s a
Skip (s
s, a -> Maybe a
forall a. a -> Maybe a
Just a
i)
              Step s a
Stop      -> Step (s, Maybe a) a
forall s a. Step s a
Stop
      | Bool
otherwise = Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, Maybe a) a -> m (Step (s, Maybe a) a))
-> Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall a b. (a -> b) -> a -> b
$ (s, Maybe a) -> Step (s, Maybe a) a
forall s a. s -> Step s a
Skip (s
st, Maybe a
forall a. Maybe a
Nothing)

    step' State Stream m a
gst (s
st, Maybe a
Nothing) = do
      Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
      Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, Maybe a) a -> m (Step (s, Maybe a) a))
-> Step (s, Maybe a) a -> m (Step (s, Maybe a) a)
forall a b. (a -> b) -> a -> b
$
        case Step s a
r of
          Yield a
x s
s -> a -> (s, Maybe a) -> Step (s, Maybe a) a
forall s a. a -> s -> Step s a
Yield a
x (s
s, Maybe a
forall a. Maybe a
Nothing)
          Skip  s
s   -> (s, Maybe a) -> Step (s, Maybe a) a
forall s a. s -> Step s a
Skip (s
s, Maybe a
forall a. Maybe a
Nothing)
          Step s a
Stop      -> Step (s, Maybe a) a
forall s a. Step s a
Stop

-- Adapted from the vector package
data DropWhileState s a
    = DropWhileDrop s
    | DropWhileYield a s
    | DropWhileNext s

{-# INLINE_NORMAL dropWhileM #-}
dropWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
dropWhileM :: (a -> m Bool) -> Stream m a -> Stream m a
dropWhileM a -> m Bool
f (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a
 -> DropWhileState s a -> m (Step (DropWhileState s a) a))
-> DropWhileState s a -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> DropWhileState s a -> m (Step (DropWhileState s a) a)
forall a.
State Stream m a
-> DropWhileState s a -> m (Step (DropWhileState s a) a)
step' (s -> DropWhileState s a
forall s a. s -> DropWhileState s a
DropWhileDrop s
state)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> DropWhileState s a -> m (Step (DropWhileState s a) a)
step' State Stream m a
gst (DropWhileDrop s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> do
                Bool
b <- a -> m Bool
f a
x
                if Bool
b
                then Step (DropWhileState s a) a -> m (Step (DropWhileState s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (DropWhileState s a) a -> m (Step (DropWhileState s a) a))
-> Step (DropWhileState s a) a -> m (Step (DropWhileState s a) a)
forall a b. (a -> b) -> a -> b
$ DropWhileState s a -> Step (DropWhileState s a) a
forall s a. s -> Step s a
Skip (s -> DropWhileState s a
forall s a. s -> DropWhileState s a
DropWhileDrop s
s)
                else Step (DropWhileState s a) a -> m (Step (DropWhileState s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (DropWhileState s a) a -> m (Step (DropWhileState s a) a))
-> Step (DropWhileState s a) a -> m (Step (DropWhileState s a) a)
forall a b. (a -> b) -> a -> b
$ DropWhileState s a -> Step (DropWhileState s a) a
forall s a. s -> Step s a
Skip (a -> s -> DropWhileState s a
forall s a. a -> s -> DropWhileState s a
DropWhileYield a
x s
s)
            Skip s
s -> Step (DropWhileState s a) a -> m (Step (DropWhileState s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (DropWhileState s a) a -> m (Step (DropWhileState s a) a))
-> Step (DropWhileState s a) a -> m (Step (DropWhileState s a) a)
forall a b. (a -> b) -> a -> b
$ DropWhileState s a -> Step (DropWhileState s a) a
forall s a. s -> Step s a
Skip (s -> DropWhileState s a
forall s a. s -> DropWhileState s a
DropWhileDrop s
s)
            Step s a
Stop -> Step (DropWhileState s a) a -> m (Step (DropWhileState s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (DropWhileState s a) a
forall s a. Step s a
Stop

    step' State Stream m a
gst (DropWhileNext s
st) =  do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> Step (DropWhileState s a) a -> m (Step (DropWhileState s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (DropWhileState s a) a -> m (Step (DropWhileState s a) a))
-> Step (DropWhileState s a) a -> m (Step (DropWhileState s a) a)
forall a b. (a -> b) -> a -> b
$ DropWhileState s a -> Step (DropWhileState s a) a
forall s a. s -> Step s a
Skip (a -> s -> DropWhileState s a
forall s a. a -> s -> DropWhileState s a
DropWhileYield a
x s
s)
            Skip s
s    -> Step (DropWhileState s a) a -> m (Step (DropWhileState s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (DropWhileState s a) a -> m (Step (DropWhileState s a) a))
-> Step (DropWhileState s a) a -> m (Step (DropWhileState s a) a)
forall a b. (a -> b) -> a -> b
$ DropWhileState s a -> Step (DropWhileState s a) a
forall s a. s -> Step s a
Skip (s -> DropWhileState s a
forall s a. s -> DropWhileState s a
DropWhileNext s
s)
            Step s a
Stop      -> Step (DropWhileState s a) a -> m (Step (DropWhileState s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (DropWhileState s a) a
forall s a. Step s a
Stop

    step' State Stream m a
_ (DropWhileYield a
x s
st) = Step (DropWhileState s a) a -> m (Step (DropWhileState s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (DropWhileState s a) a -> m (Step (DropWhileState s a) a))
-> Step (DropWhileState s a) a -> m (Step (DropWhileState s a) a)
forall a b. (a -> b) -> a -> b
$ a -> DropWhileState s a -> Step (DropWhileState s a) a
forall s a. a -> s -> Step s a
Yield a
x (s -> DropWhileState s a
forall s a. s -> DropWhileState s a
DropWhileNext s
st)

{-# INLINE dropWhile #-}
dropWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
dropWhile :: (a -> Bool) -> Stream m a -> Stream m a
dropWhile a -> Bool
f = (a -> m Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
dropWhileM (Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> m Bool) -> (a -> Bool) -> a -> m Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Bool
f)

------------------------------------------------------------------------------
-- Inserting Elements
------------------------------------------------------------------------------

{-# INLINE_NORMAL insertBy #-}
insertBy :: Monad m => (a -> a -> Ordering) -> a -> Stream m a -> Stream m a
insertBy :: (a -> a -> Ordering) -> a -> Stream m a -> Stream m a
insertBy a -> a -> Ordering
cmp a
a (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a
 -> (s, Bool, Maybe a) -> m (Step (s, Bool, Maybe a) a))
-> (s, Bool, Maybe a) -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> (s, Bool, Maybe a) -> m (Step (s, Bool, Maybe a) a)
step' (s
state, Bool
False, Maybe a
forall a. Maybe a
Nothing)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> (s, Bool, Maybe a) -> m (Step (s, Bool, Maybe a) a)
step' State Stream m a
gst (s
st, Bool
False, Maybe a
_) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> case a -> a -> Ordering
cmp a
a a
x of
                Ordering
GT -> Step (s, Bool, Maybe a) a -> m (Step (s, Bool, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, Bool, Maybe a) a -> m (Step (s, Bool, Maybe a) a))
-> Step (s, Bool, Maybe a) a -> m (Step (s, Bool, Maybe a) a)
forall a b. (a -> b) -> a -> b
$ a -> (s, Bool, Maybe a) -> Step (s, Bool, Maybe a) a
forall s a. a -> s -> Step s a
Yield a
x (s
s, Bool
False, Maybe a
forall a. Maybe a
Nothing)
                Ordering
_  -> Step (s, Bool, Maybe a) a -> m (Step (s, Bool, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, Bool, Maybe a) a -> m (Step (s, Bool, Maybe a) a))
-> Step (s, Bool, Maybe a) a -> m (Step (s, Bool, Maybe a) a)
forall a b. (a -> b) -> a -> b
$ a -> (s, Bool, Maybe a) -> Step (s, Bool, Maybe a) a
forall s a. a -> s -> Step s a
Yield a
a (s
s, Bool
True, a -> Maybe a
forall a. a -> Maybe a
Just a
x)
            Skip s
s -> Step (s, Bool, Maybe a) a -> m (Step (s, Bool, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, Bool, Maybe a) a -> m (Step (s, Bool, Maybe a) a))
-> Step (s, Bool, Maybe a) a -> m (Step (s, Bool, Maybe a) a)
forall a b. (a -> b) -> a -> b
$ (s, Bool, Maybe a) -> Step (s, Bool, Maybe a) a
forall s a. s -> Step s a
Skip (s
s, Bool
False, Maybe a
forall a. Maybe a
Nothing)
            Step s a
Stop   -> Step (s, Bool, Maybe a) a -> m (Step (s, Bool, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, Bool, Maybe a) a -> m (Step (s, Bool, Maybe a) a))
-> Step (s, Bool, Maybe a) a -> m (Step (s, Bool, Maybe a) a)
forall a b. (a -> b) -> a -> b
$ a -> (s, Bool, Maybe a) -> Step (s, Bool, Maybe a) a
forall s a. a -> s -> Step s a
Yield a
a (s
st, Bool
True, Maybe a
forall a. Maybe a
Nothing)

    step' State Stream m a
_ (s
_, Bool
True, Maybe a
Nothing) = Step (s, Bool, Maybe a) a -> m (Step (s, Bool, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (s, Bool, Maybe a) a
forall s a. Step s a
Stop

    step' State Stream m a
gst (s
st, Bool
True, Just a
prev) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> Step (s, Bool, Maybe a) a -> m (Step (s, Bool, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, Bool, Maybe a) a -> m (Step (s, Bool, Maybe a) a))
-> Step (s, Bool, Maybe a) a -> m (Step (s, Bool, Maybe a) a)
forall a b. (a -> b) -> a -> b
$ a -> (s, Bool, Maybe a) -> Step (s, Bool, Maybe a) a
forall s a. a -> s -> Step s a
Yield a
prev (s
s, Bool
True, a -> Maybe a
forall a. a -> Maybe a
Just a
x)
            Skip s
s    -> Step (s, Bool, Maybe a) a -> m (Step (s, Bool, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, Bool, Maybe a) a -> m (Step (s, Bool, Maybe a) a))
-> Step (s, Bool, Maybe a) a -> m (Step (s, Bool, Maybe a) a)
forall a b. (a -> b) -> a -> b
$ (s, Bool, Maybe a) -> Step (s, Bool, Maybe a) a
forall s a. s -> Step s a
Skip (s
s, Bool
True, a -> Maybe a
forall a. a -> Maybe a
Just a
prev)
            Step s a
Stop      -> Step (s, Bool, Maybe a) a -> m (Step (s, Bool, Maybe a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, Bool, Maybe a) a -> m (Step (s, Bool, Maybe a) a))
-> Step (s, Bool, Maybe a) a -> m (Step (s, Bool, Maybe a) a)
forall a b. (a -> b) -> a -> b
$ a -> (s, Bool, Maybe a) -> Step (s, Bool, Maybe a) a
forall s a. a -> s -> Step s a
Yield a
prev (s
st, Bool
True, Maybe a
forall a. Maybe a
Nothing)

data LoopState x s = FirstYield s
                   | InterspersingYield s
                   | YieldAndCarry x s

{-# INLINE_NORMAL intersperseM #-}
intersperseM :: Monad m => m a -> Stream m a -> Stream m a
intersperseM :: m a -> Stream m a -> Stream m a
intersperseM m a
m (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a -> LoopState a s -> m (Step (LoopState a s) a))
-> LoopState a s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> LoopState a s -> m (Step (LoopState a s) a)
step' (s -> LoopState a s
forall x s. s -> LoopState x s
FirstYield s
state)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> LoopState a s -> m (Step (LoopState a s) a)
step' State Stream m a
gst (FirstYield s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        Step (LoopState a s) a -> m (Step (LoopState a s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (LoopState a s) a -> m (Step (LoopState a s) a))
-> Step (LoopState a s) a -> m (Step (LoopState a s) a)
forall a b. (a -> b) -> a -> b
$
            case Step s a
r of
                Yield a
x s
s -> LoopState a s -> Step (LoopState a s) a
forall s a. s -> Step s a
Skip (a -> s -> LoopState a s
forall x s. x -> s -> LoopState x s
YieldAndCarry a
x s
s)
                Skip s
s -> LoopState a s -> Step (LoopState a s) a
forall s a. s -> Step s a
Skip (s -> LoopState a s
forall x s. s -> LoopState x s
FirstYield s
s)
                Step s a
Stop -> Step (LoopState a s) a
forall s a. Step s a
Stop

    step' State Stream m a
gst (InterspersingYield s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> do
                a
a <- m a
m
                Step (LoopState a s) a -> m (Step (LoopState a s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (LoopState a s) a -> m (Step (LoopState a s) a))
-> Step (LoopState a s) a -> m (Step (LoopState a s) a)
forall a b. (a -> b) -> a -> b
$ a -> LoopState a s -> Step (LoopState a s) a
forall s a. a -> s -> Step s a
Yield a
a (a -> s -> LoopState a s
forall x s. x -> s -> LoopState x s
YieldAndCarry a
x s
s)
            Skip s
s -> Step (LoopState a s) a -> m (Step (LoopState a s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (LoopState a s) a -> m (Step (LoopState a s) a))
-> Step (LoopState a s) a -> m (Step (LoopState a s) a)
forall a b. (a -> b) -> a -> b
$ LoopState a s -> Step (LoopState a s) a
forall s a. s -> Step s a
Skip (LoopState a s -> Step (LoopState a s) a)
-> LoopState a s -> Step (LoopState a s) a
forall a b. (a -> b) -> a -> b
$ s -> LoopState a s
forall x s. s -> LoopState x s
InterspersingYield s
s
            Step s a
Stop -> Step (LoopState a s) a -> m (Step (LoopState a s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (LoopState a s) a
forall s a. Step s a
Stop

    step' State Stream m a
_ (YieldAndCarry a
x s
st) = Step (LoopState a s) a -> m (Step (LoopState a s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (LoopState a s) a -> m (Step (LoopState a s) a))
-> Step (LoopState a s) a -> m (Step (LoopState a s) a)
forall a b. (a -> b) -> a -> b
$ a -> LoopState a s -> Step (LoopState a s) a
forall s a. a -> s -> Step s a
Yield a
x (s -> LoopState a s
forall x s. s -> LoopState x s
InterspersingYield s
st)

{-# INLINE intersperse #-}
intersperse :: Monad m => a -> Stream m a -> Stream m a
intersperse :: a -> Stream m a -> Stream m a
intersperse a
a = m a -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
intersperseM (a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a)

{-# INLINE_NORMAL intersperseM_ #-}
intersperseM_ :: Monad m => m b -> Stream m a -> Stream m a
intersperseM_ :: m b -> Stream m a -> Stream m a
intersperseM_ m b
m (Stream State Stream m a -> s -> m (Step s a)
step1 s
state1) = (State Stream m a
 -> Either (m (), s) s -> m (Step (Either (m (), s) s) a))
-> Either (m (), s) s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> Either (m (), s) s -> m (Step (Either (m (), s) s) a)
step ((m (), s) -> Either (m (), s) s
forall a b. a -> Either a b
Left (() -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure (), s
state1))
  where
    {-# INLINE_LATE step #-}
    step :: State Stream m a
-> Either (m (), s) s -> m (Step (Either (m (), s) s) a)
step State Stream m a
gst (Left (m ()
eff, s
st)) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> m ()
eff m ()
-> m (Step (Either (m (), s) s) a)
-> m (Step (Either (m (), s) s) a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (Either (m (), s) s) a -> m (Step (Either (m (), s) s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Either (m (), s) s -> Step (Either (m (), s) s) a
forall s a. a -> s -> Step s a
Yield a
x (s -> Either (m (), s) s
forall a b. b -> Either a b
Right s
s))
            Skip s
s -> Step (Either (m (), s) s) a -> m (Step (Either (m (), s) s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either (m (), s) s) a -> m (Step (Either (m (), s) s) a))
-> Step (Either (m (), s) s) a -> m (Step (Either (m (), s) s) a)
forall a b. (a -> b) -> a -> b
$ Either (m (), s) s -> Step (Either (m (), s) s) a
forall s a. s -> Step s a
Skip ((m (), s) -> Either (m (), s) s
forall a b. a -> Either a b
Left (m ()
eff, s
s))
            Step s a
Stop -> Step (Either (m (), s) s) a -> m (Step (Either (m (), s) s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Either (m (), s) s) a
forall s a. Step s a
Stop

    step State Stream m a
_ (Right s
st) = Step (Either (m (), s) s) a -> m (Step (Either (m (), s) s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either (m (), s) s) a -> m (Step (Either (m (), s) s) a))
-> Step (Either (m (), s) s) a -> m (Step (Either (m (), s) s) a)
forall a b. (a -> b) -> a -> b
$ Either (m (), s) s -> Step (Either (m (), s) s) a
forall s a. s -> Step s a
Skip (Either (m (), s) s -> Step (Either (m (), s) s) a)
-> Either (m (), s) s -> Step (Either (m (), s) s) a
forall a b. (a -> b) -> a -> b
$ (m (), s) -> Either (m (), s) s
forall a b. a -> Either a b
Left (m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void m b
m, s
st)

data SuffixState s a
    = SuffixElem s
    | SuffixSuffix s
    | SuffixYield a (SuffixState s a)

{-# INLINE_NORMAL intersperseSuffix #-}
intersperseSuffix :: forall m a. Monad m => m a -> Stream m a -> Stream m a
intersperseSuffix :: m a -> Stream m a -> Stream m a
intersperseSuffix m a
action (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m a
 -> SuffixState s a -> m (Step (SuffixState s a) a))
-> SuffixState s a -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> SuffixState s a -> m (Step (SuffixState s a) a)
step' (s -> SuffixState s a
forall s a. s -> SuffixState s a
SuffixElem s
state)
    where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> SuffixState s a -> m (Step (SuffixState s a) a)
step' State Stream m a
gst (SuffixElem s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        Step (SuffixState s a) a -> m (Step (SuffixState s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SuffixState s a) a -> m (Step (SuffixState s a) a))
-> Step (SuffixState s a) a -> m (Step (SuffixState s a) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
x s
s -> SuffixState s a -> Step (SuffixState s a) a
forall s a. s -> Step s a
Skip (a -> SuffixState s a -> SuffixState s a
forall s a. a -> SuffixState s a -> SuffixState s a
SuffixYield a
x (s -> SuffixState s a
forall s a. s -> SuffixState s a
SuffixSuffix s
s))
            Skip s
s -> SuffixState s a -> Step (SuffixState s a) a
forall s a. s -> Step s a
Skip (s -> SuffixState s a
forall s a. s -> SuffixState s a
SuffixElem s
s)
            Step s a
Stop -> Step (SuffixState s a) a
forall s a. Step s a
Stop

    step' State Stream m a
_ (SuffixSuffix s
st) = do
        m a
action m a
-> (a -> m (Step (SuffixState s a) a))
-> m (Step (SuffixState s a) a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
r -> Step (SuffixState s a) a -> m (Step (SuffixState s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SuffixState s a) a -> m (Step (SuffixState s a) a))
-> Step (SuffixState s a) a -> m (Step (SuffixState s a) a)
forall a b. (a -> b) -> a -> b
$ SuffixState s a -> Step (SuffixState s a) a
forall s a. s -> Step s a
Skip (a -> SuffixState s a -> SuffixState s a
forall s a. a -> SuffixState s a -> SuffixState s a
SuffixYield a
r (s -> SuffixState s a
forall s a. s -> SuffixState s a
SuffixElem s
st))

    step' State Stream m a
_ (SuffixYield a
x SuffixState s a
next) = Step (SuffixState s a) a -> m (Step (SuffixState s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SuffixState s a) a -> m (Step (SuffixState s a) a))
-> Step (SuffixState s a) a -> m (Step (SuffixState s a) a)
forall a b. (a -> b) -> a -> b
$ a -> SuffixState s a -> Step (SuffixState s a) a
forall s a. a -> s -> Step s a
Yield a
x SuffixState s a
next

{-# INLINE_NORMAL intersperseSuffix_ #-}
intersperseSuffix_ :: Monad m => m b -> Stream m a -> Stream m a
intersperseSuffix_ :: m b -> Stream m a -> Stream m a
intersperseSuffix_ m b
m (Stream State Stream m a -> s -> m (Step s a)
step1 s
state1) = (State Stream m a -> Either s s -> m (Step (Either s s) a))
-> Either s s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> Either s s -> m (Step (Either s s) a)
step (s -> Either s s
forall a b. a -> Either a b
Left s
state1)
  where
    {-# INLINE_LATE step #-}
    step :: State Stream m a -> Either s s -> m (Step (Either s s) a)
step State Stream m a
gst (Left s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> Step (Either s s) a -> m (Step (Either s s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either s s) a -> m (Step (Either s s) a))
-> Step (Either s s) a -> m (Step (Either s s) a)
forall a b. (a -> b) -> a -> b
$ a -> Either s s -> Step (Either s s) a
forall s a. a -> s -> Step s a
Yield a
x (s -> Either s s
forall a b. b -> Either a b
Right s
s)
            Skip s
s -> Step (Either s s) a -> m (Step (Either s s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Either s s) a -> m (Step (Either s s) a))
-> Step (Either s s) a -> m (Step (Either s s) a)
forall a b. (a -> b) -> a -> b
$ Either s s -> Step (Either s s) a
forall s a. s -> Step s a
Skip (Either s s -> Step (Either s s) a)
-> Either s s -> Step (Either s s) a
forall a b. (a -> b) -> a -> b
$ s -> Either s s
forall a b. a -> Either a b
Left s
s
            Step s a
Stop -> Step (Either s s) a -> m (Step (Either s s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Either s s) a
forall s a. Step s a
Stop

    step State Stream m a
_ (Right s
st) = m b
m m b -> m (Step (Either s s) a) -> m (Step (Either s s) a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (Either s s) a -> m (Step (Either s s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either s s -> Step (Either s s) a
forall s a. s -> Step s a
Skip (s -> Either s s
forall a b. a -> Either a b
Left s
st))

data SuffixSpanState s a
    = SuffixSpanElem s Int
    | SuffixSpanSuffix s
    | SuffixSpanYield a (SuffixSpanState s a)
    | SuffixSpanLast
    | SuffixSpanStop

-- | intersperse after every n items
{-# INLINE_NORMAL intersperseSuffixBySpan #-}
intersperseSuffixBySpan :: forall m a. Monad m
    => Int -> m a -> Stream m a -> Stream m a
intersperseSuffixBySpan :: Int -> m a -> Stream m a -> Stream m a
intersperseSuffixBySpan Int
n m a
action (Stream State Stream m a -> s -> m (Step s a)
step s
state) =
    (State Stream m a
 -> SuffixSpanState s a -> m (Step (SuffixSpanState s a) a))
-> SuffixSpanState s a -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> SuffixSpanState s a -> m (Step (SuffixSpanState s a) a)
step' (s -> Int -> SuffixSpanState s a
forall s a. s -> Int -> SuffixSpanState s a
SuffixSpanElem s
state Int
n)
    where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> SuffixSpanState s a -> m (Step (SuffixSpanState s a) a)
step' State Stream m a
gst (SuffixSpanElem s
st Int
i) | Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        Step (SuffixSpanState s a) a -> m (Step (SuffixSpanState s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SuffixSpanState s a) a -> m (Step (SuffixSpanState s a) a))
-> Step (SuffixSpanState s a) a -> m (Step (SuffixSpanState s a) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
x s
s -> SuffixSpanState s a -> Step (SuffixSpanState s a) a
forall s a. s -> Step s a
Skip (a -> SuffixSpanState s a -> SuffixSpanState s a
forall s a. a -> SuffixSpanState s a -> SuffixSpanState s a
SuffixSpanYield a
x (s -> Int -> SuffixSpanState s a
forall s a. s -> Int -> SuffixSpanState s a
SuffixSpanElem s
s (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)))
            Skip s
s -> SuffixSpanState s a -> Step (SuffixSpanState s a) a
forall s a. s -> Step s a
Skip (s -> Int -> SuffixSpanState s a
forall s a. s -> Int -> SuffixSpanState s a
SuffixSpanElem s
s Int
i)
            Step s a
Stop -> if Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n then Step (SuffixSpanState s a) a
forall s a. Step s a
Stop else SuffixSpanState s a -> Step (SuffixSpanState s a) a
forall s a. s -> Step s a
Skip SuffixSpanState s a
forall s a. SuffixSpanState s a
SuffixSpanLast
    step' State Stream m a
_ (SuffixSpanElem s
st Int
_) = Step (SuffixSpanState s a) a -> m (Step (SuffixSpanState s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SuffixSpanState s a) a -> m (Step (SuffixSpanState s a) a))
-> Step (SuffixSpanState s a) a -> m (Step (SuffixSpanState s a) a)
forall a b. (a -> b) -> a -> b
$ SuffixSpanState s a -> Step (SuffixSpanState s a) a
forall s a. s -> Step s a
Skip (s -> SuffixSpanState s a
forall s a. s -> SuffixSpanState s a
SuffixSpanSuffix s
st)

    step' State Stream m a
_ (SuffixSpanSuffix s
st) = do
        m a
action m a
-> (a -> m (Step (SuffixSpanState s a) a))
-> m (Step (SuffixSpanState s a) a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
r -> Step (SuffixSpanState s a) a -> m (Step (SuffixSpanState s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SuffixSpanState s a) a -> m (Step (SuffixSpanState s a) a))
-> Step (SuffixSpanState s a) a -> m (Step (SuffixSpanState s a) a)
forall a b. (a -> b) -> a -> b
$ SuffixSpanState s a -> Step (SuffixSpanState s a) a
forall s a. s -> Step s a
Skip (a -> SuffixSpanState s a -> SuffixSpanState s a
forall s a. a -> SuffixSpanState s a -> SuffixSpanState s a
SuffixSpanYield a
r (s -> Int -> SuffixSpanState s a
forall s a. s -> Int -> SuffixSpanState s a
SuffixSpanElem s
st Int
n))

    step' State Stream m a
_ (SuffixSpanState s a
SuffixSpanLast) = do
        m a
action m a
-> (a -> m (Step (SuffixSpanState s a) a))
-> m (Step (SuffixSpanState s a) a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
r -> Step (SuffixSpanState s a) a -> m (Step (SuffixSpanState s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SuffixSpanState s a) a -> m (Step (SuffixSpanState s a) a))
-> Step (SuffixSpanState s a) a -> m (Step (SuffixSpanState s a) a)
forall a b. (a -> b) -> a -> b
$ SuffixSpanState s a -> Step (SuffixSpanState s a) a
forall s a. s -> Step s a
Skip (a -> SuffixSpanState s a -> SuffixSpanState s a
forall s a. a -> SuffixSpanState s a -> SuffixSpanState s a
SuffixSpanYield a
r SuffixSpanState s a
forall s a. SuffixSpanState s a
SuffixSpanStop)

    step' State Stream m a
_ (SuffixSpanYield a
x SuffixSpanState s a
next) = Step (SuffixSpanState s a) a -> m (Step (SuffixSpanState s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SuffixSpanState s a) a -> m (Step (SuffixSpanState s a) a))
-> Step (SuffixSpanState s a) a -> m (Step (SuffixSpanState s a) a)
forall a b. (a -> b) -> a -> b
$ a -> SuffixSpanState s a -> Step (SuffixSpanState s a) a
forall s a. a -> s -> Step s a
Yield a
x SuffixSpanState s a
next

    step' State Stream m a
_ (SuffixSpanState s a
SuffixSpanStop) = Step (SuffixSpanState s a) a -> m (Step (SuffixSpanState s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (SuffixSpanState s a) a
forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Reordering
------------------------------------------------------------------------------

-- We can implement reverse as:
--
-- > reverse = foldlS (flip cons) nil
--
-- However, this implementation is unusable because of the horrible performance
-- of cons. So we just convert it to a list first and then stream from the
-- list.
--
-- XXX Maybe we can use an Array instead of a list here?
{-# INLINE_NORMAL reverse #-}
reverse :: Monad m => Stream m a -> Stream m a
reverse :: Stream m a -> Stream m a
reverse Stream m a
m = (State Stream m a -> Maybe [a] -> m (Step (Maybe [a]) a))
-> Maybe [a] -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> Maybe [a] -> m (Step (Maybe [a]) a)
forall p. p -> Maybe [a] -> m (Step (Maybe [a]) a)
step Maybe [a]
forall a. Maybe a
Nothing
    where
    {-# INLINE_LATE step #-}
    step :: p -> Maybe [a] -> m (Step (Maybe [a]) a)
step p
_ Maybe [a]
Nothing = do
        [a]
xs <- ([a] -> a -> [a]) -> [a] -> Stream m a -> m [a]
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> m b
foldl' ((a -> [a] -> [a]) -> [a] -> a -> [a]
forall a b c. (a -> b -> c) -> b -> a -> c
flip (:)) [] Stream m a
m
        Step (Maybe [a]) a -> m (Step (Maybe [a]) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe [a]) a -> m (Step (Maybe [a]) a))
-> Step (Maybe [a]) a -> m (Step (Maybe [a]) a)
forall a b. (a -> b) -> a -> b
$ Maybe [a] -> Step (Maybe [a]) a
forall s a. s -> Step s a
Skip ([a] -> Maybe [a]
forall a. a -> Maybe a
Just [a]
xs)
    step p
_ (Just (a
x:[a]
xs)) = Step (Maybe [a]) a -> m (Step (Maybe [a]) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe [a]) a -> m (Step (Maybe [a]) a))
-> Step (Maybe [a]) a -> m (Step (Maybe [a]) a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe [a] -> Step (Maybe [a]) a
forall s a. a -> s -> Step s a
Yield a
x ([a] -> Maybe [a]
forall a. a -> Maybe a
Just [a]
xs)
    step p
_ (Just []) = Step (Maybe [a]) a -> m (Step (Maybe [a]) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Maybe [a]) a
forall s a. Step s a
Stop

-- Much faster reverse for Storables
{-# INLINE_NORMAL reverse' #-}
reverse' :: forall m a. (MonadIO m, Storable a) => Stream m a -> Stream m a
{-
-- This commented implementation copies the whole stream into one single array
-- and then streams from that array, this has exactly the same performance as
-- the chunked code that follows.  Though this could be problematic due to
-- unbounded large allocations. However, if we use an idiomatic implementation
-- of arraysOf instead of the custom implementation then the chunked code
-- becomes worse by 6 times. Need to investigate if that can be improved.
import Foreign.ForeignPtr (touchForeignPtr)
import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
import Foreign.Ptr (Ptr, plusPtr)
reverse' m = Stream step Nothing
    where
    {-# INLINE_LATE step #-}
    step _ Nothing = do
        arr <- A.fromStreamD m
        let p = A.aEnd arr `plusPtr` negate (sizeOf (undefined :: a))
        return $ Skip $ Just (A.aStart arr, p)

    step _ (Just (start, p)) | p < unsafeForeignPtrToPtr start = return Stop

    step _ (Just (start, p)) = do
        let !x = A.unsafeInlineIO $ do
                    r <- peek p
                    touchForeignPtr start
                    return r
            next = p `plusPtr` negate (sizeOf (undefined :: a))
        return $ Yield x (Just (start, next))
-}
reverse' :: Stream m a -> Stream m a
reverse' =
          Stream m (Array a) -> Stream m a
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Stream m (Array a) -> Stream m a
A.flattenArraysRev -- unfoldMany A.readRev
        (Stream m (Array a) -> Stream m a)
-> (Stream m a -> Stream m (Array a)) -> Stream m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m (Array a) -> Stream m (Array a)
forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a
fromStreamK
        (Stream m (Array a) -> Stream m (Array a))
-> (Stream m a -> Stream m (Array a))
-> Stream m a
-> Stream m (Array a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m (Array a) -> Stream m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a
K.reverse
        (Stream m (Array a) -> Stream m (Array a))
-> (Stream m a -> Stream m (Array a))
-> Stream m a
-> Stream m (Array a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m (Array a) -> Stream m (Array a)
forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a
toStreamK
        (Stream m (Array a) -> Stream m (Array a))
-> (Stream m a -> Stream m (Array a))
-> Stream m a
-> Stream m (Array a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Stream m a -> Stream m (Array a)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Stream m a -> Stream m (Array a)
A.arraysOf Int
A.defaultChunkSize

------------------------------------------------------------------------------
-- Position Indexing
------------------------------------------------------------------------------

-- Adapted from the vector package
{-# INLINE_NORMAL indexed #-}
indexed :: Monad m => Stream m a -> Stream m (Int, a)
indexed :: Stream m a -> Stream m (Int, a)
indexed (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m (Int, a) -> (s, Int) -> m (Step (s, Int) (Int, a)))
-> (s, Int) -> Stream m (Int, a)
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m (Int, a) -> (s, Int) -> m (Step (s, Int) (Int, a))
forall b (m :: * -> *) a.
Num b =>
State Stream m a -> (s, b) -> m (Step (s, b) (b, a))
step' (s
state, Int
0)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> (s, b) -> m (Step (s, b) (b, a))
step' State Stream m a
gst (s
st, b
i) = b
i b -> m (Step (s, b) (b, a)) -> m (Step (s, b) (b, a))
`seq` do
         Step s a
r <- State Stream m a -> s -> m (Step s a)
step (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
         case Step s a
r of
             Yield a
x s
s -> Step (s, b) (b, a) -> m (Step (s, b) (b, a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, b) (b, a) -> m (Step (s, b) (b, a)))
-> Step (s, b) (b, a) -> m (Step (s, b) (b, a))
forall a b. (a -> b) -> a -> b
$ (b, a) -> (s, b) -> Step (s, b) (b, a)
forall s a. a -> s -> Step s a
Yield (b
i, a
x) (s
s, b
ib -> b -> b
forall a. Num a => a -> a -> a
+b
1)
             Skip    s
s -> Step (s, b) (b, a) -> m (Step (s, b) (b, a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, b) (b, a) -> m (Step (s, b) (b, a)))
-> Step (s, b) (b, a) -> m (Step (s, b) (b, a))
forall a b. (a -> b) -> a -> b
$ (s, b) -> Step (s, b) (b, a)
forall s a. s -> Step s a
Skip (s
s, b
i)
             Step s a
Stop      -> Step (s, b) (b, a) -> m (Step (s, b) (b, a))
forall (m :: * -> *) a. Monad m => a -> m a
return Step (s, b) (b, a)
forall s a. Step s a
Stop

-- Adapted from the vector package
{-# INLINE_NORMAL indexedR #-}
indexedR :: Monad m => Int -> Stream m a -> Stream m (Int, a)
indexedR :: Int -> Stream m a -> Stream m (Int, a)
indexedR Int
m (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m (Int, a) -> (s, Int) -> m (Step (s, Int) (Int, a)))
-> (s, Int) -> Stream m (Int, a)
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m (Int, a) -> (s, Int) -> m (Step (s, Int) (Int, a))
forall b (m :: * -> *) a.
Num b =>
State Stream m a -> (s, b) -> m (Step (s, b) (b, a))
step' (s
state, Int
m)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> (s, b) -> m (Step (s, b) (b, a))
step' State Stream m a
gst (s
st, b
i) = b
i b -> m (Step (s, b) (b, a)) -> m (Step (s, b) (b, a))
`seq` do
         Step s a
r <- State Stream m a -> s -> m (Step s a)
step (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
         case Step s a
r of
             Yield a
x s
s -> let i' :: b
i' = b
i b -> b -> b
forall a. Num a => a -> a -> a
- b
1
                          in Step (s, b) (b, a) -> m (Step (s, b) (b, a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, b) (b, a) -> m (Step (s, b) (b, a)))
-> Step (s, b) (b, a) -> m (Step (s, b) (b, a))
forall a b. (a -> b) -> a -> b
$ (b, a) -> (s, b) -> Step (s, b) (b, a)
forall s a. a -> s -> Step s a
Yield (b
i, a
x) (s
s, b
i')
             Skip    s
s -> Step (s, b) (b, a) -> m (Step (s, b) (b, a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, b) (b, a) -> m (Step (s, b) (b, a)))
-> Step (s, b) (b, a) -> m (Step (s, b) (b, a))
forall a b. (a -> b) -> a -> b
$ (s, b) -> Step (s, b) (b, a)
forall s a. s -> Step s a
Skip (s
s, b
i)
             Step s a
Stop      -> Step (s, b) (b, a) -> m (Step (s, b) (b, a))
forall (m :: * -> *) a. Monad m => a -> m a
return Step (s, b) (b, a)
forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Searching
------------------------------------------------------------------------------

{-# INLINE_NORMAL findIndices #-}
findIndices :: Monad m => (a -> Bool) -> Stream m a -> Stream m Int
findIndices :: (a -> Bool) -> Stream m a -> Stream m Int
findIndices a -> Bool
p (Stream State Stream m a -> s -> m (Step s a)
step s
state) = (State Stream m Int -> (s, Int) -> m (Step (s, Int) Int))
-> (s, Int) -> Stream m Int
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m Int -> (s, Int) -> m (Step (s, Int) Int)
forall b (m :: * -> *) a.
Num b =>
State Stream m a -> (s, b) -> m (Step (s, b) b)
step' (s
state, Int
0)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> (s, b) -> m (Step (s, b) b)
step' State Stream m a
gst (s
st, b
i) = b
i b -> m (Step (s, b) b) -> m (Step (s, b) b)
`seq` do
      Step s a
r <- State Stream m a -> s -> m (Step s a)
step (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
      Step (s, b) b -> m (Step (s, b) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, b) b -> m (Step (s, b) b))
-> Step (s, b) b -> m (Step (s, b) b)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
          Yield a
x s
s -> if a -> Bool
p a
x then b -> (s, b) -> Step (s, b) b
forall s a. a -> s -> Step s a
Yield b
i (s
s, b
ib -> b -> b
forall a. Num a => a -> a -> a
+b
1) else (s, b) -> Step (s, b) b
forall s a. s -> Step s a
Skip (s
s, b
ib -> b -> b
forall a. Num a => a -> a -> a
+b
1)
          Skip s
s -> (s, b) -> Step (s, b) b
forall s a. s -> Step s a
Skip (s
s, b
i)
          Step s a
Stop   -> Step (s, b) b
forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Rolling map
------------------------------------------------------------------------------

data RollingMapState s a = RollingMapInit s | RollingMapGo s a

{-# INLINE rollingMapM #-}
rollingMapM :: Monad m => (a -> a -> m b) -> Stream m a -> Stream m b
rollingMapM :: (a -> a -> m b) -> Stream m a -> Stream m b
rollingMapM a -> a -> m b
f (Stream State Stream m a -> s -> m (Step s a)
step1 s
state1) = (State Stream m b
 -> RollingMapState s a -> m (Step (RollingMapState s a) b))
-> RollingMapState s a -> Stream m b
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b
-> RollingMapState s a -> m (Step (RollingMapState s a) b)
forall (m :: * -> *) a.
State Stream m a
-> RollingMapState s a -> m (Step (RollingMapState s a) b)
step (s -> RollingMapState s a
forall s a. s -> RollingMapState s a
RollingMapInit s
state1)
    where
    step :: State Stream m a
-> RollingMapState s a -> m (Step (RollingMapState s a) b)
step State Stream m a
gst (RollingMapInit s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        Step (RollingMapState s a) b -> m (Step (RollingMapState s a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (RollingMapState s a) b -> m (Step (RollingMapState s a) b))
-> Step (RollingMapState s a) b -> m (Step (RollingMapState s a) b)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
x s
s -> RollingMapState s a -> Step (RollingMapState s a) b
forall s a. s -> Step s a
Skip (RollingMapState s a -> Step (RollingMapState s a) b)
-> RollingMapState s a -> Step (RollingMapState s a) b
forall a b. (a -> b) -> a -> b
$ s -> a -> RollingMapState s a
forall s a. s -> a -> RollingMapState s a
RollingMapGo s
s a
x
            Skip s
s -> RollingMapState s a -> Step (RollingMapState s a) b
forall s a. s -> Step s a
Skip (RollingMapState s a -> Step (RollingMapState s a) b)
-> RollingMapState s a -> Step (RollingMapState s a) b
forall a b. (a -> b) -> a -> b
$ s -> RollingMapState s a
forall s a. s -> RollingMapState s a
RollingMapInit s
s
            Step s a
Stop   -> Step (RollingMapState s a) b
forall s a. Step s a
Stop

    step State Stream m a
gst (RollingMapGo s
s1 a
x1) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
s1
        case Step s a
r of
            Yield a
x s
s -> do
                !b
res <- a -> a -> m b
f a
x a
x1
                Step (RollingMapState s a) b -> m (Step (RollingMapState s a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (RollingMapState s a) b -> m (Step (RollingMapState s a) b))
-> Step (RollingMapState s a) b -> m (Step (RollingMapState s a) b)
forall a b. (a -> b) -> a -> b
$ b -> RollingMapState s a -> Step (RollingMapState s a) b
forall s a. a -> s -> Step s a
Yield b
res (RollingMapState s a -> Step (RollingMapState s a) b)
-> RollingMapState s a -> Step (RollingMapState s a) b
forall a b. (a -> b) -> a -> b
$ s -> a -> RollingMapState s a
forall s a. s -> a -> RollingMapState s a
RollingMapGo s
s a
x
            Skip s
s -> Step (RollingMapState s a) b -> m (Step (RollingMapState s a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (RollingMapState s a) b -> m (Step (RollingMapState s a) b))
-> Step (RollingMapState s a) b -> m (Step (RollingMapState s a) b)
forall a b. (a -> b) -> a -> b
$ RollingMapState s a -> Step (RollingMapState s a) b
forall s a. s -> Step s a
Skip (RollingMapState s a -> Step (RollingMapState s a) b)
-> RollingMapState s a -> Step (RollingMapState s a) b
forall a b. (a -> b) -> a -> b
$ s -> a -> RollingMapState s a
forall s a. s -> a -> RollingMapState s a
RollingMapGo s
s a
x1
            Step s a
Stop   -> Step (RollingMapState s a) b -> m (Step (RollingMapState s a) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (RollingMapState s a) b -> m (Step (RollingMapState s a) b))
-> Step (RollingMapState s a) b -> m (Step (RollingMapState s a) b)
forall a b. (a -> b) -> a -> b
$ Step (RollingMapState s a) b
forall s a. Step s a
Stop

{-# INLINE rollingMap #-}
rollingMap :: Monad m => (a -> a -> b) -> Stream m a -> Stream m b
rollingMap :: (a -> a -> b) -> Stream m a -> Stream m b
rollingMap a -> a -> b
f = (a -> a -> m b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> a -> m b) -> Stream m a -> Stream m b
rollingMapM (\a
x a
y -> b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> m b) -> b -> m b
forall a b. (a -> b) -> a -> b
$ a -> a -> b
f a
x a
y)

------------------------------------------------------------------------------
-- Maybe Streams
------------------------------------------------------------------------------

-- XXX Will this always fuse properly?
{-# INLINE_NORMAL mapMaybe #-}
mapMaybe :: Monad m => (a -> Maybe b) -> Stream m a -> Stream m b
mapMaybe :: (a -> Maybe b) -> Stream m a -> Stream m b
mapMaybe a -> Maybe b
f = (Maybe b -> b) -> Stream m (Maybe b) -> Stream m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe b -> b
forall a. HasCallStack => Maybe a -> a
fromJust (Stream m (Maybe b) -> Stream m b)
-> (Stream m a -> Stream m (Maybe b)) -> Stream m a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe b -> Bool) -> Stream m (Maybe b) -> Stream m (Maybe b)
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
filter Maybe b -> Bool
forall a. Maybe a -> Bool
isJust (Stream m (Maybe b) -> Stream m (Maybe b))
-> (Stream m a -> Stream m (Maybe b))
-> Stream m a
-> Stream m (Maybe b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Maybe b) -> Stream m a -> Stream m (Maybe b)
forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> Stream m a -> Stream m b
map a -> Maybe b
f

{-# INLINE_NORMAL mapMaybeM #-}
mapMaybeM :: Monad m => (a -> m (Maybe b)) -> Stream m a -> Stream m b
mapMaybeM :: (a -> m (Maybe b)) -> Stream m a -> Stream m b
mapMaybeM a -> m (Maybe b)
f = (Maybe b -> b) -> Stream m (Maybe b) -> Stream m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe b -> b
forall a. HasCallStack => Maybe a -> a
fromJust (Stream m (Maybe b) -> Stream m b)
-> (Stream m a -> Stream m (Maybe b)) -> Stream m a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe b -> Bool) -> Stream m (Maybe b) -> Stream m (Maybe b)
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
filter Maybe b -> Bool
forall a. Maybe a -> Bool
isJust (Stream m (Maybe b) -> Stream m (Maybe b))
-> (Stream m a -> Stream m (Maybe b))
-> Stream m a
-> Stream m (Maybe b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> m (Maybe b)) -> Stream m a -> Stream m (Maybe b)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
mapM a -> m (Maybe b)
f