aivika-transformers-5.7: Transformers for the Aivika simulation library

CopyrightCopyright (c) 2009-2017 David Sorokin <david.sorokin@gmail.com>
LicenseBSD3
MaintainerDavid Sorokin <david.sorokin@gmail.com>
Stabilityexperimental
Safe HaskellNone
LanguageHaskell2010

Simulation.Aivika.Trans.Processor

Contents

Description

Tested with: GHC 8.0.1

The processor of simulation data.

Synopsis

Processor Type

newtype Processor m a b Source #

Represents a processor of simulation data.

Constructors

Processor 

Fields

Instances

MonadDES m => Arrow (Processor m) Source # 

Methods

arr :: (b -> c) -> Processor m b c #

first :: Processor m b c -> Processor m (b, d) (c, d) #

second :: Processor m b c -> Processor m (d, b) (d, c) #

(***) :: Processor m b c -> Processor m b' c' -> Processor m (b, b') (c, c') #

(&&&) :: Processor m b c -> Processor m b c' -> Processor m b (c, c') #

MonadDES m => ArrowZero (Processor m) Source # 

Methods

zeroArrow :: Processor m b c #

MonadDES m => ArrowPlus (Processor m) Source # 

Methods

(<+>) :: Processor m b c -> Processor m b c -> Processor m b c #

MonadDES m => ArrowChoice (Processor m) Source # 

Methods

left :: Processor m b c -> Processor m (Either b d) (Either c d) #

right :: Processor m b c -> Processor m (Either d b) (Either d c) #

(+++) :: Processor m b c -> Processor m b' c' -> Processor m (Either b b') (Either c c') #

(|||) :: Processor m b d -> Processor m c d -> Processor m (Either b c) d #

Category * (Processor m) Source # 

Methods

id :: cat a a #

(.) :: cat b c -> cat a b -> cat a c #

Processor Primitives

emptyProcessor :: MonadDES m => Processor m a b Source #

A processor that never finishes its work producing an emptyStream.

arrProcessor :: MonadDES m => (a -> Process m b) -> Processor m a b Source #

Create a simple processor by the specified handling function that runs the discontinuous process for each input value to get the output.

accumProcessor :: MonadDES m => (acc -> a -> Process m (acc, b)) -> acc -> Processor m a b Source #

Accumulator that outputs a value determined by the supplied function.

withinProcessor :: MonadDES m => Process m () -> Processor m a a Source #

Involve the computation with side effect when processing a stream of data.

Specifying Identifier

processorUsingId :: MonadDES m => ProcessId m -> Processor m a b -> Processor m a b Source #

Create a processor that will use the specified process identifier. It can be useful to refer to the underlying Process computation which can be passivated, interrupted, canceled and so on. See also the processUsingId function for more details.

Prefetch and Delay Processors

prefetchProcessor :: MonadDES m => Processor m a a Source #

This is a prefetch processor that requests for one more data item from the input in advance while the latest item is not yet fully processed in the chain of streams, usually by other processors.

You can think of this as the prefetched processor could place its latest data item in some temporary space for later use, which is very useful for modeling a sequence of separate and independent work places.

delayProcessor :: MonadDES m => a -> Processor m a a Source #

A processor that delays the input stream by one step using the specified initial value.

Buffer Processor

bufferProcessor Source #

Arguments

:: MonadDES m 
=> (Stream m a -> Process m ())

a separate process to consume the input

-> Stream m b

the resulting stream of data

-> Processor m a b 

Create a buffer processor, where the process from the first argument consumes the input stream but the stream passed in as the second argument and produced usually by some other process is returned as an output. This kind of processor is very useful for modeling the queues.

bufferProcessorLoop Source #

Arguments

:: MonadDES m 
=> (Stream m a -> Stream m c -> Process m ())

consume two streams: the input values of type a and the values of type c returned by the loop

-> Stream m d

the stream of data that may become results

-> Processor m d (Either e b)

process and then decide what values of type e should be processed in the loop (this is a condition)

-> Processor m e c

process in the loop and then return a value of type c to the input again (this is a loop body)

-> Processor m a b 

