Safe Haskell | Safe-Infered |
---|
Real Time Stream Processors are used to describe pipelines that process events in real time. An event is described as a (Time, Value) pair. When an RTSP receives an event it can respond by emitting zero or more events at any time at or after the receipt of the original event. Further incoming events may influence the stream of emitted events.
Piplelines of RTSPs can be built up using the '(.)' operator from the Category instance, or alternatively the '(>>>)' operator (which is merely dot with its arguments swapped). RTSPs can be run in parallel with their outputs merged using mappend from the Monoid instance.
Within the RTSP implementation all notions of "delay" and "time" merely refer to the time component of events, and are used
for event ordering. Only the execRTSP
function, which runs in the IO monad, executes any actual real-time delay.
The main data types for the application programmer are:
Event
- A value that occurs at a certain time. For instance an
might represent a key press.Event
Char RTSP
- The Real Time Stream Processor. A value of type
takes in events of typeRTSP
x yx
and emits events of typey
. RTSPs can be strung together into pipelines using(.)
(or(>>>)
if you prefer your data to flow left-to-right). RTSPs are also monoids, so you can fork your data through two parallel RTSPs and then merge the results. RTA
- A monad for building stateful RTSPs. Convert an
RTA
into anRTSP
usingexecRTA
oraccumulateRTA
depending what you want to do with pending output events when a new input event arrives.
You can test an RTSP in "fast time" (that is, without waiting for real-time delays) by using simulateRTSP
. Then you can
execute the RTSP in real time using execRTSP
and be confident that the real time behaviour will match the fast-time behaviour.
Simultaneous Events
The handling of logically simultaneous events in discrete event simulation is a long-standing problem. The three basic approaches are:
- Impose an arbitrary but deterministic order on "simultaneous" events.
- Collect the simultaneous events and pass them to the application, on the basis that the application programmer can then impose the appropriate semantics.
- Simulate all possible orderings.
This library takes the first approach. Option 2 would force each RTSP to wait for the next event to see if it was simultaneous, which is possible in a simulator but not in a real time system. In a real time system option 3 is not feasible, and would still leave the problem of which ordering to present to the outside world as the "real" one.
When two simultaneous events arrive at an RTSP, the current implementation uses the following rules:
- Simultaneous output events retain the order of the input events that triggered them. Hence simultaneous events never "overtake".
- In the case of
(id `mappend` stream (+ 1))
the output alternates the left and right expressions, starting with the left.
However these properties interact in ways that are complex, hard to define formally and not guaranteed to be stable. Code that depends on the ordering of simultaneous events should therefore be avoided.
- data Event a = Event {
- eventTime :: UTCTime
- eventValue :: a
- isBefore :: Event a -> Event b -> Bool
- data EventStream b c
- emitsBefore :: EventStream b1 c1 -> EventStream b2 c2 -> Bool
- nullStream :: EventStream b c
- esFinished :: EventStream b c -> Bool
- esPeek :: EventStream b c -> [Event c]
- esFutures :: EventStream b c -> [(Event c, EventStream b c)]
- esProcess :: Event b -> EventStream b c -> EventStream b c
- esMerge :: EventStream b c -> EventStream b c -> EventStream b c
- newtype RTSP b c = RTSP {
- runRTSP :: Event b -> EventStream b c
- simulateRTSP :: RTSP b c -> [Event b] -> [Event c]
- execRTSP :: RTSP b (IO ()) -> IO (b -> IO ())
- stream :: (b -> c) -> RTSP b c
- accumulate :: RTSP b c -> RTSP b c
- repeatEvent :: [NominalDiffTime] -> RTSP b b
- delay0 :: NominalDiffTime -> RTSP b b
- delay :: NominalDiffTime -> RTSP b b
- type Cond a b = (a -> Bool, RTSP a b)
- streamFilter :: Cond a b -> RTSP a b
- cond :: [Cond a b] -> RTSP a b
- cond1 :: [Cond a b] -> RTSP a b
- ifThenElse :: Cond a b -> RTSP a b -> RTSP a b
- data RTA s c v
- get :: RTA s c s
- put :: s -> RTA s c ()
- modify :: (s -> s) -> RTA s c ()
- emit :: c -> RTA s c ()
- pause :: NominalDiffTime -> RTA s c ()
- now :: RTA s c UTCTime
- execRTA :: s -> (b -> RTA s c Bool) -> RTSP b c
- accumulateRTA :: s -> (b -> RTA s c Bool) -> RTSP b c
Events
Real time events.
Event | |
|
isBefore :: Event a -> Event b -> BoolSource
True if the first event occurs strictly before the second. This makes Event
a poset (partially ordered set).
Infix priority 4 (the same as other comparison operators).
Real Time Stream Processors
data EventStream b c Source
A real-time event stream cannot be described without reference to
unknown future inputs. Hence EventStream
embodies two possible futures:
- An
Event c
will be emitted at some time in the future, with a newEventStream
representing the future after that event. - An incoming
Event b
will arrive before the nextEvent c
is emitted, creating a newEventStream
representing the response to that event. The oldEvent c
may or may not be part of the newEventStream
.
There are also two degenerate cases:
- Wait: no event is scheduled to be emitted, and the
EventStream
just waits for an incoming event. - Finish: no event will ever be emitted, regardless of incoming events. This is explicitly distinguished so that complex RTSP expressions can be GC'd if they can be proven to be finished.
Event streams are like the Mirror of Galadriel, for they show things that were, things that are, and things that yet may be. But which it is that he sees, even the wisest cannot always tell.
Seeing is both good and perilous. An event stream may be modified by new events, but exceptions or inconsistent results will occur if the incoming events are not in increasing order of time.
Category EventStream | |
Functor (EventStream b) | |
Show c => Show (EventStream b c) | |
Monoid (EventStream b c) |
emitsBefore :: EventStream b1 c1 -> EventStream b2 c2 -> BoolSource
True if the first argument is scheduled to emit an event before the second. This makes EventStream
a poset
(partially ordered set). Infix priority 4.
nullStream :: EventStream b cSource
An event stream that never generates anything.
esFinished :: EventStream b c -> BoolSource
True if the event stream is guaranteed not to emit any future events, regardless of input.
esPeek :: EventStream b c -> [Event c]Source
Peek at the events that will be emitted by this EventStream if no incoming event interrupts them.
esFutures :: EventStream b c -> [(Event c, EventStream b c)]Source
All the possible futures of the event stream.
esProcess :: Event b -> EventStream b c -> EventStream b cSource
Given a new input event to an existing event stream, this returns the modified event stream. When esProcess
is called on the result the Event argument to the second call must not occur before the first (they can be
simultaneous). More formally, if
esOut = esProcess (esProcess esIn ev1) ev2
then not (ev2
. This precondition is not checked.
isBefore
ev1)
esMerge :: EventStream b c -> EventStream b c -> EventStream b cSource
Merge the outputs of two event streams. Input events are delivered to both streams.
Real Time Stream Processor (RTSP)
An EventStream cannot exist independently of some event that caused it to start. Hence the only way to create an EventStream is through an RTSP.
RTSP | |
|
:: RTSP b c | The processor to execute. |
-> [Event b] | The events must be finite and in chronological order. This is unchecked. |
-> [Event c] |
Execute an RTSP against a list of events. Useful for testing.
:: RTSP b (IO ()) | The output of the RTSP is a series of action events that will be executed in a separate thread sequentially at the times given. The actions may, of course, fork their own threads as necessary. execRTSP uses |
-> IO (b -> IO ()) |
Execute an RTSP in the IO monad. The function returns immediately with an action for pushing events into the RTSP.
accumulate :: RTSP b c -> RTSP b cSource
When a new input event is delivered to an RTSP it causes any future output events to be dropped in favour of the new
events. accumulate
instead keeps the events from previous inputs interleaved with the new ones. If you use
this unnecessarily then you will get duplicated events.
If there are n
output events due to be emitted before an input event then this will require O(n) time for the input.
Manipulating event times
repeatEvent :: [NominalDiffTime] -> RTSP b bSource
Repeat each input event after the specified delays until a new event arrives, at which point the sequence begins again with the new event value. The list of delays must not be negative and must be in ascending order. All the delays are relative to the first event.
Be careful when using list comprehensions to create the argument. A list like
[1..5] :: NominalDiffTime
will count up in picoseconds rather than seconds, which is probably not what is wanted. Instead use
map fromInteger [1..5] :: NominalDiffTime
delay0 :: NominalDiffTime -> RTSP b bSource
Delay input events by the specified time, but given an event stream {ev1, ev2, ev3...}
, if ev2 arrives before
ev1 has been emitted then ev1 will be lost.
delay :: NominalDiffTime -> RTSP b bSource
Delay input events by the specified time.
Unfortunately this requires O(n) time when there are n
events queued up due to the use of accumulate.
Conditional event processing
type Cond a b = (a -> Bool, RTSP a b)Source
A conditional stream: events matching the predicate will be passed to the stream.
streamFilter :: Cond a b -> RTSP a bSource
Conditional stream execution: only certain events will be accepted.
ifThenElse :: Cond a b -> RTSP a b -> RTSP a bSource
Send each event to the conditional stream if it accepts it, otherwise send it to the second argument.
ifThenElse (p, rThen) rElse
is equivalent to
streamFilter (p, rThen) `mappend` streamFilter (not . p, rElse)
However ifThenElse
only evaluates p
once for each input event.
Real Time Actions with state
Real-time Actions. This monad is used to build sequential processors that can be turned into stream processors. An RTA emits zero or more events in response to each input event, and has a state that persists from one event to the next. In particular, state changes made after a pause will be visible to the next event regardless of the relative times.
pause :: NominalDiffTime -> RTA s c ()Source
Pause before the next step. This does not actually delay processing; it merely increments the time of any emitted events.
:: s | The initial state. State persists between input events. |
-> (b -> RTA s c Bool) | A function from the input value to an action. If the action returns |
-> RTSP b c |
Execute an RTA as part of a real time stream processor.
When a new event arrives any pending output events will be lost. However any state changes are immediately visible to the next event, even if they occured "after" the lost events. For instance, consider this:
execRTA 1 $ \_ -> do n <- get pause 10 emit n put (n+1) return True
If this receives events at t=[0,1,3,20]
then it will emit [Event 13 3, Event 30 4]
. The events that would have been emitted
at t=[10,11]
have been lost, but the state change still occured immediately, regardless of the output schedule.