-- |
-- Module      : Streamly.Internal.Data.Stream.Concurrent.Channel
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Stream.Concurrent.Channel
    (
    -- * Channel
      Channel (..)
    , newChannel
    , withChannel
    , withChannelK
    , fromChannel
    , toChannel
    , toChannelK
    , stopChannel
    -- quiesceChannel -- wait for running tasks but do not schedule any more.

    -- * Configuration
    , Config
    , defaultConfig

    -- ** Limits
    , maxThreads
    , maxBuffer

    -- ** Rate Control
    , Rate(..)
    , rate
    , avgRate
    , minRate
    , maxRate
    , constRate

    -- ** Stop behavior
    , StopWhen (..)
    , stopWhen
    , getStopWhen

    -- ** Scheduling behavior
    , eager
    , ordered
    , interleaved

    -- ** Diagnostics
    , inspect
    )
where

import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream.StreamD (Stream)
import Streamly.Internal.Data.Stream.Concurrent.Channel.Operations
    (fromChannel, fromChannelK, toChannel, toChannelK)

import qualified Streamly.Internal.Data.Stream.Concurrent.Channel.Append
    as Append
import qualified Streamly.Internal.Data.Stream.Concurrent.Channel.Interleave
    as Interleave
import qualified Streamly.Internal.Data.Stream.StreamK as K

import Streamly.Internal.Data.Stream.Concurrent.Channel.Type
import Streamly.Internal.Data.Stream.Channel.Types

-- | Create a new concurrent stream evaluation channel. The monad
-- state used to run the stream actions is captured from the call site of
-- newChannel.
{-# INLINE newChannel #-}
newChannel :: MonadAsync m =>
    (Config -> Config) -> m (Channel m a)
newChannel :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newChannel Config -> Config
modifier =
    let cfg :: Config
cfg = Config -> Config
modifier Config
defaultConfig
     in if Config -> Bool
getInterleaved Config
cfg
        then forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
Interleave.newChannel Config -> Config
modifier
        else forall (m :: * -> *) a.
MonadRunInIO m =>
(Config -> Config) -> m (Channel m a)
Append.newChannel Config -> Config
modifier

-- | Allocate a channel and evaluate the stream using the channel and the
-- supplied evaluator function. The evaluator is run in a worker thread.
{-# INLINE withChannelK #-}
withChannelK :: MonadAsync m =>
       (Config -> Config)
    -> K.StreamK m a
    -> (Channel m b -> K.StreamK m a -> K.StreamK m b)
    -> K.StreamK m b
withChannelK :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
withChannelK Config -> Config
modifier StreamK m a
input Channel m b -> StreamK m a -> StreamK m b
evaluator = forall (m :: * -> *) a. Monad m => m (StreamK m a) -> StreamK m a
K.concatEffect m (StreamK m b)
action

    where

    action :: m (StreamK m b)
action = do
        Channel m b
chan <- forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newChannel Config -> Config
modifier
        forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m b
chan (Channel m b -> StreamK m a -> StreamK m b
evaluator Channel m b
chan StreamK m a
input)
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadAsync m => Channel m a -> StreamK m a
fromChannelK Channel m b
chan

-- | Allocate a channel and evaluate the stream using the channel and the
-- supplied evaluator function. The evaluator is run in a worker thread.
{-# INLINE withChannel #-}
withChannel :: MonadAsync m =>
       (Config -> Config)
    -> Stream m a
    -> (Channel m b -> Stream m a -> Stream m b)
    -> Stream m b
withChannel :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> Stream m a
-> (Channel m b -> Stream m a -> Stream m b)
-> Stream m b
withChannel Config -> Config
modifier Stream m a
input Channel m b -> Stream m a -> Stream m b
evaluator =
    let f :: Channel m b -> StreamK m a -> StreamK m b
f Channel m b
chan StreamK m a
stream = forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
K.fromStream forall a b. (a -> b) -> a -> b
$ Channel m b -> Stream m a -> Stream m b
evaluator Channel m b
chan (forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
K.toStream StreamK m a
stream)
     in forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
K.toStream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
withChannelK Config -> Config
modifier (forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
K.fromStream Stream m a
input) Channel m b -> StreamK m a -> StreamK m b
f