Like bufferProcessor but allows creating a loop when some items can be processed repeatedly. It is very useful for modeling the processors with queues and loop-backs.

Processing Queues

queueProcessor Source #

Arguments

:: MonadDES m 
=> (a -> Process m ())

enqueue the input item and wait while the queue is full if required so that there were no hanging items

-> Process m b

dequeue an output item

-> Processor m a b

the buffering processor

Return a processor with help of which we can model the queue.

Although the function doesn't refer to the queue directly, its main use case is namely a processing of the queue. The first argument should be the enqueueing operation, while the second argument should be the opposite dequeueing operation.

The reason is as follows. There are many possible combinations how the queues can be modeled. There is no sense to enumerate all them creating a separate function for each case. We can just use combinators to define exactly what we need.

So, the queue can lose the input items if the queue is full, or the input process can suspend while the queue is full, or we can use priorities for enqueueing, storing and dequeueing the items in different combinations. There are so many use cases!

There is a hope that this function along with other similar functions from this module is sufficient to cover the most important cases. Even if it is not sufficient then you can use a more generic function bufferProcessor which this function is based on. In case of need, you can even write your own function from scratch. It is quite easy actually.

queueProcessorLoopMerging Source #

Arguments

:: MonadDES m 
=> (Stream m a -> Stream m d -> Stream m e)

merge two streams: the input values of type a and the values of type d returned by the loop

-> (e -> Process m ())

enqueue the input item and wait while the queue is full if required so that there were no hanging items

-> Process m c

dequeue an item for the further processing

-> Processor m c (Either f b)

process and then decide what values of type f should be processed in the loop (this is a condition)

-> Processor m f d

process in the loop and then return a value of type d to the queue again (this is a loop body)

-> Processor m a b

the buffering processor

Like queueProcessor creates a queue processor but with a loop when some items can be processed and then added to the queue again. Also it allows specifying how two input streams of data can be merged.

queueProcessorLoopSeq Source #

Arguments

:: MonadDES m 
=> (a -> Process m ())

enqueue the input item and wait while the queue is full if required so that there were no hanging items

-> Process m c

dequeue an item for the further processing

-> Processor m c (Either e b)

process and then decide what values of type e should be processed in the loop (this is a condition)

-> Processor m e a

process in the loop and then return a value of type a to the queue again (this is a loop body)

-> Processor m a b

the buffering processor

Like queueProcessorLoopMerging creates a queue processor with a loop when some items can be processed and then added to the queue again. Only it sequentially merges two input streams of data: one stream that come from the external source and another stream of data returned by the loop. The first stream has a priority over the second one.

queueProcessorLoopParallel Source #

Arguments

:: MonadDES m 
=> (a -> Process m ())

enqueue the input item and wait while the queue is full if required so that there were no hanging items

-> Process m c

dequeue an item for the further processing

-> Processor m c (Either e b)

process and then decide what values of type e should be processed in the loop (this is a condition)

-> Processor m e a

process in the loop and then return a value of type a to the queue again (this is a loop body)

-> Processor m a b

the buffering processor

Like queueProcessorLoopMerging creates a queue processor with a loop when some items can be processed and then added to the queue again. Only it runs two simultaneous processes to enqueue the input streams of data: one stream that come from the external source and another stream of data returned by the loop.

Sequencing Processors

processorSeq :: MonadDES m => [Processor m a a] -> Processor m a a Source #

Launches the processors sequentially using the prefetchProcessor between them to model an autonomous work of each of the processors specified.

Parallelizing Processors

processorParallel :: MonadDES m => [Processor m a b] -> Processor m a b Source #

Launches the processors in parallel consuming the same input stream and producing a combined output stream. This version applies the FCFS strategy both for input and output, which suits the most part of uses cases.

processorQueuedParallel Source #

Arguments

:: (MonadDES m, EnqueueStrategy m si, EnqueueStrategy m so) 
=> si

the strategy applied for enqueuing the input data

-> so

the strategy applied for enqueuing the output data

-> [Processor m a b]

the processors to parallelize

-> Processor m a b

the parallelized processor

Launches the specified processors in parallel consuming the same input stream and producing a combined output stream.

