aivika-transformers-4.3.2: Transformers for the Aivika simulation library

CopyrightCopyright (c) 2009-2015, David Sorokin <david.sorokin@gmail.com>
LicenseBSD3
MaintainerDavid Sorokin <david.sorokin@gmail.com>
Stabilityexperimental
Safe HaskellNone
LanguageHaskell2010

Simulation.Aivika.Trans.Processor

Contents

Description

Tested with: GHC 7.10.1

The processor of simulation data.

Synopsis

Processor Type

newtype Processor m a b Source

Represents a processor of simulation data.

Constructors

Processor 

Fields

runProcessor :: Stream m a -> Stream m b

Run the processor.

Processor Primitives

emptyProcessor :: MonadDES m => Processor m a b Source

A processor that never finishes its work producing an emptyStream.

arrProcessor :: MonadDES m => (a -> Process m b) -> Processor m a b Source

Create a simple processor by the specified handling function that runs the discontinuous process for each input value to get the output.

accumProcessor :: MonadDES m => (acc -> a -> Process m (acc, b)) -> acc -> Processor m a b Source

Accumulator that outputs a value determined by the supplied function.

withinProcessor :: MonadDES m => Process m () -> Processor m a a Source

Involve the computation with side effect when processing a stream of data.

Specifying Identifier

processorUsingId :: MonadDES m => ProcessId m -> Processor m a b -> Processor m a b Source

Create a processor that will use the specified process identifier. It can be useful to refer to the underlying Process computation which can be passivated, interrupted, canceled and so on. See also the processUsingId function for more details.

Prefetch and Delay Processors

prefetchProcessor :: MonadDES m => Processor m a a Source

This is a prefetch processor that requests for one more data item from the input in advance while the latest item is not yet fully processed in the chain of streams, usually by other processors.

You can think of this as the prefetched processor could place its latest data item in some temporary space for later use, which is very useful for modeling a sequence of separate and independent work places.

delayProcessor :: MonadDES m => a -> Processor m a a Source

A processor that delays the input stream by one step using the specified initial value.

Buffer Processor

bufferProcessor Source

Arguments

:: MonadDES m 
=> (Stream m a -> Process m ())

a separate process to consume the input

-> Stream m b

the resulting stream of data

-> Processor m a b 

Create a buffer processor, where the process from the first argument consumes the input stream but the stream passed in as the second argument and produced usually by some other process is returned as an output. This kind of processor is very useful for modeling the queues.

bufferProcessorLoop Source

Arguments

:: MonadDES m 
=> (Stream m a -> Stream m c -> Process m ())

consume two streams: the input values of type a and the values of type c returned by the loop

-> Stream m d

the stream of data that may become results

-> Processor m d (Either e b)

process and then decide what values of type e should be processed in the loop (this is a condition)

-> Processor m e c

process in the loop and then return a value of type c to the input again (this is a loop body)

-> Processor m a b 

Like bufferProcessor but allows creating a loop when some items can be processed repeatedly. It is very useful for modeling the processors with queues and loop-backs.

Processing Queues

queueProcessor Source

Arguments

:: MonadDES m 
=> (a -> Process m ())

enqueue the input item and wait while the queue is full if required so that there were no hanging items

-> Process m b

dequeue an output item

-> Processor m a b

the buffering processor

Return a processor with help of which we can model the queue.

Although the function doesn't refer to the queue directly, its main use case is namely a processing of the queue. The first argument should be the enqueueing operation, while the second argument should be the opposite dequeueing operation.

The reason is as follows. There are many possible combinations how the queues can be modeled. There is no sense to enumerate all them creating a separate function for each case. We can just use combinators to define exactly what we need.

So, the queue can lose the input items if the queue is full, or the input process can suspend while the queue is full, or we can use priorities for enqueueing, storing and dequeueing the items in different combinations. There are so many use cases!

There is a hope that this function along with other similar functions from this module is sufficient to cover the most important cases. Even if it is not sufficient then you can use a more generic function bufferProcessor which this function is based on. In case of need, you can even write your own function from scratch. It is quite easy actually.

queueProcessorLoopMerging Source

Arguments

:: MonadDES m 
=> (Stream m a -> Stream m d -> Stream m e)

merge two streams: the input values of type a and the values of type d returned by the loop

-> (e -> Process m ())

enqueue the input item and wait while the queue is full if required so that there were no hanging items

-> Process m c

dequeue an item for the further processing

-> Processor m c (Either f b)

process and then decide what values of type f should be processed in the loop (this is a condition)

-> Processor m f d

process in the loop and then return a value of type d to the queue again (this is a loop body)

-> Processor m a b

the buffering processor

Like queueProcessor creates a queue processor but with a loop when some items can be processed and then added to the queue again. Also it allows specifying how two input streams of data can be merged.

queueProcessorLoopSeq Source

Arguments

:: MonadDES m 
=> (a -> Process m ())

enqueue the input item and wait while the queue is full if required so that there were no hanging items

-> Process m c

dequeue an item for the further processing

-> Processor m c (Either e b)

process and then decide what values of type e should be processed in the loop (this is a condition)

