streamly-0.6.0: Beautiful Streaming, Concurrent and Reactive Composition

Copyright(c) 2017 Harendra Kumar
LicenseBSD3
Maintainerharendra.kumar@gmail.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly

Contents

Description

The way a list represents a sequence of pure values, a stream represents a sequence of monadic actions. The monadic stream API offered by Streamly is very close to the Haskell Prelude pure lists' API, it can be considered as a natural extension of lists to monadic actions. Streamly streams provide concurrent composition and merging of streams. It can be considered as a concurrent list transformer. In contrast to the Prelude lists, merging or appending streams of arbitrary length is scalable and inexpensive.

The basic stream type is Serial, it represents a sequence of IO actions, and is a Monad. The type SerialT is a monad transformer that can represent a sequence of actions in an arbitrary monad. The type Serial is in fact a synonym for SerialT IO. There are a few more types similar to SerialT, all of them represent a stream and differ only in the Semigroup, Applicative and Monad compositions of the stream. Serial and WSerial types compose serially whereas Async and WAsync types compose concurrently. All these types can be freely inter-converted using type combinators without any cost. You can freely switch to any type of composition at any point in the program. When no type annotation or explicit stream type combinators are used, the default stream type is inferred as Serial.

Here is a simple console echo program example:

> runStream $ S.repeatM getLine & S.mapM putStrLn

For more details please see the Streamly.Tutorial module and the examples directory in this package.

This module exports stream types, instances and some basic operations. Functionality exported by this module include:

  • Semigroup append (<>) instances as well as explicit operations for merging streams
  • Monad and Applicative instances for looping over streams
  • Zip Applicatives for zipping streams
  • Stream type combinators to convert between different composition styles
  • Some basic utilities to run and fold streams

See the Streamly.Prelude module for comprehensive APIs for construction, generation, elimination and transformation of streams.

This module is designed to be imported unqualified:

import Streamly
Synopsis

Documentation

type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) Source #

A monad that can perform concurrent or parallel IO operations. Streams that can be composed concurrently require the underlying monad to be MonadAsync.

Since: 0.1.0

Stream transformers

Serial Streams

Serial streams compose serially or non-concurrently. In a composed stream, each action is executed only after the previous action has finished. The two serial stream types SerialT and WSerialT differ in how they traverse the streams in a Semigroup or Monad composition.

data SerialT m a Source #

Deep serial composition or serial composition with depth first traversal. The Semigroup instance of SerialT appends two streams serially in a depth first manner, yielding all elements from the first stream, and then all elements from the second stream.

import Streamly
import qualified Streamly.Prelude as S

main = (toList . serially $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,2,3,4]

The Monad instance runs the monadic continuation for each element of the stream, serially.

main = runStream . serially $ do
    x <- return 1 <> return 2
    S.yieldM $ print x
1
2

SerialT nests streams serially in a depth first manner.

main = runStream . serially $ do
    x <- return 1 <> return 2
    y <- return 3 <> return 4
    S.yieldM $ print (x, y)
(1,3)
(1,4)
(2,3)
(2,4)

This behavior of SerialT is exactly like a list transformer. We call the monadic code being run for each element of the stream a monadic continuation. In imperative paradigm we can think of this composition as nested for loops and the monadic continuation is the body of the loop. The loop iterates for all elements of the stream.

The serially combinator can be omitted as the default stream type is SerialT. Note that serial composition with depth first traversal can be used to combine an infinite number of streams as it explores only one stream at a time.

Since: 0.2.0

Instances
MonadTrans SerialT Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

lift :: Monad m => m a -> SerialT m a #

IsStream SerialT Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

toStream :: SerialT m a -> Stream m a

fromStream :: Stream m a -> SerialT m a

consM :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

(|:) :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

(MonadBase b m, Monad m) => MonadBase b (SerialT m) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

liftBase :: b α -> SerialT m α #

MonadState s m => MonadState s (SerialT m) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

get :: SerialT m s #

put :: s -> SerialT m () #

state :: (s -> (a, s)) -> SerialT m a #

MonadReader r m => MonadReader r (SerialT m) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

ask :: SerialT m r #

local :: (r -> r) -> SerialT m a -> SerialT m a #

reader :: (r -> a) -> SerialT m a #

Monad m => Monad (SerialT m) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

(>>=) :: SerialT m a -> (a -> SerialT m b) -> SerialT m b #

(>>) :: SerialT m a -> SerialT m b -> SerialT m b #

return :: a -> SerialT m a #

fail :: String -> SerialT m a #

Monad m => Functor (SerialT m) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

fmap :: (a -> b) -> SerialT m a -> SerialT m b #

(<$) :: a -> SerialT m b -> SerialT m a #

Monad m => Applicative (SerialT m) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

pure :: a -> SerialT m a #

(<*>) :: SerialT m (a -> b) -> SerialT m a -> SerialT m b #

liftA2 :: (a -> b -> c) -> SerialT m a -> SerialT m b -> SerialT m c #

(*>) :: SerialT m a -> SerialT m b -> SerialT m b #

(<*) :: SerialT m a -> SerialT m b -> SerialT m a #

(Foldable m, Monad m) => Foldable (SerialT m) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

fold :: Monoid m0 => SerialT m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> SerialT m a -> m0 #

foldr :: (a -> b -> b) -> b -> SerialT m a -> b #

foldr' :: (a -> b -> b) -> b -> SerialT m a -> b #

foldl :: (b -> a -> b) -> b -> SerialT m a -> b #

foldl' :: (b -> a -> b) -> b -> SerialT m a -> b #

foldr1 :: (a -> a -> a) -> SerialT m a -> a #

foldl1 :: (a -> a -> a) -> SerialT m a -> a #

toList :: SerialT m a -> [a] #

null :: SerialT m a -> Bool #

length :: SerialT m a -> Int #

elem :: Eq a => a -> SerialT m a -> Bool #

maximum :: Ord a => SerialT m a -> a #

minimum :: Ord a => SerialT m a -> a #

sum :: Num a => SerialT m a -> a #

product :: Num a => SerialT m a -> a #

Traversable (SerialT Identity) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

traverse :: Applicative f => (a -> f b) -> SerialT Identity a -> f (SerialT Identity b) #

sequenceA :: Applicative f => SerialT Identity (f a) -> f (SerialT Identity a) #

mapM :: Monad m => (a -> m b) -> SerialT Identity a -> m (SerialT Identity b) #

sequence :: Monad m => SerialT Identity (m a) -> m (SerialT Identity a) #

MonadIO m => MonadIO (SerialT m) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

liftIO :: IO a -> SerialT m a #

NFData1 (SerialT Identity) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

liftRnf :: (a -> ()) -> SerialT Identity a -> () #

MonadThrow m => MonadThrow (SerialT m) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

throwM :: Exception e => e -> SerialT m a #

