{-# LANGUAGE CPP #-}
-- |
-- Module      : Streamly.Internal.Data.Stream.StreamD.Transformer
-- Copyright   : (c) 2018 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Transform the underlying monad of a stream using a monad transfomer.

module Streamly.Internal.Data.Stream.StreamD.Transformer
    (
      foldlT
    , foldrT

    -- * Transform Inner Monad
    , liftInner
    , runReaderT
    , usingReaderT
    , evalStateT
    , runStateT
    , usingStateT
    )
where

#include "inline.hs"

import Control.Monad.Trans.Class (MonadTrans(lift))
import Control.Monad.Trans.Reader (ReaderT)
import Control.Monad.Trans.State.Strict (StateT)
import GHC.Types (SPEC(..))
import Streamly.Internal.Data.SVar.Type (defState, adaptState)

import qualified Control.Monad.Trans.Reader as Reader
import qualified Control.Monad.Trans.State.Strict as State

import Streamly.Internal.Data.Stream.StreamD.Type

#include "DocTestDataStream.hs"

-- | Lazy left fold to a transformer monad.
--
{-# INLINE_NORMAL foldlT #-}
foldlT :: (Monad m, Monad (s m), MonadTrans s)
    => (s m b -> a -> s m b) -> s m b -> Stream m a -> s m b
foldlT :: (s m b -> a -> s m b) -> s m b -> Stream m a -> s m b
foldlT s m b -> a -> s m b
fstep s m b
begin (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = SPEC -> s m b -> s -> s m b
go SPEC
SPEC s m b
begin s
state
  where
    go :: SPEC -> s m b -> s -> s m b
go !SPEC
_ s m b
acc s
st = do
        Step s a
r <- m (Step s a) -> s m (Step s a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Step s a) -> s m (Step s a)) -> m (Step s a) -> s m (Step s a)
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> s -> m (Step s a)
step State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s a
r of
            Yield a
x s
s -> SPEC -> s m b -> s -> s m b
go SPEC
SPEC (s m b -> a -> s m b
fstep s m b
acc a
x) s
s
            Skip s
s -> SPEC -> s m b -> s -> s m b
go SPEC
SPEC s m b
acc s
s
            Step s a
Stop   -> s m b
acc

-- | Right fold to a transformer monad.  This is the most general right fold
-- function. 'foldrS' is a special case of 'foldrT', however 'foldrS'
-- implementation can be more efficient:
--
-- >>> foldrS = Stream.foldrT
--
-- >>> step f x xs = lift $ f x (runIdentityT xs)
-- >>> foldrM f z s = runIdentityT $ Stream.foldrT (step f) (lift z) s
--
-- 'foldrT' can be used to translate streamly streams to other transformer
-- monads e.g.  to a different streaming type.
--
-- /Pre-release/
{-# INLINE_NORMAL foldrT #-}
foldrT :: (Monad m, Monad (t m), MonadTrans t)
    => (a -> t m b -> t m b) -> t m b -> Stream m a -> t m b
foldrT :: (a -> t m b -> t m b) -> t m b -> Stream m a -> t m b
foldrT a -> t m b -> t m b
f t m b
final (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = SPEC -> s -> t m b
go SPEC
SPEC s
state
  where
    {-# INLINE_LATE go #-}
    go :: SPEC -> s -> t m b
go !SPEC
_ s
st = do
          Step s a
r <- m (Step s a) -> t m (Step s a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Step s a) -> t m (Step s a)) -> m (Step s a) -> t m (Step s a)
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> s -> m (Step s a)
step State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
          case Step s a
r of
            Yield a
x s
s -> a -> t m b -> t m b
f a
x (SPEC -> s -> t m b
go SPEC
SPEC s
s)
            Skip s
s    -> SPEC -> s -> t m b
go SPEC
SPEC s
s
            Step s a
Stop      -> t m b
final

-------------------------------------------------------------------------------
-- Transform Inner Monad
-------------------------------------------------------------------------------

-- | Lift the inner monad @m@ of @Stream m a@ to @t m@ where @t@ is a monad
-- transformer.
--
{-# INLINE_NORMAL liftInner #-}
liftInner :: (Monad m, MonadTrans t, Monad (t m))
    => Stream m a -> Stream (t m) a
liftInner :: Stream m a -> Stream (t m) a
liftInner (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = (State StreamK (t m) a -> s -> t m (Step s a))
-> s -> Stream (t m) a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK (t m) a -> s -> t m (Step s a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad (t m)) =>
State StreamK m a -> s -> t m (Step s a)
step' s
state
    where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> s -> t m (Step s a)
step' State StreamK m a
gst s
st = do
        Step s a
r <- m (Step s a) -> t m (Step s a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Step s a) -> t m (Step s a)) -> m (Step s a) -> t m (Step s a)
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> s -> m (Step s a)
step (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        Step s a -> t m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a -> t m (Step s a)) -> Step s a -> t m (Step s a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
x s
s -> a -> s -> Step s a
forall s a. a -> s -> Step s a
Yield a
x s
s
            Skip s
s    -> s -> Step s a
forall s a. s -> Step s a
Skip s
s
            Step s a
Stop      -> Step s a
forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Sharing read only state in a stream
------------------------------------------------------------------------------

-- | Evaluate the inner monad of a stream as 'ReaderT'.
--
{-# INLINE_NORMAL runReaderT #-}
runReaderT :: Monad m => m s -> Stream (ReaderT s m) a -> Stream m a
runReaderT :: m s -> Stream (ReaderT s m) a -> Stream m a
runReaderT m s
env (Stream State StreamK (ReaderT s m) a -> s -> ReaderT s m (Step s a)
step s
state) = (State StreamK m a -> (s, m s) -> m (Step (s, m s) a))
-> (s, m s) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> (s, m s) -> m (Step (s, m s) a)
forall (m :: * -> *) (m :: * -> *) a.
Monad m =>
State StreamK m a -> (s, m s) -> m (Step (s, m s) a)
step' (s
state, m s
env)
    where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> (s, m s) -> m (Step (s, m s) a)
step' State StreamK m a
gst (s
st, m s
action) = do
        s
sv <- m s
action
        Step s a
r <- ReaderT s m (Step s a) -> s -> m (Step s a)
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
Reader.runReaderT (State StreamK (ReaderT s m) a -> s -> ReaderT s m (Step s a)
step (State StreamK m a -> State StreamK (ReaderT s m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st) s
sv
        Step (s, m s) a -> m (Step (s, m s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, m s) a -> m (Step (s, m s) a))
-> Step (s, m s) a -> m (Step (s, m s) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
x s
s -> a -> (s, m s) -> Step (s, m s) a
forall s a. a -> s -> Step s a
Yield a
x (s
s, s -> m s
forall (m :: * -> *) a. Monad m => a -> m a
return s
sv)
            Skip  s
s   -> (s, m s) -> Step (s, m s) a
forall s a. s -> Step s a
Skip (s
s, s -> m s
forall (m :: * -> *) a. Monad m => a -> m a
return s
sv)
            Step s a
Stop      -> Step (s, m s) a
forall s a. Step s a
Stop

-- | Run a stream transformation using a given environment.
--
{-# INLINE usingReaderT #-}
usingReaderT
    :: Monad m
    => m r
    -> (Stream (ReaderT r m) a -> Stream (ReaderT r m) a)
    -> Stream m a
    -> Stream m a
usingReaderT :: m r
-> (Stream (ReaderT r m) a -> Stream (ReaderT r m) a)
-> Stream m a
-> Stream m a
usingReaderT m r
r Stream (ReaderT r m) a -> Stream (ReaderT r m) a
f Stream m a
xs = m r -> Stream (ReaderT r m) a -> Stream m a
forall (m :: * -> *) s a.
Monad m =>
m s -> Stream (ReaderT s m) a -> Stream m a
runReaderT m r
r (Stream (ReaderT r m) a -> Stream m a)
-> Stream (ReaderT r m) a -> Stream m a
forall a b. (a -> b) -> a -> b
$ Stream (ReaderT r m) a -> Stream (ReaderT r m) a
f (Stream (ReaderT r m) a -> Stream (ReaderT r m) a)
-> Stream (ReaderT r m) a -> Stream (ReaderT r m) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream (ReaderT r m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
liftInner Stream m a
xs

------------------------------------------------------------------------------
-- Sharing read write state in a stream
------------------------------------------------------------------------------

-- | Evaluate the inner monad of a stream as 'StateT'.
--
-- >>> evalStateT s = fmap snd . Stream.runStateT s
--
{-# INLINE_NORMAL evalStateT #-}
evalStateT :: Monad m => m s -> Stream (StateT s m) a -> Stream m a
evalStateT :: m s -> Stream (StateT s m) a -> Stream m a
evalStateT m s
initial (Stream State StreamK (StateT s m) a -> s -> StateT s m (Step s a)
step s
state) = (State StreamK m a -> (s, m s) -> m (Step (s, m s) a))
-> (s, m s) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> (s, m s) -> m (Step (s, m s) a)
forall (m :: * -> *) (m :: * -> *) a.
Monad m =>
State StreamK m a -> (s, m s) -> m (Step (s, m s) a)
step' (s
state, m s
initial)
    where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> (s, m s) -> m (Step (s, m s) a)
step' State StreamK m a
gst (s
st, m s
action) = do
        s
sv <- m s
action
        (Step s a
r, !s
sv') <- StateT s m (Step s a) -> s -> m (Step s a, s)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
State.runStateT (State StreamK (StateT s m) a -> s -> StateT s m (Step s a)
step (State StreamK m a -> State StreamK (StateT s m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st) s
sv
        Step (s, m s) a -> m (Step (s, m s) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, m s) a -> m (Step (s, m s) a))
-> Step (s, m s) a -> m (Step (s, m s) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
x s
s -> a -> (s, m s) -> Step (s, m s) a
forall s a. a -> s -> Step s a
Yield a
x (s
s, s -> m s
forall (m :: * -> *) a. Monad m => a -> m a
return s
sv')
            Skip  s
s   -> (s, m s) -> Step (s, m s) a
forall s a. s -> Step s a
Skip (s
s, s -> m s
forall (m :: * -> *) a. Monad m => a -> m a
return s
sv')
            Step s a
Stop      -> Step (s, m s) a
forall s a. Step s a
Stop

-- | Evaluate the inner monad of a stream as 'StateT' and emit the resulting
-- state and value pair after each step.
--
{-# INLINE_NORMAL runStateT #-}
runStateT :: Monad m => m s -> Stream (StateT s m) a -> Stream m (s, a)
runStateT :: m s -> Stream (StateT s m) a -> Stream m (s, a)
runStateT m s
initial (Stream State StreamK (StateT s m) a -> s -> StateT s m (Step s a)
step s
state) = (State StreamK m (s, a) -> (s, m s) -> m (Step (s, m s) (s, a)))
-> (s, m s) -> Stream m (s, a)
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m (s, a) -> (s, m s) -> m (Step (s, m s) (s, a))
forall (m :: * -> *) (m :: * -> *) a.
Monad m =>
State StreamK m a -> (s, m s) -> m (Step (s, m s) (s, a))
step' (s
state, m s
initial)
    where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> (s, m s) -> m (Step (s, m s) (s, a))
step' State StreamK m a
gst (s
st, m s
action) = do
        s
sv <- m s
action
        (Step s a
r, !s
sv') <- StateT s m (Step s a) -> s -> m (Step s a, s)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
State.runStateT (State StreamK (StateT s m) a -> s -> StateT s m (Step s a)
step (State StreamK m a -> State StreamK (StateT s m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st) s
sv
        Step (s, m s) (s, a) -> m (Step (s, m s) (s, a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, m s) (s, a) -> m (Step (s, m s) (s, a)))
-> Step (s, m s) (s, a) -> m (Step (s, m s) (s, a))
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
x s
s -> (s, a) -> (s, m s) -> Step (s, m s) (s, a)
forall s a. a -> s -> Step s a
Yield (s
sv', a
x) (s
s, s -> m s
forall (m :: * -> *) a. Monad m => a -> m a
return s
sv')
            Skip  s
s   -> (s, m s) -> Step (s, m s) (s, a)
forall s a. s -> Step s a
Skip (s
s, s -> m s
forall (m :: * -> *) a. Monad m => a -> m a
return s
sv')
            Step s a
Stop      -> Step (s, m s) (s, a)
forall s a. Step s a
Stop

-- | Run a stateful (StateT) stream transformation using a given state.
--
-- >>> usingStateT s f = Stream.evalStateT s . f . Stream.liftInner
--
-- See also: 'scan'
--
{-# INLINE usingStateT #-}
usingStateT
    :: Monad m
    => m s
    -> (Stream (StateT s m) a -> Stream (StateT s m) a)
    -> Stream m a
    -> Stream m a
usingStateT :: m s
-> (Stream (StateT s m) a -> Stream (StateT s m) a)
-> Stream m a
-> Stream m a
usingStateT m s
s Stream (StateT s m) a -> Stream (StateT s m) a
f = m s -> Stream (StateT s m) a -> Stream m a
forall (m :: * -> *) s a.
Monad m =>
m s -> Stream (StateT s m) a -> Stream m a
evalStateT m s
s (Stream (StateT s m) a -> Stream m a)
-> (Stream m a -> Stream (StateT s m) a)
-> Stream m a
-> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream (StateT s m) a -> Stream (StateT s m) a
f (Stream (StateT s m) a -> Stream (StateT s m) a)
-> (Stream m a -> Stream (StateT s m) a)
-> Stream m a
-> Stream (StateT s m) a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Stream (StateT s m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
liftInner