Safe Haskell | None |
---|---|
Language | Haskell2010 |
Fast and robust message queues for concurrent processes.
Processes of an application can exchange message using Message Boxes.
This library is meant to be a wrapper around a
well tested and benchmarked subset of unagi-chan
for applications using unliftio
.
In addition to the basic functionality, i.e. _Message Boxes_, there is a very little bit of type level magic dust in UnliftIO.MessageBox.Command that helps to write code that sends a message and expects the receiving process to send a reply.
This module re-exports most of the library.
Synopsis
- class IsInput input where
- deliver :: MonadUnliftIO m => input a -> a -> m Bool
- deliver_ :: MonadUnliftIO m => input a -> a -> m ()
- class IsInput (Input box) => IsMessageBox box where
- type Input box :: Type -> Type
- receive :: MonadUnliftIO m => box a -> m (Maybe a)
- tryReceive :: MonadUnliftIO m => box a -> m (Future a)
- receiveAfter :: MonadUnliftIO m => box a -> Int -> m (Maybe a)
- newInput :: MonadUnliftIO m => box a -> m (Input box a)
- class (IsMessageBox (MessageBox argument), IsInput (Input (MessageBox argument))) => IsMessageBoxArg argument where
- type MessageBox argument :: Type -> Type
- getConfiguredMessageLimit :: argument -> Maybe Int
- newMessageBox :: MonadUnliftIO m => argument -> m (MessageBox argument a)
- handleMessage :: (MonadUnliftIO m, IsMessageBox box) => box message -> (message -> m b) -> m (Maybe b)
- data WaitingInput a = WaitingInput !Int !(BlockingInput a)
- data WaitingBox a = WaitingBox WaitingBoxLimit (BlockingBox a)
- data WaitingBoxLimit = WaitingBoxLimit !(Maybe Int) !Int !MessageLimit
- newtype NonBlockingInput a = NonBlockingInput (BlockingInput a)
- data NonBlockingBox a
- newtype NonBlockingBoxLimit = NonBlockingBoxLimit MessageLimit
- data BlockingInput a
- data BlockingBox a
- newtype BlockingBoxLimit = BlockingBoxLimit MessageLimit
- data MessageLimit
- messageLimitToInt :: MessageLimit -> Int
- data BlockingUnlimited = BlockingUnlimited
- data UnlimitedBoxInput a
- data UnlimitedBox a
- newtype CatchAllInput i a = CatchAllInput (i a)
- newtype CatchAllBox box a = CatchAllBox (box a)
- newtype CatchAllArg cfg = CatchAllArg cfg
- data AsyncReply r
- newtype DuplicateReply = DuplicateReply CallId
- data CommandError where
- data ReplyBox a
- data Message apiTag where
- Blocking :: Show (Command apiTag ('Return result)) => Command apiTag ('Return result) -> ReplyBox result -> Message apiTag
- NonBlocking :: Show (Command apiTag 'FireAndForget) => Command apiTag 'FireAndForget -> Message apiTag
- data ReturnType where
- FireAndForget :: ReturnType
- Return :: Type -> ReturnType
- data family Command apiTag :: ReturnType -> Type
- cast :: (MonadUnliftIO m, IsInput o, Show (Command apiTag 'FireAndForget)) => o (Message apiTag) -> Command apiTag 'FireAndForget -> m Bool
- call :: (HasCallIdCounter env, MonadReader env m, MonadUnliftIO m, IsInput input, Show (Command apiTag ('Return result))) => input (Message apiTag) -> Command apiTag ('Return result) -> Int -> m (Either CommandError result)
- replyTo :: MonadUnliftIO m => ReplyBox a -> a -> m ()
- delegateCall :: (MonadUnliftIO m, IsInput o, Show (Command apiTag ('Return r))) => o (Message apiTag) -> Command apiTag ('Return r) -> ReplyBox r -> m Bool
- callAsync :: (HasCallIdCounter env, MonadReader env m, MonadUnliftIO m, IsInput o, Show (Command apiTag ('Return result))) => o (Message apiTag) -> Command apiTag ('Return result) -> m (Maybe (AsyncReply result))
- waitForReply :: MonadUnliftIO m => Int -> AsyncReply result -> m (Either CommandError result)
- tryTakeReply :: MonadUnliftIO m => AsyncReply result -> m (Maybe (Either CommandError result))
- class HasCallIdCounter env where
- getCallIdCounter :: env -> CounterVar CallId
- newtype CallId = MkCallId Int
- newCallIdCounter :: MonadIO m => m (CounterVar CallId)
- takeNext :: (MonadReader env m, HasCallIdCounter env, MonadUnliftIO m) => m CallId
- class HasCounterVar a env | env -> a where
- getCounterVar :: env -> CounterVar a
- data CounterVar a
- fresh :: forall a env m. (MonadReader env m, MonadIO m, HasCounterVar a env, Coercible a Int) => m a
- incrementAndGet :: forall a m. (MonadIO m, Coercible a Int) => CounterVar a -> m a
- newCounterVar :: forall a m. MonadIO m => m (CounterVar a)
- newtype Future a = Future (IO (Maybe a))
- tryNow :: MonadUnliftIO m => Future a -> m (Maybe a)
- awaitFuture :: MonadUnliftIO m => Future b -> m b
Documentation
class IsInput input where Source #
A type class for input types. A common interface for delivering messages.
deliver :: MonadUnliftIO m => input a -> a -> m Bool Source #
Send a message. Take whatever time it takes. Depending on the implementation, this might be a non-blocking operation. Return if the operation was successful.
NOTE: False
may sporadically be returned, especially
when there is a lot of load, so please make sure to
build your application in such a way, that it
anticipates failure.
deliver_ :: MonadUnliftIO m => input a -> a -> m () Source #
Instances
class IsInput (Input box) => IsMessageBox box where Source #
A type class for msgBox types. A common interface for receiving messages.
receive :: MonadUnliftIO m => box a -> m (Maybe a) Source #
Receive a message. Take whatever time it takes.
Return Just
the value or Nothing
when an error
occurred.
NOTE: Nothing may sporadically be returned, especially when there is a lot of load, so please make sure to build your application in such a way, that it anticipates failure.
tryReceive :: MonadUnliftIO m => box a -> m (Future a) Source #
Return a Future
that can be used to wait for the
arrival of the next message.
NOTE: Each future value represents the next slot in the queue
so one future corresponds to exactly that message (should it arrive)
and if that future value is dropped, that message will be lost!
:: MonadUnliftIO m | |
=> box a | Message box |
-> Int | Time in micro seconds to wait until the action is invoked. |
-> m (Maybe a) |
Wait for an incoming message or return Nothing.
The default implementation uses tryReceive
to get a
Future
on which awaitFuture
inside a timeout
is called.
Instances might override this with more performant implementations especially non-blocking Unagi channel based implementation.
NOTE: Nothing may sporadically be returned, especially when there is a lot of load, so please make sure to build your application in such a way, that it anticipates failure.
newInput :: MonadUnliftIO m => box a -> m (Input box a) Source #
Create a new input
that enqueus messages,
which are received by the box
Instances
class (IsMessageBox (MessageBox argument), IsInput (Input (MessageBox argument))) => IsMessageBoxArg argument where Source #
Types that configure and allow the creation of a MessageBox
.
Create IsMessageBox
instances from a parameter.
Types that determine MessageBox
values.
For a limited message box this might be the limit of the message queue.
type MessageBox argument :: Type -> Type Source #
The message box that can be created from the message box argument
getConfiguredMessageLimit :: argument -> Maybe Int Source #
Return a message limit.
NOTE: This method was added for unit tests. Although the method is totally valid, it might not be super useful in production code. Also note that the naming follows the rule: Reserve short names for entities that are used often.
newMessageBox :: MonadUnliftIO m => argument -> m (MessageBox argument a) Source #
Create a new msgBox
according to the argument
.
This is required to receive a message.
NOTE: Only one process may receive on an msgBox.
Instances
handleMessage :: (MonadUnliftIO m, IsMessageBox box) => box message -> (message -> m b) -> m (Maybe b) Source #
Receive a message and apply a function to it.
data WaitingInput a Source #
An input for a BlockingBox
that will block
for not much more than the given timeout when
the message box is full.
WaitingInput !Int !(BlockingInput a) |
Instances
IsInput WaitingInput Source # | |
Defined in UnliftIO.MessageBox.Limited deliver :: MonadUnliftIO m => WaitingInput a -> a -> m Bool Source # deliver_ :: MonadUnliftIO m => WaitingInput a -> a -> m () Source # |
data WaitingBox a Source #
A BlockingBox
an a WaitingBoxLimit
for
the IsMessageBox
instance.
Instances
IsMessageBox WaitingBox Source # | |
Defined in UnliftIO.MessageBox.Limited receive :: MonadUnliftIO m => WaitingBox a -> m (Maybe a) Source # tryReceive :: MonadUnliftIO m => WaitingBox a -> m (Future a) Source # receiveAfter :: MonadUnliftIO m => WaitingBox a -> Int -> m (Maybe a) Source # newInput :: MonadUnliftIO m => WaitingBox a -> m (Input WaitingBox a) Source # | |
type Input WaitingBox Source # | |
Defined in UnliftIO.MessageBox.Limited |
data WaitingBoxLimit Source #
A IsMessageBoxArg
instance wrapping the BlockingBox
with independently configurable timeouts for receive
and deliver
.
Instances
newtype NonBlockingInput a Source #
A wrapper around BlockingInput
with a non-blocking IsInput
instance.
deliver
will enqueue the message or return False
immediately,
if the message box already contains more messages than
it's limit allows.
Instances
IsInput NonBlockingInput Source # | |
Defined in UnliftIO.MessageBox.Limited deliver :: MonadUnliftIO m => NonBlockingInput a -> a -> m Bool Source # deliver_ :: MonadUnliftIO m => NonBlockingInput a -> a -> m () Source # |
data NonBlockingBox a Source #
A BlockingBox
wrapper for non-blocking IsMessageBox
instances.
The difference to the BlockingBox
instance is that deliver
immediately returns if the message box limit is surpassed.
Instances
IsMessageBox NonBlockingBox Source # | |
Defined in UnliftIO.MessageBox.Limited receive :: MonadUnliftIO m => NonBlockingBox a -> m (Maybe a) Source # tryReceive :: MonadUnliftIO m => NonBlockingBox a -> m (Future a) Source # receiveAfter :: MonadUnliftIO m => NonBlockingBox a -> Int -> m (Maybe a) Source # newInput :: MonadUnliftIO m => NonBlockingBox a -> m (Input NonBlockingBox a) Source # | |
type Input NonBlockingBox Source # | |
Defined in UnliftIO.MessageBox.Limited |
newtype NonBlockingBoxLimit Source #
A BlockingBoxLimit
wrapper for non-blocking IsMessageBoxArg
instances.
Instances
data BlockingInput a Source #
A message queue into which messages can be enqued by,
e.g. tryToDeliver
.
Messages can be received from an BlockingBox
.
The Input
is the counter part of a BlockingBox
.
Instances
IsInput BlockingInput Source # | A blocking instance that invokes |
Defined in UnliftIO.MessageBox.Limited deliver :: MonadUnliftIO m => BlockingInput a -> a -> m Bool Source # deliver_ :: MonadUnliftIO m => BlockingInput a -> a -> m () Source # |
data BlockingBox a Source #
A message queue out of which messages can by receive
d.
This is the counter part of Input
. Can be used for reading
messages.
Messages can be received by receive
or tryReceive
.
Instances
IsMessageBox BlockingBox Source # | A blocking instance that invokes |
Defined in UnliftIO.MessageBox.Limited receive :: MonadUnliftIO m => BlockingBox a -> m (Maybe a) Source # tryReceive :: MonadUnliftIO m => BlockingBox a -> m (Future a) Source # receiveAfter :: MonadUnliftIO m => BlockingBox a -> Int -> m (Maybe a) Source # newInput :: MonadUnliftIO m => BlockingBox a -> m (Input BlockingBox a) Source # | |
type Input BlockingBox Source # | |
Defined in UnliftIO.MessageBox.Limited |
newtype BlockingBoxLimit Source #
Contains the (vague) limit of messages that a BlockingBox
can buffer, i.e. that deliver
can put into a BlockingInput
of a BlockingBox
.
Instances
data MessageLimit Source #
Message Limit
The message limit must be a reasonable small positive integer that is also a power of two. This stems from the fact that Unagi is used under the hood.
The limit is a lower bound.
Instances
messageLimitToInt :: MessageLimit -> Int Source #
Convert a MessageLimit
to the
Int
representation.
data BlockingUnlimited Source #
The (empty) configuration for creating
UnlimitedBox
es using the IsMessageBoxArg
methods.
Instances
Show BlockingUnlimited Source # | |
Defined in UnliftIO.MessageBox.Unlimited showsPrec :: Int -> BlockingUnlimited -> ShowS # show :: BlockingUnlimited -> String # showList :: [BlockingUnlimited] -> ShowS # | |
IsMessageBoxArg BlockingUnlimited Source # | |
Defined in UnliftIO.MessageBox.Unlimited type MessageBox BlockingUnlimited :: Type -> Type Source # getConfiguredMessageLimit :: BlockingUnlimited -> Maybe Int Source # newMessageBox :: MonadUnliftIO m => BlockingUnlimited -> m (MessageBox BlockingUnlimited a) Source # | |
type MessageBox BlockingUnlimited Source # | |
Defined in UnliftIO.MessageBox.Unlimited |
data UnlimitedBoxInput a Source #
A message queue into which messages can be enqued by,
e.g. deliver
.
Messages can be received from an UnlimitedBox
.
The UnlimitedBoxInput
is the counter part of a UnlimitedBox
.
Instances
IsInput UnlimitedBoxInput Source # | A blocking instance that invokes |
Defined in UnliftIO.MessageBox.Unlimited deliver :: MonadUnliftIO m => UnlimitedBoxInput a -> a -> m Bool Source # deliver_ :: MonadUnliftIO m => UnlimitedBoxInput a -> a -> m () Source # |
data UnlimitedBox a Source #
A message queue out of which messages can
by receive
d.
This is the counter part of Input
. Can be
used for reading messages.
Messages can be received by receive
or tryReceive
.
Instances
IsMessageBox UnlimitedBox Source # | A blocking instance that invokes |
Defined in UnliftIO.MessageBox.Unlimited receive :: MonadUnliftIO m => UnlimitedBox a -> m (Maybe a) Source # tryReceive :: MonadUnliftIO m => UnlimitedBox a -> m (Future a) Source # receiveAfter :: MonadUnliftIO m => UnlimitedBox a -> Int -> m (Maybe a) Source # newInput :: MonadUnliftIO m => UnlimitedBox a -> m (Input UnlimitedBox a) Source # | |
type Input UnlimitedBox Source # | |
Defined in UnliftIO.MessageBox.Unlimited |
newtype CatchAllInput i a Source #
A wrapper around values that are instances
of IsInput
.
CatchAllInput (i a) |
Instances
IsInput i => IsInput (CatchAllInput i) Source # | |
Defined in UnliftIO.MessageBox.CatchAll deliver :: MonadUnliftIO m => CatchAllInput i a -> a -> m Bool Source # deliver_ :: MonadUnliftIO m => CatchAllInput i a -> a -> m () Source # |
newtype CatchAllBox box a Source #
A wrapper around values that are instances
of IsMessageBox
.
The Input
type will be wrapped using
CatchAllInput
.
CatchAllBox (box a) |
Instances
IsMessageBox box => IsMessageBox (CatchAllBox box) Source # | |
Defined in UnliftIO.MessageBox.CatchAll receive :: MonadUnliftIO m => CatchAllBox box a -> m (Maybe a) Source # tryReceive :: MonadUnliftIO m => CatchAllBox box a -> m (Future a) Source # receiveAfter :: MonadUnliftIO m => CatchAllBox box a -> Int -> m (Maybe a) Source # newInput :: MonadUnliftIO m => CatchAllBox box a -> m (Input (CatchAllBox box) a) Source # | |
type Input (CatchAllBox box) Source # | |
Defined in UnliftIO.MessageBox.CatchAll |
newtype CatchAllArg cfg Source #
A wrapper around values that are instances
of IsMessageBoxArg
. The factory wraps
the result of the delegated newMessageBox
invocation into a CatchAllBox
.
CatchAllArg cfg |
Instances
data AsyncReply r Source #
The result of callAsync
.
Use waitForReply
or tryTakeReply
.
Instances
Typeable r => Show (AsyncReply r) Source # | |
Defined in UnliftIO.MessageBox.Command showsPrec :: Int -> AsyncReply r -> ShowS # show :: AsyncReply r -> String # showList :: [AsyncReply r] -> ShowS # |
newtype DuplicateReply Source #
Instances
Eq DuplicateReply Source # | |
Defined in UnliftIO.MessageBox.Command (==) :: DuplicateReply -> DuplicateReply -> Bool # (/=) :: DuplicateReply -> DuplicateReply -> Bool # | |
Show DuplicateReply Source # | |
Defined in UnliftIO.MessageBox.Command showsPrec :: Int -> DuplicateReply -> ShowS # show :: DuplicateReply -> String # showList :: [DuplicateReply] -> ShowS # | |
Exception DuplicateReply Source # | |
Defined in UnliftIO.MessageBox.Command |
data CommandError where Source #
The failures that the receiver of a Return
Command
, i.e. a Blocking
,
can communicate to the caller, in order to indicate that
processing a request did not or will not lead to the result the
caller is blocked waiting for.
CouldNotEnqueueCommand :: !CallId -> CommandError | Failed to enqueue a |
BlockingCommandFailure :: !CallId -> CommandError | The request has failed for reasons. |
BlockingCommandTimedOut :: !CallId -> CommandError | Timeout waiting for the result. |
Instances
Eq CommandError Source # | |
Defined in UnliftIO.MessageBox.Command (==) :: CommandError -> CommandError -> Bool # (/=) :: CommandError -> CommandError -> Bool # | |
Show CommandError Source # | |
Defined in UnliftIO.MessageBox.Command showsPrec :: Int -> CommandError -> ShowS # show :: CommandError -> String # showList :: [CommandError] -> ShowS # |
This is like Input
, it can be used
by the receiver of a Blocking
to either send a reply using reply
or to fail/abort the request using sendRequestError
data Message apiTag where Source #
A message valid for some user defined apiTag
.
The apiTag
tag (phantom-) type defines the
messages allowed here, declared by the instance of
Command
for apiTag
.
Blocking :: Show (Command apiTag ('Return result)) => Command apiTag ('Return result) -> ReplyBox result -> Message apiTag | Wraps a Such a message can formed by using A |
NonBlocking :: Show (Command apiTag 'FireAndForget) => Command apiTag 'FireAndForget -> Message apiTag | If the The smart constructor |
data ReturnType where Source #
Indicates if a Command
requires the
receiver to send a reply or not.
FireAndForget :: ReturnType | Indicates that a Values of a |
Return :: Type -> ReturnType | Indicates that a Values of a |
data family Command apiTag :: ReturnType -> Type Source #
This family allows to encode imperative commands.
The clauses of a Command
define the commands that
a process should execute.
Every clause may specify an individual ReturnType
that
declares if and what response is valid for a message.
For example:
type LampId = Int data instance Command LightControl r where GetLamps :: Command LigthControl (Return [LampId]) SwitchOn :: LampId -> Command LigthControl FireAndForget data LightControl -- the phantom type
The type index of the Command family is the uninhabited
LightControl
type.
.
The second type parameter indicates if a message requires the receiver to send a reply back to the blocked and waiting sender, or if no reply is necessary.
cast :: (MonadUnliftIO m, IsInput o, Show (Command apiTag 'FireAndForget)) => o (Message apiTag) -> Command apiTag 'FireAndForget -> m Bool Source #
Enqueue a NonBlocking
Message
into an Input
.
This is just for symetry to call
, this is
equivalent to: input -> MessageBox.tryToDeliver input . NonBlocking
The
call :: (HasCallIdCounter env, MonadReader env m, MonadUnliftIO m, IsInput input, Show (Command apiTag ('Return result))) => input (Message apiTag) -> Command apiTag ('Return result) -> Int -> m (Either CommandError result) Source #
Enqueue a Blocking
Message
into an IsInput
and wait for the
response.
If message deliver
y failed, return Left
.CouldNotEnqueueCommand
If no reply was given by the receiving process (using replyTo
) within
a given duration, return Left
.BlockingCommandTimedOut
Important: The given timeout starts after deliver
has returned,
if deliver
blocks and delays, call
might take longer than the
specified timeout.
The receiving process can either delegate the call using
delegateCall
or reply to the call by using: replyTo
.
replyTo :: MonadUnliftIO m => ReplyBox a -> a -> m () Source #
delegateCall :: (MonadUnliftIO m, IsInput o, Show (Command apiTag ('Return r))) => o (Message apiTag) -> Command apiTag ('Return r) -> ReplyBox r -> m Bool Source #
callAsync :: (HasCallIdCounter env, MonadReader env m, MonadUnliftIO m, IsInput o, Show (Command apiTag ('Return result))) => o (Message apiTag) -> Command apiTag ('Return result) -> m (Maybe (AsyncReply result)) Source #
:: MonadUnliftIO m | |
=> Int | The time in micro seconds to wait
before returning |
-> AsyncReply result | |
-> m (Either CommandError result) |
tryTakeReply :: MonadUnliftIO m => AsyncReply result -> m (Maybe (Either CommandError result)) Source #
class HasCallIdCounter env where Source #
Class of environment records containing a CounterVar
for CallId
s.
getCallIdCounter :: env -> CounterVar CallId Source #
Instances
An identifier value every command send by call
s.
newCallIdCounter :: MonadIO m => m (CounterVar CallId) Source #
Create a new CallId
CounterVar
.
takeNext :: (MonadReader env m, HasCallIdCounter env, MonadUnliftIO m) => m CallId Source #
Increment and get a new CallId
.
class HasCounterVar a env | env -> a where Source #
A type class for MonadReader
based
applications.
getCounterVar :: env -> CounterVar a Source #
Instances
HasCounterVar (t :: k) (CounterVar t) Source # | |
Defined in UnliftIO.MessageBox.Util.Fresh getCounterVar :: CounterVar t -> CounterVar t Source # |
data CounterVar a Source #
An AtomicCounter
.
Instances
HasCounterVar (t :: k) (CounterVar t) Source # | |
Defined in UnliftIO.MessageBox.Util.Fresh getCounterVar :: CounterVar t -> CounterVar t Source # | |
HasCallIdCounter (CounterVar CallId) Source # | |
Defined in UnliftIO.MessageBox.Util.CallId |
fresh :: forall a env m. (MonadReader env m, MonadIO m, HasCounterVar a env, Coercible a Int) => m a Source #
A threadsafe atomic a
Atomically increment and get the value of the Counter
for type a
that must be present in the env
.
incrementAndGet :: forall a m. (MonadIO m, Coercible a Int) => CounterVar a -> m a Source #
Atomically increment and get the value of the Counter
for type a
that must be present in the env
.
newCounterVar :: forall a m. MonadIO m => m (CounterVar a) Source #
Create a new CounterVar
starting at 0
.
A wrapper around an IO action that returns value in the future.
awaitFuture :: MonadUnliftIO m => Future b -> m b Source #
Poll a Future until the value is present.