aivika-4.1.1: A multi-paradigm simulation library

CopyrightCopyright (c) 2009-2015, David Sorokin <david.sorokin@gmail.com>
LicenseBSD3
MaintainerDavid Sorokin <david.sorokin@gmail.com>
Stabilityexperimental
Safe HaskellNone
LanguageHaskell2010

Simulation.Aivika.Stream

Contents

Description

Tested with: GHC 7.10.1

The infinite stream of data in time.

Synopsis

Stream Type

newtype Stream a Source

Represents an infinite stream of data in time, some kind of the cons cell.

Constructors

Cons 

Fields

runStream :: Process (a, Stream a)

Run the stream.

Merging and Splitting Stream

emptyStream :: Stream a Source

An empty stream that never returns data.

mergeStreams :: Stream a -> Stream a -> Stream a Source

Merge two streams applying the FCFS strategy for enqueuing the input data.

mergeQueuedStreams Source

Arguments

:: EnqueueStrategy s 
=> s

the strategy applied for enqueuing the input data

-> Stream a

the fist input stream

-> Stream a

the second input stream

-> Stream a

the output combined stream

Merge two streams.

If you don't know what the strategy to apply, then you probably need the FCFS strategy, or function mergeStreams that does namely this.

mergePriorityStreams Source

Arguments

:: PriorityQueueStrategy s p 
=> s

the strategy applied for enqueuing the input data

-> Stream (p, a)

the fist input stream

-> Stream (p, a)

the second input stream

-> Stream a

the output combined stream

Merge two priority streams.

concatStreams :: [Stream a] -> Stream a Source

Concatenate the input streams applying the FCFS strategy and producing one output stream.

concatQueuedStreams Source

Arguments

:: EnqueueStrategy s 
=> s

the strategy applied for enqueuing the input data

-> [Stream a]

the input stream

-> Stream a

the combined output stream

Concatenate the input streams producing one output stream.

If you don't know what the strategy to apply, then you probably need the FCFS strategy, or function concatStreams that does namely this.

concatPriorityStreams Source

Arguments

:: PriorityQueueStrategy s p 
=> s

the strategy applied for enqueuing the input data

-> [Stream (p, a)]

the input stream

-> Stream a

the combined output stream

Concatenate the input priority streams producing one output stream.

splitStream :: Int -> Stream a -> Simulation [Stream a] Source

Split the input stream into the specified number of output streams after applying the FCFS strategy for enqueuing the output requests.

splitStreamQueueing Source

Arguments

:: EnqueueStrategy s 
=> s

the strategy applied for enqueuing the output requests

-> Int

the number of output streams

-> Stream a

the input stream

-> Simulation [Stream a]

the splitted output streams

Split the input stream into the specified number of output streams.

If you don't know what the strategy to apply, then you probably need the FCFS strategy, or function splitStream that does namely this.

splitStreamPrioritising Source

Arguments

:: PriorityQueueStrategy s p 
=> s

the strategy applied for enqueuing the output requests

-> [Stream p]

the streams of priorities

-> Stream a

the input stream

-> Simulation [Stream a]

the splitted output streams

Split the input stream into a list of output streams using the specified priorities.

Specifying Identifier

streamUsingId :: ProcessId -> Stream a -> Stream a Source

Create a stream that will use the specified process identifier. It can be useful to refer to the underlying Process computation which can be passivated, interrupted, canceled and so on. See also the processUsingId function for more details.

Prefetching and Delaying Stream

prefetchStream :: Stream a -> Stream a Source

Prefetch the input stream requesting for one more data item in advance while the last received item is not yet fully processed in the chain of streams, usually by the processors.

You can think of this as the prefetched stream could place its latest data item in some temporary space for later use, which is very useful for modeling a sequence of separate and independent work places.

delayStream :: a -> Stream a -> Stream a Source

Delay the stream by one step using the specified initial value.

Stream Arriving

arrivalStream :: Stream a -> Stream (Arrival a) Source

Transform a stream so that the resulting stream returns a sequence of arrivals saving the information about the time points at which the original stream items were received by demand.

Memoizing, Zipping and Uzipping Stream

memoStream :: Stream a -> Simulation (Stream a) Source

Memoize the stream so that it would always return the same data within the simulation run.

zipStreamSeq :: Stream a -> Stream b -> Stream (a, b) Source

Zip two streams trying to get data sequentially.

zipStreamParallel :: Stream a -> Stream b -> Stream (a, b) Source

Zip two streams trying to get data as soon as possible, launching the sub-processes in parallel.

zip3StreamSeq :: Stream a -> Stream b -> Stream c -> Stream (a, b, c) Source

Zip three streams trying to get data sequentially.

