Safe Haskell | Safe |
---|---|
Language | Haskell2010 |
Synopsis
- class Event e where
- type EncodingFormat e :: *
- decodeEvent :: EncodingFormat e -> Either String e
- class Event e => WritableEvent e where
- encodeEvent :: e -> EncodingFormat e
- class Stream f stream where
- type EventType stream :: *
- type EventIdentifier stream :: *
- type EventMetadata stream :: *
- streamEvents :: stream -> StreamBounds' stream -> Producer [Either (EventIdentifier stream, String) (EventWithContext' stream)] f ()
- class Stream f stream => WritableStream f stream where
- writeEventWithMetadata :: stream -> EventType stream -> EventMetadata stream -> ConsistencyCheck (EventIdentifier stream) -> f (EventIdentifier stream)
- data EventWithContext identifier metadata event = EventWithContext {
- identifier :: identifier
- metadata :: metadata
- event :: event
- type EventWithContext' stream = EventWithContext (EventIdentifier stream) (EventMetadata stream) (EventType stream)
- class MonadMetadata metadata m where
- getMetadata :: m metadata
- data ConsistencyCheck identifier
- = NoConsistencyCheck
- | CheckNoEvents
- | CheckLastEvent identifier
- writeEvent :: (Monad m, MonadMetadata (EventMetadata stream) m, WritableStream m stream) => stream -> EventType stream -> m (EventIdentifier stream)
- writeEventCc :: (Monad m, MonadMetadata (EventMetadata stream) m, WritableStream m stream) => stream -> EventType stream -> ConsistencyCheck (EventIdentifier stream) -> m (EventIdentifier stream)
- optimistically :: MonadError Error m => m a -> m a
- data StreamBounds identifier = StreamBounds {
- _afterEvent :: Maybe identifier
- _untilEvent :: Maybe identifier
- type StreamBounds' stream = StreamBounds (EventIdentifier stream)
- afterEvent :: Ord identifier => identifier -> StreamBounds identifier
- untilEvent :: Ord identifier => identifier -> StreamBounds identifier
- class StreamFamily f fam where
- type StreamType fam :: *
- type StreamIdentifier fam :: *
- getStream :: fam -> StreamIdentifier fam -> f (StreamType fam)
- allNewEvents :: fam -> f (Producer [(StreamIdentifier fam, Either (EventIdentifier (StreamType fam), String) (EventWithContext' (StreamType fam)))] f a)
- latestEventIdentifiers :: fam -> Producer (StreamIdentifier fam, EventIdentifier (StreamType fam)) f ()
- type Aggregator event agg = event -> State agg ()
- runAggregator :: forall m stream aggregate. (MonadError Error m, Show (EventIdentifier stream), Stream m stream) => Aggregator (EventWithContext' stream) aggregate -> stream -> StreamBounds' stream -> aggregate -> m (aggregate, Maybe (EventIdentifier stream), Int)
- type Projection event st action = event -> State st [action]
- runProjection :: forall streamFamily action trackingTable m st. (MonadError Error m, Hashable (StreamIdentifier streamFamily), Ord (EventIdentifier (StreamType streamFamily)), Ord (StreamIdentifier streamFamily), Stream m (StreamType streamFamily), StreamFamily m streamFamily, TrackingTable m trackingTable (StreamIdentifier streamFamily) (EventIdentifier (StreamType streamFamily)) st) => streamFamily -> (StreamIdentifier streamFamily -> st) -> Projection (EventWithContext' (StreamType streamFamily)) st action -> trackingTable -> (trackingTable -> (st, [action], StreamIdentifier streamFamily, EventIdentifier (StreamType streamFamily)) -> m ()) -> m ()
- data TrackedState identifier st
- class TrackingTable m table streamId eventId st | table -> streamId, table -> eventId, table -> st where
- getTrackedState :: table -> streamId -> m (TrackedState eventId st)
- upsertError :: table -> streamId -> eventId -> String -> m ()
- newtype InMemoryTrackingTable streamId eventId st = InMemoryTrackingTable (TVar (HashMap streamId (Maybe (eventId, st), Maybe (eventId, String))))
- createInMemoryTrackingTable :: MonadIO m => m (InMemoryTrackingTable streamId eventId st)
- executeInMemoryActions :: (MonadError Error m, Hashable streamId, MonadIO m, Ord streamId) => TVar projected -> (projected -> action -> Either Error projected) -> InMemoryTrackingTable streamId eventId st -> (st, [action], streamId, eventId) -> m ()
- class ReadModel f model where
- type ReadModelQuery model :: *
- type ReadModelResponse model :: *
- query :: model -> ReadModelQuery model -> f (ReadModelResponse model)
- type Transformer inputEvent eventId event = inputEvent -> Transform eventId event ()
- data TransformedStream m identifier metadata event
- transformStream :: Stream m stream => Transformer (Either (EventIdentifier stream, String) (EventWithContext' stream)) identifier (EventWithContext identifier metadata event) -> (identifier -> m (EventIdentifier stream)) -> stream -> TransformedStream m identifier metadata event
- data TransformedStreamFamily m streamId eventId metadata event
- transformStreamFamily :: forall m streamId eventId metadata event streamFamily. (Hashable (StreamIdentifier streamFamily), Ord (StreamIdentifier streamFamily), Stream m (StreamType streamFamily), StreamFamily m streamFamily) => Transformer (Either (EventIdentifier (StreamType streamFamily), String) (EventWithContext' (StreamType streamFamily))) eventId (EventWithContext eventId metadata event) -> (StreamIdentifier streamFamily -> m streamId) -> (streamId -> m (StreamIdentifier streamFamily)) -> (EventIdentifier (StreamType streamFamily) -> m eventId) -> (eventId -> m (EventIdentifier (StreamType streamFamily))) -> streamFamily -> TransformedStreamFamily m streamId eventId metadata event
- type Transform eventId event = Free (TransformF eventId event)
- pushEvent :: event -> Transform eventId event ()
- mergeEvents :: ([event] -> (a, [event])) -> Transform eventId event a
- flushEvents :: Transform eventId event ()
- failTransformer :: eventId -> String -> Transform eventId event ()
- data Error
Events
Event that can read from some event stream with compatible decoding format and fed to a projection or aggregated in some way.
type EncodingFormat e :: * Source #
Format in which the event is encoded, e.g. Value
.
decodeEvent :: EncodingFormat e -> Either String e Source #
class Event e => WritableEvent e where Source #
Event that can be written to an event stream. This is separate from Event
to make it possible to restrict the events that can be written with a GADT.
encodeEvent :: e -> EncodingFormat e Source #
Format from which the event can be decoded, e.g. Value
.
Streams
class Stream f stream where Source #
type EventType stream :: * Source #
Type of the events contained in that stream.
type EventIdentifier stream :: * Source #
Type of unique identifiers for events in the stream.
There must be a total order on identifiers so they can be sorted.
type EventMetadata stream :: * Source #
Depending on the store, this structure can contain the creation date, a correlation ID, etc.
streamEvents :: stream -> StreamBounds' stream -> Producer [Either (EventIdentifier stream, String) (EventWithContext' stream)] f () Source #
Stream all the events within some bounds in arbitrary batches.
Events must be streamed from lowest to greatest identifier. If the back-end
is fetching events in batches, they can be returned in the same way to
improve performace. If the event can't be decoded, a Left
should be
returned instead with the identifier and an error message.
Instances
class Stream f stream => WritableStream f stream where Source #
writeEventWithMetadata :: stream -> EventType stream -> EventMetadata stream -> ConsistencyCheck (EventIdentifier stream) -> f (EventIdentifier stream) Source #
Append the event to the stream and return the identifier.
The identifier must be greater than the previous events' identifiers.
The function must throw ConsistencyCheckError
if the check fails.
Instances
(MonadError Error m, MonadIO m, NFData event, NFData metadata) => WritableStream m (Stream metadata event) Source # | |
Defined in Database.CQRS.InMemory writeEventWithMetadata :: Stream metadata event -> EventType (Stream metadata event) -> EventMetadata (Stream metadata event) -> ConsistencyCheck (EventIdentifier (Stream metadata event)) -> m (EventIdentifier (Stream metadata event)) Source # |
data EventWithContext identifier metadata event Source #
Once added to the stream, an event is adorned with an identifier and some metadata.
EventWithContext | |
|
Instances
(Eq identifier, Eq metadata, Eq event) => Eq (EventWithContext identifier metadata event) Source # | |
Defined in Database.CQRS.Stream (==) :: EventWithContext identifier metadata event -> EventWithContext identifier metadata event -> Bool # (/=) :: EventWithContext identifier metadata event -> EventWithContext identifier metadata event -> Bool # | |
(Show identifier, Show metadata, Show event) => Show (EventWithContext identifier metadata event) Source # | |
Defined in Database.CQRS.Stream showsPrec :: Int -> EventWithContext identifier metadata event -> ShowS # show :: EventWithContext identifier metadata event -> String # showList :: [EventWithContext identifier metadata event] -> ShowS # | |
Generic (EventWithContext identifier metadata event) Source # | |
Defined in Database.CQRS.Stream type Rep (EventWithContext identifier metadata event) :: Type -> Type # from :: EventWithContext identifier metadata event -> Rep (EventWithContext identifier metadata event) x # to :: Rep (EventWithContext identifier metadata event) x -> EventWithContext identifier metadata event # | |
type Rep (EventWithContext identifier metadata event) Source # | |
Defined in Database.CQRS.Stream type Rep (EventWithContext identifier metadata event) = D1 (MetaData "EventWithContext" "Database.CQRS.Stream" "eventsourcing-0.9.0-4g8y83fLOKaCrH4ufVC9GA" False) (C1 (MetaCons "EventWithContext" PrefixI True) (S1 (MetaSel (Just "identifier") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 identifier) :*: (S1 (MetaSel (Just "metadata") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 metadata) :*: S1 (MetaSel (Just "event") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 event)))) |
type EventWithContext' stream = EventWithContext (EventIdentifier stream) (EventMetadata stream) (EventType stream) Source #
class MonadMetadata metadata m where Source #
The event metadata come from the current "environment".
getMetadata :: m metadata Source #
Instances
Monad m => MonadMetadata () m Source # | |
Defined in Database.CQRS.Stream getMetadata :: m () Source # |
data ConsistencyCheck identifier Source #
A condition to check before inserting a new event in a stream.
This can be used to enforce consistency by checking that no new events were inserted since some validation has been performed and therefore that the validations are still sound.
NoConsistencyCheck | Always write the new event. |
CheckNoEvents | There are no events in that stream. |
CheckLastEvent identifier | The latest event's identifier matches. |
writeEvent :: (Monad m, MonadMetadata (EventMetadata stream) m, WritableStream m stream) => stream -> EventType stream -> m (EventIdentifier stream) Source #
Get the metadata from the environment, append the event to the store and return the identifier.
writeEventCc :: (Monad m, MonadMetadata (EventMetadata stream) m, WritableStream m stream) => stream -> EventType stream -> ConsistencyCheck (EventIdentifier stream) -> m (EventIdentifier stream) Source #
Get the metadata from the environment, validate the consistency check, append the event to the store and return its identifier.
optimistically :: MonadError Error m => m a -> m a Source #
Execute an action and retry indefinitely as long as it throws
ConsistencyCheckError
.
This makes it possible to have Optimistic Concurrency Control when writing
events by getting the aggregate and using writeEventCc
or
writeEventWithMetadata
inside the action passed to optimistically
.
/! It does NOT create a transaction when you can write several events. You should only use this to write a single event!
data StreamBounds identifier Source #
Lower/upper bounds of an event stream.
The Semigroup
instance returns bounds for the intersection of the two
streams.
StreamBounds | |
|
Instances
type StreamBounds' stream = StreamBounds (EventIdentifier stream) Source #
afterEvent :: Ord identifier => identifier -> StreamBounds identifier Source #
After the event with the given identifier, excluding it.
untilEvent :: Ord identifier => identifier -> StreamBounds identifier Source #
Until the event with the given identifier, including it.
Stream families
class StreamFamily f fam where Source #
A stream family is a collection of streams of the same type.
For example, a table in a relational database can contain events for all aggregates of a certain type. Each aggregate has its own stream of events but they are all stored in the same table. That table is a stream family indexed by aggregate ID.
type StreamType fam :: * Source #
Type of the streams contained in this stream family.
type StreamIdentifier fam :: * Source #
Identifier for a specific stream, e.g. an aggregate ID.
getStream :: fam -> StreamIdentifier fam -> f (StreamType fam) Source #
Get the stream corresponding to a given identifier.
allNewEvents :: fam -> f (Producer [(StreamIdentifier fam, Either (EventIdentifier (StreamType fam), String) (EventWithContext' (StreamType fam)))] f a) Source #
Initialise and return a producer of newly-created events from *all* streams in arbitrary batches. If an event can't be decoded, the decoding error is returned instead.
Events should appear in the correct order within a given stream but not necessarily in-between them, i.e. two events belonging to different streams won't necessarily be ordered according to the chronological history.
It is okay for events to be sent more than one time as long as the order is respected within each stream if it makes the implementation easier and prevents the loss of some events.
How events are batched together is up to the implementation as long as the order is respected.
It is okay for batches to be empty to signal that there are currently no new notifications. This is important for migrations, so they know they have processed all events.
latestEventIdentifiers :: fam -> Producer (StreamIdentifier fam, EventIdentifier (StreamType fam)) f () Source #
Stream the identifier of the latest events for each stream in the family.
It is a snapshot of the last event identifiers at the time the producer is called. It is meant to be used by projections to catch up to the latest state.
Instances
(Eq identifier, Hashable identifier, MonadIO m) => StreamFamily m (StreamFamily identifier metadata event) Source # | |
Defined in Database.CQRS.InMemory type StreamType (StreamFamily identifier metadata event) :: Type Source # type StreamIdentifier (StreamFamily identifier metadata event) :: Type Source # getStream :: StreamFamily identifier metadata event -> StreamIdentifier (StreamFamily identifier metadata event) -> m (StreamType (StreamFamily identifier metadata event)) Source # allNewEvents :: StreamFamily identifier metadata event -> m (Producer [(StreamIdentifier (StreamFamily identifier metadata event), Either (EventIdentifier (StreamType (StreamFamily identifier metadata event)), String) (EventWithContext' (StreamType (StreamFamily identifier metadata event))))] m a) Source # latestEventIdentifiers :: StreamFamily identifier metadata event -> Producer (StreamIdentifier (StreamFamily identifier metadata event), EventIdentifier (StreamType (StreamFamily identifier metadata event))) m () Source # | |
Monad m => StreamFamily m (TransformedStreamFamily m streamId eventId metadata event) Source # | |
Defined in Database.CQRS.Transformer type StreamType (TransformedStreamFamily m streamId eventId metadata event) :: Type Source # type StreamIdentifier (TransformedStreamFamily m streamId eventId metadata event) :: Type Source # getStream :: TransformedStreamFamily m streamId eventId metadata event -> StreamIdentifier (TransformedStreamFamily m streamId eventId metadata event) -> m (StreamType (TransformedStreamFamily m streamId eventId metadata event)) Source # allNewEvents :: TransformedStreamFamily m streamId eventId metadata event -> m (Producer [(StreamIdentifier (TransformedStreamFamily m streamId eventId metadata event), Either (EventIdentifier (StreamType (TransformedStreamFamily m streamId eventId metadata event)), String) (EventWithContext' (StreamType (TransformedStreamFamily m streamId eventId metadata event))))] m a) Source # latestEventIdentifiers :: TransformedStreamFamily m streamId eventId metadata event -> Producer (StreamIdentifier (TransformedStreamFamily m streamId eventId metadata event), EventIdentifier (StreamType (TransformedStreamFamily m streamId eventId metadata event))) m () Source # |
Aggregators and projections
type Aggregator event agg = event -> State agg () Source #
Function aggregating a state in memory.
runAggregator :: forall m stream aggregate. (MonadError Error m, Show (EventIdentifier stream), Stream m stream) => Aggregator (EventWithContext' stream) aggregate -> stream -> StreamBounds' stream -> aggregate -> m (aggregate, Maybe (EventIdentifier stream), Int) Source #
Run an Aggregator
on events from a stream starting with a given state and
return the new aggregate state, the identifier of the last event processed if
any and how many of them were processed.
type Projection event st action = event -> State st [action] Source #
Projection returning actions that can be batched and executed.
This can be used to batch changes to tables in a database for example.
:: (MonadError Error m, Hashable (StreamIdentifier streamFamily), Ord (EventIdentifier (StreamType streamFamily)), Ord (StreamIdentifier streamFamily), Stream m (StreamType streamFamily), StreamFamily m streamFamily, TrackingTable m trackingTable (StreamIdentifier streamFamily) (EventIdentifier (StreamType streamFamily)) st) | |
=> streamFamily | |
-> (StreamIdentifier streamFamily -> st) | Initialise state when no events have been processed yet. |
-> Projection (EventWithContext' (StreamType streamFamily)) st action | |
-> trackingTable | |
-> (trackingTable -> (st, [action], StreamIdentifier streamFamily, EventIdentifier (StreamType streamFamily)) -> m ()) | Commit the custom actions. See |
-> m () |
data TrackedState identifier st Source #
NeverRan | |
SuccessAt identifier st | |
FailureAt (Maybe (identifier, st)) identifier String | Last succeeded at, failed at. |
Instances
(Eq identifier, Eq st) => Eq (TrackedState identifier st) Source # | |
Defined in Database.CQRS.Projection (==) :: TrackedState identifier st -> TrackedState identifier st -> Bool # (/=) :: TrackedState identifier st -> TrackedState identifier st -> Bool # | |
(Show identifier, Show st) => Show (TrackedState identifier st) Source # | |
Defined in Database.CQRS.Projection showsPrec :: Int -> TrackedState identifier st -> ShowS # show :: TrackedState identifier st -> String # showList :: [TrackedState identifier st] -> ShowS # |
class TrackingTable m table streamId eventId st | table -> streamId, table -> eventId, table -> st where Source #
getTrackedState :: table -> streamId -> m (TrackedState eventId st) Source #
upsertError :: table -> streamId -> eventId -> String -> m () Source #
Instances
(Hashable streamId, MonadIO m, Ord streamId) => TrackingTable m (InMemoryTrackingTable streamId eventId st) streamId eventId st Source # | |
Defined in Database.CQRS.Projection getTrackedState :: InMemoryTrackingTable streamId eventId st -> streamId -> m (TrackedState eventId st) Source # upsertError :: InMemoryTrackingTable streamId eventId st -> streamId -> eventId -> String -> m () Source # |
newtype InMemoryTrackingTable streamId eventId st Source #
Instances
(Hashable streamId, MonadIO m, Ord streamId) => TrackingTable m (InMemoryTrackingTable streamId eventId st) streamId eventId st Source # | |
Defined in Database.CQRS.Projection getTrackedState :: InMemoryTrackingTable streamId eventId st -> streamId -> m (TrackedState eventId st) Source # upsertError :: InMemoryTrackingTable streamId eventId st -> streamId -> eventId -> String -> m () Source # |
createInMemoryTrackingTable :: MonadIO m => m (InMemoryTrackingTable streamId eventId st) Source #
executeInMemoryActions Source #
:: (MonadError Error m, Hashable streamId, MonadIO m, Ord streamId) | |
=> TVar projected | |
-> (projected -> action -> Either Error projected) | For tabular data actions, use |
-> InMemoryTrackingTable streamId eventId st | |
-> (st, [action], streamId, eventId) | |
-> m () |
Read models
class ReadModel f model where Source #
type ReadModelQuery model :: * Source #
type ReadModelResponse model :: * Source #
query :: model -> ReadModelQuery model -> f (ReadModelResponse model) Source #
Instances
(StreamFamily m streamFamily, Stream m (StreamType streamFamily), MonadError Error m, Hashable (StreamIdentifier streamFamily), MonadIO m, Ord (EventIdentifier (StreamType streamFamily)), Ord (StreamIdentifier streamFamily), Show (EventIdentifier (StreamType streamFamily))) => ReadModel m (AggregateStore streamFamily aggregate) Source # | |
Defined in Database.CQRS.ReadModel.AggregateStore type ReadModelQuery (AggregateStore streamFamily aggregate) :: Type Source # type ReadModelResponse (AggregateStore streamFamily aggregate) :: Type Source # query :: AggregateStore streamFamily aggregate -> ReadModelQuery (AggregateStore streamFamily aggregate) -> m (ReadModelResponse (AggregateStore streamFamily aggregate)) Source # |
Transformers
type Transformer inputEvent eventId event = inputEvent -> Transform eventId event () Source #
data TransformedStream m identifier metadata event Source #
Instances
transformStream :: Stream m stream => Transformer (Either (EventIdentifier stream, String) (EventWithContext' stream)) identifier (EventWithContext identifier metadata event) -> (identifier -> m (EventIdentifier stream)) -> stream -> TransformedStream m identifier metadata event Source #
data TransformedStreamFamily m streamId eventId metadata event Source #
Instances
transformStreamFamily :: forall m streamId eventId metadata event streamFamily. (Hashable (StreamIdentifier streamFamily), Ord (StreamIdentifier streamFamily), Stream m (StreamType streamFamily), StreamFamily m streamFamily) => Transformer (Either (EventIdentifier (StreamType streamFamily), String) (EventWithContext' (StreamType streamFamily))) eventId (EventWithContext eventId metadata event) -> (StreamIdentifier streamFamily -> m streamId) -> (streamId -> m (StreamIdentifier streamFamily)) -> (EventIdentifier (StreamType streamFamily) -> m eventId) -> (eventId -> m (EventIdentifier (StreamType streamFamily))) -> streamFamily -> TransformedStreamFamily m streamId eventId metadata event Source #
type Transform eventId event = Free (TransformF eventId event) Source #
Monad in which you can push, merge and flush events.
mergeEvents :: ([event] -> (a, [event])) -> Transform eventId event a Source #
Apply a function to the queue of event returning a value and a new queue, sets the queue to the new one and return the value.
The intent is to allow a new event to be merged in a previous one if possible to make the new event stream more compact.
flushEvents :: Transform eventId event () Source #
Flush the queue so it can be processed downstream, e.g. sent to a message broker.
Flushing may also occur automatically.
failTransformer :: eventId -> String -> Transform eventId event () Source #
Flush the events and push an error downstream.