-- |
-- Module      : Streamly.Internal.Data.Producer
-- Copyright   : (c) 2021 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- A 'Producer' is an 'Unfold' with an 'extract' function added to extract
-- the state. It is more powerful but less general than an Unfold.
--
-- A 'Producer' represents steps of a loop generating a sequence of elements.
-- While unfolds are closed representation of imperative loops with some opaque
-- internal state, producers are open loops with the state being accessible to
-- the user.
--
-- Unlike an unfold, which runs a loop till completion, a producer can be
-- stopped in the middle, its state can be extracted, examined, changed, and
-- then it can be resumed later from the stopped state.
--
-- A producer can be used in places where a CPS stream would otherwise be
-- needed, because the state of the loop can be passed around. However, it can
-- be much more efficient than CPS because it allows stream fusion and
-- unecessary function calls can be avoided.

module Streamly.Internal.Data.Producer
    (
      module Streamly.Internal.Data.Producer.Source
    , module Streamly.Internal.Data.Producer.Type

    -- * Converting
    , simplify
    , fromStreamD
    )
where

#include "inline.hs"

import Streamly.Internal.Data.Stream.Step (Step(..))
import Streamly.Internal.Data.Stream.Type (Stream(..))
import Streamly.Internal.Data.SVar.Type (defState)
import Streamly.Internal.Data.Unfold.Type (Unfold(..))

import Streamly.Internal.Data.Producer.Source
import Streamly.Internal.Data.Producer.Type
import Prelude hiding (concat)

-- XXX We should write unfolds as producers where possible and define
-- unfolds using "simplify".
--
-------------------------------------------------------------------------------
-- Converting to unfolds
-------------------------------------------------------------------------------

-- | Simplify a producer to an unfold.
--
-- /Pre-release/
{-# INLINE simplify #-}
simplify :: Producer m a b -> Unfold m a b
simplify :: forall (m :: * -> *) a b. Producer m a b -> Unfold m a b
simplify (Producer s -> m (Step s b)
step a -> m s
inject s -> m a
_) = (s -> m (Step s b)) -> (a -> m s) -> Unfold m a b
forall (m :: * -> *) a b s.
(s -> m (Step s b)) -> (a -> m s) -> Unfold m a b
Unfold s -> m (Step s b)
step a -> m s
inject

-------------------------------------------------------------------------------
-- Unfolds
-------------------------------------------------------------------------------

-- | Convert a StreamD stream into a producer.
--
-- /Pre-release/
{-# INLINE_NORMAL fromStreamD #-}
fromStreamD :: Monad m => Producer m (Stream m a) a
fromStreamD :: forall (m :: * -> *) a. Monad m => Producer m (Stream m a) a
fromStreamD = (Stream m a -> m (Step (Stream m a) a))
-> (Stream m a -> m (Stream m a))
-> (Stream m a -> m (Stream m a))
-> Producer m (Stream m a) a
forall (m :: * -> *) a b s.
(s -> m (Step s b)) -> (a -> m s) -> (s -> m a) -> Producer m a b
Producer Stream m a -> m (Step (Stream m a) a)
forall {m :: * -> *} {a}.
Monad m =>
Stream m a -> m (Step (Stream m a) a)
step Stream m a -> m (Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return Stream m a -> m (Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return

    where

    {-# INLINE_LATE step #-}
    step :: Stream m a -> m (Step (Stream m a) a)
step (UnStream State StreamK m a -> s -> m (Step s a)
step1 s
state1) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
state1
        Step (Stream m a) a -> m (Step (Stream m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Stream m a) a -> m (Step (Stream m a) a))
-> Step (Stream m a) a -> m (Step (Stream m a) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
x s
s -> a -> Stream m a -> Step (Stream m a) a
forall s a. a -> s -> Step s a
Yield a
x ((State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step1 s
s)
            Skip s
s    -> Stream m a -> Step (Stream m a) a
forall s a. s -> Step s a
Skip ((State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step1 s
s)
            Step s a
Stop      -> Step (Stream m a) a
forall s a. Step s a
Stop