{-# LANGUAGE CPP #-}
module Streamly.Internal.Data.Stream.Transformer
(
foldlT
, foldrT
, liftInner
, runReaderT
, usingReaderT
, evalStateT
, runStateT
, usingStateT
)
where
#include "inline.hs"
import Control.Monad.Trans.Class (MonadTrans(lift))
import Control.Monad.Trans.Reader (ReaderT)
import Control.Monad.Trans.State.Strict (StateT)
import GHC.Types (SPEC(..))
import Streamly.Internal.Data.SVar.Type (defState, adaptState)
import qualified Control.Monad.Trans.Reader as Reader
import qualified Control.Monad.Trans.State.Strict as State
import Streamly.Internal.Data.Stream.Type
#include "DocTestDataStream.hs"
{-# INLINE_NORMAL foldlT #-}
foldlT :: (Monad m, Monad (s m), MonadTrans s)
=> (s m b -> a -> s m b) -> s m b -> Stream m a -> s m b
foldlT :: forall (m :: * -> *) (s :: (* -> *) -> * -> *) b a.
(Monad m, Monad (s m), MonadTrans s) =>
(s m b -> a -> s m b) -> s m b -> Stream m a -> s m b
foldlT s m b -> a -> s m b
fstep s m b
begin (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = SPEC -> s m b -> s -> s m b
go SPEC
SPEC s m b
begin s
state
where
go :: SPEC -> s m b -> s -> s m b
go !SPEC
_ s m b
acc s
st = do
Step s a
r <- m (Step s a) -> s m (Step s a)
forall (m :: * -> *) a. Monad m => m a -> s m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Step s a) -> s m (Step s a)) -> m (Step s a) -> s m (Step s a)
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> s -> m (Step s a)
step State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
case Step s a
r of
Yield a
x s
s -> SPEC -> s m b -> s -> s m b
go SPEC
SPEC (s m b -> a -> s m b
fstep s m b
acc a
x) s
s
Skip s
s -> SPEC -> s m b -> s -> s m b
go SPEC
SPEC s m b
acc s
s
Step s a
Stop -> s m b
acc
{-# INLINE_NORMAL foldrT #-}
foldrT :: (Monad m, Monad (t m), MonadTrans t)
=> (a -> t m b -> t m b) -> t m b -> Stream m a -> t m b
foldrT :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
(Monad m, Monad (t m), MonadTrans t) =>
(a -> t m b -> t m b) -> t m b -> Stream m a -> t m b
foldrT a -> t m b -> t m b
f t m b
final (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = SPEC -> s -> t m b
go SPEC
SPEC s
state
where
{-# INLINE_LATE go #-}
go :: SPEC -> s -> t m b
go !SPEC
_ s
st = do
Step s a
r <- m (Step s a) -> t m (Step s a)
forall (m :: * -> *) a. Monad m => m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Step s a) -> t m (Step s a)) -> m (Step s a) -> t m (Step s a)
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> s -> m (Step s a)
step State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
case Step s a
r of
Yield a
x s
s -> a -> t m b -> t m b
f a
x (SPEC -> s -> t m b
go SPEC
SPEC s
s)
Skip s
s -> SPEC -> s -> t m b
go SPEC
SPEC s
s
Step s a
Stop -> t m b
final
{-# INLINE_NORMAL liftInner #-}
liftInner :: (Monad m, MonadTrans t, Monad (t m))
=> Stream m a -> Stream (t m) a
liftInner :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
liftInner (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = (State StreamK (t m) a -> s -> t m (Step s a))
-> s -> Stream (t m) a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK (t m) a -> s -> t m (Step s a)
forall {t :: (* -> *) -> * -> *} {m :: * -> *} {a}.
(MonadTrans t, Monad (t m)) =>
State StreamK m a -> s -> t m (Step s a)
step' s
state
where
{-# INLINE_LATE step' #-}
step' :: State StreamK m a -> s -> t m (Step s a)
step' State StreamK m a
gst s
st = do
Step s a
r <- m (Step s a) -> t m (Step s a)
forall (m :: * -> *) a. Monad m => m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Step s a) -> t m (Step s a)) -> m (Step s a) -> t m (Step s a)
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> s -> m (Step s a)
step (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
Step s a -> t m (Step s a)
forall a. a -> t m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a -> t m (Step s a)) -> Step s a -> t m (Step s a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
Yield a
x s
s -> a -> s -> Step s a
forall s a. a -> s -> Step s a
Yield a
x s
s
Skip s
s -> s -> Step s a
forall s a. s -> Step s a
Skip s
s
Step s a
Stop -> Step s a
forall s a. Step s a
Stop
{-# INLINE_NORMAL runReaderT #-}
runReaderT :: Monad m => m s -> Stream (ReaderT s m) a -> Stream m a
runReaderT :: forall (m :: * -> *) s a.
Monad m =>
m s -> Stream (ReaderT s m) a -> Stream m a
runReaderT m s
env (Stream State StreamK (ReaderT s m) a -> s -> ReaderT s m (Step s a)
step s
state) = (State StreamK m a -> (s, m s) -> m (Step (s, m s) a))
-> (s, m s) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> (s, m s) -> m (Step (s, m s) a)
forall {m :: * -> *} {m :: * -> *} {a}.
Monad m =>
State StreamK m a -> (s, m s) -> m (Step (s, m s) a)
step' (s
state, m s
env)
where
{-# INLINE_LATE step' #-}
step' :: State StreamK m a -> (s, m s) -> m (Step (s, m s) a)
step' State StreamK m a
gst (s
st, m s
action) = do
s
sv <- m s
action
Step s a
r <- ReaderT s m (Step s a) -> s -> m (Step s a)
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
Reader.runReaderT (State StreamK (ReaderT s m) a -> s -> ReaderT s m (Step s a)
step (State StreamK m a -> State StreamK (ReaderT s m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st) s
sv
Step (s, m s) a -> m (Step (s, m s) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, m s) a -> m (Step (s, m s) a))
-> Step (s, m s) a -> m (Step (s, m s) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
Yield a
x s
s -> a -> (s, m s) -> Step (s, m s) a
forall s a. a -> s -> Step s a
Yield a
x (s
s, s -> m s
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return s
sv)
Skip s
s -> (s, m s) -> Step (s, m s) a
forall s a. s -> Step s a
Skip (s
s, s -> m s
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return s
sv)
Step s a
Stop -> Step (s, m s) a
forall s a. Step s a
Stop
{-# INLINE usingReaderT #-}
usingReaderT
:: Monad m
=> m r
-> (Stream (ReaderT r m) a -> Stream (ReaderT r m) a)
-> Stream m a
-> Stream m a
usingReaderT :: forall (m :: * -> *) r a.
Monad m =>
m r
-> (Stream (ReaderT r m) a -> Stream (ReaderT r m) a)
-> Stream m a
-> Stream m a
usingReaderT m r
r Stream (ReaderT r m) a -> Stream (ReaderT r m) a
f Stream m a
xs = m r -> Stream (ReaderT r m) a -> Stream m a
forall (m :: * -> *) s a.
Monad m =>
m s -> Stream (ReaderT s m) a -> Stream m a
runReaderT m r
r (Stream (ReaderT r m) a -> Stream m a)
-> Stream (ReaderT r m) a -> Stream m a
forall a b. (a -> b) -> a -> b
$ Stream (ReaderT r m) a -> Stream (ReaderT r m) a
f (Stream (ReaderT r m) a -> Stream (ReaderT r m) a)
-> Stream (ReaderT r m) a -> Stream (ReaderT r m) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream (ReaderT r m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
liftInner Stream m a
xs
{-# INLINE_NORMAL evalStateT #-}
evalStateT :: Monad m => m s -> Stream (StateT s m) a -> Stream m a
evalStateT :: forall (m :: * -> *) s a.
Monad m =>
m s -> Stream (StateT s m) a -> Stream m a
evalStateT m s
initial (Stream State StreamK (StateT s m) a -> s -> StateT s m (Step s a)
step s
state) = (State StreamK m a -> (s, m s) -> m (Step (s, m s) a))
-> (s, m s) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> (s, m s) -> m (Step (s, m s) a)
forall {m :: * -> *} {m :: * -> *} {a}.
Monad m =>
State StreamK m a -> (s, m s) -> m (Step (s, m s) a)
step' (s
state, m s
initial)
where
{-# INLINE_LATE step' #-}
step' :: State StreamK m a -> (s, m s) -> m (Step (s, m s) a)
step' State StreamK m a
gst (s
st, m s
action) = do
s
sv <- m s
action
(Step s a
r, !s
sv') <- StateT s m (Step s a) -> s -> m (Step s a, s)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
State.runStateT (State StreamK (StateT s m) a -> s -> StateT s m (Step s a)
step (State StreamK m a -> State StreamK (StateT s m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st) s
sv
Step (s, m s) a -> m (Step (s, m s) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, m s) a -> m (Step (s, m s) a))
-> Step (s, m s) a -> m (Step (s, m s) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
Yield a
x s
s -> a -> (s, m s) -> Step (s, m s) a
forall s a. a -> s -> Step s a
Yield a
x (s
s, s -> m s
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return s
sv')
Skip s
s -> (s, m s) -> Step (s, m s) a
forall s a. s -> Step s a
Skip (s
s, s -> m s
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return s
sv')
Step s a
Stop -> Step (s, m s) a
forall s a. Step s a
Stop
{-# INLINE_NORMAL runStateT #-}
runStateT :: Monad m => m s -> Stream (StateT s m) a -> Stream m (s, a)
runStateT :: forall (m :: * -> *) s a.
Monad m =>
m s -> Stream (StateT s m) a -> Stream m (s, a)
runStateT m s
initial (Stream State StreamK (StateT s m) a -> s -> StateT s m (Step s a)
step s
state) = (State StreamK m (s, a) -> (s, m s) -> m (Step (s, m s) (s, a)))
-> (s, m s) -> Stream m (s, a)
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m (s, a) -> (s, m s) -> m (Step (s, m s) (s, a))
forall {m :: * -> *} {m :: * -> *} {a}.
Monad m =>
State StreamK m a -> (s, m s) -> m (Step (s, m s) (s, a))
step' (s
state, m s
initial)
where
{-# INLINE_LATE step' #-}
step' :: State StreamK m a -> (s, m s) -> m (Step (s, m s) (s, a))
step' State StreamK m a
gst (s
st, m s
action) = do
s
sv <- m s
action
(Step s a
r, !s
sv') <- StateT s m (Step s a) -> s -> m (Step s a, s)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
State.runStateT (State StreamK (StateT s m) a -> s -> StateT s m (Step s a)
step (State StreamK m a -> State StreamK (StateT s m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st) s
sv
Step (s, m s) (s, a) -> m (Step (s, m s) (s, a))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, m s) (s, a) -> m (Step (s, m s) (s, a)))
-> Step (s, m s) (s, a) -> m (Step (s, m s) (s, a))
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
Yield a
x s
s -> (s, a) -> (s, m s) -> Step (s, m s) (s, a)
forall s a. a -> s -> Step s a
Yield (s
sv', a
x) (s
s, s -> m s
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return s
sv')
Skip s
s -> (s, m s) -> Step (s, m s) (s, a)
forall s a. s -> Step s a
Skip (s
s, s -> m s
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return s
sv')
Step s a
Stop -> Step (s, m s) (s, a)
forall s a. Step s a
Stop
{-# INLINE usingStateT #-}
usingStateT
:: Monad m
=> m s
-> (Stream (StateT s m) a -> Stream (StateT s m) a)
-> Stream m a
-> Stream m a
usingStateT :: forall (m :: * -> *) s a.
Monad m =>
m s
-> (Stream (StateT s m) a -> Stream (StateT s m) a)
-> Stream m a
-> Stream m a
usingStateT m s
s Stream (StateT s m) a -> Stream (StateT s m) a
f = m s -> Stream (StateT s m) a -> Stream m a
forall (m :: * -> *) s a.
Monad m =>
m s -> Stream (StateT s m) a -> Stream m a
evalStateT m s
s (Stream (StateT s m) a -> Stream m a)
-> (Stream m a -> Stream (StateT s m) a)
-> Stream m a
-> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream (StateT s m) a -> Stream (StateT s m) a
f (Stream (StateT s m) a -> Stream (StateT s m) a)
-> (Stream m a -> Stream (StateT s m) a)
-> Stream m a
-> Stream (StateT s m) a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Stream (StateT s m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
liftInner