If you don't know what the enqueue strategies to apply, then you will probably need FCFS for the both parameters, or function processorParallel that does namely this.

processorPrioritisingOutputParallel Source #

Arguments

:: (MonadDES m, EnqueueStrategy m si, PriorityQueueStrategy m so po) 
=> si

the strategy applied for enqueuing the input data

-> so

the strategy applied for enqueuing the output data

-> [Processor m a (po, b)]

the processors to parallelize

-> Processor m a b

the parallelized processor

Launches the specified processors in parallel using priorities for combining the output.

processorPrioritisingInputParallel Source #

Arguments

:: (MonadDES m, PriorityQueueStrategy m si pi, EnqueueStrategy m so) 
=> si

the strategy applied for enqueuing the input data

-> so

the strategy applied for enqueuing the output data

-> [(Stream m pi, Processor m a b)]

the streams of input priorities and the processors to parallelize

-> Processor m a b

the parallelized processor

Launches the specified processors in parallel using priorities for consuming the intput.

processorPrioritisingInputOutputParallel Source #

Arguments

:: (MonadDES m, PriorityQueueStrategy m si pi, PriorityQueueStrategy m so po) 
=> si

the strategy applied for enqueuing the input data

-> so

the strategy applied for enqueuing the output data

-> [(Stream m pi, Processor m a (po, b))]

the streams of input priorities and the processors to parallelize

-> Processor m a b

the parallelized processor

Launches the specified processors in parallel using priorities for consuming the input and combining the output.

Arrival Processor

arrivalProcessor :: MonadDES m => Processor m a (Arrival a) Source #

A processor that adds the information about the time points at which the original stream items were received by demand.

Utilities

joinProcessor :: MonadDES m => Process m (Processor m a b) -> Processor m a b Source #

Removes one level of the computation, projecting its bound processor into the outer level.

Failover

failoverProcessor :: MonadDES m => [Processor m a b] -> Processor m a b Source #

Takes the next processor from the list after the current processor fails because of cancelling the underlying process.

Integrating with Signals and Channels

channelProcessor :: MonadDES m => Channel m a b -> Processor m a b Source #

Convert the specified signal transform, i.e. the channel, to a processor.

The processor may return data with delay as the values are requested by demand. Consider using the arrivalSignal function to provide with the information about the time points at which the signal was actually triggered.

The point is that the Stream used in the Processor is requested outside, while the Signal used in the Channel is triggered inside. They are different by nature. The former is passive, while the latter is active.

The resulting processor may be a root of space leak as it uses an internal queue to store the values received from the input signal. Consider using queuedChannelProcessor that allows specifying the bounded queue in case of need.

processorChannel :: MonadDES m => Processor m a b -> Channel m a b Source #

Convert the specified processor to a signal transform, i.e. the channel.

The processor may return data with delay as the values are requested by demand. Consider using the arrivalSignal function to provide with the information about the time points at which the signal was actually triggered.

The point is that the Stream used in the Processor is requested outside, while the Signal used in Channel is triggered inside. They are different by nature. The former is passive, while the latter is active.

The resulting channel may be a root of space leak as it uses an internal queue to store the values received from the input stream. Consider using queuedProcessorChannel that allows specifying the bounded queue in case of need.

queuedChannelProcessor Source #

Arguments

:: MonadDES m 
=> (b -> Event m ())

enqueue

-> Process m b

dequeue

-> Channel m a b

the channel

-> Processor m a b

the processor

Like channelProcessor but allows specifying an arbitrary queue for storing the signal values, for example, the bounded queue.

queuedProcessorChannel Source #

Arguments

:: MonadDES m 
=> (a -> Event m ())

enqueue

-> Process m a

dequeue

-> Processor m a b

the processor

-> Channel m a b

the channel

Like processorChannel but allows specifying an arbitrary queue for storing the signal values, for example, the bounded queue.

Debugging

traceProcessor Source #

Arguments

:: MonadDES m 
=> Maybe String

the request message

-> Maybe String

the response message

-> Processor m a b

a processor

-> Processor m a b 

Show the debug messages with the current simulation time.