zip3StreamParallel :: Stream a -> Stream b -> Stream c -> Stream (a, b, c) Source

Zip three streams trying to get data as soon as possible, launching the sub-processes in parallel.

unzipStream :: Stream (a, b) -> Simulation (Stream a, Stream b) Source

Unzip the stream.

streamSeq :: [Stream a] -> Stream [a] Source

To form each new portion of data for the output stream, read data sequentially from the input streams.

This is a generalization of zipStreamSeq.

streamParallel :: [Stream a] -> Stream [a] Source

To form each new portion of data for the output stream, read data from the input streams in parallel.

This is a generalization of zipStreamParallel.

Consuming and Sinking Stream

consumeStream :: (a -> Process ()) -> Stream a -> Process () Source

Consume the stream. It returns a process that infinitely reads data from the stream and then redirects them to the provided function. It is useful for modeling the process of enqueueing data in the queue from the input stream.

sinkStream :: Stream a -> Process () Source

Sink the stream. It returns a process that infinitely reads data from the stream. The resulting computation can be a moving force to simulate the whole system of the interconnected streams and processors.

Useful Combinators

repeatProcess :: Process a -> Stream a Source

Return a stream of values generated by the specified process.

mapStream :: (a -> b) -> Stream a -> Stream b Source

Map the stream according the specified function.

mapStreamM :: (a -> Process b) -> Stream a -> Stream b Source

Compose the stream.

apStream :: Stream (a -> b) -> Stream a -> Stream b Source

Sequential application.

apStreamM :: Stream (a -> Process b) -> Stream a -> Stream b Source

Sequential application.

filterStream :: (a -> Bool) -> Stream a -> Stream a Source

Filter only those data values that satisfy to the specified predicate.

filterStreamM :: (a -> Process Bool) -> Stream a -> Stream a Source

Filter only those data values that satisfy to the specified predicate.

takeStream :: Int -> Stream a -> Stream a Source

Return the prefix of the stream of the specified length.

takeStreamWhile :: (a -> Bool) -> Stream a -> Stream a Source

Return the longest prefix of the stream of elements that satisfy the predicate.

takeStreamWhileM :: (a -> Process Bool) -> Stream a -> Stream a Source

Return the longest prefix of the stream of elements that satisfy the computation.

dropStream :: Int -> Stream a -> Stream a Source

Return the suffix of the stream after the specified first elements.

dropStreamWhile :: (a -> Bool) -> Stream a -> Stream a Source

Return the suffix of the stream of elements remaining after takeStreamWhile.

dropStreamWhileM :: (a -> Process Bool) -> Stream a -> Stream a Source

Return the suffix of the stream of elements remaining after takeStreamWhileM.

singletonStream :: a -> Stream a Source

Return a stream consisting of exactly one element and inifinite tail.

joinStream :: Process (Stream a) -> Stream a Source

Removes one level of the computation, projecting its bound stream into the outer level.

Failover

failoverStream :: [Stream a] -> Stream a Source

Takes the next stream from the list after the current stream fails because of cancelling the underlying process.

Integrating with Signals

signalStream :: Signal a -> Process (Stream a) Source

Return a stream of values triggered by the specified signal.

Since the time at which the values of the stream are requested for may differ from the time at which the signal is triggered, it can be useful to apply the arrivalSignal function to add the information about the time points at which the signal was actually received.

The point is that the Stream is requested outside, while the Signal is triggered inside. They are different by nature. The former is passive, while the latter is active.

The resulting stream may be a root of space leak as it uses an internal queue to store the values received from the signal. The oldest value is dequeued each time we request the stream and it is returned within the computation.

Cancel the stream's process to unsubscribe from the specified signal.

streamSignal :: Stream a -> Process (Signal a) Source

Return a computation of the signal that triggers values from the specified stream, each time the next value of the stream is received within the underlying Process computation.

Cancel the returned process to stop reading from the specified stream.

Utilities

leftStream :: Stream (Either a b) -> Stream a Source

The stream of Left values.

rightStream :: Stream (Either a b) -> Stream b Source

The stream of Right values.

replaceLeftStream :: Stream (Either a b) -> Stream c -> Stream (Either c b) Source

Replace the Left values.

replaceRightStream :: Stream (Either a b) -> Stream c -> Stream (Either a c) Source

Replace the Right values.

partitionEitherStream :: Stream (Either a b) -> Simulation (Stream a, Stream b) Source

Partition the stream of Either values into two streams.

Debugging

traceStream Source

Arguments

:: Maybe String

the request message

-> Maybe String

the response message

-> Stream a

a stream

-> Stream a 

Show the debug messages with the current simulation time.