Copyright | Copyright (c) 2009-2013, David Sorokin <david.sorokin@gmail.com> |
---|---|
License | BSD3 |
Maintainer | David Sorokin <david.sorokin@gmail.com> |
Stability | experimental |
Safe Haskell | Safe-Inferred |
Language | Haskell98 |
Tested with: GHC 7.6.3
The infinite stream of data in time.
- newtype Stream a = Cons {}
- emptyStream :: Stream a
- mergeStreams :: Stream a -> Stream a -> Stream a
- mergeQueuedStreams :: EnqueueStrategy s q => s -> Stream a -> Stream a -> Stream a
- mergePriorityStreams :: PriorityQueueStrategy s q p => s -> Stream (p, a) -> Stream (p, a) -> Stream a
- concatStreams :: [Stream a] -> Stream a
- concatQueuedStreams :: EnqueueStrategy s q => s -> [Stream a] -> Stream a
- concatPriorityStreams :: PriorityQueueStrategy s q p => s -> [Stream (p, a)] -> Stream a
- splitStream :: Int -> Stream a -> Simulation [Stream a]
- splitStreamQueuing :: EnqueueStrategy s q => s -> Int -> Stream a -> Simulation [Stream a]
- splitStreamPrioritising :: PriorityQueueStrategy s q p => s -> [Stream p] -> Stream a -> Simulation [Stream a]
- streamUsingId :: ProcessId -> Stream a -> Stream a
- prefetchStream :: Stream a -> Stream a
- memoStream :: Stream a -> Simulation (Stream a)
- zipStreamSeq :: Stream a -> Stream b -> Stream (a, b)
- zipStreamParallel :: Stream a -> Stream b -> Stream (a, b)
- zip3StreamSeq :: Stream a -> Stream b -> Stream c -> Stream (a, b, c)
- zip3StreamParallel :: Stream a -> Stream b -> Stream c -> Stream (a, b, c)
- unzipStream :: Stream (a, b) -> Simulation (Stream a, Stream b)
- streamSeq :: [Stream a] -> Stream [a]
- streamParallel :: [Stream a] -> Stream [a]
- consumeStream :: (a -> Process ()) -> Stream a -> Process ()
- sinkStream :: Stream a -> Process ()
- repeatProcess :: Process a -> Stream a
- mapStream :: (a -> b) -> Stream a -> Stream b
- mapStreamM :: (a -> Process b) -> Stream a -> Stream b
- apStreamDataFirst :: Process (a -> b) -> Stream a -> Stream b
- apStreamDataLater :: Process (a -> b) -> Stream a -> Stream b
- apStreamParallel :: Process (a -> b) -> Stream a -> Stream b
- filterStream :: (a -> Bool) -> Stream a -> Stream a
- filterStreamM :: (a -> Process Bool) -> Stream a -> Stream a
- leftStream :: Stream (Either a b) -> Stream a
- rightStream :: Stream (Either a b) -> Stream b
- replaceLeftStream :: Stream (Either a b) -> Stream c -> Stream (Either c b)
- replaceRightStream :: Stream (Either a b) -> Stream c -> Stream (Either a c)
- partitionEitherStream :: Stream (Either a b) -> Simulation (Stream a, Stream b)
Stream Type
Represents an infinite stream of data in time, some kind of the cons cell.
Merging and Splitting Stream
emptyStream :: Stream a Source
An empty stream that never returns data.
mergeStreams :: Stream a -> Stream a -> Stream a Source
Merge two streams applying the FCFS
strategy for enqueuing the input data.
:: EnqueueStrategy s q | |
=> s | the strategy applied for enqueuing the input data |
-> Stream a | the fist input stream |
-> Stream a | the second input stream |
-> Stream a | the output combined stream |
Merge two streams.
If you don't know what the strategy to apply, then you probably
need the FCFS
strategy, or function mergeStreams
that
does namely this.
:: PriorityQueueStrategy s q p | |
=> s | the strategy applied for enqueuing the input data |
-> Stream (p, a) | the fist input stream |
-> Stream (p, a) | the second input stream |
-> Stream a | the output combined stream |
Merge two priority streams.
concatStreams :: [Stream a] -> Stream a Source
Concatenate the input streams applying the FCFS
strategy and
producing one output stream.
:: EnqueueStrategy s q | |
=> s | the strategy applied for enqueuing the input data |
-> [Stream a] | the input stream |
-> Stream a | the combined output stream |
Concatenate the input streams producing one output stream.
If you don't know what the strategy to apply, then you probably
need the FCFS
strategy, or function concatStreams
that
does namely this.
:: PriorityQueueStrategy s q p | |
=> s | the strategy applied for enqueuing the input data |
-> [Stream (p, a)] | the input stream |
-> Stream a | the combined output stream |
Concatenate the input priority streams producing one output stream.
splitStream :: Int -> Stream a -> Simulation [Stream a] Source
Split the input stream into the specified number of output streams
after applying the FCFS
strategy for enqueuing the output requests.
:: EnqueueStrategy s q | |
=> s | the strategy applied for enqueuing the output requests |
-> Int | the number of output streams |
-> Stream a | the input stream |
-> Simulation [Stream a] | the splitted output streams |
Split the input stream into the specified number of output streams.
If you don't know what the strategy to apply, then you probably
need the FCFS
strategy, or function splitStream
that
does namely this.
splitStreamPrioritising Source
:: PriorityQueueStrategy s q p | |
=> s | the strategy applied for enqueuing the output requests |
-> [Stream p] | the streams of priorities |
-> Stream a | the input stream |
-> Simulation [Stream a] | the splitted output streams |
Split the input stream into a list of output streams using the specified priorities.
Specifying Identifier
streamUsingId :: ProcessId -> Stream a -> Stream a Source
Create a stream that will use the specified process identifier.
It can be useful to refer to the underlying Process
computation which
can be passivated, interrupted, canceled and so on. See also the
processUsingId
function for more details.
Prefetching Stream
prefetchStream :: Stream a -> Stream a Source
Prefetch the input stream requesting for one more data item in advance while the last received item is not yet fully processed in the chain of streams, usually by the processors.
You can think of this as the prefetched stream could place its latest data item in some temporary space for later use, which is very useful for modeling a sequence of separate and independent work places.
Memoizing, Zipping and Uzipping Stream
memoStream :: Stream a -> Simulation (Stream a) Source
Memoize the stream so that it would always return the same data within the simulation run.
zipStreamSeq :: Stream a -> Stream b -> Stream (a, b) Source
Zip two streams trying to get data sequentially.
zipStreamParallel :: Stream a -> Stream b -> Stream (a, b) Source
Zip two streams trying to get data as soon as possible, launching the sub-processes in parallel.
zip3StreamSeq :: Stream a -> Stream b -> Stream c -> Stream (a, b, c) Source
Zip three streams trying to get data sequentially.
zip3StreamParallel :: Stream a -> Stream b -> Stream c -> Stream (a, b, c) Source
Zip three streams trying to get data as soon as possible, launching the sub-processes in parallel.
unzipStream :: Stream (a, b) -> Simulation (Stream a, Stream b) Source
Unzip the stream.
streamSeq :: [Stream a] -> Stream [a] Source
To form each new portion of data for the output stream, read data sequentially from the input streams.
This is a generalization of zipStreamSeq
.
streamParallel :: [Stream a] -> Stream [a] Source
To form each new portion of data for the output stream, read data from the input streams in parallel.
This is a generalization of zipStreamParallel
.
Consuming and Sinking Stream
consumeStream :: (a -> Process ()) -> Stream a -> Process () Source
Consume the stream. It returns a process that infinitely reads data from the stream and then redirects them to the provided function. It is useful for modeling the process of enqueueing data in the queue from the input stream.
sinkStream :: Stream a -> Process () Source
Sink the stream. It returns a process that infinitely reads data from the stream. The resulting computation can be a moving force to simulate the whole system of the interconnected streams and processors.
Useful Combinators
repeatProcess :: Process a -> Stream a Source
Return a stream of values generated by the specified process.
mapStream :: (a -> b) -> Stream a -> Stream b Source
Map the stream according the specified function.
mapStreamM :: (a -> Process b) -> Stream a -> Stream b Source
Compose the stream.
apStreamDataFirst :: Process (a -> b) -> Stream a -> Stream b Source
Transform the stream getting the transformation function after data have come.
apStreamDataLater :: Process (a -> b) -> Stream a -> Stream b Source
Transform the stream getting the transformation function before requesting for data.
apStreamParallel :: Process (a -> b) -> Stream a -> Stream b Source
Transform the stream trying to get the transformation function as soon as possible at the same time when requesting for the next portion of data.
filterStream :: (a -> Bool) -> Stream a -> Stream a Source
Filter only those data values that satisfy to the specified predicate.
filterStreamM :: (a -> Process Bool) -> Stream a -> Stream a Source
Filter only those data values that satisfy to the specified predicate.
Utilities
replaceLeftStream :: Stream (Either a b) -> Stream c -> Stream (Either c b) Source
Replace the Left
values.
replaceRightStream :: Stream (Either a b) -> Stream c -> Stream (Either a c) Source
Replace the Right
values.
partitionEitherStream :: Stream (Either a b) -> Simulation (Stream a, Stream b) Source
Partition the stream of Either
values into two streams.