Copyright | (c) Tim Watson 2012 - 2013 |
---|---|
License | BSD3 (see the file LICENSE) |
Maintainer | Tim Watson <watson.timothy@gmail.com> |
Stability | experimental |
Portability | non-portable (requires concurrency) |
Safe Haskell | None |
Language | Haskell98 |
Generic process that acts as an external mailbox and message buffer.
- Overview
For use when rate limiting is not possible (or desired), this module
provides a buffer process that receives mail via its post
API, buffers
the received messages and delivers them when its owning process asks for
them. A mailbox has to be started with a maximum buffer size - the so called
limit - and will discard messages once its internal storage reaches this
user defined threshold.
The usual behaviour of the buffer process is to accumulate messages in
its internal memory. When a client evaluates notify
, the buffer will
send a NewMail
message to the (real) mailbox of its owning process as
soon as it has any message(s) ready to deliver. If the buffer already
contains undelivered mail, the NewMail
message will be dispatched
immediately.
When the owning process wishes to receive mail, evaluating deliver
(from
any process) will cause the buffer to send its owner a Delivery
message
containing the accumulated messages and additional information about the
number of messages it is delivering, the number of messages dropped since
the last delivery and a handle for the mailbox (so that processes can have
multiple mailboxes if required, and distinguish between them).
- Overflow Handling
A mailbox handles overflow - when the number of messages it is holding
reaches the limit - differently depending on the BufferType
selected
when it starts. The Queue
buffer will, once the limit is reached, drop
older messages first (i.e., the head of the queue) to make space for
newer ones. The Ring
buffer works similarly, but blocks new messages
so as to preserve existing ones instead. Finally, the Stack
buffer will
drop the last (i.e., most recently received) message to make room for new
mail.
Mailboxes can be resized by evaluating resize
with a new value for the
limit. If the new limit is older that the current/previous one, messages
are dropped as though the mailbox had previously seen a volume of mail
equal to the difference (in size) between the limits. In this situation,
the Queue
will drop as many older messages as neccessary to come within
the limit, whilst the other two buffer types will drop as many newer messages
as needed.
- Ordering Guarantees
When messages are delivered to the owner, they arrive as a list of raw
Message
entries, given in descending age order (i.e., eldest first).
Whilst this approximates the FIFO ordering a process' mailbox would usually
offer, the Stack
buffer will appear to offer no ordering at all, since
it always deletes the most recent message(s). The Queue
and Ring
buffers
will maintain a more queue-like (i.e., FIFO) view of received messages,
with the obvious constraint the newer or older data might have been deleted.
- Post API and Relaying
For messages to be properly handled by the mailbox, they can either be sent
via the post
API or directly to the Mailbox
. Messages sent directly to
the mailbox will still be handled via the internal buffers and subjected to
the mailbox limits. The post
API is really just a means to ensure that
the conversion from Serializable a -> Message
is done in the caller's
process and uses the safe wrapMessage
variant.
- Acknowledgements
This API is based on the work of Erlang programmers Fred Hebert and Geoff Cant, its design closely mirroring that of the the pobox library application.
- data Mailbox
- startMailbox :: ProcessId -> BufferType -> Limit -> Process Mailbox
- startSupervised :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process (ProcessId, Message)
- startSupervisedMailbox :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process Mailbox
- createMailbox :: BufferType -> Limit -> Process Mailbox
- resize :: Mailbox -> Integer -> Process ()
- statistics :: Mailbox -> Process MailboxStats
- monitor :: Mailbox -> Process MonitorRef
- type Limit = Integer
- data BufferType
- data MailboxStats = MailboxStats {}
- post :: Serializable a => Mailbox -> a -> Process ()
- notify :: Mailbox -> Process ()
- deliver :: Mailbox -> Process ()
- active :: Mailbox -> Filter -> Process ()
- data NewMail = NewMail !Mailbox !Integer
- data Delivery = Delivery {}
- data FilterResult
- acceptEverything :: Closure (Message -> Process FilterResult)
- acceptMatching :: Closure (Closure (Message -> Process FilterResult) -> Message -> Process FilterResult)
- __remoteTable :: RemoteTable -> RemoteTable
Creating, Starting, Configuring and Running a Mailbox
Opaque handle to a mailbox.
startMailbox :: ProcessId -> BufferType -> Limit -> Process Mailbox Source
Start a mailbox for the supplied ProcessId
.
start = spawnLocal $ run
startSupervised :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process (ProcessId, Message) Source
As startMailbox
, but suitable for use in supervisor child specs.
This variant is for use when you want to access to the underlying
Mailbox
handle in your supervised child refs. See supervisor's
ChildRef
data type for more information.
Example: > childSpec = toChildStart $ startSupervised pid bufferType mboxLimit
startSupervisedMailbox :: ProcessId -> BufferType -> Limit -> SupervisorPid -> Process Mailbox Source
As startMailbox
, but suitable for use in supervisor child specs.
Example: > childSpec = toChildStart $ startSupervisedMailbox pid bufferType mboxLimit
createMailbox :: BufferType -> Limit -> Process Mailbox Source
Start a mailbox for the calling process.
create = getSelfPid >>= start
resize :: Mailbox -> Integer -> Process () Source
Alters the mailbox's limit - this might cause messages to be dropped!
statistics :: Mailbox -> Process MailboxStats Source
Obtain statistics (from/to anywhere) about a mailbox.
monitor :: Mailbox -> Process MonitorRef Source
Monitor a mailbox.
data BufferType Source
Describes the different types of buffer.
data MailboxStats Source
Bundle of statistics data, available on request via
the mailboxStats
API call.
Posting Mail
post :: Serializable a => Mailbox -> a -> Process () Source
Posts a message to someone's mailbox.
Obtaining Mail and Notifications
deliver :: Mailbox -> Process () Source
Instructs the mailbox to deliver all pending messages to the owner.
active :: Mailbox -> Filter -> Process () Source
Instructs the mailbox to send a Delivery
as soon as any mail is
available, or immediately (if the buffer already contains data).
NB: signals are only delivered to the mailbox's owning process.
Marker message indicating to the owning process that mail has arrived.
Mail delivery.
data FilterResult Source
acceptEverything :: Closure (Message -> Process FilterResult) Source
A do-nothing filter that accepts all messages (i.e., returns Keep
for any input).
acceptMatching :: Closure (Closure (Message -> Process FilterResult) -> Message -> Process FilterResult) Source
A filter that takes a Closure (Message -> Process FilterResult)
holding
the filter function and applies it remotely (i.e., in the mailbox's own
managed process).