Copyright | Ivan Lazar Miljenovic |
---|---|
License | MIT |
Maintainer | Ivan.Miljenovic@gmail.com |
Safe Haskell | None |
Language | Haskell2010 |
This module defines variants of those in Streaming.Concurrent for
use with the Withable
class, found in the streaming-with
package.
- data Buffer a
- unbounded :: Buffer a
- bounded :: Int -> Buffer a
- latest :: a -> Buffer a
- newest :: Int -> Buffer a
- withBuffer :: (Withable w, MonadBaseControl IO (WithMonad w)) => Buffer a -> (InBasket a -> WithMonad w i) -> w (OutBasket a)
- withBufferedTransform :: (Withable w, MonadBaseControl IO (WithMonad w)) => Int -> (OutBasket a -> InBasket b -> WithMonad w ab) -> (InBasket a -> WithMonad w i) -> w (OutBasket b)
- newtype InBasket a = InBasket {}
- newtype OutBasket a = OutBasket {
- receiveMsg :: STM (Maybe a)
- writeStreamBasket :: (Withable w, MonadBase IO (WithMonad w)) => Stream (Of a) (WithMonad w) r -> InBasket a -> w ()
- withStreamBasket :: (Withable w, MonadBase IO m) => OutBasket a -> w (Stream (Of a) m ())
- withMergedStreams :: (Withable w, MonadBaseControl IO (WithMonad w), MonadBase IO m, Foldable t) => Buffer a -> t (Stream (Of a) (WithMonad w) v) -> w (Stream (Of a) m ())
- withStreamMap :: (Withable w, MonadBaseControl IO (WithMonad w), MonadBase IO n) => Int -> (a -> b) -> Stream (Of a) (WithMonad w) i -> w (Stream (Of b) n ())
- withStreamMapM :: (Withable w, MonadBaseControl IO (WithMonad w), MonadBase IO n) => Int -> (a -> WithMonad w b) -> Stream (Of a) (WithMonad w) i -> w (Stream (Of b) n ())
- withStreamTransform :: (Withable w, m ~ WithMonad w, MonadBaseControl IO m, MonadBase IO n) => Int -> (Stream (Of a) m () -> Stream (Of b) m t) -> Stream (Of a) m i -> w (Stream (Of b) n ())
- joinBuffers :: MonadBase IO m => (a -> b) -> OutBasket a -> InBasket b -> m ()
- joinBuffersM :: MonadBase IO m => (a -> m b) -> OutBasket a -> InBasket b -> m ()
- joinBuffersStream :: MonadBase IO m => (Stream (Of a) m () -> Stream (Of b) m t) -> OutBasket a -> InBasket b -> m ()
Buffers
bounded :: Int -> Buffer a Source #
Store a bounded number of messages, specified by the Int
argument.
A buffer size <= 0
will result in a permanently empty buffer,
which could result in a system that hangs.
latest :: a -> Buffer a Source #
Only store the "latest" message, beginning with an initial value.
This buffer is never empty nor full; as such, it is up to the
caller to ensure they only take as many values as they need
(e.g. using
as the final
parameter to print
. readStreamBasket
withBuffer
will -- after all other values are
processed -- keep printing the last value over and over again).
Using a buffer
withBuffer :: (Withable w, MonadBaseControl IO (WithMonad w)) => Buffer a -> (InBasket a -> WithMonad w i) -> w (OutBasket a) Source #
Use a buffer to asynchronously communicate.
Two functions are taken as parameters:
- How to provide input to the buffer (the result of this is discarded)
- How to take values from the buffer
As soon as one function indicates that it is complete then the other is terminated. This is safe: trying to write data to a closed buffer will not achieve anything.
However, reading a buffer that has not indicated that it is closed (e.g. waiting on an action to complete to be able to provide the next value) but contains no values will block.
withBufferedTransform Source #
:: (Withable w, MonadBaseControl IO (WithMonad w)) | |
=> Int | How many concurrent computations to run. |
-> (OutBasket a -> InBasket b -> WithMonad w ab) | What to do with each individual concurrent computation; result is ignored. |
-> (InBasket a -> WithMonad w i) | Provide initial data; result is ignored. |
-> w (OutBasket b) |
Use buffers to concurrently transform the provided data.
In essence, this is a demultiplexer -> multiplexer
transformation: the incoming data is split into n
individual
segments, the results of which are then merged back together
again.
Note: ordering of elements in the output is undeterministic.
Since: 0.2.0.0
An exhaustible source of values.
receiveMsg
returns Nothing
if the source is exhausted.
OutBasket | |
|
Stream support
writeStreamBasket :: (Withable w, MonadBase IO (WithMonad w)) => Stream (Of a) (WithMonad w) r -> InBasket a -> w () Source #
Write a single stream to a buffer.
Type written to make it easier if this is the only stream being written to the buffer.
withStreamBasket :: (Withable w, MonadBase IO m) => OutBasket a -> w (Stream (Of a) m ()) Source #
Read the output of a buffer into a stream.
Note that there is no requirement that m ~ WithMonad w
.
Since: 0.2.0.0
withMergedStreams :: (Withable w, MonadBaseControl IO (WithMonad w), MonadBase IO m, Foldable t) => Buffer a -> t (Stream (Of a) (WithMonad w) v) -> w (Stream (Of a) m ()) Source #
Concurrently merge multiple streams together.
The resulting order is unspecified.
Since: 0.2.0.0
Mapping
These functions provide (concurrency-based rather than parallelism-based) pseudo-equivalents to parMap.
Note however that in practice, these seem to be no better than - and
indeed often worse - than using map
and mapM
. A benchmarking
suite is available with this library that tries to compare different
scenarios.
These implementations try to be relatively conservative in terms of
memory usage; it is possible to get better performance by using an
unbounded
Buffer
but if you feed elements into a Buffer
much
faster than you can consume them then memory usage will increase.
The "Primitives" available below can assist you with defining your
own custom mapping function in conjunction with
withBufferedTransform
.
:: (Withable w, MonadBaseControl IO (WithMonad w), MonadBase IO n) | |
=> Int | How many concurrent computations to run. |
-> (a -> b) | |
-> Stream (Of a) (WithMonad w) i | |
-> w (Stream (Of b) n ()) |
Concurrently map a function over all elements of a Stream
.
Note: ordering of elements in the output is undeterministic.
Since: 0.2.0.0
:: (Withable w, MonadBaseControl IO (WithMonad w), MonadBase IO n) | |
=> Int | How many concurrent computations to run. |
-> (a -> WithMonad w b) | |
-> Stream (Of a) (WithMonad w) i | |
-> w (Stream (Of b) n ()) |
Concurrently map a monadic function over all elements of a
Stream
.
Note: ordering of elements in the output is undeterministic.
Since: 0.2.0.0
:: (Withable w, m ~ WithMonad w, MonadBaseControl IO m, MonadBase IO n) | |
=> Int | How many concurrent computations to run. |
-> (Stream (Of a) m () -> Stream (Of b) m t) | |
-> Stream (Of a) m i | |
-> w (Stream (Of b) n ()) |
Concurrently split the provided stream into n
streams and
transform them all using the provided function.
Note: ordering of elements in the output is undeterministic.
Since: 0.2.0.0
Primitives
joinBuffers :: MonadBase IO m => (a -> b) -> OutBasket a -> InBasket b -> m () Source #
Take an item out of one Buffer
, apply a function to it and then
place it into another 'Buffer.
Since: 0.3.1.0
joinBuffersM :: MonadBase IO m => (a -> m b) -> OutBasket a -> InBasket b -> m () Source #
As with joinBuffers
but apply a monadic function.
Since: 0.3.1.0