-- |
-- Module      : Streamly.Internal.Data.Stream.IsStream.Lift
-- Copyright   : (c) 2019 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Stream.IsStream.Lift
    (
    -- * Generalize Inner Monad
      hoist
    , generally

    -- * Transform Inner Monad
    , liftInner
    , usingReaderT
    , runReaderT
    , evalStateT
    , usingStateT
    , runStateT
    )
where

#include "inline.hs"

import Control.Monad.Trans.Reader (ReaderT)
import Control.Monad.Trans.State.Strict (StateT)
import Control.Monad.Trans.Class (MonadTrans(..))
import Data.Functor.Identity (Identity (..))
import Streamly.Internal.Data.Stream.Prelude (fromStreamS, toStreamS)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Stream.StreamD (fromStreamD, toStreamD)
import Streamly.Internal.Data.Stream.StreamK (IsStream)

import qualified Streamly.Internal.Data.Stream.StreamD as D
#ifdef USE_STREAMK_ONLY
import qualified Streamly.Internal.Data.Stream.StreamK as S
#else
import qualified Streamly.Internal.Data.Stream.StreamD as S
#endif

------------------------------------------------------------------------------
-- Generalize the underlying monad
------------------------------------------------------------------------------

-- | Transform the inner monad of a stream using a natural transformation.
--
-- / Internal/
--
{-# INLINE hoist #-}
hoist :: (Monad m, Monad n)
    => (forall x. m x -> n x) -> SerialT m a -> SerialT n a
hoist :: (forall x. m x -> n x) -> SerialT m a -> SerialT n a
hoist forall x. m x -> n x
f SerialT m a
xs = Stream n a -> SerialT n a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream n a -> SerialT n a) -> Stream n a -> SerialT n a
forall a b. (a -> b) -> a -> b
$ (forall x. m x -> n x) -> Stream m a -> Stream n a
forall (n :: * -> *) (m :: * -> *) a.
Monad n =>
(forall x. m x -> n x) -> Stream m a -> Stream n a
S.hoist forall x. m x -> n x
f (SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS SerialT m a
xs)

