module Streamly.Internal.Data.Stream.Concurrent.Channel
(
Channel (..)
, newChannel
, withChannel
, withChannelK
, fromChannel
, toChannel
, toChannelK
, stopChannel
, Config
, defaultConfig
, maxThreads
, maxBuffer
, Rate(..)
, rate
, avgRate
, minRate
, maxRate
, constRate
, StopWhen (..)
, stopWhen
, getStopWhen
, eager
, ordered
, interleaved
, inspect
)
where
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream.StreamD (Stream)
import Streamly.Internal.Data.Stream.Concurrent.Channel.Operations
(fromChannel, fromChannelK, toChannel, toChannelK)
import qualified Streamly.Internal.Data.Stream.Concurrent.Channel.Append
as Append
import qualified Streamly.Internal.Data.Stream.Concurrent.Channel.Interleave
as Interleave
import qualified Streamly.Internal.Data.Stream.StreamK as K
import Streamly.Internal.Data.Stream.Concurrent.Channel.Type
import Streamly.Internal.Data.Stream.Channel.Types
{-# INLINE newChannel #-}
newChannel :: MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newChannel :: forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newChannel Config -> Config
modifier =
let cfg :: Config
cfg = Config -> Config
modifier Config
defaultConfig
in if Config -> Bool
getInterleaved Config
cfg
then forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
Interleave.newChannel Config -> Config
modifier
else forall (m :: * -> *) a.
MonadRunInIO m =>
(Config -> Config) -> m (Channel m a)
Append.newChannel Config -> Config
modifier
{-# INLINE withChannelK #-}
withChannelK :: MonadAsync m =>
(Config -> Config)
-> K.StreamK m a
-> (Channel m b -> K.StreamK m a -> K.StreamK m b)
-> K.StreamK m b
withChannelK :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
withChannelK Config -> Config
modifier StreamK m a
input Channel m b -> StreamK m a -> StreamK m b
evaluator = forall (m :: * -> *) a. Monad m => m (StreamK m a) -> StreamK m a
K.concatEffect m (StreamK m b)
action
where
action :: m (StreamK m b)
action = do
Channel m b
chan <- forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> m (Channel m a)
newChannel Config -> Config
modifier
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m b
chan (Channel m b -> StreamK m a -> StreamK m b
evaluator Channel m b
chan StreamK m a
input)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadAsync m => Channel m a -> StreamK m a
fromChannelK Channel m b
chan
{-# INLINE withChannel #-}
withChannel :: MonadAsync m =>
(Config -> Config)
-> Stream m a
-> (Channel m b -> Stream m a -> Stream m b)
-> Stream m b
withChannel :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> Stream m a
-> (Channel m b -> Stream m a -> Stream m b)
-> Stream m b
withChannel Config -> Config
modifier Stream m a
input Channel m b -> Stream m a -> Stream m b
evaluator =
let f :: Channel m b -> StreamK m a -> StreamK m b
f Channel m b
chan StreamK m a
stream = forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
K.fromStream forall a b. (a -> b) -> a -> b
$ Channel m b -> Stream m a -> Stream m b
evaluator Channel m b
chan (forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
K.toStream StreamK m a
stream)
in forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
K.toStream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config)
-> StreamK m a
-> (Channel m b -> StreamK m a -> StreamK m b)
-> StreamK m b
withChannelK Config -> Config
modifier (forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
K.fromStream Stream m a
input) Channel m b -> StreamK m a -> StreamK m b
f