Safe Haskell | Safe |
---|---|
Language | Haskell2010 |
Synopsis
- type Aggregator event agg = event -> State agg ()
- runAggregator :: forall m stream aggregate. (MonadError Error m, Show (EventIdentifier stream), Stream m stream) => Aggregator (EventWithContext' stream) aggregate -> stream -> StreamBounds' stream -> aggregate -> m (aggregate, Maybe (EventIdentifier stream), Int)
- type Projection event st action = event -> State st [action]
- runProjection :: forall streamFamily action trackingTable m st. (MonadError Error m, Hashable (StreamIdentifier streamFamily), Ord (EventIdentifier (StreamType streamFamily)), Ord (StreamIdentifier streamFamily), Stream m (StreamType streamFamily), StreamFamily m streamFamily, TrackingTable m trackingTable (StreamIdentifier streamFamily) (EventIdentifier (StreamType streamFamily)) st) => streamFamily -> (StreamIdentifier streamFamily -> st) -> Projection (EventWithContext' (StreamType streamFamily)) st action -> trackingTable -> (trackingTable -> (st, [action], StreamIdentifier streamFamily, EventIdentifier (StreamType streamFamily)) -> m ()) -> m ()
- data TrackedState identifier st
- class TrackingTable m table streamId eventId st | table -> streamId, table -> eventId, table -> st where
- getTrackedState :: table -> streamId -> m (TrackedState eventId st)
- upsertError :: table -> streamId -> eventId -> String -> m ()
- newtype InMemoryTrackingTable streamId eventId st = InMemoryTrackingTable (TVar (HashMap streamId (Maybe (eventId, st), Maybe (eventId, String))))
- createInMemoryTrackingTable :: MonadIO m => m (InMemoryTrackingTable streamId eventId st)
- executeInMemoryActions :: (MonadError Error m, Hashable streamId, MonadIO m, Ord streamId) => TVar projected -> (projected -> action -> Either Error projected) -> InMemoryTrackingTable streamId eventId st -> (st, [action], streamId, eventId) -> m ()
Documentation
type Aggregator event agg = event -> State agg () Source #
Function aggregating a state in memory.
runAggregator :: forall m stream aggregate. (MonadError Error m, Show (EventIdentifier stream), Stream m stream) => Aggregator (EventWithContext' stream) aggregate -> stream -> StreamBounds' stream -> aggregate -> m (aggregate, Maybe (EventIdentifier stream), Int) Source #
Run an Aggregator
on events from a stream starting with a given state and
return the new aggregate state, the identifier of the last event processed if
any and how many of them were processed.
type Projection event st action = event -> State st [action] Source #
Projection returning actions that can be batched and executed.
This can be used to batch changes to tables in a database for example.
:: (MonadError Error m, Hashable (StreamIdentifier streamFamily), Ord (EventIdentifier (StreamType streamFamily)), Ord (StreamIdentifier streamFamily), Stream m (StreamType streamFamily), StreamFamily m streamFamily, TrackingTable m trackingTable (StreamIdentifier streamFamily) (EventIdentifier (StreamType streamFamily)) st) | |
=> streamFamily | |
-> (StreamIdentifier streamFamily -> st) | Initialise state when no events have been processed yet. |
-> Projection (EventWithContext' (StreamType streamFamily)) st action | |
-> trackingTable | |
-> (trackingTable -> (st, [action], StreamIdentifier streamFamily, EventIdentifier (StreamType streamFamily)) -> m ()) | Commit the custom actions. See |
-> m () |
data TrackedState identifier st Source #
NeverRan | |
SuccessAt identifier st | |
FailureAt (Maybe (identifier, st)) identifier String | Last succeeded at, failed at. |
Instances
(Eq identifier, Eq st) => Eq (TrackedState identifier st) Source # | |
Defined in Database.CQRS.Projection (==) :: TrackedState identifier st -> TrackedState identifier st -> Bool # (/=) :: TrackedState identifier st -> TrackedState identifier st -> Bool # | |
(Show identifier, Show st) => Show (TrackedState identifier st) Source # | |
Defined in Database.CQRS.Projection showsPrec :: Int -> TrackedState identifier st -> ShowS # show :: TrackedState identifier st -> String # showList :: [TrackedState identifier st] -> ShowS # |
class TrackingTable m table streamId eventId st | table -> streamId, table -> eventId, table -> st where Source #
getTrackedState :: table -> streamId -> m (TrackedState eventId st) Source #
upsertError :: table -> streamId -> eventId -> String -> m () Source #
Instances
(Hashable streamId, MonadIO m, Ord streamId) => TrackingTable m (InMemoryTrackingTable streamId eventId st) streamId eventId st Source # | |
Defined in Database.CQRS.Projection getTrackedState :: InMemoryTrackingTable streamId eventId st -> streamId -> m (TrackedState eventId st) Source # upsertError :: InMemoryTrackingTable streamId eventId st -> streamId -> eventId -> String -> m () Source # |
newtype InMemoryTrackingTable streamId eventId st Source #
Instances
(Hashable streamId, MonadIO m, Ord streamId) => TrackingTable m (InMemoryTrackingTable streamId eventId st) streamId eventId st Source # | |
Defined in Database.CQRS.Projection getTrackedState :: InMemoryTrackingTable streamId eventId st -> streamId -> m (TrackedState eventId st) Source # upsertError :: InMemoryTrackingTable streamId eventId st -> streamId -> eventId -> String -> m () Source # |
createInMemoryTrackingTable :: MonadIO m => m (InMemoryTrackingTable streamId eventId st) Source #
executeInMemoryActions Source #
:: (MonadError Error m, Hashable streamId, MonadIO m, Ord streamId) | |
=> TVar projected | |
-> (projected -> action -> Either Error projected) | For tabular data actions, use |
-> InMemoryTrackingTable streamId eventId st | |
-> (st, [action], streamId, eventId) | |
-> m () |