-> Processor m e a

process in the loop and then return a value of type a to the queue again (this is a loop body)

-> Processor m a b

the buffering processor

Like queueProcessorLoopMerging creates a queue processor with a loop when some items can be processed and then added to the queue again. Only it sequentially merges two input streams of data: one stream that come from the external source and another stream of data returned by the loop. The first stream has a priority over the second one.

queueProcessorLoopParallel Source

Arguments

:: MonadDES m 
=> (a -> Process m ())

enqueue the input item and wait while the queue is full if required so that there were no hanging items

-> Process m c

dequeue an item for the further processing

-> Processor m c (Either e b)

process and then decide what values of type e should be processed in the loop (this is a condition)

-> Processor m e a

process in the loop and then return a value of type a to the queue again (this is a loop body)

-> Processor m a b

the buffering processor

Like queueProcessorLoopMerging creates a queue processor with a loop when some items can be processed and then added to the queue again. Only it runs two simultaneous processes to enqueue the input streams of data: one stream that come from the external source and another stream of data returned by the loop.

Sequencing Processors

processorSeq :: MonadDES m => [Processor m a a] -> Processor m a a Source

Launches the processors sequentially using the prefetchProcessor between them to model an autonomous work of each of the processors specified.

Parallelizing Processors

processorParallel :: MonadDES m => [Processor m a b] -> Processor m a b Source

Launches the processors in parallel consuming the same input stream and producing a combined output stream. This version applies the FCFS strategy both for input and output, which suits the most part of uses cases.

processorQueuedParallel Source

Arguments

:: (MonadDES m, EnqueueStrategy m si, EnqueueStrategy m so) 
=> si

the strategy applied for enqueuing the input data

-> so

the strategy applied for enqueuing the output data

-> [Processor m a b]

the processors to parallelize

-> Processor m a b

the parallelized processor

Launches the specified processors in parallel consuming the same input stream and producing a combined output stream.

If you don't know what the enqueue strategies to apply, then you will probably need FCFS for the both parameters, or function processorParallel that does namely this.

processorPrioritisingOutputParallel Source

Arguments

:: (MonadDES m, EnqueueStrategy m si, PriorityQueueStrategy m so po) 
=> si

the strategy applied for enqueuing the input data

-> so

the strategy applied for enqueuing the output data

-> [Processor m a (po, b)]

the processors to parallelize

-> Processor m a b

the parallelized processor

Launches the specified processors in parallel using priorities for combining the output.

processorPrioritisingInputParallel Source

Arguments

:: (MonadDES m, PriorityQueueStrategy m si pi, EnqueueStrategy m so) 
=> si

the strategy applied for enqueuing the input data

-> so

the strategy applied for enqueuing the output data

-> [(Stream m pi, Processor m a b)]

the streams of input priorities and the processors to parallelize

-> Processor m a b

the parallelized processor

Launches the specified processors in parallel using priorities for consuming the intput.

processorPrioritisingInputOutputParallel Source

Arguments

:: (MonadDES m, PriorityQueueStrategy m si pi, PriorityQueueStrategy m so po) 
=> si

the strategy applied for enqueuing the input data

-> so

the strategy applied for enqueuing the output data

-> [(Stream m pi, Processor m a (po, b))]

the streams of input priorities and the processors to parallelize

-> Processor m a b

the parallelized processor

Launches the specified processors in parallel using priorities for consuming the input and combining the output.

Arrival Processor

arrivalProcessor :: MonadDES m => Processor m a (Arrival a) Source

A processor that adds the information about the time points at which the original stream items were received by demand.

Utilities

joinProcessor :: MonadDES m => Process m (Processor m a b) -> Processor m a b Source

Removes one level of the computation, projecting its bound processor into the outer level.

Failover

failoverProcessor :: MonadDES m => [Processor m a b] -> Processor m a b Source

Takes the next processor from the list after the current processor fails because of cancelling the underlying process.

Integrating with Signals

signalProcessor :: MonadDES m => (Signal m a -> Signal m b) -> Processor m a b Source

Convert the specified signal transform to a processor.

The processor may return data with delay as the values are requested by demand. Consider using the arrivalSignal function to provide with the information about the time points at which the signal was actually triggered.

The point is that the Stream used in the Processor is requested outside, while the Signal is triggered inside. They are different by nature. The former is passive, while the latter is active.

Cancel the processor's process to unsubscribe from the signals provided.

processorSignaling :: MonadDES m => Processor m a b -> Signal m a -> Process m (Signal m b) Source

Convert the specified processor to a signal transform.

The processor may return data with delay as the values are requested by demand. Consider using the arrivalSignal function to provide with the information about the time points at which the signal was actually triggered.

The point is that the Stream used in the Processor is requested outside, while the Signal is triggered inside. They are different by nature. The former is passive, while the latter is active.

Cancel the returned process to unsubscribe from the signal specified.

Debugging

traceProcessor Source

Arguments

:: MonadDES m 
=> Maybe String

the request message

-> Maybe String

the response message

-> Processor m a b

a processor

-> Processor m a b 

Show the debug messages with the current simulation time.