module Streamly.Internal.Data.Stream.Concurrent.Channel
(
module Streamly.Internal.Data.Stream.Concurrent.Channel.Type
, module Streamly.Internal.Data.Stream.Concurrent.Channel.Operations
, module Streamly.Internal.Data.Stream.Concurrent.Channel.Append
, module Streamly.Internal.Data.Stream.Concurrent.Channel.Interleave
, module Streamly.Internal.Data.Stream.Concurrent.Channel.Dispatcher
, module Streamly.Internal.Data.Stream.Concurrent.Channel.Consumer
, Channel (..)
, newChannel
, withChannel
, withChannelK
, Config
, defaultConfig
, maxThreads
, maxBuffer
, Rate(..)
, rate
, avgRate
, minRate
, maxRate
, constRate
, StopWhen (..)
, stopWhen
, getStopWhen
, eager
, ordered
, interleaved
, inspect
)
where
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream (Stream)
import qualified Streamly.Internal.Data.StreamK as K
import Streamly.Internal.Data.Channel.Types
import Streamly.Internal.Data.Stream.Concurrent.Channel.Type
import Streamly.Internal.Data.Stream.Concurrent.Channel.Operations
import Streamly.Internal.Data.Stream.Concurrent.Channel.Append
import Streamly.Internal.Data.Stream.Concurrent.Channel.Interleave
import Streamly.Internal.Data.Stream.Concurrent.Channel.Dispatcher
import Streamly.Internal.Data.Stream.Concurrent.Channel.Consumer
{-# INLINE newChannel #-}
newChannel :: MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newChannel :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newChannel Config -> Config
modifier =
let cfg :: Config
cfg = Config -> Config
modifier Config
defaultConfig
in if Config -> Bool
getInterleaved Config
cfg
then (Config -> Config) -> m (Channel m a)
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newInterleaveChannel Config -> Config
modifier
else (Config -> Config) -> m (Channel m a)
forall (m :: * -> *) a.
MonadRunInIO m =>
(Config -> Config) -> m (Channel m a)
newAppendChannel Config -> Config
modifier
{-# INLINE withChannelK #-}
withChannelK :: MonadAsync m =>
(Config -> Config)
-> K.StreamK m a
-> (Channel m b -> K.StreamK m a -> K.StreamK m b)
-> K.StreamK m b
withChannelK :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
withChannelK Config -> Config
modifier StreamK m a
input Channel m b -> StreamK m a -> StreamK m b
evaluator = m (StreamK m b) -> StreamK m b
forall (m :: * -> *) a. Monad m => m (StreamK m a) -> StreamK m a
K.concatEffect m (StreamK m b)
action
where
action :: m (StreamK m b)
action = do
Channel m b
chan <- (Config -> Config) -> m (Channel m b)
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newChannel Config -> Config
modifier
Channel m b -> StreamK m b -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m b
chan (Channel m b -> StreamK m a -> StreamK m b
evaluator Channel m b
chan StreamK m a
input)
StreamK m b -> m (StreamK m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamK m b -> m (StreamK m b)) -> StreamK m b -> m (StreamK m b)
forall a b. (a -> b) -> a -> b
$ Channel m b -> StreamK m b
forall (m :: * -> *) a. MonadAsync m => Channel m a -> StreamK m a
fromChannelK Channel m b
chan
{-# INLINE withChannel #-}
withChannel :: MonadAsync m =>
(Config -> Config)
-> Stream m a
-> (Channel m b -> Stream m a -> Stream m b)
-> Stream m b
withChannel :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> Stream m a
-> (Channel m b -> Stream m a -> Stream m b)
-> Stream m b
withChannel Config -> Config
modifier Stream m a
input Channel m b -> Stream m a -> Stream m b
evaluator =
let f :: Channel m b -> StreamK m a -> StreamK m b
f Channel m b
chan StreamK m a
stream = Stream m b -> StreamK m b
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
K.fromStream (Stream m b -> StreamK m b) -> Stream m b -> StreamK m b
forall a b. (a -> b) -> a -> b
$ Channel m b -> Stream m a -> Stream m b
evaluator Channel m b
chan (StreamK m a -> Stream m a
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
K.toStream StreamK m a
stream)
in StreamK m b -> Stream m b
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
K.toStream (StreamK m b -> Stream m b) -> StreamK m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ (Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
withChannelK Config -> Config
modifier (Stream m a -> StreamK m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
K.fromStream Stream m a
input) Channel m b -> StreamK m a -> StreamK m b
f