-- |
-- Module      : Streamly.Internal.Data.Stream.Concurrent.Channel
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Stream.Concurrent.Channel
    (
      module Streamly.Internal.Data.Stream.Concurrent.Channel.Type
    , module Streamly.Internal.Data.Stream.Concurrent.Channel.Operations
    , module Streamly.Internal.Data.Stream.Concurrent.Channel.Append
    , module Streamly.Internal.Data.Stream.Concurrent.Channel.Interleave
    , module Streamly.Internal.Data.Stream.Concurrent.Channel.Dispatcher
    , module Streamly.Internal.Data.Stream.Concurrent.Channel.Consumer

    -- * Channel
    , Channel (..)
    , newChannel
    , withChannel
    , withChannelK
    -- quiesceChannel -- wait for running tasks but do not schedule any more.

    -- * Configuration
    , Config
    , defaultConfig

    -- ** Limits
    , maxThreads
    , maxBuffer

    -- ** Rate Control
    , Rate(..)
    , rate
    , avgRate
    , minRate
    , maxRate
    , constRate

    -- ** Stop behavior
    , StopWhen (..)
    , stopWhen
    , getStopWhen

    -- ** Scheduling behavior
    , eager
    , ordered
    , interleaved

    -- ** Diagnostics
    , inspect
    )
where

import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream (Stream)

import qualified Streamly.Internal.Data.StreamK as K

import Streamly.Internal.Data.Channel.Types

import Streamly.Internal.Data.Stream.Concurrent.Channel.Type
import Streamly.Internal.Data.Stream.Concurrent.Channel.Operations
import Streamly.Internal.Data.Stream.Concurrent.Channel.Append
import Streamly.Internal.Data.Stream.Concurrent.Channel.Interleave
import Streamly.Internal.Data.Stream.Concurrent.Channel.Dispatcher
import Streamly.Internal.Data.Stream.Concurrent.Channel.Consumer

-- | Create a new concurrent stream evaluation channel. The monad
-- state used to run the stream actions is captured from the call site of
-- newChannel.
{-# INLINE newChannel #-}
newChannel :: MonadAsync m =>
    (Config -> Config) -> m (Channel m a)
newChannel :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newChannel Config -> Config
modifier =
    let cfg :: Config
cfg = Config -> Config
modifier Config
defaultConfig
     in if Config -> Bool
getInterleaved Config
cfg
        then (Config -> Config) -> m (Channel m a)
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newInterleaveChannel Config -> Config
modifier
        else (Config -> Config) -> m (Channel m a)
forall (m :: * -> *) a.
MonadRunInIO m =>
(Config -> Config) -> m (Channel m a)
newAppendChannel Config -> Config
modifier

-- | Allocate a channel and evaluate the stream using the channel and the
-- supplied evaluator function. The evaluator is run in a worker thread.
{-# INLINE withChannelK #-}
withChannelK :: MonadAsync m =>
       (Config -> Config)
    -> K.StreamK m a
    -> (Channel m b -> K.StreamK m a -> K.StreamK m b)
    -> K.StreamK m b
withChannelK :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
withChannelK Config -> Config
modifier StreamK m a
input Channel m b -> StreamK m a -> StreamK m b
evaluator = m (StreamK m b) -> StreamK m b
forall (m :: * -> *) a. Monad m => m (StreamK m a) -> StreamK m a
K.concatEffect m (StreamK m b)
action

    where

    action :: m (StreamK m b)
action = do
        Channel m b
chan <- (Config -> Config) -> m (Channel m b)
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newChannel Config -> Config
modifier
        Channel m b -> StreamK m b -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m b
chan (Channel m b -> StreamK m a -> StreamK m b
evaluator Channel m b
chan StreamK m a
input)
        StreamK m b -> m (StreamK m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamK m b -> m (StreamK m b)) -> StreamK m b -> m (StreamK m b)
forall a b. (a -> b) -> a -> b
$ Channel m b -> StreamK m b
forall (m :: * -> *) a. MonadAsync m => Channel m a -> StreamK m a
fromChannelK Channel m b
chan

-- | Allocate a channel and evaluate the stream using the channel and the
-- supplied evaluator function. The evaluator is run in a worker thread.
{-# INLINE withChannel #-}
withChannel :: MonadAsync m =>
       (Config -> Config)
    -> Stream m a
    -> (Channel m b -> Stream m a -> Stream m b)
    -> Stream m b
withChannel :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> Stream m a
-> (Channel m b -> Stream m a -> Stream m b)
-> Stream m b
withChannel Config -> Config
modifier Stream m a
input Channel m b -> Stream m a -> Stream m b
evaluator =
    let f :: Channel m b -> StreamK m a -> StreamK m b
f Channel m b
chan StreamK m a
stream = Stream m b -> StreamK m b
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
K.fromStream (Stream m b -> StreamK m b) -> Stream m b -> StreamK m b
forall a b. (a -> b) -> a -> b
$ Channel m b -> Stream m a -> Stream m b
evaluator Channel m b
chan (StreamK m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
K.toStream StreamK m a
stream)
     in StreamK m b -> Stream m b
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
K.toStream (StreamK m b -> Stream m b) -> StreamK m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ (Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
withChannelK Config -> Config
modifier (Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
K.fromStream Stream m a
input) Channel m b -> StreamK m a -> StreamK m b
f