-- | Generalize the inner monad of the stream from 'Identity' to any monad.
--
-- / Internal/
--
{-# INLINE generally #-}
generally :: (IsStream t, Monad m) => t Identity a -> t m a
generally :: t Identity a -> t m a
generally t Identity a
xs = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (forall x. Identity x -> m x) -> Stream Identity a -> Stream m a
forall (n :: * -> *) (m :: * -> *) a.
Monad n =>
(forall x. m x -> n x) -> Stream m a -> Stream n a
S.hoist (x -> m x
forall (m :: * -> *) a. Monad m => a -> m a
return (x -> m x) -> (Identity x -> x) -> Identity x -> m x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identity x -> x
forall a. Identity a -> a
runIdentity) (t Identity a -> Stream Identity a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t Identity a
xs)

------------------------------------------------------------------------------
-- Add and remove a monad transformer
------------------------------------------------------------------------------

-- | Lift the inner monad @m@ of a stream @t m a@ to @tr m@ using the monad
-- transformer @tr@.
--
-- @since 0.8.0
--
{-# INLINE liftInner #-}
liftInner :: (Monad m, IsStream t, MonadTrans tr, Monad (tr m))
    => t m a -> t (tr m) a
liftInner :: t m a -> t (tr m) a
liftInner t m a
xs = Stream (tr m) a -> t (tr m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream (tr m) a -> t (tr m) a) -> Stream (tr m) a -> t (tr m) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream (tr m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
D.liftInner (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs)

------------------------------------------------------------------------------
-- Sharing read only state in a stream
------------------------------------------------------------------------------

-- | Evaluate the inner monad of a stream as 'ReaderT'.
--
-- @since 0.8.0
--
{-# INLINE runReaderT #-}
runReaderT :: (IsStream t, Monad m) => m s -> t (ReaderT s m) a -> t m a
runReaderT :: m s -> t (ReaderT s m) a -> t m a
runReaderT m s
s t (ReaderT s m) a
xs = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ m s -> Stream (ReaderT s m) a -> Stream m a
forall (m :: * -> *) s a.
Monad m =>
m s -> Stream (ReaderT s m) a -> Stream m a
D.runReaderT m s
s (t (ReaderT s m) a -> Stream (ReaderT s m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t (ReaderT s m) a
xs)

-- | Run a stream transformation using a given environment.
--
-- See also: 'Serial.map'
--
-- / Internal/
--
{-# INLINE usingReaderT #-}
usingReaderT
    :: (Monad m, IsStream t)
    => m r
    -> (t (ReaderT r m) a -> t (ReaderT r m) a)
    -> t m a
    -> t m a
usingReaderT :: m r -> (t (ReaderT r m) a -> t (ReaderT r m) a) -> t m a -> t m a
usingReaderT m r
r t (ReaderT r m) a -> t (ReaderT r m) a
f t m a
xs = m r -> t (ReaderT r m) a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) s a.
(IsStream t, Monad m) =>
m s -> t (ReaderT s m) a -> t m a
runReaderT m r
r (t (ReaderT r m) a -> t m a) -> t (ReaderT r m) a -> t m a
forall a b. (a -> b) -> a -> b
$ t (ReaderT r m) a -> t (ReaderT r m) a
f (t (ReaderT r m) a -> t (ReaderT r m) a)
-> t (ReaderT r m) a -> t (ReaderT r m) a
forall a b. (a -> b) -> a -> b
$ t m a -> t (ReaderT r m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *)
       (tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
liftInner t m a
xs

------------------------------------------------------------------------------
-- Sharing read write state in a stream
------------------------------------------------------------------------------

-- | Evaluate the inner monad of a stream as 'StateT'.
--
-- This is supported only for 'SerialT' as concurrent state updation may not be
-- safe.
--
-- > evalStateT s = Stream.map snd . Stream.runStateT s
--
-- / Internal/
--
{-# INLINE evalStateT #-}
evalStateT ::  Monad m => m s -> SerialT (StateT s m) a -> SerialT m a
-- evalStateT s = fmap snd . runStateT s
evalStateT :: m s -> SerialT (StateT s m) a -> SerialT m a
evalStateT m s
s SerialT (StateT s m) a
xs = Stream m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> SerialT m a) -> Stream m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ m s -> Stream (StateT s m) a -> Stream m a
forall (m :: * -> *) s a.
Monad m =>
m s -> Stream (StateT s m) a -> Stream m a
D.evalStateT m s
s (SerialT (StateT s m) a -> Stream (StateT s m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD SerialT (StateT s m) a
xs)

-- | Run a stateful (StateT) stream transformation using a given state.
--
-- This is supported only for 'SerialT' as concurrent state updation may not be
-- safe.
--
-- > usingStateT s f = evalStateT s . f . liftInner
--
-- See also: 'scanl''
--
-- / Internal/
--
{-# INLINE usingStateT #-}
usingStateT
    :: Monad m
    => m s
    -> (SerialT (StateT s m) a -> SerialT (StateT s m) a)
    -> SerialT m a
    -> SerialT m a
usingStateT :: m s
-> (SerialT (StateT s m) a -> SerialT (StateT s m) a)
-> SerialT m a
-> SerialT m a
usingStateT m s
s SerialT (StateT s m) a -> SerialT (StateT s m) a
f = m s -> SerialT (StateT s m) a -> SerialT m a
forall (m :: * -> *) s a.
Monad m =>
m s -> SerialT (StateT s m) a -> SerialT m a
evalStateT m s
s (SerialT (StateT s m) a -> SerialT m a)
-> (SerialT m a -> SerialT (StateT s m) a)
-> SerialT m a
-> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT (StateT s m) a -> SerialT (StateT s m) a
f (SerialT (StateT s m) a -> SerialT (StateT s m) a)
-> (SerialT m a -> SerialT (StateT s m) a)
-> SerialT m a
-> SerialT (StateT s m) a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m a -> SerialT (StateT s m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *)
       (tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
liftInner

-- | Evaluate the inner monad of a stream as 'StateT' and emit the resulting
-- state and value pair after each step.
--
-- This is supported only for 'SerialT' as concurrent state updation may not be
-- safe.
--
-- @since 0.8.0
--
{-# INLINE runStateT #-}
runStateT :: Monad m => m s -> SerialT (StateT s m) a -> SerialT m (s, a)
runStateT :: m s -> SerialT (StateT s m) a -> SerialT m (s, a)
runStateT m s
s SerialT (StateT s m) a
xs = Stream m (s, a) -> SerialT m (s, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m (s, a) -> SerialT m (s, a))
-> Stream m (s, a) -> SerialT m (s, a)
forall a b. (a -> b) -> a -> b
$ m s -> Stream (StateT s m) a -> Stream m (s, a)
forall (m :: * -> *) s a.
Monad m =>
m s -> Stream (StateT s m) a -> Stream m (s, a)
D.runStateT m s
s (SerialT (StateT s m) a -> Stream (StateT s m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD SerialT (StateT s m) a
xs)