{-# LANGUAGE CPP #-}
-- |
-- Module      : Streamly.Internal.Data.Stream.StreamD.Lift
-- Copyright   : (c) 2018 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Transform the underlying monad of a stream.

module Streamly.Internal.Data.Stream.StreamD.Lift
    (
    -- * Generalize Inner Monad
      morphInner
    , generalizeInner

    -- * Transform Inner Monad
    , liftInnerWith
    , runInnerWith
    , runInnerWithState
    )
where

#include "inline.hs"

import Data.Functor.Identity (Identity(..))
import Streamly.Internal.Data.SVar.Type (adaptState)

import Streamly.Internal.Data.Stream.StreamD.Type

#include "DocTestDataStream.hs"

-------------------------------------------------------------------------------
-- Generalize Inner Monad
-------------------------------------------------------------------------------

-- | Transform the inner monad of a stream using a natural transformation.
--
-- Example, generalize the inner monad from Identity to any other:
--
-- >>> generalizeInner = Stream.morphInner (return . runIdentity)
--
-- Also known as hoist.
--
{-# INLINE_NORMAL morphInner #-}
morphInner :: Monad n => (forall x. m x -> n x) -> Stream m a -> Stream n a
morphInner :: (forall x. m x -> n x) -> Stream m a -> Stream n a
morphInner forall x. m x -> n x
f (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = (State StreamK n a -> s -> n (Step s a)) -> s -> Stream n a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK n a -> s -> n (Step s a)
forall (m :: * -> *) a. State StreamK m a -> s -> n (Step s a)
step' s
state
    where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> s -> n (Step s a)
step' State StreamK m a
gst s
st = do
        Step s a
r <- m (Step s a) -> n (Step s a)
forall x. m x -> n x
f (m (Step s a) -> n (Step s a)) -> m (Step s a) -> n (Step s a)
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> s -> m (Step s a)
step (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        Step s a -> n (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a -> n (Step s a)) -> Step s a -> n (Step s a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
x s
s -> a -> s -> Step s a
forall s a. a -> s -> Step s a
Yield a
x s
s
            Skip  s
s   -> s -> Step s a
forall s a. s -> Step s a
Skip s
s
            Step s a
Stop      -> Step s a
forall s a. Step s a
Stop

-- | Generalize the inner monad of the stream from 'Identity' to any monad.
--
-- Definition:
--
-- >>> generalizeInner = Stream.morphInner (return . runIdentity)
--
{-# INLINE generalizeInner #-}
generalizeInner :: Monad m => Stream Identity a -> Stream m a
generalizeInner :: Stream Identity a -> Stream m a
generalizeInner = (forall x. Identity x -> m x) -> Stream Identity a -> Stream m a
forall (n :: * -> *) (m :: * -> *) a.
Monad n =>
(forall x. m x -> n x) -> Stream m a -> Stream n a
morphInner (x -> m x
forall (m :: * -> *) a. Monad m => a -> m a
return (x -> m x) -> (Identity x -> x) -> Identity x -> m x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Identity x -> x
forall a. Identity a -> a
runIdentity)

-------------------------------------------------------------------------------
-- Transform Inner Monad
-------------------------------------------------------------------------------

-- | Lift the inner monad @m@ of a stream @Stream m a@ to @t m@ using the
-- supplied lift function.
--
{-# INLINE_NORMAL liftInnerWith #-}
liftInnerWith :: (Monad (t m)) =>
    (forall b. m b -> t m b) -> Stream m a -> Stream (t m) a
liftInnerWith :: (forall b. m b -> t m b) -> Stream m a -> Stream (t m) a
liftInnerWith forall b. m b -> t m b
lift (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = (State StreamK (t m) a -> s -> t m (Step s a))
-> s -> Stream (t m) a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK (t m) a -> s -> t m (Step s a)
forall (m :: * -> *) a. State StreamK m a -> s -> t m (Step s a)
step1 s
state

    where

    {-# INLINE_LATE step1 #-}
    step1 :: State StreamK m a -> s -> t m (Step s a)
step1 State StreamK m a
gst s
st = do
        Step s a
r <- m (Step s a) -> t m (Step s a)
forall b. m b -> t m b
lift (m (Step s a) -> t m (Step s a)) -> m (Step s a) -> t m (Step s a)
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> s -> m (Step s a)
step (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        Step s a -> t m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a -> t m (Step s a)) -> Step s a -> t m (Step s a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
x s
s -> a -> s -> Step s a
forall s a. a -> s -> Step s a
Yield a
x s
s
            Skip s
s    -> s -> Step s a
forall s a. s -> Step s a
Skip s
s
            Step s a
Stop      -> Step s a
forall s a. Step s a
Stop

-- | Evaluate the inner monad of a stream using the supplied runner function.
--
{-# INLINE_NORMAL runInnerWith #-}
runInnerWith :: Monad m =>
    (forall b. t m b -> m b) -> Stream (t m) a -> Stream m a
runInnerWith :: (forall b. t m b -> m b) -> Stream (t m) a -> Stream m a
runInnerWith forall b. t m b -> m b
run (Stream State StreamK (t m) a -> s -> t m (Step s a)
step s
state) = (State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
forall (m :: * -> *) a. State StreamK m a -> s -> m (Step s a)
step1 s
state

    where

    {-# INLINE_LATE step1 #-}
    step1 :: State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st = do
        Step s a
r <- t m (Step s a) -> m (Step s a)
forall b. t m b -> m b
run (t m (Step s a) -> m (Step s a)) -> t m (Step s a) -> m (Step s a)
forall a b. (a -> b) -> a -> b
$ State StreamK (t m) a -> s -> t m (Step s a)
step (State StreamK m a -> State StreamK (t m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        Step s a -> m (Step s a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a -> m (Step s a)) -> Step s a -> m (Step s a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
x s
s -> a -> s -> Step s a
forall s a. a -> s -> Step s a
Yield a
x s
s
            Skip s
s -> s -> Step s a
forall s a. s -> Step s a
Skip s
s
            Step s a
Stop -> Step s a
forall s a. Step s a
Stop

-- | Evaluate the inner monad of a stream using the supplied stateful runner
-- function and the initial state. The state returned by an invocation of the
-- runner is supplied as input state to the next invocation.
--
{-# INLINE_NORMAL runInnerWithState #-}
runInnerWithState :: Monad m =>
    (forall b. s -> t m b -> m (b, s))
    -> m s
    -> Stream (t m) a
    -> Stream m (s, a)
runInnerWithState :: (forall b. s -> t m b -> m (b, s))
-> m s -> Stream (t m) a -> Stream m (s, a)
runInnerWithState forall b. s -> t m b -> m (b, s)
run m s
initial (Stream State StreamK (t m) a -> s -> t m (Step s a)
step s
state) =
    (State StreamK m (s, a) -> (s, m s) -> m (Step (s, m s) (s, a)))
-> (s, m s) -> Stream m (s, a)
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m (s, a) -> (s, m s) -> m (Step (s, m s) (s, a))
forall (m :: * -> *) (m :: * -> *) a.
Monad m =>
State StreamK m a -> (s, m s) -> m (Step (s, m s) (s, a))
step1 (s
state, m s
initial)

    where

    {-# INLINE_LATE step1 #-}
    step1 :: State StreamK m a -> (s, m s) -> m (Step (s, m s) (s, a))
step1 State StreamK m a
gst (s
st, m s
action) = do
        s
sv <- m s
action
        (Step s a
r, !s
sv1) <- s -> t m (Step s a) -> m (Step s a, s)
forall b. s -> t m b -> m (b, s)
run s
sv (State StreamK (t m) a -> s -> t m (Step s a)
step (State StreamK m a -> State StreamK (t m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st)
        Step (s, m s) (s, a) -> m (Step (s, m s) (s, a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, m s) (s, a) -> m (Step (s, m s) (s, a)))
-> Step (s, m s) (s, a) -> m (Step (s, m s) (s, a))
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
x s
s -> (s, a) -> (s, m s) -> Step (s, m s) (s, a)
forall s a. a -> s -> Step s a
Yield (s
sv1, a
x) (s
s, s -> m s
forall (m :: * -> *) a. Monad m => a -> m a
return s
sv1)
            Skip s
s -> (s, m s) -> Step (s, m s) (s, a)
forall s a. s -> Step s a
Skip (s
s, s -> m s
forall (m :: * -> *) a. Monad m => a -> m a
return s
sv1)
            Step s a
Stop -> Step (s, m s) (s, a)
forall s a. Step s a
Stop