Copyright | Copyright (c) 2009-2013, David Sorokin <david.sorokin@gmail.com> |
---|---|
License | BSD3 |
Maintainer | David Sorokin <david.sorokin@gmail.com> |
Stability | experimental |
Safe Haskell | Safe-Inferred |
Language | Haskell98 |
Tested with: GHC 7.6.3
The processor of simulation data.
- newtype Processor a b = Processor {
- runProcessor :: Stream a -> Stream b
- emptyProcessor :: Processor a b
- arrProcessor :: (a -> Process b) -> Processor a b
- accumProcessor :: (acc -> a -> Process (acc, b)) -> acc -> Processor a b
- simpleProcessor :: (a -> Process b) -> Processor a b
- statefulProcessor :: s -> ((s, a) -> Process (s, b)) -> Processor a b
- processorUsingId :: ProcessId -> Processor a b -> Processor a b
- prefetchProcessor :: Processor a a
- bufferProcessor :: (Stream a -> Process ()) -> Stream b -> Processor a b
- bufferProcessorLoop :: (Stream a -> Stream c -> Process ()) -> Stream d -> Processor d (Either e b) -> Processor e c -> Processor a b
- queueProcessor :: (a -> Process ()) -> Process b -> Processor a b
- queueProcessorLoopMerging :: (Stream a -> Stream d -> Stream e) -> (e -> Process ()) -> Process c -> Processor c (Either f b) -> Processor f d -> Processor a b
- queueProcessorLoopSeq :: (a -> Process ()) -> Process c -> Processor c (Either e b) -> Processor e a -> Processor a b
- queueProcessorLoopParallel :: (a -> Process ()) -> Process c -> Processor c (Either e b) -> Processor e a -> Processor a b
- processorSeq :: [Processor a a] -> Processor a a
- processorParallel :: [Processor a b] -> Processor a b
- processorQueuedParallel :: (EnqueueStrategy si qi, EnqueueStrategy so qo) => si -> so -> [Processor a b] -> Processor a b
- processorPrioritisingOutputParallel :: (EnqueueStrategy si qi, PriorityQueueStrategy so qo po) => si -> so -> [Processor a (po, b)] -> Processor a b
- processorPrioritisingInputParallel :: (PriorityQueueStrategy si qi pi, EnqueueStrategy so qo) => si -> so -> [(Stream pi, Processor a b)] -> Processor a b
- processorPrioritisingInputOutputParallel :: (PriorityQueueStrategy si qi pi, PriorityQueueStrategy so qo po) => si -> so -> [(Stream pi, Processor a (po, b))] -> Processor a b
- arrivalProcessor :: Processor a (Arrival a)
- signalProcessor :: (Signal a -> Signal b) -> Processor a b
- processorSignaling :: Processor a b -> Signal a -> Process (Signal b)
Processor Type
Represents a processor of simulation data.
Processor | |
|
Processor Primitives
emptyProcessor :: Processor a b Source
A processor that never finishes its work producing an emptyStream
.
arrProcessor :: (a -> Process b) -> Processor a b Source
Create a simple processor by the specified handling function that runs the discontinuous process for each input value to get the output.
accumProcessor :: (acc -> a -> Process (acc, b)) -> acc -> Processor a b Source
Accumulator that outputs a value determined by the supplied function.
simpleProcessor :: (a -> Process b) -> Processor a b Source
Deprecated: Use arrProcessor instead
Create a simple processor by the specified handling function that runs the discontinuous process for each input value to get the output.
statefulProcessor :: s -> ((s, a) -> Process (s, b)) -> Processor a b Source
Deprecated: Use accumProcessor instead
Like simpleProcessor
but allows creating a processor that has a state
which is passed in to every new iteration.
Specifying Identifier
processorUsingId :: ProcessId -> Processor a b -> Processor a b Source
Create a processor that will use the specified process identifier.
It can be useful to refer to the underlying Process
computation which
can be passivated, interrupted, canceled and so on. See also the
processUsingId
function for more details.
Prefetch Processor
prefetchProcessor :: Processor a a Source
This is a prefetch processor that requests for one more data item from the input in advance while the latest item is not yet fully processed in the chain of streams, usually by other processors.
You can think of this as the prefetched processor could place its latest data item in some temporary space for later use, which is very useful for modeling a sequence of separate and independent work places.
Buffer Processor
:: (Stream a -> Process ()) | a separate process to consume the input |
-> Stream b | the resulting stream of data |
-> Processor a b |
Create a buffer processor, where the process from the first argument consumes the input stream but the stream passed in as the second argument and produced usually by some other process is returned as an output. This kind of processor is very useful for modeling the queues.
:: (Stream a -> Stream c -> Process ()) | consume two streams: the input values of type |
-> Stream d | the stream of data that may become results |
-> Processor d (Either e b) | process and then decide what values of type |
-> Processor e c | process in the loop and then return a value
of type |
-> Processor a b |
Like bufferProcessor
but allows creating a loop when some items
can be processed repeatedly. It is very useful for modeling the processors
with queues and loop-backs.
Processing Queues
:: (a -> Process ()) | enqueue the input item and wait while the queue is full if required so that there were no hanging items |
-> Process b | dequeue an output item |
-> Processor a b | the buffering processor |
Return a processor with help of which we can model the queue.
Although the function doesn't refer to the queue directly, its main use case is namely a processing of the queue. The first argument should be the enqueueing operation, while the second argument should be the opposite dequeueing operation.
The reason is as follows. There are many possible combinations how the queues can be modeled. There is no sense to enumerate all them creating a separate function for each case. We can just use combinators to define exactly what we need.
So, the queue can lose the input items if the queue is full, or the input process can suspend while the queue is full, or we can use priorities for enqueueing, storing and dequeueing the items in different combinations. There are so many use cases!
There is a hope that this function along with other similar functions from this
module is sufficient to cover the most important cases. Even if it is not sufficient
then you can use a more generic function bufferProcessor
which this function is
based on. In case of need, you can even write your own function from scratch. It is
quite easy actually.
queueProcessorLoopMerging Source
:: (Stream a -> Stream d -> Stream e) | merge two streams: the input values of type |
-> (e -> Process ()) | enqueue the input item and wait while the queue is full if required so that there were no hanging items |
-> Process c | dequeue an item for the further processing |
-> Processor c (Either f b) | process and then decide what values of type |
-> Processor f d | process in the loop and then return a value
of type |
-> Processor a b | the buffering processor |
Like queueProcessor
creates a queue processor but with a loop when some items
can be processed and then added to the queue again. Also it allows specifying
how two input streams of data can be merged.
:: (a -> Process ()) | enqueue the input item and wait while the queue is full if required so that there were no hanging items |
-> Process c | dequeue an item for the further processing |
-> Processor c (Either e b) | process and then decide what values of type |
-> Processor e a | process in the loop and then return a value
of type |
-> Processor a b | the buffering processor |
Like queueProcessorLoopMerging
creates a queue processor with a loop when
some items can be processed and then added to the queue again. Only it sequentially
merges two input streams of data: one stream that come from the external source and
another stream of data returned by the loop. The first stream has a priority over
the second one.
queueProcessorLoopParallel Source
:: (a -> Process ()) | enqueue the input item and wait while the queue is full if required so that there were no hanging items |
-> Process c | dequeue an item for the further processing |
-> Processor c (Either e b) | process and then decide what values of type |
-> Processor e a | process in the loop and then return a value
of type |
-> Processor a b | the buffering processor |
Like queueProcessorLoopMerging
creates a queue processor with a loop when
some items can be processed and then added to the queue again. Only it runs two
simultaneous processes to enqueue the input streams of data: one stream that come
from the external source and another stream of data returned by the loop.
Sequencing Processors
processorSeq :: [Processor a a] -> Processor a a Source
Launches the processors sequentially using the prefetchProcessor
between them
to model an autonomous work of each of the processors specified.
Parallelizing Processors
processorParallel :: [Processor a b] -> Processor a b Source
Launches the processors in parallel consuming the same input stream and producing
a combined output stream. This version applies the FCFS
strategy both for input
and output, which suits the most part of uses cases.
processorQueuedParallel Source
:: (EnqueueStrategy si qi, EnqueueStrategy so qo) | |
=> si | the strategy applied for enqueuing the input data |
-> so | the strategy applied for enqueuing the output data |
-> [Processor a b] | the processors to parallelize |
-> Processor a b | the parallelized processor |
Launches the specified processors in parallel consuming the same input stream and producing a combined output stream.
If you don't know what the enqueue strategies to apply, then
you will probably need FCFS
for the both parameters, or
function processorParallel
that does namely this.
processorPrioritisingOutputParallel Source
:: (EnqueueStrategy si qi, PriorityQueueStrategy so qo po) | |
=> si | the strategy applied for enqueuing the input data |
-> so | the strategy applied for enqueuing the output data |
-> [Processor a (po, b)] | the processors to parallelize |
-> Processor a b | the parallelized processor |
Launches the specified processors in parallel using priorities for combining the output.
processorPrioritisingInputParallel Source
:: (PriorityQueueStrategy si qi pi, EnqueueStrategy so qo) | |
=> si | the strategy applied for enqueuing the input data |
-> so | the strategy applied for enqueuing the output data |
-> [(Stream pi, Processor a b)] | the streams of input priorities and the processors to parallelize |
-> Processor a b | the parallelized processor |
Launches the specified processors in parallel using priorities for consuming the intput.
processorPrioritisingInputOutputParallel Source
:: (PriorityQueueStrategy si qi pi, PriorityQueueStrategy so qo po) | |
=> si | the strategy applied for enqueuing the input data |
-> so | the strategy applied for enqueuing the output data |
-> [(Stream pi, Processor a (po, b))] | the streams of input priorities and the processors to parallelize |
-> Processor a b | the parallelized processor |
Launches the specified processors in parallel using priorities for consuming the input and combining the output.
Arrival Processor
arrivalProcessor :: Processor a (Arrival a) Source
A processor that adds the information about the time points at which the original stream items were received by demand.
Integrating with Signals
signalProcessor :: (Signal a -> Signal b) -> Processor a b Source
Convert the specified signal transform to a processor.
The processor may return data with delay as the values are requested by demand.
Consider using the arrivalSignal
function to provide with the information
about the time points at which the signal was actually triggered.
The point is that the Stream
used in the Processor
is requested outside,
while the Signal
is triggered inside. They are different by nature.
The former is passive, while the latter is active.
Cancel the processor's process to unsubscribe from the signals provided.
processorSignaling :: Processor a b -> Signal a -> Process (Signal b) Source
Convert the specified processor to a signal transform.
The processor may return data with delay as the values are requested by demand.
Consider using the arrivalSignal
function to provide with the information
about the time points at which the signal was actually triggered.
The point is that the Stream
used in the Processor
is requested outside,
while the Signal
is triggered inside. They are different by nature.
The former is passive, while the latter is active.
Cancel the returned process to unsubscribe from the signal specified.