IsList (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Streams.Serial

Associated Types

type Item (SerialT Identity a) :: Type #

Eq a => Eq (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Streams.Serial

Ord a => Ord (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Streams.Serial

Read a => Read (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Streams.Serial

Show a => Show (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Streams.Serial

a ~ Char => IsString (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Streams.Serial

Semigroup (SerialT m a) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

(<>) :: SerialT m a -> SerialT m a -> SerialT m a #

sconcat :: NonEmpty (SerialT m a) -> SerialT m a #

stimes :: Integral b => b -> SerialT m a -> SerialT m a #

Monoid (SerialT m a) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

mempty :: SerialT m a #

mappend :: SerialT m a -> SerialT m a -> SerialT m a #

mconcat :: [SerialT m a] -> SerialT m a #

NFData a => NFData (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

rnf :: SerialT Identity a -> () #

type Item (SerialT Identity a) Source # 
Instance details

Defined in Streamly.Streams.Serial

type Item (SerialT Identity a) = a

data WSerialT m a Source #

Wide serial composition or serial composition with a breadth first traversal. The Semigroup instance of WSerialT traverses the two streams in a breadth first manner. In other words, it interleaves two streams, yielding one element from each stream alternately.

import Streamly
import qualified Streamly.Prelude as S

main = (toList . wSerially $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]

Similarly, the Monad instance interleaves the iterations of the inner and the outer loop, nesting loops in a breadth first manner.

main = runStream . wSerially $ do
    x <- return 1 <> return 2
    y <- return 3 <> return 4
    S.yieldM $ print (x, y)
(1,3)
(2,3)
(1,4)
(2,4)

Note that a serial composition with breadth first traversal can only combine a finite number of streams as it needs to retain state for each unfinished stream.

Since: 0.2.0

Instances
MonadTrans WSerialT Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

lift :: Monad m => m a -> WSerialT m a #

IsStream WSerialT Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

toStream :: WSerialT m a -> Stream m a

fromStream :: Stream m a -> WSerialT m a

consM :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

(|:) :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

(MonadBase b m, Monad m) => MonadBase b (WSerialT m) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

liftBase :: b α -> WSerialT m α #

MonadState s m => MonadState s (WSerialT m) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

get :: WSerialT m s #

put :: s -> WSerialT m () #

state :: (s -> (a, s)) -> WSerialT m a #

MonadReader r m => MonadReader r (WSerialT m) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

ask :: WSerialT m r #

local :: (r -> r) -> WSerialT m a -> WSerialT m a #

reader :: (r -> a) -> WSerialT m a #

Monad m => Monad (WSerialT m) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

(>>=) :: WSerialT m a -> (a -> WSerialT m b) -> WSerialT m b #

(>>) :: WSerialT m a -> WSerialT m b -> WSerialT m b #

return :: a -> WSerialT m a #

fail :: String -> WSerialT m a #

Monad m => Functor (WSerialT m) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

fmap :: (a -> b) -> WSerialT m a -> WSerialT m b #

(<$) :: a -> WSerialT m b -> WSerialT m a #

Monad m => Applicative (WSerialT m) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

pure :: a -> WSerialT m a #

(<*>) :: WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b #

liftA2 :: (a -> b -> c) -> WSerialT m a -> WSerialT m b -> WSerialT m c #

(*>) :: WSerialT m a -> WSerialT m b -> WSerialT m b #

(<*) :: WSerialT m a -> WSerialT m b -> WSerialT m a #

(Foldable m, Monad m) => Foldable (WSerialT m) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

fold :: Monoid m0 => WSerialT m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> WSerialT m a -> m0 #

foldr :: (a -> b -> b) -> b -> WSerialT m a -> b #

foldr' :: (a -> b -> b) -> b -> WSerialT m a -> b #

foldl :: (b -> a -> b) -> b -> WSerialT m a -> b #

foldl' :: (b -> a -> b) -> b -> WSerialT m a -> b #

foldr1 :: (a -> a -> a) -> WSerialT m a -> a #

foldl1 :: (a -> a -> a) -> WSerialT m a -> a #

toList :: WSerialT m a -> [a] #

null :: WSerialT m a -> Bool #

length :: WSerialT m a -> Int #

elem :: Eq a => a -> WSerialT m a -> Bool #

maximum :: Ord a => WSerialT m a -> a #

minimum :: Ord a => WSerialT m a -> a #

sum :: Num a => WSerialT m a -> a #

product :: Num a => WSerialT m a -> a #

Traversable (WSerialT Identity) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

traverse :: Applicative f => (a -> f b) -> WSerialT Identity a -> f (WSerialT Identity b) #

sequenceA :: Applicative f => WSerialT Identity (f a) -> f (WSerialT Identity a) #

mapM :: Monad m => (a -> m b) -> WSerialT Identity a -> m (WSerialT Identity b) #

sequence :: Monad m => WSerialT Identity (m a) -> m (WSerialT Identity a) #

MonadIO m => MonadIO (WSerialT m) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

liftIO :: IO a -> WSerialT m a #

NFData1 (WSerialT Identity) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

liftRnf :: (a -> ()) -> WSerialT Identity a -> () #

MonadThrow m => MonadThrow (WSerialT m) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

throwM :: Exception e => e -> WSerialT m a #

IsList (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Streams.Serial

Associated Types

type Item (WSerialT Identity a) :: Type #

Eq a => Eq (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Streams.Serial

Ord a => Ord (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Streams.Serial

Read a => Read (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Streams.Serial

Show a => Show (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Streams.Serial

a ~ Char => IsString (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Streams.Serial

Semigroup (WSerialT m a) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

(<>) :: WSerialT m a -> WSerialT m a -> WSerialT m a #

sconcat :: NonEmpty (WSerialT m a) -> WSerialT m a #

stimes :: Integral b => b -> WSerialT m a -> WSerialT m a #

Monoid (WSerialT m a) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

mempty :: WSerialT m a #

mappend :: WSerialT m a -> WSerialT m a -> WSerialT m a #

mconcat :: [WSerialT m a] -> WSerialT m a #

NFData a => NFData (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

rnf :: WSerialT Identity a -> () #

type Item (WSerialT Identity a) Source # 
Instance details

Defined in Streamly.Streams.Serial

type Item (WSerialT Identity a) = a

Concurrent Lookahead Streams

data AheadT m a Source #

Deep ahead composition or ahead composition with depth first traversal. The semigroup composition of AheadT appends streams in a depth first manner just like SerialT except that it can produce elements concurrently ahead of time. It is like AsyncT except that AsyncT produces the output as it arrives whereas AheadT orders the output in the traversal order.

main = (toList . aheadly $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,2,3,4]

Any exceptions generated by a constituent stream are propagated to the output stream.

Similarly, the monad instance of AheadT may run each iteration concurrently ahead of time but presents the results in the same order as SerialT.

import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent

main = runStream . aheadly $ do
    n <- return 3 <> return 2 <> return 1
    S.yieldM $ do
         threadDelay (n * 1000000)
         myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1
ThreadId 39: Delay 2
ThreadId 38: Delay 3

All iterations may run in the same thread if they do not block.

Note that ahead composition with depth first traversal can be used to combine infinite number of streams as it explores only a bounded number of streams at a time.

Since: 0.3.0

Instances
MonadTrans AheadT Source # 
Instance details

Defined in Streamly.Streams.Ahead

Methods

lift :: Monad m => m a -> AheadT m a #

IsStream AheadT Source # 
Instance details

Defined in Streamly.Streams.Ahead

Methods

toStream :: AheadT m a -> Stream m a

fromStream :: Stream m a -> AheadT m a

consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) Source # 
Instance details

Defined in Streamly.Streams.Ahead

Methods

liftBase :: b α -> AheadT m α #

(MonadState s m, MonadAsync m) => MonadState s (AheadT m) Source # 
Instance details

Defined in Streamly.Streams.Ahead

Methods

get :: AheadT m s #

put :: s -> AheadT m () #

state :: (s -> (a, s)) -> AheadT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) Source # 
Instance details

Defined in Streamly.Streams.Ahead

Methods

ask :: AheadT m r #

local :: (r -> r) -> AheadT m a -> AheadT m a #

reader :: (r -> a) -> AheadT m a #

MonadAsync m => Monad (AheadT m) Source # 
Instance details

Defined in Streamly.Streams.Ahead

Methods

(>>=) :: AheadT m a -> (a -> AheadT m b) -> AheadT m b #

(>>) :: AheadT m a -> AheadT m b -> AheadT m b #

return :: a -> AheadT m a #

fail :: String -> AheadT m a #

Monad m => Functor (AheadT m) Source # 
Instance details

Defined in Streamly.Streams.Ahead

Methods

fmap :: (a -> b) -> AheadT m a -> AheadT m b #

(<$) :: a -> AheadT m b -> AheadT m a #

(Monad m, MonadAsync m) => Applicative (AheadT m) Source # 
Instance details

Defined in Streamly.Streams.Ahead

Methods

pure :: a -> AheadT m a #

(<*>) :: AheadT m (a -> b) -> AheadT m a -> AheadT m b #

liftA2 :: (a -> b -> c) -> AheadT m a -> AheadT m b -> AheadT m c #

(*>) :: AheadT m a -> AheadT m b -> AheadT m b #

(<*) :: AheadT m a -> AheadT m b -> AheadT m a #

(MonadIO m, MonadAsync m) => MonadIO (AheadT m) Source # 
Instance details

Defined in Streamly.Streams.Ahead

Methods

liftIO :: IO a -> AheadT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) Source # 
Instance details

Defined in Streamly.Streams.Ahead

Methods

throwM :: Exception e => e -> AheadT m a #

MonadAsync m => Semigroup (AheadT m a) Source # 
Instance details

Defined in Streamly.Streams.Ahead

Methods

(<>) :: AheadT m a -> AheadT m a -> AheadT m a #

sconcat :: NonEmpty (AheadT m a) -> AheadT m a #

stimes :: Integral b => b -> AheadT m a -> AheadT m a #

MonadAsync m => Monoid (AheadT m a) Source # 
Instance details

Defined in Streamly.Streams.Ahead

Methods

mempty :: AheadT m a #

mappend :: AheadT m a -> AheadT m a -> AheadT m a #

mconcat :: [AheadT m a] -> AheadT m a #

Concurrent Asynchronous Streams

The async style streams execute actions asynchronously and consume the outputs as well asynchronously. In a composed stream, at any point of time more than one stream can run concurrently and yield elements. The elements are yielded by the composed stream as they are generated by the constituent streams on a first come first serve basis. Therefore, on each run the stream may yield elements in a different sequence depending on the delays introduced by scheduling. The two async types AsyncT and WAsyncT differ in how they traverse streams in Semigroup or Monad compositions.

data AsyncT m a Source #

Deep async composition or async composition with depth first traversal. In a left to right Semigroup composition it tries to yield elements from the left stream as long as it can, but it can run the right stream in parallel if it needs to, based on demand. The right stream can be run if the left stream blocks on IO or cannot produce elements fast enough for the consumer.

main = (toList . asyncly $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,2,3,4]

Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.

Similarly, the monad instance of AsyncT may run each iteration concurrently based on demand. More concurrent iterations are started only if the previous iterations are not able to produce enough output for the consumer.

import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent

main = runStream . asyncly $ do
    n <- return 3 <> return 2 <> return 1
    S.yieldM $ do
         threadDelay (n * 1000000)
         myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1
ThreadId 39: Delay 2
ThreadId 38: Delay 3

All iterations may run in the same thread if they do not block.

Note that async composition with depth first traversal can be used to combine infinite number of streams as it explores only a bounded number of streams at a time.

Since: 0.1.0

Instances
MonadTrans AsyncT Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

lift :: Monad m => m a -> AsyncT m a #

IsStream AsyncT Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

toStream :: AsyncT m a -> Stream m a

fromStream :: Stream m a -> AsyncT m a

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

liftBase :: b α -> AsyncT m α #

(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

get :: AsyncT m s #

put :: s -> AsyncT m () #

state :: (s -> (a, s)) -> AsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

ask :: AsyncT m r #

local :: (r -> r) -> AsyncT m a -> AsyncT m a #

reader :: (r -> a) -> AsyncT m a #

MonadAsync m => Monad (AsyncT m) Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

(>>=) :: AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b #

(>>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

return :: a -> AsyncT m a #

fail :: String -> AsyncT m a #

Monad m => Functor (AsyncT m) Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

fmap :: (a -> b) -> AsyncT m a -> AsyncT m b #

(<$) :: a -> AsyncT m b -> AsyncT m a #

(Monad m, MonadAsync m) => Applicative (AsyncT m) Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

pure :: a -> AsyncT m a #

(<*>) :: AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b #

liftA2 :: (a -> b -> c) -> AsyncT m a -> AsyncT m b -> AsyncT m c #

(*>) :: AsyncT m a -> AsyncT m b -> AsyncT m b #

(<*) :: AsyncT m a -> AsyncT m b -> AsyncT m a #

(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

liftIO :: IO a -> AsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

throwM :: Exception e => e -> AsyncT m a #

MonadAsync m => Semigroup (AsyncT m a) Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

(<>) :: AsyncT m a -> AsyncT m a -> AsyncT m a #

sconcat :: NonEmpty (AsyncT m a) -> AsyncT m a #

stimes :: Integral b => b -> AsyncT m a -> AsyncT m a #

MonadAsync m => Monoid (AsyncT m a) Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

mempty :: AsyncT m a #

mappend :: AsyncT m a -> AsyncT m a -> AsyncT m a #

mconcat :: [AsyncT m a] -> AsyncT m a #

data WAsyncT m a Source #

Wide async composition or async composition with breadth first traversal. The Semigroup instance of WAsyncT concurrently traverses the composed streams using a depth first travesal or in a round robin fashion, yielding elements from both streams alternately.

main = (toList . wAsyncly $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]

Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.

Similarly, the Monad instance of WAsyncT runs all iterations fairly concurrently using a round robin scheduling.

import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent

main = runStream . wAsyncly $ do
    n <- return 3 <> return 2 <> return 1
    S.yieldM $ do
         threadDelay (n * 1000000)
         myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1
ThreadId 39: Delay 2
ThreadId 38: Delay 3

Unlike AsyncT all iterations are guaranteed to run fairly concurrently, unconditionally.

Note that async composition with breadth first traversal can only combine a finite number of streams as it needs to retain state for each unfinished stream.

Since: 0.2.0

Instances
MonadTrans WAsyncT Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

lift :: Monad m => m a -> WAsyncT m a #

IsStream WAsyncT Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

toStream :: WAsyncT m a -> Stream m a

fromStream :: Stream m a -> WAsyncT m a

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

liftBase :: b α -> WAsyncT m α #

(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

get :: WAsyncT m s #

put :: s -> WAsyncT m () #

state :: (s -> (a, s)) -> WAsyncT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

ask :: WAsyncT m r #

local :: (r -> r) -> WAsyncT m a -> WAsyncT m a #

reader :: (r -> a) -> WAsyncT m a #

MonadAsync m => Monad (WAsyncT m) Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

(>>=) :: WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b #

(>>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

return :: a -> WAsyncT m a #

fail :: String -> WAsyncT m a #

Monad m => Functor (WAsyncT m) Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

fmap :: (a -> b) -> WAsyncT m a -> WAsyncT m b #

(<$) :: a -> WAsyncT m b -> WAsyncT m a #

(Monad m, MonadAsync m) => Applicative (WAsyncT m) Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

pure :: a -> WAsyncT m a #

(<*>) :: WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b #

liftA2 :: (a -> b -> c) -> WAsyncT m a -> WAsyncT m b -> WAsyncT m c #

(*>) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m b #

(<*) :: WAsyncT m a -> WAsyncT m b -> WAsyncT m a #

(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

liftIO :: IO a -> WAsyncT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

throwM :: Exception e => e -> WAsyncT m a #

MonadAsync m => Semigroup (WAsyncT m a) Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

(<>) :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

sconcat :: NonEmpty (WAsyncT m a) -> WAsyncT m a #

stimes :: Integral b => b -> WAsyncT m a -> WAsyncT m a #

MonadAsync m => Monoid (WAsyncT m a) Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

mempty :: WAsyncT m a #

mappend :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

mconcat :: [WAsyncT m a] -> WAsyncT m a #

data ParallelT m a Source #

Async composition with simultaneous traversal of all streams.

The Semigroup instance of ParallelT concurrently merges two streams, running both strictly concurrently and yielding elements from both streams as they arrive. When multiple streams are combined using ParallelT each one is evaluated in its own thread and the results produced are presented in the combined stream on a first come first serve basis.

AsyncT and WAsyncT are concurrent lookahead streams each with a specific type of consumption pattern (depth first or breadth first). Since they are lookahead, they may introduce certain default latency in starting more concurrent tasks for efficiency reasons or may put a default limitation on the resource consumption (e.g. number of concurrent threads for lookahead). If we look at the implementation detail, they both can share a pool of worker threads to evaluate the streams in the desired pattern and at the desired rate. However, ParallelT uses a separate runtime thread to evaluate each stream.

WAsyncT is similar to ParallelT, as both of them evaluate the constituent streams fairly in a round robin fashion. However, the key difference is that WAsyncT is lazy or pull driven whereas ParallelT is strict or push driven. ParallelT immediately starts concurrent evaluation of both the streams (in separate threads) and later picks the results whereas WAsyncT may wait for a certain latency threshold before initiating concurrent evaluation of the next stream. The concurrent scheduling of the next stream or the degree of concurrency is driven by the feedback from the consumer. In case of ParallelT each stream is evaluated in a separate thread and results are pushed to a shared output buffer, the evaluation rate is controlled by blocking when the buffer is full.

Concurrent lookahead streams are generally more efficient than ParallelT and can work pretty efficiently even for smaller tasks because they do not necessarily use a separate thread for each task. So they should be preferred over ParallelT especially when efficiency is a concern and simultaneous strict evaluation is not a requirement. ParallelT is useful for cases when the streams are required to be evaluated simultaneously irrespective of how the consumer consumes them e.g. when we want to race two tasks and want to start both strictly at the same time or if we have timers in the parallel tasks and our results depend on the timers being started at the same time. We can say that ParallelT is almost the same (modulo some implementation differences) as WAsyncT when the latter is used with unlimited lookahead and zero latency in initiating lookahead.

main = (toList . parallely $ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]

When streams with more than one element are merged, it yields whichever stream yields first without any bias, unlike the Async style streams.

Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.

Similarly, the Monad instance of ParallelT runs all iterations of the loop concurrently.

import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent

main = runStream . parallely $ do
    n <- return 3 <> return 2 <> return 1
    S.yieldM $ do
         threadDelay (n * 1000000)
         myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1
ThreadId 39: Delay 2
ThreadId 38: Delay 3

Note that parallel composition can only combine a finite number of streams as it needs to retain state for each unfinished stream.

Since: 0.1.0

Instances
MonadTrans ParallelT Source # 
Instance details

Defined in Streamly.Streams.Parallel

Methods

lift :: Monad m => m a -> ParallelT m a #

IsStream ParallelT Source # 
Instance details

Defined in Streamly.Streams.Parallel

Methods

toStream :: ParallelT m a -> Stream m a

fromStream :: Stream m a -> ParallelT m a

consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

(|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) Source # 
Instance details

Defined in Streamly.Streams.Parallel

Methods

liftBase :: b α -> ParallelT m α #

(MonadState s m, MonadAsync m) => MonadState s (ParallelT m) Source # 
Instance details

Defined in Streamly.Streams.Parallel

Methods

get :: ParallelT m s #

put :: s -> ParallelT m () #

state :: (s -> (a, s)) -> ParallelT m a #

(MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) Source # 
Instance details

Defined in Streamly.Streams.Parallel

Methods

ask :: ParallelT m r #

local :: (r -> r) -> ParallelT m a -> ParallelT m a #

reader :: (r -> a) -> ParallelT m a #

MonadAsync m => Monad (ParallelT m) Source # 
Instance details

Defined in Streamly.Streams.Parallel

Methods

(>>=) :: ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b #

(>>) :: ParallelT m a -> ParallelT m b -> ParallelT m b #

return :: a -> ParallelT m a #

fail :: String -> ParallelT m a #

Monad m => Functor (ParallelT m) Source # 
Instance details

Defined in Streamly.Streams.Parallel

Methods

fmap :: (a -> b) -> ParallelT m a -> ParallelT m b #

(<$) :: a -> ParallelT m b -> ParallelT m a #

(Monad m, MonadAsync m) => Applicative (ParallelT m) Source # 
Instance details

Defined in Streamly.Streams.Parallel

Methods

pure :: a -> ParallelT m a #

(<*>) :: ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b #

liftA2 :: (a -> b -> c) -> ParallelT m a -> ParallelT m b -> ParallelT m c #

(*>) :: ParallelT m a -> ParallelT m b -> ParallelT m b #

(<*) :: ParallelT m a -> ParallelT m b -> ParallelT m a #

(MonadIO m, MonadAsync m) => MonadIO (ParallelT m) Source # 
Instance details

Defined in Streamly.Streams.Parallel

Methods

liftIO :: IO a -> ParallelT m a #

(MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) Source # 
Instance details

Defined in Streamly.Streams.Parallel

Methods

throwM :: Exception e => e -> ParallelT m a #

MonadAsync m => Semigroup (ParallelT m a) Source # 
Instance details

Defined in Streamly.Streams.Parallel

Methods

(<>) :: ParallelT m a -> ParallelT m a -> ParallelT m a #

sconcat :: NonEmpty (ParallelT m a) -> ParallelT m a #

stimes :: Integral b => b -> ParallelT m a -> ParallelT m a #

MonadAsync m => Monoid (ParallelT m a) Source # 
Instance details

Defined in Streamly.Streams.Parallel

Methods

mempty :: ParallelT m a #

mappend :: ParallelT m a -> ParallelT m a -> ParallelT m a #

mconcat :: [ParallelT m a] -> ParallelT m a #

Zipping Streams

ZipSerialM and ZipAsyncM, provide Applicative instances for zipping the corresponding elements of two streams together. Note that these types are not monads.

data ZipSerialM m a Source #

The applicative instance of ZipSerialM zips a number of streams serially i.e. it produces one element from each stream serially and then zips all those elements.

main = (toList . zipSerially $ (,,) <$> s1 <*> s2 <*> s3) >>= print
    where s1 = fromFoldable [1, 2]
          s2 = fromFoldable [3, 4]
          s3 = fromFoldable [5, 6]
[(1,3,5),(2,4,6)]

The Semigroup instance of this type works the same way as that of SerialT.

Since: 0.2.0

Instances
IsStream ZipSerialM Source # 
Instance details

Defined in Streamly.Streams.Zip

Methods

toStream :: ZipSerialM m a -> Stream m a

fromStream :: Stream m a -> ZipSerialM m a

consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

(|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

Monad m => Functor (ZipSerialM m) Source # 
Instance details

Defined in Streamly.Streams.Zip

Methods

fmap :: (a -> b) -> ZipSerialM m a -> ZipSerialM m b #

(<$) :: a -> ZipSerialM m b -> ZipSerialM m a #

Monad m => Applicative (ZipSerialM m) Source # 
Instance details

Defined in Streamly.Streams.Zip

Methods

pure :: a -> ZipSerialM m a #

(<*>) :: ZipSerialM m (a -> b) -> ZipSerialM m a -> ZipSerialM m b #

liftA2 :: (a -> b -> c) -> ZipSerialM m a -> ZipSerialM m b -> ZipSerialM m c #

(*>) :: ZipSerialM m a -> ZipSerialM m b -> ZipSerialM m b #

(<*) :: ZipSerialM m a -> ZipSerialM m b -> ZipSerialM m a #

(Foldable m, Monad m) => Foldable (ZipSerialM m) Source # 
Instance details

Defined in Streamly.Streams.Zip

Methods

fold :: Monoid m0 => ZipSerialM m m0 -> m0 #

foldMap :: Monoid m0 => (a -> m0) -> ZipSerialM m a -> m0 #

foldr :: (a -> b -> b) -> b -> ZipSerialM m a -> b #

foldr' :: (a -> b -> b) -> b -> ZipSerialM m a -> b #

foldl :: (b -> a -> b) -> b -> ZipSerialM m a -> b #

foldl' :: (b -> a -> b) -> b -> ZipSerialM m a -> b #

foldr1 :: (a -> a -> a) -> ZipSerialM m a -> a #

foldl1 :: (a -> a -> a) -> ZipSerialM m a -> a #

toList :: ZipSerialM m a -> [a] #

null :: ZipSerialM m a -> Bool #

length :: ZipSerialM m a -> Int #

elem :: Eq a => a -> ZipSerialM m a -> Bool #

maximum :: Ord a => ZipSerialM m a -> a #

minimum :: Ord a => ZipSerialM m a -> a #

sum :: Num a => ZipSerialM m a -> a #

product :: Num a => ZipSerialM m a -> a #

Traversable (ZipSerialM Identity) Source # 
Instance details

Defined in Streamly.Streams.Zip

Methods

traverse :: Applicative f => (a -> f b) -> ZipSerialM Identity a -> f (ZipSerialM Identity b) #

sequenceA :: Applicative f => ZipSerialM Identity (f a) -> f (ZipSerialM Identity a) #

mapM :: Monad m => (a -> m b) -> ZipSerialM Identity a -> m (ZipSerialM Identity b) #

sequence :: Monad m => ZipSerialM Identity (m a) -> m (ZipSerialM Identity a) #

NFData1 (ZipSerialM Identity) Source # 
Instance details

Defined in Streamly.Streams.Zip

Methods

liftRnf :: (a -> ()) -> ZipSerialM Identity a -> () #

IsList (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Streams.Zip

Associated Types

type Item (ZipSerialM Identity a) :: Type #

Eq a => Eq (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Streams.Zip

Ord a => Ord (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Streams.Zip

Read a => Read (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Streams.Zip

Show a => Show (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Streams.Zip

a ~ Char => IsString (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Streams.Zip

Semigroup (ZipSerialM m a) Source # 
Instance details

Defined in Streamly.Streams.Zip

Methods

(<>) :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a #

sconcat :: NonEmpty (ZipSerialM m a) -> ZipSerialM m a #

stimes :: Integral b => b -> ZipSerialM m a -> ZipSerialM m a #

Monoid (ZipSerialM m a) Source # 
Instance details

Defined in Streamly.Streams.Zip

Methods

mempty :: ZipSerialM m a #

mappend :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a #

mconcat :: [ZipSerialM m a] -> ZipSerialM m a #

NFData a => NFData (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Streams.Zip

Methods

rnf :: ZipSerialM Identity a -> () #

type Item (ZipSerialM Identity a) Source # 
Instance details

Defined in Streamly.Streams.Zip

type Item (ZipSerialM Identity a) = a

data ZipAsyncM m a Source #

Like ZipSerialM but zips in parallel, it generates all the elements to be zipped concurrently.

main = (toList . zipAsyncly $ (,,) <$> s1 <*> s2 <*> s3) >>= print
    where s1 = fromFoldable [1, 2]
          s2 = fromFoldable [3, 4]
          s3 = fromFoldable [5, 6]
[(1,3,5),(2,4,6)]

The Semigroup instance of this type works the same way as that of SerialT.

Since: 0.2.0

Instances
IsStream ZipAsyncM Source # 
Instance details

Defined in Streamly.Streams.Zip

Methods

toStream :: ZipAsyncM m a -> Stream m a

fromStream :: Stream m a -> ZipAsyncM m a

consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

(|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

Monad m => Functor (ZipAsyncM m) Source # 
Instance details

Defined in Streamly.Streams.Zip

Methods

fmap :: (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b #

(<$) :: a -> ZipAsyncM m b -> ZipAsyncM m a #

MonadAsync m => Applicative (ZipAsyncM m) Source # 
Instance details

Defined in Streamly.Streams.Zip

Methods

pure :: a -> ZipAsyncM m a #

(<*>) :: ZipAsyncM m (a -> b) -> ZipAsyncM m a -> ZipAsyncM m b #

liftA2 :: (a -> b -> c) -> ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m c #

(*>) :: ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m b #

(<*) :: ZipAsyncM m a -> ZipAsyncM m b -> ZipAsyncM m a #

Semigroup (ZipAsyncM m a) Source # 
Instance details

Defined in Streamly.Streams.Zip

Methods

(<>) :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a #

sconcat :: NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a #

stimes :: Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a #

Monoid (ZipAsyncM m a) Source # 
Instance details

Defined in Streamly.Streams.Zip

Methods

mempty :: ZipAsyncM m a #

mappend :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a #

mconcat :: [ZipAsyncM m a] -> ZipAsyncM m a #

Running Streams

runStream :: Monad m => SerialT m a -> m () Source #

Run a stream, discarding the results. By default it interprets the stream as SerialT, to run other types of streams use the type adapting combinators for example runStream . asyncly.

Since: 0.2.0

Parallel Function Application

Stream processing functions can be composed in a chain using function application with or without the $ operator, or with reverse function application operator &. Streamly provides concurrent versions of these operators applying stream processing functions such that each stage of the stream can run in parallel. The operators start with a |; we can read |$ as "parallel dollar" to remember that | comes before $.

Imports for the code snippets below:

 import Streamly
 import qualified Streamly.Prelude as S
 import Control.Concurrent

(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b infixr 0 Source #

Parallel function application operator for streams; just like the regular function application operator $ except that it is concurrent. The following code prints a value every second even though each stage adds a 1 second delay.

runStream $
   S.mapM (\x -> threadDelay 1000000 >> print x)
     |$ S.repeatM (threadDelay 1000000 >> return 1)

Concurrent

Since: 0.3.0

(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b infixl 1 Source #

Parallel reverse function application operator for streams; just like the regular reverse function application operator & except that it is concurrent.

runStream $
      S.repeatM (threadDelay 1000000 >> return 1)
   |& S.mapM (\x -> threadDelay 1000000 >> print x)

Concurrent

Since: 0.3.0

(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b infixr 0 Source #

Parallel function application operator; applies a run or fold function to a stream such that the fold consumer and the stream producer run in parallel. A run or fold function reduces the stream to a value in the underlying monad. The . at the end of the operator is a mnemonic for termination of the stream.

   S.foldlM' (\_ a -> threadDelay 1000000 >> print a) ()
      |$. S.repeatM (threadDelay 1000000 >> return 1)

Concurrent

Since: 0.3.0

(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b infixl 1 Source #

Parallel reverse function application operator for applying a run or fold functions to a stream. Just like |$. except that the operands are reversed.

       S.repeatM (threadDelay 1000000 >> return 1)
   |&. S.foldlM' (\_ a -> threadDelay 1000000 >> print a) ()

Concurrent

Since: 0.3.0

mkAsync :: (IsStream t, MonadAsync m) => t m a -> m (t m a) Source #

Make a stream asynchronous, triggers the computation and returns a stream in the underlying monad representing the output generated by the original computation. The returned action is exhaustible and must be drained once. If not drained fully we may have a thread blocked forever and once exhausted it will always return empty.

Since: 0.2.0

Merging Streams

The Semigroup operation <> of each stream type combines two streams in a type specific manner. This section provides polymorphic versions of <> which can be used to combine two streams in a predetermined way irrespective of the type.

serial :: IsStream t => t m a -> t m a -> t m a Source #

Polymorphic version of the Semigroup operation <> of SerialT. Appends two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream.

Since: 0.2.0

wSerial :: IsStream t => t m a -> t m a -> t m a Source #

Polymorphic version of the Semigroup operation <> of WSerialT. Interleaves two streams, yielding one element from each stream alternately.

Since: 0.2.0

ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Polymorphic version of the Semigroup operation <> of AheadT. Merges two streams sequentially but with concurrent lookahead.

Since: 0.3.0

async :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Polymorphic version of the Semigroup operation <> of AsyncT. Merges two streams possibly concurrently, preferring the elements from the left one when available.

Since: 0.2.0

wAsync :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Polymorphic version of the Semigroup operation <> of WAsyncT. Merges two streams concurrently choosing elements from both fairly.

Since: 0.2.0

parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Polymorphic version of the Semigroup operation <> of ParallelT Merges two streams concurrently.

Since: 0.2.0

Concurrency Control

These combinators can be used at any point in a stream composition to set parameters to control the concurrency of the enclosed stream. A parameter set at any point remains effective for any concurrent combinators used downstream until it is reset. These control parameters have no effect on non-concurrent combinators in the stream, or on non-concurrent streams. They also do not affect Parallel streams, as concurrency for Parallel streams is always unbounded.

maxThreads :: IsStream t => Int -> t m a -> t m a Source #

Specify the maximum number of threads that can be spawned concurrently for any concurrent combinator in a stream. A value of 0 resets the thread limit to default, a negative value means there is no limit. The default value is 1500.

When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.

Since: 0.4.0

maxBuffer :: IsStream t => Int -> t m a -> t m a Source #

Specify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.

CAUTION! using an unbounded maxBuffer value (i.e. a negative value) coupled with an unbounded maxThreads value is a recipe for disaster in presence of infinite streams, or very large streams. Especially, it must not be used when pure is used in ZipAsyncM streams as pure in applicative zip streams generates an infinite stream causing unbounded concurrent generation with no limit on the buffer or threads.

Since: 0.4.0

Rate Limiting

data Rate Source #

Specifies the stream yield rate in yields per second (Hertz). We keep accumulating yield credits at rateGoal. At any point of time we allow only as many yields as we have accumulated as per rateGoal since the start of time. If the consumer or the producer is slower or faster, the actual rate may fall behind or exceed rateGoal. We try to recover the gap between the two by increasing or decreasing the pull rate from the producer. However, if the gap becomes more than rateBuffer we try to recover only as much as rateBuffer.

rateLow puts a bound on how low the instantaneous rate can go when recovering the rate gap. In other words, it determines the maximum yield latency. Similarly, rateHigh puts a bound on how high the instantaneous rate can go when recovering the rate gap. In other words, it determines the minimum yield latency. We reduce the latency by increasing concurrency, therefore we can say that it puts an upper bound on concurrency.

If the rateGoal is 0 or negative the stream never yields a value. If the rateBuffer is 0 or negative we do not attempt to recover.

Since: 0.5.0

Constructors

Rate 

Fields

rate :: IsStream t => Maybe Rate -> t m a -> t m a Source #

Specify the pull rate of a stream. A Nothing value resets the rate to default which is unlimited. When the rate is specified, concurrent production may be ramped up or down automatically to achieve the specified yield rate. The specific behavior for different styles of Rate specifications is documented under Rate. The effective maximum production rate achieved by a stream is governed by:

  • The maxThreads limit
  • The maxBuffer limit
  • The maximum rate that the stream producer can achieve
  • The maximum rate that the stream consumer can achieve

Since: 0.5.0

avgRate :: IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate (r/2) r (2*r) maxBound)

Specifies the average production rate of a stream in number of yields per second (i.e. Hertz). Concurrent production is ramped up or down automatically to achieve the specified average yield rate. The rate can go down to half of the specified rate on the lower side and double of the specified rate on the higher side.

Since: 0.5.0

minRate :: IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate r r (2*r) maxBound)

Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.

Since: 0.5.0

maxRate :: IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate (r/2) r r maxBound)

Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.

Since: 0.5.0

constRate :: IsStream t => Double -> t m a -> t m a Source #

Same as rate (Just $ Rate r r r 0)

Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.

Since: 0.5.0

Folding Containers of Streams

These are variants of standard Foldable fold functions that use a polymorphic stream sum operation (e.g. async or wSerial) to fold a container of streams.

foldWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a Source #

A variant of fold that allows you to fold a Foldable container of streams using the specified stream sum operation.

foldWith async $ map return [1..3]

Since: 0.1.0

foldMapWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #

A variant of foldMap that allows you to map a monadic streaming action on a Foldable container and then fold it using the specified stream sum operation.

foldMapWith async return [1..3]

Since: 0.1.0

forEachWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #

Like foldMapWith but with the last two arguments reversed i.e. the monadic streaming function is the last argument.

Since: 0.1.0

Stream Type Adapters

You may want to use different stream composition styles at different points in your program. Stream types can be freely converted or adapted from one type to another. The IsStream type class facilitates type conversion of one stream type to another. It is not used directly, instead the type combinators provided below are used for conversions.

To adapt from one monomorphic type (e.g. AsyncT) to another monomorphic type (e.g. SerialT) use the adapt combinator. To give a polymorphic code a specific interpretation or to adapt a specific type to a polymorphic type use the type specific combinators e.g. asyncly or wSerially. You cannot adapt polymorphic code to polymorphic code, as the compiler would not know which specific type you are converting from or to. If you see a an ambiguous type variable error then most likely you are using adapt unnecessarily on polymorphic code.

class IsStream t Source #

Class of types that can represent a stream of elements of some type a in some monad m.

Since: 0.2.0

Minimal complete definition

toStream, fromStream, consM, (|:)

Instances
IsStream WSerialT Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

toStream :: WSerialT m a -> Stream m a

fromStream :: Stream m a -> WSerialT m a

consM :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

(|:) :: MonadAsync m => m a -> WSerialT m a -> WSerialT m a Source #

IsStream SerialT Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

toStream :: SerialT m a -> Stream m a

fromStream :: Stream m a -> SerialT m a

consM :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

(|:) :: MonadAsync m => m a -> SerialT m a -> SerialT m a Source #

IsStream ParallelT Source # 
Instance details

Defined in Streamly.Streams.Parallel

Methods

toStream :: ParallelT m a -> Stream m a

fromStream :: Stream m a -> ParallelT m a

consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

(|:) :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a Source #

IsStream WAsyncT Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

toStream :: WAsyncT m a -> Stream m a

fromStream :: Stream m a -> WAsyncT m a

consM :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

(|:) :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a Source #

IsStream AsyncT Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

toStream :: AsyncT m a -> Stream m a

fromStream :: Stream m a -> AsyncT m a

consM :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

(|:) :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a Source #

IsStream AheadT Source # 
Instance details

Defined in Streamly.Streams.Ahead

Methods

toStream :: AheadT m a -> Stream m a

fromStream :: Stream m a -> AheadT m a

consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

(|:) :: MonadAsync m => m a -> AheadT m a -> AheadT m a Source #

IsStream ZipAsyncM Source # 
Instance details

Defined in Streamly.Streams.Zip

Methods

toStream :: ZipAsyncM m a -> Stream m a

fromStream :: Stream m a -> ZipAsyncM m a

consM :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

(|:) :: MonadAsync m => m a -> ZipAsyncM m a -> ZipAsyncM m a Source #

IsStream ZipSerialM Source # 
Instance details

Defined in Streamly.Streams.Zip

Methods

toStream :: ZipSerialM m a -> Stream m a

fromStream :: Stream m a -> ZipSerialM m a

consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

(|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a Source #

serially :: IsStream t => SerialT m a -> t m a Source #

Fix the type of a polymorphic stream as SerialT.

Since: 0.1.0

wSerially :: IsStream t => WSerialT m a -> t m a Source #

Fix the type of a polymorphic stream as WSerialT.

Since: 0.2.0

asyncly :: IsStream t => AsyncT m a -> t m a Source #

Fix the type of a polymorphic stream as AsyncT.

Since: 0.1.0

aheadly :: IsStream t => AheadT m a -> t m a Source #

Fix the type of a polymorphic stream as AheadT.

Since: 0.3.0

wAsyncly :: IsStream t => WAsyncT m a -> t m a Source #

Fix the type of a polymorphic stream as WAsyncT.

Since: 0.2.0

parallely :: IsStream t => ParallelT m a -> t m a Source #

Fix the type of a polymorphic stream as ParallelT.

Since: 0.1.0

zipSerially :: IsStream t => ZipSerialM m a -> t m a Source #

Fix the type of a polymorphic stream as ZipSerialM.

Since: 0.2.0

zipAsyncly :: IsStream t => ZipAsyncM m a -> t m a Source #

Fix the type of a polymorphic stream as ZipAsyncM.

Since: 0.2.0

adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a Source #

Adapt any specific stream type to any other specific stream type.

Since: 0.1.0

IO Streams

type Serial = SerialT IO Source #

A serial IO stream of elements of type a. See SerialT documentation for more details.

Since: 0.2.0

type WSerial = WSerialT IO Source #

An interleaving serial IO stream of elements of type a. See WSerialT documentation for more details.

Since: 0.2.0

type Ahead = AheadT IO Source #

A serial IO stream of elements of type a with concurrent lookahead. See AheadT documentation for more details.

Since: 0.3.0

type Async = AsyncT IO Source #

A demand driven left biased parallely composing IO stream of elements of type a. See AsyncT documentation for more details.

Since: 0.2.0

type WAsync = WAsyncT IO Source #

A round robin parallely composing IO stream of elements of type a. See WAsyncT documentation for more details.

Since: 0.2.0

type Parallel = ParallelT IO Source #

A parallely composing IO stream of elements of type a. See ParallelT documentation for more details.

Since: 0.2.0

type ZipSerial = ZipSerialM IO Source #

An IO stream whose applicative instance zips streams serially.

Since: 0.2.0

type ZipAsync = ZipAsyncM IO Source #

An IO stream whose applicative instance zips streams wAsyncly.

Since: 0.2.0

Re-exports

class Semigroup a where #

The class of semigroups (types with an associative binary operation).

Instances should satisfy the associativity law:

Since: base-4.9.0.0

Minimal complete definition

(<>)

Methods

(<>) :: a -> a -> a infixr 6 #

An associative operation.

sconcat :: NonEmpty a -> a #

Reduce a non-empty list with <>

The default definition should be sufficient, but this can be overridden for efficiency.

stimes :: Integral b => b -> a -> a #

Repeat a value n times.

Given that this works on a Semigroup it is allowed to fail if you request 0 or fewer repetitions, and the default definition will do so.

By making this a member of the class, idempotent semigroups and monoids can upgrade this to execute in O(1) by picking stimes = stimesIdempotent or stimes = stimesIdempotentMonoid respectively.

Instances
Semigroup Ordering

Since: base-4.9.0.0

Instance details

Defined in GHC.Base

Semigroup ()

Since: base-4.9.0.0

Instance details

Defined in GHC.Base

Methods

(<>) :: () -> () -> () #

sconcat :: NonEmpty () -> () #

stimes :: Integral b => b -> () -> () #

Semigroup Void

Since: base-4.9.0.0

Instance details

Defined in Data.Void

Methods

(<>) :: Void -> Void -> Void #

sconcat :: NonEmpty Void -> Void #

stimes :: Integral b => b -> Void -> Void #

Semigroup All

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup.Internal

Methods

(<>) :: All -> All -> All #

sconcat :: NonEmpty All -> All #

stimes :: Integral b => b -> All -> All #

Semigroup Any

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup.Internal

Methods

(<>) :: Any -> Any -> Any #

sconcat :: NonEmpty Any -> Any #

stimes :: Integral b => b -> Any -> Any #

Semigroup ByteArray 
Instance details

Defined in Data.Primitive.ByteArray

Semigroup [a]

Since: base-4.9.0.0

Instance details

Defined in GHC.Base

Methods

(<>) :: [a] -> [a] -> [a] #

sconcat :: NonEmpty [a] -> [a] #

stimes :: Integral b => b -> [a] -> [a] #

Semigroup a => Semigroup (Maybe a)

Since: base-4.9.0.0

Instance details

Defined in GHC.Base

Methods

(<>) :: Maybe a -> Maybe a -> Maybe a #

sconcat :: NonEmpty (Maybe a) -> Maybe a #

stimes :: Integral b => b -> Maybe a -> Maybe a #

Semigroup a => Semigroup (IO a)

Since: base-4.10.0.0

Instance details

Defined in GHC.Base

Methods

(<>) :: IO a -> IO a -> IO a #

sconcat :: NonEmpty (IO a) -> IO a #

stimes :: Integral b => b -> IO a -> IO a #

Semigroup p => Semigroup (Par1 p)

Since: base-4.12.0.0

Instance details

Defined in GHC.Generics

Methods

(<>) :: Par1 p -> Par1 p -> Par1 p #

sconcat :: NonEmpty (Par1 p) -> Par1 p #

stimes :: Integral b => b -> Par1 p -> Par1 p #

Ord a => Semigroup (Min a)

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup

Methods

(<>) :: Min a -> Min a -> Min a #

sconcat :: NonEmpty (Min a) -> Min a #

stimes :: Integral b => b -> Min a -> Min a #

Ord a => Semigroup (Max a)

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup

Methods

(<>) :: Max a -> Max a -> Max a #

sconcat :: NonEmpty (Max a) -> Max a #

stimes :: Integral b => b -> Max a -> Max a #

Semigroup (First a)

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup

Methods

(<>) :: First a -> First a -> First a #

sconcat :: NonEmpty (First a) -> First a #

stimes :: Integral b => b -> First a -> First a #

Semigroup (Last a)

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup

Methods

(<>) :: Last a -> Last a -> Last a #

sconcat :: NonEmpty (Last a) -> Last a #

stimes :: Integral b => b -> Last a -> Last a #

Monoid m => Semigroup (WrappedMonoid m)

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup

Semigroup a => Semigroup (Option a)

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup

Methods

(<>) :: Option a -> Option a -> Option a #

sconcat :: NonEmpty (Option a) -> Option a #

stimes :: Integral b => b -> Option a -> Option a #

Semigroup a => Semigroup (Identity a)

Since: base-4.9.0.0

Instance details

Defined in Data.Functor.Identity

Methods

(<>) :: Identity a -> Identity a -> Identity a #

sconcat :: NonEmpty (Identity a) -> Identity a #

stimes :: Integral b => b -> Identity a -> Identity a #

Semigroup (First a)

Since: base-4.9.0.0

Instance details

Defined in Data.Monoid

Methods

(<>) :: First a -> First a -> First a #

sconcat :: NonEmpty (First a) -> First a #

stimes :: Integral b => b -> First a -> First a #

Semigroup (Last a)

Since: base-4.9.0.0

Instance details

Defined in Data.Monoid

Methods

(<>) :: Last a -> Last a -> Last a #

sconcat :: NonEmpty (Last a) -> Last a #

stimes :: Integral b => b -> Last a -> Last a #

Semigroup a => Semigroup (Dual a)

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup.Internal

Methods

(<>) :: Dual a -> Dual a -> Dual a #

sconcat :: NonEmpty (Dual a) -> Dual a #

stimes :: Integral b => b -> Dual a -> Dual a #

Semigroup (Endo a)

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup.Internal

Methods

(<>) :: Endo a -> Endo a -> Endo a #

sconcat :: NonEmpty (Endo a) -> Endo a #

stimes :: Integral b => b -> Endo a -> Endo a #

Num a => Semigroup (Sum a)

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup.Internal

Methods

(<>) :: Sum a -> Sum a -> Sum a #

sconcat :: NonEmpty (Sum a) -> Sum a #

stimes :: Integral b => b -> Sum a -> Sum a #

Num a => Semigroup (Product a)

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup.Internal

Methods

(<>) :: Product a -> Product a -> Product a #

sconcat :: NonEmpty (Product a) -> Product a #

stimes :: Integral b => b -> Product a -> Product a #

Semigroup a => Semigroup (Down a)

Since: base-4.11.0.0

Instance details

Defined in Data.Ord

Methods

(<>) :: Down a -> Down a -> Down a #

sconcat :: NonEmpty (Down a) -> Down a #

stimes :: Integral b => b -> Down a -> Down a #

Semigroup (NonEmpty a)

Since: base-4.9.0.0

Instance details

Defined in GHC.Base

Methods

(<>) :: NonEmpty a -> NonEmpty a -> NonEmpty a #

sconcat :: NonEmpty (NonEmpty a) -> NonEmpty a #

stimes :: Integral b => b -> NonEmpty a -> NonEmpty a #

Ord a => Semigroup (Set a)

Since: containers-0.5.7

Instance details

Defined in Data.Set.Internal

Methods

(<>) :: Set a -> Set a -> Set a #

sconcat :: NonEmpty (Set a) -> Set a #

stimes :: Integral b => b -> Set a -> Set a #

Semigroup (Heap a) 
Instance details

Defined in Data.Heap

Methods

(<>) :: Heap a -> Heap a -> Heap a #

sconcat :: NonEmpty (Heap a) -> Heap a #

stimes :: Integral b => b -> Heap a -> Heap a #

Semigroup (Array a)

Since: primitive-0.6.3.0

Instance details

Defined in Data.Primitive.Array

Methods

(<>) :: Array a -> Array a -> Array a #

sconcat :: NonEmpty (Array a) -> Array a #

stimes :: Integral b => b -> Array a -> Array a #

Semigroup (MergeSet a) 
Instance details

Defined in Data.Set.Internal

Methods

(<>) :: MergeSet a -> MergeSet a -> MergeSet a #

sconcat :: NonEmpty (MergeSet a) -> MergeSet a #

stimes :: Integral b => b -> MergeSet a -> MergeSet a #

Semigroup b => Semigroup (a -> b)

Since: base-4.9.0.0

Instance details

Defined in GHC.Base

Methods

(<>) :: (a -> b) -> (a -> b) -> a -> b #

sconcat :: NonEmpty (a -> b) -> a -> b #

stimes :: Integral b0 => b0 -> (a -> b) -> a -> b #

Semigroup (Either a b)

Since: base-4.9.0.0

Instance details

Defined in Data.Either

Methods

(<>) :: Either a b -> Either a b -> Either a b #

sconcat :: NonEmpty (Either a b) -> Either a b #

stimes :: Integral b0 => b0 -> Either a b -> Either a b #

Semigroup (V1 p)

Since: base-4.12.0.0

Instance details

Defined in GHC.Generics

Methods

(<>) :: V1 p -> V1 p -> V1 p #

sconcat :: NonEmpty (V1 p) -> V1 p #

stimes :: Integral b => b -> V1 p -> V1 p #

Semigroup (U1 p)

Since: base-4.12.0.0

Instance details

Defined in GHC.Generics

Methods

(<>) :: U1 p -> U1 p -> U1 p #

sconcat :: NonEmpty (U1 p) -> U1 p #

stimes :: Integral b => b -> U1 p -> U1 p #

(Semigroup a, Semigroup b) => Semigroup (a, b)

Since: base-4.9.0.0

Instance details

Defined in GHC.Base

Methods

(<>) :: (a, b) -> (a, b) -> (a, b) #

sconcat :: NonEmpty (a, b) -> (a, b) #

stimes :: Integral b0 => b0 -> (a, b) -> (a, b) #

Semigroup a => Semigroup (ST s a)

Since: base-4.11.0.0

Instance details

Defined in GHC.ST

Methods

(<>) :: ST s a -> ST s a -> ST s a #

sconcat :: NonEmpty (ST s a) -> ST s a #

stimes :: Integral b => b -> ST s a -> ST s a #

Semigroup (Proxy s)

Since: base-4.9.0.0

Instance details

Defined in Data.Proxy

Methods

(<>) :: Proxy s -> Proxy s -> Proxy s #

sconcat :: NonEmpty (Proxy s) -> Proxy s #

stimes :: Integral b => b -> Proxy s -> Proxy s #

Semigroup (WSerialT m a) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

(<>) :: WSerialT m a -> WSerialT m a -> WSerialT m a #

sconcat :: NonEmpty (WSerialT m a) -> WSerialT m a #

stimes :: Integral b => b -> WSerialT m a -> WSerialT m a #

Semigroup (SerialT m a) Source # 
Instance details

Defined in Streamly.Streams.Serial

Methods

(<>) :: SerialT m a -> SerialT m a -> SerialT m a #

sconcat :: NonEmpty (SerialT m a) -> SerialT m a #

stimes :: Integral b => b -> SerialT m a -> SerialT m a #

MonadAsync m => Semigroup (ParallelT m a) Source # 
Instance details

Defined in Streamly.Streams.Parallel

Methods

(<>) :: ParallelT m a -> ParallelT m a -> ParallelT m a #

sconcat :: NonEmpty (ParallelT m a) -> ParallelT m a #

stimes :: Integral b => b -> ParallelT m a -> ParallelT m a #

MonadAsync m => Semigroup (WAsyncT m a) Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

(<>) :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a #

sconcat :: NonEmpty (WAsyncT m a) -> WAsyncT m a #

stimes :: Integral b => b -> WAsyncT m a -> WAsyncT m a #

MonadAsync m => Semigroup (AsyncT m a) Source # 
Instance details

Defined in Streamly.Streams.Async

Methods

(<>) :: AsyncT m a -> AsyncT m a -> AsyncT m a #

sconcat :: NonEmpty (AsyncT m a) -> AsyncT m a #

stimes :: Integral b => b -> AsyncT m a -> AsyncT m a #

MonadAsync m => Semigroup (AheadT m a) Source # 
Instance details

Defined in Streamly.Streams.Ahead

Methods

(<>) :: AheadT m a -> AheadT m a -> AheadT m a #

sconcat :: NonEmpty (AheadT m a) -> AheadT m a #

stimes :: Integral b => b -> AheadT m a -> AheadT m a #

Semigroup (ZipAsyncM m a) Source # 
Instance details

Defined in Streamly.Streams.Zip

Methods

(<>) :: ZipAsyncM m a -> ZipAsyncM m a -> ZipAsyncM m a #

sconcat :: NonEmpty (ZipAsyncM m a) -> ZipAsyncM m a #

stimes :: Integral b => b -> ZipAsyncM m a -> ZipAsyncM m a #

Semigroup (ZipSerialM m a) Source # 
Instance details

Defined in Streamly.Streams.Zip

Methods

(<>) :: ZipSerialM m a -> ZipSerialM m a -> ZipSerialM m a #

sconcat :: NonEmpty (ZipSerialM m a) -> ZipSerialM m a #

stimes :: Integral b => b -> ZipSerialM m a -> ZipSerialM m a #

Semigroup (f p) => Semigroup (Rec1 f p)

Since: base-4.12.0.0

Instance details

Defined in GHC.Generics

Methods

(<>) :: Rec1 f p -> Rec1 f p -> Rec1 f p #

sconcat :: NonEmpty (Rec1 f p) -> Rec1 f p #

stimes :: Integral b => b -> Rec1 f p -> Rec1 f p #

(Semigroup a, Semigroup b, Semigroup c) => Semigroup (a, b, c)

Since: base-4.9.0.0

Instance details

Defined in GHC.Base

Methods

(<>) :: (a, b, c) -> (a, b, c) -> (a, b, c) #

sconcat :: NonEmpty (a, b, c) -> (a, b, c) #

stimes :: Integral b0 => b0 -> (a, b, c) -> (a, b, c) #

Semigroup a => Semigroup (Const a b)

Since: base-4.9.0.0

Instance details

Defined in Data.Functor.Const

Methods

(<>) :: Const a b -> Const a b -> Const a b #

sconcat :: NonEmpty (Const a b) -> Const a b #

stimes :: Integral b0 => b0 -> Const a b -> Const a b #

(Applicative f, Semigroup a) => Semigroup (Ap f a)

Since: base-4.12.0.0

Instance details

Defined in Data.Monoid

Methods

(<>) :: Ap f a -> Ap f a -> Ap f a #

sconcat :: NonEmpty (Ap f a) -> Ap f a #

stimes :: Integral b => b -> Ap f a -> Ap f a #

Alternative f => Semigroup (Alt f a)

Since: base-4.9.0.0

Instance details

Defined in Data.Semigroup.Internal

Methods

(<>) :: Alt f a -> Alt f a -> Alt f a #

sconcat :: NonEmpty (Alt f a) -> Alt f a #

stimes :: Integral b => b -> Alt f a -> Alt f a #

Semigroup c => Semigroup (K1 i c p)

Since: base-4.12.0.0

Instance details

Defined in GHC.Generics

Methods

(<>) :: K1 i c p -> K1 i c p -> K1 i c p #

sconcat :: NonEmpty (K1 i c p) -> K1 i c p #

stimes :: Integral b => b -> K1 i c p -> K1 i c p #

(Semigroup (f p), Semigroup (g p)) => Semigroup ((f :*: g) p)

Since: base-4.12.0.0

Instance details

Defined in GHC.Generics

Methods

(<>) :: (f :*: g) p -> (f :*: g) p -> (f :*: g) p #

sconcat :: NonEmpty ((f :*: g) p) -> (f :*: g) p #

stimes :: Integral b => b -> (f :*: g) p -> (f :*: g) p #

(Semigroup a, Semigroup b, Semigroup c, Semigroup d) => Semigroup (a, b, c, d)

Since: base-4.9.0.0

Instance details

Defined in GHC.Base

Methods

(<>) :: (a, b, c, d) -> (a, b, c, d) -> (a, b, c, d) #

sconcat :: NonEmpty (a, b, c, d) -> (a, b, c, d) #

stimes :: Integral b0 => b0 -> (a, b, c, d) -> (a, b, c, d) #

Semigroup (f p) => Semigroup (M1 i c f p)

Since: base-4.12.0.0

Instance details

Defined in GHC.Generics

Methods

(<>) :: M1 i c f p -> M1 i c f p -> M1 i c f p #

sconcat :: NonEmpty (M1 i c f p) -> M1 i c f p #

stimes :: Integral b => b -> M1 i c f p -> M1 i c f p #

Semigroup (f (g p)) => Semigroup ((f :.: g) p)

Since: base-4.12.0.0

Instance details

Defined in GHC.Generics

Methods

(<>) :: (f :.: g) p -> (f :.: g) p -> (f :.: g) p #

sconcat :: NonEmpty ((f :.: g) p) -> (f :.: g) p #

stimes :: Integral b => b -> (f :.: g) p -> (f :.: g) p #

(Semigroup a, Semigroup b, Semigroup c, Semigroup d, Semigroup e) => Semigroup (a, b, c, d, e)

Since: base-4.9.0.0

Instance details

Defined in GHC.Base

Methods

(<>) :: (a, b, c, d, e) -> (a, b, c, d, e) -> (a, b, c, d, e) #

sconcat :: NonEmpty (a, b, c, d, e) -> (a, b, c, d, e) #

stimes :: Integral b0 => b0 -> (a, b, c, d, e) -> (a, b, c, d, e) #

Deprecated

type Streaming = IsStream Source #

Deprecated: Please use IsStream instead.

Same as IsStream.

Since: 0.1.0

runStreaming :: (Monad m, IsStream t) => t m a -> m () Source #

Deprecated: Please use runStream instead.

Same as runStream

Since: 0.1.0

runStreamT :: Monad m => SerialT m a -> m () Source #

Deprecated: Please use runStream instead.

Same as runStream.

Since: 0.1.0

runInterleavedT :: Monad m => WSerialT m a -> m () Source #

Deprecated: Please use 'runStream . interleaving' instead.

Same as runStream . wSerially.

Since: 0.1.0

runAsyncT :: Monad m => AsyncT m a -> m () Source #

Deprecated: Please use 'runStream . asyncly' instead.

Same as runStream . asyncly.

Since: 0.1.0

runParallelT :: Monad m => ParallelT m a -> m () Source #

Deprecated: Please use 'runStream . parallely' instead.

Same as runStream . parallely.

Since: 0.1.0

runZipStream :: Monad m => ZipSerialM m a -> m () Source #

Deprecated: Please use 'runStream . zipSerially instead.

Same as runStream . zipping.

Since: 0.1.0

runZipAsync :: Monad m => ZipAsyncM m a -> m () Source #

Deprecated: Please use 'runStream . zipAsyncly instead.

Same as runStream . zippingAsync.

Since: 0.1.0

type StreamT = SerialT Source #

Deprecated: Please use SerialT instead.

Since: 0.1.0

type InterleavedT = WSerialT Source #

Deprecated: Please use WSerialT instead.

Since: 0.1.0

type ZipStream = ZipSerialM Source #

Deprecated: Please use ZipSerialM instead.

Since: 0.1.0

interleaving :: IsStream t => WSerialT m a -> t m a Source #

Deprecated: Please use wSerially instead.

Same as wSerially.

Since: 0.1.0

zipping :: IsStream t => ZipSerialM m a -> t m a Source #

Deprecated: Please use zipSerially instead.

Same as zipSerially.

Since: 0.1.0

zippingAsync :: IsStream t => ZipAsyncM m a -> t m a Source #

Deprecated: Please use zipAsyncly instead.

Same as zipAsyncly.

Since: 0.1.0

(<=>) :: IsStream t => t m a -> t m a -> t m a infixr 5 Source #

Deprecated: Please use wSerial instead.

Same as wSerial.

Since: 0.1.0

(<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #

Deprecated: Please use async instead.

Same as async.

Since: 0.1.0