Copyright | (c) 2017 Harendra Kumar |
---|---|
License | BSD3 |
Maintainer | streamly@composewell.com |
Safe Haskell | None |
Language | Haskell2010 |
- Stream Types
- Concurrent Streams
- Combining Streams
- Imports and Supporting Code
- Generating Streams
- Generating Streams Concurrently
- Eliminating Streams
- Concurrent Pipeline Stages
- Transforming Streams
- Mapping Concurrently
- Merging Streams
- Nesting Streams
- Zipping Streams
- Monad transformers
- Concurrent Programming
- Reactive Programming
- Writing Concurrent Programs
- Performance
- Interoperation with Streaming Libraries
- Comparison with Existing Packages
- Where to go next?
Streamly is a general computing framework based on concurrent data flow
programming. The IO monad and pure lists are a special case of streamly. On
one hand, streamly extends the lists of pure values to lists of monadic
actions, on the other hand it extends the IO monad with concurrent
non-determinism. In simple imperative terms we can say that streamly extends
the IO monad with for
loops and nested for
loops with concurrency
support. Hopefully, this analogy becomes clearer once you go through this
tutorial.
Streaming in general enables writing modular, composable and scalable applications with ease, and concurrency allows you to make them scale and perform well. Streamly enables writing scalable concurrent applications without being aware of threads or synchronization. No explicit thread control is needed. Where applicable, concurrency rate is automatically controlled based on the demand by the consumer. However, combinators can be used to fine tune the concurrency control.
Streaming and concurrency together enable expressing reactive applications
conveniently. See the CirclingSquare
example in the examples directory for
a simple SDL based FRP example. To summarize, streamly provides a unified
computing framework for streaming, non-determinism and functional reactive
programming in an elegant and simple API that is a natural extension of pure
lists to monadic streams.
In this tutorial we will go over the basic concepts and how to use the library. Before you go through this tutorial we recommend that you take a look at:
- The quick overview in the package README file.
- The overview of streams and folds in the Streamly module.
Once you finish this tutorial, see the last section for further reading resources.
Synopsis
Stream Types
The monadic stream API offered by Streamly is very close to the Haskell Prelude pure lists' API, it can be considered as a natural extension of lists to monadic actions. Streamly streams provide concurrent composition and merging of streams. It can be considered as a concurrent list transformer.
The basic stream type is Serial
, it represents a sequence of IO actions,
and is a Monad
. The Serial
monad is almost a drop in replacement for
the IO
monad, IO monad is a special case of the Serial
monad; IO monad
represents a single IO action whereas the Serial
monad represents a series
of IO actions. The only change you need to make to go from IO
to Serial
is to use drain
to run the monad and to prefix the IO actions with
either yieldM
or liftIO
. If you use liftIO you can switch from Serial
to IO monad by simply removing the drain
function; no other changes
are needed unless you have used some stream specific composition or
combinators.
Similarly, the Serial
type is almost a drop in replacement for pure lists,
pure lists are a special case of monadic streams. If you use nil
in place
of '[]' and |:
in place :
you can replace a list with a Serial
stream.
The only difference is that the elements must be monadic type and to operate
on the streams we must use the corresponding functions from
Streamly.Prelude instead of using the base Prelude.
Concurrent Streams
Many stream operations can be done concurrently:
- Streams can be generated concurrently.
- Streams can be merged concurrently.
- Multiple stages in a streaming pipeline can run concurrently.
- Streams can be mapped and zipped concurrently.
- In monadic composition they combine like a list transformer, providing concurrent non-determinism.
There are three basic concurrent stream styles, Ahead
, Async
, and
Parallel
. The Ahead
style streams are similar to Serial
except that
they can speculatively execute multiple stream actions concurrently in
advance. Ahead
would return exactly the same stream as Serial
except
that it may execute the actions concurrently. The Async
style streams,
like Ahead
, speculatively execute multiple stream actions in advance but
return the results in their finishing order rather than in the stream
traversal order. Parallel
is like Async
except that it provides
unbounded parallelism instead of controlled parallelism.
For easy reference, we can classify the stream types based on execution order, consumption order, and bounded or unbounded concurrency. Execution could be serial (i.e. synchronous) or asynchronous. In serial execution we execute the next action in the stream only after the previous one has finished executing. In asynchronous execution multiple actions in the stream can be executed asynchronously i.e. the next action can start executing even before the first one has finished. Consumption order determines the order in which the outputs generated by the composition are consumed. Consumption could be serial or asynchronous. In serial consumption, the outputs are consumed in the traversal order, in asynchronous consumption the outputs are consumed as they arrive i.e. first come first serve order.
Type | Execution | Consumption | Concurrency |
---|---|---|---|
Serial | Serial | Serial | None |
Ahead | Asynchronous | Serial | bounded |
Async | Asynchronous | Asynchronous | bounded |
Parallel | Asynchronous | Asynchronous | unbounded |
All these types can be freely inter-converted using type conversion
combinators or type annotations, without any cost, to achieve the desired
composition style. To force a particular type of composition, we coerce the
stream type using the corresponding type adapting combinator from
serially
, aheadly
, asyncly
, or parallely
. The default stream type
is inferred as Serial
unless you change it by using one of the combinators
or by using a type annotation.
Combining Streams
Streams can be combined using <>
or mappend
to form a
composite. Composite streams can be interpreted in a depth first or
breadth first manner using an appropriate type conversion before
consumption. Deep (e.g. Serial
) stream type variants traverse a
composite stream in a depth first manner, such that each stream is
traversed fully before traversing the next stream. Wide
(e.g. WSerial
) stream types traverse it in a breadth first
manner, such that one element from each stream is traversed before
coming back to the first stream again.
Each stream type has a wide traversal variant prefixed by W
. The wide
variant differs only in the Semigroup/Monoid, Applicative/Monad
compositions of the streams.
The following table summarizes the basic types and the corresponding wide
variants:
+------------+-----------+ | Deep | Wide | +============+===========+ |Serial
|WSerial
| +------------+-----------+ |Ahead
|WAhead
| +------------+-----------+ |Async
|WAsync
| +------------+-----------+
Other than these types there are also ZipSerial
and ZipAsync
types that
zip streams serially or concurrently using Applicative
operation. These
types are not monads they are only applicatives and they do not differ in
Semigroup
composition.
Imports and Supporting Code
In most of example snippets we do not repeat the imports. Where imports are not explicitly specified use the imports shown below.
import Streamly import Streamly.Prelude ((|:), nil) import qualified Streamly.Prelude as S import Control.Concurrent import Control.Monad (forever)
To illustrate concurrent vs serial composition aspects, we will use the
following delay
function to introduce a sleep or delay specified in
seconds. After the delay it prints the number of seconds it slept.
delay n = S.yieldM
$ do
threadDelay (n * 1000000)
tid <- myThreadId
putStrLn (show tid ++ ": Delay " ++ show n)
Generating Streams
We will assume the following imports in this tutorial. Go ahead, fire up a GHCi session and import these lines to start playing.
> import Streamly > import Streamly.Prelude ((|:)) > import qualified Streamly.Prelude as S > import Control.Concurrent
nil
represents an empty stream and consM
or its operator form |:
adds
a monadic action at the head of the stream.
> S.toList
S.nil
[] > S.toList
$getLine
|:getLine
|: S.nil
hello world ["hello","world"]
To create a singleton stream from a pure value use yield
or pure
and to
create a singleton stream from a monadic action use yieldM
. Note that in
case of Zip applicative streams "pure" repeats the value to generate an
infinite stream.
> S.toList
$pure
1 [1] > S.toList
$ S.yield
1 [1] > S.toList
$ S.yieldM
getLine
hello ["hello"]
To create a stream from pure values in a Foldable
container use
fromFoldable
which is equivalent to a fold using cons
and nil
:
> S.toList
$ S.fromFoldable
[1..3] [1,2,3] > S.toList
$foldr
S.cons
S.nil
[1..3] [1,2,3]
To create a stream from monadic actions in a Foldable
container just use a
right fold using consM
and nil
:
> S.drain
$foldr
(|:
) S.nil
[putStr
"Hello ",putStrLn
"world!"] Hello world!
For more ways to construct a stream see the module Streamly.Prelude.
Generating Streams Concurrently
Monadic construction and generation functions like consM
, unfoldrM
,
replicateM
, repeatM
, iterateM
and fromFoldableM
work concurrently
when used with appropriate stream type combinator. The pure versions of
these APIs are not concurrent, however you can use the monadic versions even
for pure computations by wrapping the pure value in a monad to get the
concurrent generation capability where required.
The following code finishes in 3 seconds (6 seconds when serial):
> let p n = threadDelay (n * 1000000) >> return n > S.toList
$parallely
$ p 3 |: p 2 |: p 1 |: S.nil
[1,2,3] > S.toList
$aheadly
$ p 3 |: p 2 |: p 1 |: S.nil
[3,2,1]
The following finishes in 10 seconds (100 seconds when serial):
> S.drain $asyncly
$ S.replicateM
10 $ p 10
Eliminating Streams
We have already seen drain
and toList
to eliminate a stream in the
examples above. drain
runs a stream discarding the results i.e. only
for effects. toList
runs the stream and collects the results in a list.
For other ways to eliminate a stream see the Folding
section in
Streamly.Prelude module.
Concurrent Pipeline Stages
The concurrent function application operators |$
and |&
apply a stream
argument to a stream function concurrently to compose a concurrent pipeline
of stream processing functions:
Because both the stages run concurrently, we would see a delay of only 1 second instead of 2 seconds in the following:
> let p n = threadDelay (n * 1000000) >> return n > S.drain
$ S.repeatM
(p 1)|&
S.mapM
(\x -> p 1 >> print x)
Transforming Streams
Transformation over a stream is the equivalent of a for
loop construct in
imperative paradigm. We iterate over every element in the stream and perform
certain transformations for each element. Transformations may involve
mapping functions over the elements, filtering elements from the stream or
folding all the elements in the stream into a single value. Streamly streams
are exactly like lists and you can perform all the transformations in the
same way as you would on lists.
Here is a simple console echo program that just echoes every input line, forever:
> import Data.Function ((&)) > S.drain
$ S.repeatM
getLine & S.mapM
putStrLn
The following code snippet reads lines from standard input, filters blank lines, drops the first non-blank line, takes the next two, up cases them, numbers them and prints them:
import Streamly import qualified Streamly.Prelude as S import Data.Char (toUpper) import Data.Function ((&)) main = S.drain
$ S.repeatM
getLine & S.filter
(not . null) & S.drop
1 & S.take
2 & fmap (map toUpper) & S.zipWith
(\n s -> show n ++ " " ++ s) (S.fromFoldable
[1..]) & S.mapM
putStrLn
Mapping Concurrently
Monadic transformation functions mapM
and sequence
work concurrently
when used with appropriate stream type combinators. The pure versions do not
work concurrently, however you can use the monadic versions even for pure
computations to get the concurrent transformation capability where required.
This would print a value every second (2 seconds when serial):
> let p n = threadDelay (n * 1000000) >> return n > S.drain
$ S.aheadly $ S.mapM
(\x -> p 1 >> print x) (serially $ S.repeatM (p 1))
Merging Streams
Semigroup Style
We can combine two streams into a single stream using semigroup composition
operation <>
. Streams can be combined in many different ways as described
in the following sections, the <>
operation behaves differently depending
on the stream type in effect. The stream type and therefore the composition
style can be changed at any point using one of the type combinators as
discussed earlier.
Deep Serial Composition (Serial
)
The Semigroup
operation <>
of the Serial
type combines the two streams
in a serial depth first manner. We use the serially
type combinator to
effect Serial
style of composition. We can also use an explicit Serial
type annotation for the stream to achieve the same effect. However, since
Serial
is the default type unless explicitly specified by using a
combinator, we can omit using an explicit combinator or type annotation for
this style of composition.
When two streams with multiple elements are combined in this manner, the monadic actions in the two streams are performed sequentially i.e. first all actions in the first stream are performed sequentially and then all actions in the second stream are performed sequentially. We call it serial depth first as the full depth of one stream is fully traversed before we move to the next. The following example prints the sequence 1, 2, 3, 4:
main = S.drain
$ (print 1 |: print 2 |: nil) <> (print 3 |: print 4 |: nil)
1 2 3 4
All actions in both the streams are performed serially in the same thread.
In the following example we can see that all actions are performed in the
same thread and take a combined total of 3 + 2 + 1 = 6
seconds:
main = S.drain
$ delay 3 <> delay 2 <> delay 1
ThreadId 36: Delay 3 ThreadId 36: Delay 2 ThreadId 36: Delay 1
The polymorphic version of the binary operation <>
of the Serial
type is
serial
. We can use serial
to join streams in a sequential manner
irrespective of the type of stream:
main = S.drain
$ (print 1 |: print 2 |: nil) `serial` (print 3 |: print 4 |: nil)
Wide Serial Composition (WSerial
)
The Semigroup
operation <>
of the WSerial
type combines the two
streams in a serial breadth first manner. We use the wSerially
type
combinator to effect WSerial
style of composition. We can also use the
WSerial
type annotation for the stream to achieve the same effect.
When two streams with multiple elements are combined in this manner, we traverse all the streams in a breadth first manner i.e. one action from each stream is performed and yielded to the resulting stream before we come back to the first stream again and so on. The following example prints the sequence 1, 3, 2, 4
main = S.drain
.wSerially
$ (print 1 |: print 2 |: nil) <> (print 3 |: print 4 |: nil)
1 3 2 4
Even though the monadic actions of the two streams are performed in an
interleaved manner they are all performed serially in the same thread. In
the following example we can see that all actions are performed in the same
thread and take a combined total of 3 + 2 + 1 = 6
seconds:
main = S.drain
.wSerially
$ delay 3 <> delay 2 <> delay 1
ThreadId 36: Delay 3 ThreadId 36: Delay 2 ThreadId 36: Delay 1
The polymorphic version of the WSerial
binary operation <>
is called
wSerial
. We can use wSerial
to join streams in an interleaved manner
irrespective of the type, notice that we have not used the wSerially
combinator in the following example:
main = S.drain
$ (print 1 |: print 2 |: nil) `wSerial` (print 3 |: print 4 |: nil)
1 3 2 4
Note that this composition cannot be used to fold infinite number of streams since it requires preserving the state until a stream is finished.
Deep Speculative Composition (Ahead
)
The Semigroup
operation <>
of the Ahead
type combines two streams in a
serial depth first manner with concurrent lookahead. We use the aheadly
type combinator to effect Ahead
style of composition. We can also use an
explicit Ahead
type annotation for the stream to achieve the same effect.
When two streams are combined in this manner, the streams are traversed in
depth first manner just like Serial
, however it can execute the next
stream concurrently and keep the results ready when its turn arrives.
Concurrent execution of the next stream(s) is performed if the first stream
blocks or if it cannot produce output at the rate that is enough to meet the
consumer demand. Multiple streams can be executed concurrently to meet the
demand. The following example would print the result in a second even
though each action in each stream takes one second:
main = do xs <- S.toList
.aheadly
$ (p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil) print xs where p n = threadDelay 1000000 >> return n
[1,2,3,4]
Each stream is constructed aheadly
and then both the streams are merged
aheadly
, therefore, all the actions can run concurrently but the result is
presented in serial order.
You can also use the polymorphic combinator ahead
in place of <>
to
compose any type of streams in this manner.
Deep Asynchronous Composition (Async
)
The Semigroup
operation <>
of the Async
type combines the two
streams in a depth first manner with parallel look ahead. We use the
asyncly
type combinator to effect Async
style of composition. We
can also use the Async
type annotation for the stream type to achieve
the same effect.
When two streams with multiple elements are combined in this manner, the
streams are traversed in depth first manner just like Serial
, however it
can execute the next stream concurrently and return the results from it
as they arrive i.e. the results from the next stream may be yielded even
before the results from the first stream. Concurrent execution of the next
stream(s) is performed if the first stream blocks or if it cannot produce
output at the rate that is enough to meet the consumer demand. Multiple
streams can be executed concurrently to meet the demand.
In the example below each element in the stream introduces a constant delay
of 1 second, however, it takes just one second to produce all the results.
The results are not guaranteed to be in any particular order:
main = do xs <- S.toList
.asyncly
$ (p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil) print xs where p n = threadDelay 1000000 >> return n
[4,2,1,3]
The constituent streams are also composed in Async
manner and the
composition of streams too. We can compose the constituent streams to run
serially, in that case it would take 2 seconds to produce all the results.
The elements in the serial streams would be in serial order in the results:
main = do xs <- S.toList
.asyncly
$ (serially $ p 1 |: p 2 |: nil) <> (serially $ p 3 |: p 4 |: nil) print xs where p n = threadDelay 1000000 >> return n
[3,1,2,4]
In the following example we can see that new threads are started when a
computation blocks. Notice that the output from the stream with the
shortest delay is printed first. The whole computation takes maximum of
(3, 2, 1) = 3
seconds:
main = S.drain
.asyncly
$ delay 3<>
delay 2<>
delay 1
ThreadId 42: Delay 1 ThreadId 41: Delay 2 ThreadId 40: Delay 3
When we have a tree of computations composed using this style, the tree is
traversed in DFS style just like the Serial
style, the only difference is
that here we can move on to executing the next stream if a stream blocks.
However, we will not start new threads if we have sufficient output to
saturate the consumer. This is why we call it left-biased demand driven or
adaptive concurrency style, the concurrency tends to stay on the left side
of the composition as long as possible. More threads are started based on
the pull rate of the consumer. The following example prints an output every
second as all of the actions are concurrent.
main = S.drain
.asyncly
$ (delay 1 <> delay 2) <> (delay 3 <> delay 4)
1 2 3 4
All the computations may even run in a single thread when more threads are not needed. As you can see, in the following example the computations are run in a single thread one after another, because none of them blocks. However, if the thread consuming the stream were faster than the producer then it would have started parallel threads for each computation to keep up even if none of them blocks:
main = S.drain
.asyncly
$ traced (sqrt 9)<>
traced (sqrt 16)<>
traced (sqrt 25) where traced m = S.yieldM
(myThreadId >>= print) >> return m
ThreadId 40 ThreadId 40 ThreadId 40
Note that the order of printing in the above examples may change due to variations in scheduling latencies for concurrent threads.
The polymorphic version of the Async
binary operation <>
is called
async
. We can use async
to join streams in a left biased
adaptively concurrent manner irrespective of the type, notice that we have
not used the asyncly
combinator in the following example:
main = S.drain
$ delay 3 `async` delay 2 `async` delay 1
ThreadId 42: Delay 1 ThreadId 41: Delay 2 ThreadId 40: Delay 3
Since the concurrency provided by this operator is demand driven it cannot
be used when the composed computations start timers that are relative to
each other because all computations may not be started at the same time and
therefore timers in all of them may not start at the same time. When
relative timing among all computations is important or when we need to start
all computations at once for any reason Parallel
style must be used
instead.
Async
style utilizes resources optimally and should be preferred over
Parallel
or WAsync
unless you really need those. Async
should be used
when we know that the computations can run in parallel but we do not care if
they actually run in parallel or not, that decision can be left to the
scheduler based on demand. Also, note that async
operator can be used to fold
infinite number of streams in contrast to the Parallel
or WAsync
styles,
because it does not require us to run all of them at the same time in a fair
manner.
Wide Asynchronous Composition (WAsync
)
The Semigroup
operation <>
of the WAsync
type combines two streams in
a concurrent manner using breadth first traversal. We use the wAsyncly
type combinator to effect WAsync
style of composition. We can also use the
WAsync
type annotation for the stream to achieve the same effect.
When streams with multiple elements are combined in this manner, we traverse all the streams concurrently in a breadth first manner i.e. one action from each stream is performed and yielded to the resulting stream before we come back to the first stream again and so on. Even though we execute the actions in a breadth first order the outputs are consumed on a first come first serve basis.
In the following example we can see that outputs are produced in the breadth first traversal order but this is not guaranteed.
main = S.drain
.wAsyncly
$ (serially $ print 1 |: print 2 |: nil) <> (serially $ print 3 |: print 4 |: nil)
1 3 2 4
The polymorphic version of the binary operation <>
of the WAsync
type is
wAsync
. We can use wAsync
to join streams using a breadth first
concurrent traversal irrespective of the type, notice that we have not used
the wAsyncly
combinator in the following example:
main = S.drain
$ delay 3 `wAsync` delay 2 `wAsync` delay 1
ThreadId 42: Delay 1 ThreadId 41: Delay 2 ThreadId 40: Delay 3
Since the concurrency provided by this style is demand driven it may not
be used when the composed computations start timers that are relative to
each other because all computations may not be started at the same time and
therefore timers in all of them may not start at the same time. When
relative timing among all computations is important or when we need to start
all computations at once for any reason Parallel
style must be used
instead.
Parallel Asynchronous Composition (Parallel
)
The Semigroup
operation <>
of the Parallel
type combines the two
streams in a fairly concurrent manner with round robin scheduling. We use
the parallely
type combinator to effect Parallel
style of composition.
We can also use the Parallel
type annotation for the stream type to
achieve the same effect.
When two streams with multiple elements are combined in this manner, the
monadic actions in both the streams are performed concurrently with a fair
round robin scheduling. The outputs are yielded in the order in which the
actions complete. This is pretty similar to the WAsync
type, the
difference is that WAsync
is adaptive to the consumer demand and may or
may not execute all actions in parallel depending on the demand, whereas
Parallel
runs all the streams in parallel irrespective of the demand.
The following example sends a query to all the three search engines in parallel and prints the name of the search engines in the order in which the responses arrive. You need the http-conduit package to run this example:
import Streamly import qualified Streamly.Prelude as S import Network.HTTP.Simple main = S.drain
.parallely
$ google <> bing <> duckduckgo where google = get "https://www.google.com/search?q=haskell" bing = get "https://www.bing.com/search?q=haskell" duckduckgo = get "https://www.duckduckgo.com/?q=haskell" get s = S.yieldM
(httpNoBody (parseRequest_ s) >> putStrLn (show s))
The polymorphic version of the binary operation <>
of the Parallel
type
is parallel
. We can use parallel
to join streams in a fairly concurrent
manner irrespective of the type, notice that we have not used the
parallely
combinator in the following example:
main = S.drain
$ delay 3 `parallel` delay 2 `wAsync` delay 1
ThreadId 42: Delay 1 ThreadId 41: Delay 2 ThreadId 40: Delay 3
Note that this style of composition cannot be used to combine infinite number of streams, as it will lead to an infinite sized scheduling queue.
Monoid Style
We can use Monoid
instances to fold a container of streams in the desired
style using fold
or foldMap
. We have also provided some fold utilities
to fold streams using the polymorphic combine operations:
foldWith
is likefold
, it folds aFoldable
container of streams using the given composition operator.foldMapWith
is likefoldMap
, it folds likefoldWith
but also maps a function before folding.forEachWith
is likefoldMapwith
but the container argument comes before the function argument.
All of the following are equivalent and start ten concurrent tasks each with a delay from 1 to 10 seconds, resulting in the printing of each number every second:
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main = do S.drain
$asyncly
$ foldMap delay [1..10] S.drain
$ S.foldWith
async
(map delay [1..10]) S.drain
$ S.foldMapWith
async
delay [1..10] S.drain
$ S.forEachWith
async
[1..10] delay where delay n = S.yieldM
$ threadDelay (n * 1000000) >> print n
Nesting Streams
Till now we discussed ways to apply transformations on a stream or to merge
streams together to create another stream. We mentioned earlier that
transforming a stream is similar to a for
loop in the imperative paradigm.
We will now discuss the concept of a nested composition of streams which is
analogous to nested for
loops in the imperative paradigm. Functional
programmers call this style of composition a list transformer or ListT
.
Logic programmers call it a logic monad or non-deterministic composition,
but for ordinary imperative minded people like me it is easier to think in
terms of good old nested for
loops.
Monad
In functional programmer's parlance the Monad
instances of different
IsStream
types implement non-determinism, exploring all possible
combination of choices from both the streams. From an imperative
programmer's point of view it behaves like nested loops i.e. for each
element in the first stream and for each element in the second stream
execute the body of the loop.
The Monad
instances of Serial
, WSerial
, Async
and WAsync
stream types support different flavors of nested looping. In other words,
they are all variants of list transformer. The nesting behavior of these
types correspond exactly to the way they merge streams as we discussed in
the previous section.
Deep Serial Nesting (Serial
)
The Monad
composition of the Serial
type behaves like a standard list
transformer. This is the default when we do not use an explicit type
combinator. However, the serially
type combinator can be used to switch to
this style of composition. We will see how this style of composition works
in the following examples.
Let's start with an example with a simple for
loop without any nesting.
For simplicity of illustration we are using streams of pure values in all
the examples. However, the streams could also be made of monadic actions
instead.
import Streamly import qualified Streamly.Prelude as S main = S.drain
$ do x <- S.fromFoldable
[3,2,1] delay x
ThreadId 30: Delay 3 ThreadId 30: Delay 2 ThreadId 30: Delay 1
As we can see, the code after the fromFoldable
statement is run three
times, once for each value of x
drawn from the stream. All the three
iterations are serial and run in the same thread one after another. In
imperative terms this is equivalent to a for
loop with three iterations.
A console echo loop copying standard input to standard output can simply be written like this:
import Streamly
import qualified Streamly.Prelude as S
import Control.Monad (forever)
main = S.drain
$ forever $ S.yieldM getLine >>= S.yieldM . putStrLn
When multiple streams are composed using this style they nest in a DFS
manner i.e. nested iterations of a loop are executed before we proceed to
the next iteration of the parent loop. This behaves just like nested for
loops in imperative programming.
import Streamly import qualified Streamly.Prelude as S main = S.drain
$ do x <- S.fromFoldable
[1,2] y <- S.fromFoldable
[3,4] S.yieldM
$ putStrLn $ show (x, y)
(1,3) (1,4) (2,3) (2,4)
Notice that this is analogous to merging streams of type Serial
or merging
streams using serial
.
Wide Serial Nesting (WSerial
)
The Monad
composition of WSerial
type interleaves the iterations of
outer and inner loops in a nested loop composition. This works exactly the
same way as the merging of two streams in wSerially
fashion works. The
wSerially
type combinator can be used to switch to this style of
composition. Alternatively, a type annotation can be used to specify the
type of the stream as WSerial
.
import Streamly import qualified Streamly.Prelude as S main = S.drain
.wSerially
$ do x <- S.fromFoldable
[1,2] y <- S.fromFoldable
[3,4] S.yieldM $ putStrLn $ show (x, y)
(1,3) (2,3) (1,4) (2,4)
Deep Speculative Nesting (Ahead
)
The Monad
composition of Ahead
type behaves just like Serial
except
that it can speculatively perform a bounded number of next iterations of a
loop concurrently.
The aheadly
type combinator can be used to switch to this style of
composition. Alternatively, a type annotation can be used to specify the
type of the stream as Ahead
.
import Streamly import qualified Streamly.Prelude as S comp = S.toList
.aheadly
$ do x <- S.fromFoldable
[3,2,1] delay x >> return x main = comp >>= print
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3 [3,2,1]
This code finishes in 3 seconds, Serial
would take 6 seconds. As we can
see all the three iterations are concurrent and run in different threads,
however, the results are returned in the serial order.
Concurrency is demand driven, when multiple streams are composed using this
style, the iterations are executed in a depth first manner just like
Serial
i.e. nested iterations are executed before we proceed to the next
outer iteration. The only difference is that we may execute multiple future
iterations concurrently and keep the results ready.
Deep Asynchronous Nesting (Async
)
The Monad
composition of Async
type can perform the iterations of a
loop concurrently. Concurrency is demand driven i.e. more concurrent
iterations are started only if the previous iterations are not able to
produce enough output for the consumer of the output stream. This works
exactly the same way as the merging of two streams asyncly
works.
This is the concurrent analogue of Serial
style monadic composition.
The asyncly
type combinator can be used to switch to this style of
composition. Alternatively, a type annotation can be used to specify the
type of the stream as Async
.
import Streamly import qualified Streamly.Prelude as S main = S.drain
.asyncly
$ do x <- S.fromFoldable
[3,2,1] delay x
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
As we can see the code after the fromFoldable
statement is run three
times, once for each value of x
. All the three iterations are concurrent
and run in different threads. The iteration with least delay finishes first.
When compared to imperative programming, this can be viewed as a for
loop
with three concurrent iterations.
Concurrency is demand driven just as in the case of async
merging.
When multiple streams are composed using this style, the iterations are
triggered in a depth first manner just like Serial
i.e. nested iterations are
executed before we proceed to the next iteration at higher level. However,
unlike Serial
more than one iterations may be started concurrently based
on the demand from the consumer of the stream.
import Streamly import qualified Streamly.Prelude as S main = S.drain
.asyncly
$ do x <- S.fromFoldable
[1,2] y <- S.fromFoldable
[3,4] S.yieldM
$ putStrLn $ show (x, y)
(1,3) (1,4) (2,3) (2,4)
Wide Asynchronous Nesting (WAsync
)
Just like Async
the Monad
composition of WAsync
runs the iterations of
a loop concurrently. The difference is in the nested loop behavior. The
nested loops in this type are traversed and executed in a breadth first
manner rather than the depth first manner of Async
style.
The loop nesting works exactly the same way as the merging of streams
wAsyncly
works. The wAsyncly
type combinator can be used to switch to
this style of composition. Alternatively, a type annotation can be used to
specify the type of the stream as WAsync
.
import Streamly import qualified Streamly.Prelude as S main = S.drain
.wAsyncly
$ do x <- S.fromFoldable
[1,2] y <- S.fromFoldable
[3,4] S.yieldM
$ putStrLn $ show (x, y)
(1,3) (2,3) (1,4) (2,4)
Parallel Asynchronous Nesting (Parallel
)
Just like Async
or WAsync
the Monad
composition of Parallel
runs the
iterations of a loop concurrently. The difference is in the nested loop
behavior. The streams at each nest level is run fully concurrently
irrespective of the demand. The loop nesting works exactly the same way as
the merging of streams parallely
works. The parallely
type combinator
can be used to switch to this style of composition. Alternatively, a type
annotation can be used to specify the type of the stream as Parallel
.
import Streamly import qualified Streamly.Prelude as S main = S.drain
.parallely
$ do x <- S.fromFoldable
[3,2,1] delay x
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
Exercise
Streamly code is usually written in a way that is agnostic of the
specific monadic composition type. We use a polymorphic type with a
IsStream
type class constraint. When running the stream we can choose the
specific mode of composition. For example take a look at the following code.
import Streamly import qualified Streamly.Prelude as S composed :: (IsStream t, Monad (t IO)) => t IO () composed = do sz <- sizes cl <- colors sh <- shapes S.yieldM
$ putStrLn $ show (sz, cl, sh) where sizes = S.fromFoldable
[1, 2, 3] colors = S.fromFoldable
["red", "green", "blue"] shapes = S.fromFoldable
["triangle", "square", "circle"]
Now we can interpret this in whatever way we want:
main = S.drain
.serially
$ composed main = S.drain
.wSerially
$ composed main = S.drain
.asyncly
$ composed main = S.drain
.wAsyncly
$ composed main = S.drain
.parallely
$ composed
As an exercise try to figure out the output of this code for each mode of composition.
Applicative
Applicative is precisely the same as the ap
operation of Monad
. For
zipping applicatives separate types ZipSerial
and ZipAsync
are
provided.
The following example uses the Serial
applicative, it runs all iterations
serially and takes a total 17 seconds (1 + 3 + 4 + 2 + 3 + 4):
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent s1 = d 1 <> d 2 s2 = d 3 <> d 4 d n = delay n >> return n main = (S.toList
.serially
$ (,) <$> s1 <*> s2) >>= print
ThreadId 36: Delay 1 ThreadId 36: Delay 3 ThreadId 36: Delay 4 ThreadId 36: Delay 2 ThreadId 36: Delay 3 ThreadId 36: Delay 4 [(1,3),(1,4),(2,3),(2,4)]
Similarly WSerial
applicative runs the iterations in an interleaved
order but since it is serial it takes a total of 17 seconds:
main = (S.toList
.wSerially
$ (,) <$> s1 <*> s2) >>= print
ThreadId 36: Delay 1 ThreadId 36: Delay 3 ThreadId 36: Delay 2 ThreadId 36: Delay 3 ThreadId 36: Delay 4 ThreadId 36: Delay 4 [(1,3),(2,3),(1,4),(2,4)]
Async
can run the iterations concurrently and therefore takes a total
of 6 seconds which is max (1, 2) + max (3, 4):
main = (S.toList
.asyncly
$ (,) <$> s1 <*> s2) >>= print
ThreadId 34: Delay 1 ThreadId 36: Delay 2 ThreadId 35: Delay 3 ThreadId 36: Delay 3 ThreadId 35: Delay 4 ThreadId 36: Delay 4 [(1,3),(2,3),(1,4),(2,4)]
Similarly WAsync
as well can run the iterations concurrently and
therefore takes a total of 6 seconds (2 + 4):
main = (S.toList
.wAsyncly
$ (,) <$> s1 <*> s2) >>= print
ThreadId 34: Delay 1 ThreadId 36: Delay 2 ThreadId 35: Delay 3 ThreadId 36: Delay 3 ThreadId 35: Delay 4 ThreadId 36: Delay 4 [(1,3),(2,3),(1,4),(2,4)]
Functor
fmap
transforms a stream by mapping a function on all elements of the
stream. fmap
behaves in the same way for all stream types, it is always
serial.
import Streamly import qualified Streamly.Prelude as S main = (S.toList
$ fmap show $ S.fromFoldable
[1..10]) >>= print
Also see the mapM
and sequence
functions for mapping actions, in the
Streamly.Prelude module.
Zipping Streams
Zipping is a special transformation where the corresponding elements of two streams are combined together using a zip function producing a new stream of outputs. Two different types are provided for serial and concurrent zipping. These types provide an applicative instance that can be used to lift functions to zip the argument streams. Also see the zipping functions in the Streamly.Prelude module.
Serial Zipping
The applicative instance of ZipSerial
type zips streams serially.
zipSerially
type combinator can be used to switch to serial applicative
zip composition:
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent d n = delay n >> return n s1 =serially
$ d 1 <> d 2 s2 =serially
$ d 3 <> d 4 main = (S.toList
.zipSerially
$ (,) <$> s1 <*> s2) >>= print
This takes total 10 seconds to zip, which is (1 + 2 + 3 + 4) since everything runs serially:
ThreadId 29: Delay 1 ThreadId 29: Delay 3 ThreadId 29: Delay 2 ThreadId 29: Delay 4 [(1,3),(2,4)]
Parallel Zipping
The applicative instance of ZipAsync
type zips streams concurrently.
zipAsyncly
type combinator can be used to switch to parallel applicative
zip composition:
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering)) d n = delay n >> return n s1 =serially
$ d 1 <> d 2 s2 =serially
$ d 3 <> d 4 main = do hSetBuffering stdout LineBuffering (S.toList
.zipAsyncly
$ (,) <$> s1 <*> s2) >>= print
This takes 7 seconds to zip, which is max (1,3) + max (2,4) because 1 and 3 are produced concurrently, and 2 and 4 are produced concurrently:
ThreadId 32: Delay 1 ThreadId 32: Delay 2 ThreadId 33: Delay 3 ThreadId 33: Delay 4 [(1,3),(2,4)]
Monad transformers
To represent streams in an arbitrary monad use the more general monad
transformer types for example the monad transformer type corresponding to
the Serial
type is SerialT
. SerialT m a
represents a stream of values
of type a
in some underlying monad m
. For example, SerialT IO Int
is a
stream of Int
in IO
monad. In fact, the type Serial
is a synonym for
SerialT IO
.
Similarly we have monad transformer types for other stream types as well viz.
WSerialT
, AsyncT
, WAsyncT
and ParallelT
.
To lift a value from an underlying monad in a monad transformer stack into a
singleton stream use lift
and to lift from an IO action use liftIO
.
> S.drain
$ liftIO $ putStrLn "Hello world!" Hello world! > S.drain
$ lift $ putStrLn "Hello world!" Hello world!
Concurrent Programming
When writing concurrent programs there are two distinct places where the programmer can control the concurrency. First, when composing a stream by merging multiple streams we can choose an appropriate sum style operators to combine them concurrently or serially. Second, when processing a stream in a monadic composition we can choose one of the monad composition types to choose the desired type of concurrency.
In the following example the squares of x
and y
are computed
concurrently using the async
operation and the square roots of their
sum are computed serially because of the streamly
combinator. We can
choose different combinators for the monadic processing and the stream
generation, to control the concurrency. We can also use the asyncly
combinator instead of explicitly folding with async
.
import Streamly import qualified Streamly.Prelude as S import Data.List (sum) main = do z <- S.toList
$serially
-- Serial monadic processing (sqrt below) $ do x2 <-forEachWith
async
[1..100] $ -- Concurrent"for"
loop \x -> return $ x * x -- body of the loop y2 <-forEachWith
async
[1..100] $ \y -> return $ y * y return $ sqrt (x2 + y2) print $ sum z
We can see how this directly maps to the imperative style OpenMP model, we use combinators and operators instead of the ugly pragmas.
For more concurrent programming examples see, ListDir.hs, MergeSort.hs and SearchQuery.hs in the examples directory.
Reactive Programming
Reactive programming is nothing but concurrent streaming which is what streamly is all about. With streamly we can generate streams of events, merge streams that are generated concurrently and process events concurrently. We can do all this without any knowledge about the specifics of the implementation of concurrency. In the following example you will see that the code is just regular Haskell code without much streamly APIs used (active hyperlinks are the streamly APIs) and yet it is a reactive application.
This application has two independent and concurrent sources of event
streams, acidRain
and userAction
. acidRain
continuously generates
events that deteriorate the health of the character in the game.
userAction
can be "potion" or "quit". When the user types "potion" the
health improves and the game continues.
{-# LANGUAGE FlexibleContexts #-} import Streamly import Streamly.Prelude as S import Control.Monad (void, when) import Control.Monad.IO.Class (MonadIO(liftIO)) import Control.Monad.State (MonadState, get, modify, runStateT, put) data Event = Quit | Harm Int | Heal Int deriving (Show) userAction :: MonadAsync m =>SerialT
m Event userAction = S.repeatM
$ liftIO askUser where askUser = do command <- getLine case command of "potion" -> return (Heal 10) "harm" -> return (Harm 10) "quit" -> return Quit _ -> putStrLn "Type potion or harm or quit" >> askUser acidRain :: MonadAsync m =>SerialT
m Event acidRain =asyncly
$constRate
1 $ S.repeatM
$ liftIO $ return $ Harm 1 data Result = Check | Done runEvents :: (MonadAsync m, MonadState Int m) =>SerialT
m Result runEvents = do event <- userAction `parallel` acidRain case event of Harm n -> modify (\h -> h - n) >> return Check Heal n -> modify (\h -> h + n) >> return Check Quit -> return Done data Status = Alive | GameOver deriving Eq getStatus :: (MonadAsync m, MonadState Int m) => Result -> m Status getStatus result = case result of Done -> liftIO $ putStrLn "You quit!" >> return GameOver Check -> do h <- get liftIO $ if (h <= 0) then putStrLn "You die!" >> return GameOver else putStrLn ("Health = " <> show h) >> return Alive main :: IO () main = do putStrLn "Your health is deteriorating due to acid rain,\ \ type \"potion\" or \"quit\"" let runGame = S.drainWhile
(== Alive) $ S.mapM
getStatus runEvents void $ runStateT runGame 60
You can also find the source of this example in the examples directory as AcidRain.hs. It has been adapted from Gabriel's pipes-concurrency package. This is much simpler compared to the pipes version because of the builtin concurrency in streamly. You can also find a SDL based reactive programming example adapted from Yampa in CirclingSquare.hs.
Writing Concurrent Programs
When writing concurrent programs it is advised to not use the concurrent
style stream combinators blindly at the top level. That might create too
much concurrency where it is not even required, and can even degrade
performance in some cases. In some cases it can also lead to surprising
behavior because of some code that is supposed to be serial becoming
concurrent. Please be aware that all concurrency capable APIs that you may
have used under the scope of a concurrent stream combinator will become
concurrent. For example if you have a repeatM
somewhere in your program
and you use parallely
on top, the repeatM
becomes fully parallel,
resulting into an infinite parallel execution . Instead, use the
Keep It Serial and Stupid principle, start with the default serial
composition and enable concurrent combinators only when and where necessary.
When you use a concurrent combinator you can use an explicit serially
combinator to suppress any unnecessary concurrency under the scope of that
combinator.
Performance
Streamly is highly optimized for performance, it is designed for serious high performing, concurrent and scalable applications. We have created the streaming-benchmarks package which is specifically and carefully designed to measure the performance of Haskell streaming libraries fairly and squarely in the right way. Streamly performs at par or even better than most streaming libraries for serial operations even though it needs to deal with the concurrency capability.
Interoperation with Streaming Libraries
We can use unfoldr
and uncons
to convert one streaming type to another.
Interop with vector
:
import Streamly import qualified Streamly.Prelude as S import qualified Data.Vector.Fusion.Stream.Monadic as V -- | vector to streamly fromVector :: (IsStream t, Monad m) => V.Stream m a -> t m a fromVector = S.unfoldrM unconsV where unconsV v = do r <- V.null v if r then return Nothing else do h <- V.head v return $ Just (h, V.tail v) -- | streamly to vector toVector :: Monad m => SerialT m a -> V.Stream m a toVector = V.unfoldrM (S.uncons . adapt) main = do S.toList (fromVector (V.fromList [1..3])) >>= print V.toList (toVector (S.fromFoldable [1..3])) >>= print
Interop with pipes
:
import Streamly import qualified Streamly.Prelude as S import qualified Pipes as P import qualified Pipes.Prelude as P -- | pipes to streamly fromPipes :: (IsStream t, Monad m) => P.Producer a m r -> t m a fromPipes = S.unfoldrM
unconsP where -- Adapt P.next to return a Maybe instead of Either unconsP p = P.next p >>= either (\_ -> return Nothing) (return . Just) -- | streamly to pipes toPipes :: Monad m => SerialT m a -> P.Producer a m () toPipes = P.unfoldr unconsS where -- Adapt S.uncons to return an Either instead of Maybe unconsS s = S.uncons
s >>= maybe (return $ Left ()) (return . Right) main = do S.toList
(fromPipes (P.each [1..3])) >>= print P.toListM (toPipes (S.fromFoldable
[1..3])) >>= print
Interop with streaming
:
import Streamly
import qualified Streamly.Prelude as S
import qualified Streaming as SG
import qualified Streaming.Prelude as SG
-- | streaming to streamly
fromStreaming :: (IsStream t, MonadAsync m) => SG.Stream (SG.Of a) m r -> t m a
fromStreaming = S.unfoldrM SG.uncons
-- | streamly to streaming
toStreaming :: Monad m => SerialT m a -> SG.Stream (SG.Of a) m ()
toStreaming = SG.unfoldr unconsS
where
-- Adapt S.uncons to return an Either instead of Maybe
unconsS s = S.uncons
s >>= maybe (return $ Left ()) (return . Right)
main = do
S.toList (fromStreaming (SG.each [1..3])) >>= print
SG.toList (toStreaming (S.fromFoldable [1..3])) >>= print
Interop with conduit
:
import Streamly import qualified Streamly.Prelude as S import qualified Data.Conduit as C import qualified Data.Conduit.List as C import qualified Data.Conduit.Combinators as C -- It seems there is no way out of a conduit as it does not provide an -- uncons or a tail function. We can convert streamly to conduit though. -- | streamly to conduit toConduit :: Monad m => SerialT m a -> C.ConduitT i a m () toConduit s = C.unfoldM S.uncons
s main = do C.runConduit (toConduit (S.fromFoldable
[1..3]) C..| C.sinkList) >>= print
Comparison with Existing Packages
List transformers and logic programming monads also provide a product style composition similar to streamly, however streamly generalizes it with the time dimension; allowing streams to be composed in an asynchronous and concurrent fashion in many different ways. It also provides multiple alternative ways of composing streams e.g. serial, interleaved or concurrent.
This seemingly simple addition of asynchronicity and concurrency to product style streaming composition unifies a number of disparate abstractions into one powerful, concise and elegant abstraction. A wide variety of programming problems can be solved elegantly with this abstraction. In particular, it unifies three major programming domains namely non-deterministic (logic) programming, concurrent programming and functional reactive programming. In other words, you can do everything with this one abstraction that you could do with the popular libraries listed under these categories in the list below.
+-----------------+----------------+ | Non-determinism | pipes | | +----------------+ | | list-t | | +----------------+ | | logict | +-----------------+----------------+ | Streaming | vector | | +----------------+ | | streaming | | +----------------+ | | pipes | | +----------------+ | | conduit | +-----------------+----------------+ | Concurrency | async | | +----------------+ | | transient | +-----------------+----------------+ | FRP | Yampa | | +----------------+ | | dunai | | +----------------+ | | reflex | +-----------------+----------------+
Streamly is a list-transformer. It provides all the functionality provided by any of the list transformer and logic programming packages listed above. In addition, Streamly naturally integrates the concurrency dimension to the basic list transformer functionality.
When it comes to streaming, in terms of the streaming API streamly is almost identical to the vector package. Streamly, vector and streaming packages all represent a stream as data and are therefore similar in the fundamental approach to streaming. The fundamental difference is that streamly adds concurrency support and the monad instance provides concurrent looping. Other streaming libraries like pipes, conduit and machines represent and compose stream processors rather than the stream data and therefore fall in another class of streaming libraries and have comparatively more complicated types.
When it comes to concurrency, streamly can do everything that the async
package can do and more. async provides applicative concurrency whereas
streamly provides both applicative and monadic concurrency. The
ZipAsync
type behaves like the applicative instance of async. In
comparison to transient streamly has a first class streaming interface and
is a monad transformer that can be used universally in any Haskell monad
transformer stack. Streamly was in fact originally inspired by the
concurrency implementation in transient
though it has no resemblance with
that and takes a lazy pull approach versus transient's strict push approach.
The non-determinism, concurrency and streaming combination make streamly a
strong reactive programming library as well. Reactive programming is
fundamentally stream of events that can be processed concurrently. The
example in this tutorial as well as the
CirclingSquare example from Yampa demonstrate
the basic reactive capability of streamly. In core concepts streamly is
strikingly similar to dunai
. dunai was designed from a FRP perspective
and streamly was originally designed from a concurrency perspective.
However, both have similarity at the core.
Where to go next?
- Read the documentation of Streamly module
- Read the documentation of Streamly.Prelude module
- See the examples in the "examples" directory of the package
- See the tests in the "test" directory of the package