Safe Haskell | None |
---|---|
Language | Haskell2010 |
Synopsis
- data Stream identifier metadata event
- makeStream :: forall identifier metadata event. (WritableEvent event, ToField (EncodingFormat event), ToField identifier, ToRow metadata) => (forall a. (Connection -> IO a) -> IO a) -> Query -> Query -> [Query] -> Query -> Stream identifier metadata event
- makeStream' :: (ToRow r, ToRow r') => (forall a. (Connection -> IO a) -> IO a) -> (Query, r) -> (forall encEvent. (ToField encEvent, ToRow metadata) => encEvent -> metadata -> ConsistencyCheck identifier -> (Query, r')) -> Query -> Stream identifier metadata event
- data StreamFamily streamId eventId metadata event
- makeStreamFamily :: (forall a. (Connection -> IO a) -> IO a) -> Query -> Query -> (ByteString -> Either String (streamId, eventId)) -> Query -> Query -> [Query] -> Query -> StreamFamily streamId eventId metadata event
- type Projection event st = Projection event st SqlAction
- executeSqlActions :: forall streamId eventId action m st. (MonadError Error m, MonadIO m, ToField eventId, ToField streamId, ToField st) => ([action] -> [SqlAction]) -> (forall r. (Connection -> IO r) -> IO r) -> TrackingTable streamId eventId st -> Consumer (st, [action], streamId, eventId) m ()
- executeCustomActions :: forall streamId eventId action m st. (MonadError Error m, MonadIO m, ToField eventId, ToField streamId, ToField st) => (action -> m (Either String (m ()))) -> TrackingTable streamId eventId st -> Consumer (st, [action], streamId, eventId) m ()
- fromTabularDataActions :: FromTabularDataAction cols => Query -> [TabularDataAction cols] -> [SqlAction]
- createTrackingTable :: (MonadError Error m, MonadIO m) => (forall r. (Connection -> IO r) -> IO r) -> Query -> Query -> Query -> Query -> m (TrackingTable streamId eventId st)
Stream
data Stream identifier metadata event Source #
Stream of events stored in a PostgreSQL relation.
The job of sharding streams in different tables is left to the database. If this is something you want to do, you can create a view and a trigger on insert into that view.
Instances
(Event event, MonadError Error m, MonadIO m, Ord identifier, FromField identifier, ToField identifier, FromRow metadata, FromField (EncodingFormat event)) => Stream m (Stream identifier metadata event) Source # | |
Defined in Database.CQRS.PostgreSQL.Stream type EventType (Stream identifier metadata event) :: Type # type EventIdentifier (Stream identifier metadata event) :: Type # type EventMetadata (Stream identifier metadata event) :: Type # streamEvents :: Stream identifier metadata event -> StreamBounds' (Stream identifier metadata event) -> Producer [Either (EventIdentifier (Stream identifier metadata event), String) (EventWithContext' (Stream identifier metadata event))] m () # | |
(WritableEvent event, MonadError Error m, MonadIO m, Ord identifier, FromField identifier, ToField identifier, FromField (EncodingFormat event), ToField (EncodingFormat event), FromRow metadata, ToRow metadata) => WritableStream m (Stream identifier metadata event) Source # | |
Defined in Database.CQRS.PostgreSQL.Stream writeEventWithMetadata :: Stream identifier metadata event -> EventType (Stream identifier metadata event) -> EventMetadata (Stream identifier metadata event) -> ConsistencyCheck (EventIdentifier (Stream identifier metadata event)) -> m (EventIdentifier (Stream identifier metadata event)) # | |
type EventMetadata (Stream identifier metadata event) Source # | |
Defined in Database.CQRS.PostgreSQL.Stream | |
type EventIdentifier (Stream identifier metadata event) Source # | |
Defined in Database.CQRS.PostgreSQL.Stream | |
type EventType (Stream identifier metadata event) Source # | |
Defined in Database.CQRS.PostgreSQL.Stream |
:: (WritableEvent event, ToField (EncodingFormat event), ToField identifier, ToRow metadata) | |
=> (forall a. (Connection -> IO a) -> IO a) | Connection pool as a function. |
-> Query | Relation name. |
-> Query | Identifier column name. If there are several, use a tuple. |
-> [Query] | Column names for metadata. |
-> Query | Event column name. |
-> Stream identifier metadata event |
Make a Stream
from basic information about the relation name and columns.
:: (ToRow r, ToRow r') | |
=> (forall a. (Connection -> IO a) -> IO a) | Connection pool as a function. |
-> (Query, r) | Select query. |
-> (forall encEvent. (ToField encEvent, ToRow metadata) => encEvent -> metadata -> ConsistencyCheck identifier -> (Query, r')) | Insert query builder. |
-> Query | Identifier column (use tuple if several.) |
-> Stream identifier metadata event |
Make a stream from queries.
This function is less safe than makeStream
and should only be used when
makeStream
is not flexible enough. It is also closer to the implementation
and more subject to changes.
Stream family
data StreamFamily streamId eventId metadata event Source #
Family of event streams stored in a PostgreSQL relation.
Each stream should have a unique stream identifier and event identifiers must be unique within a stream, but not necessarily across them.
allNewEvents
starts a new thread which reads notifications on the given
channel and writes them to a transactional bounded queue (a TBQueue
) which
is then consumed by the returned Producer
. The maximum size of this queue
is hard-coded to 100. Should an exception be raised in the listening thread,
it is thrown back by the producer.
Instances
(Event event, MonadError Error m, MonadIO m, FromField eventId, FromField streamId, FromField (EncodingFormat event), FromRow metadata, ToField eventId, ToField streamId) => StreamFamily m (StreamFamily streamId eventId metadata event) Source # | |
Defined in Database.CQRS.PostgreSQL.StreamFamily type StreamType (StreamFamily streamId eventId metadata event) :: Type # type StreamIdentifier (StreamFamily streamId eventId metadata event) :: Type # getStream :: StreamFamily streamId eventId metadata event -> StreamIdentifier (StreamFamily streamId eventId metadata event) -> m (StreamType (StreamFamily streamId eventId metadata event)) # allNewEvents :: StreamFamily streamId eventId metadata event -> m (Producer [(StreamIdentifier (StreamFamily streamId eventId metadata event), Either (EventIdentifier (StreamType (StreamFamily streamId eventId metadata event)), String) (EventWithContext' (StreamType (StreamFamily streamId eventId metadata event))))] m a) # latestEventIdentifiers :: StreamFamily streamId eventId metadata event -> Producer (StreamIdentifier (StreamFamily streamId eventId metadata event), EventIdentifier (StreamType (StreamFamily streamId eventId metadata event))) m () # | |
type StreamIdentifier (StreamFamily streamId eventId metadata event) Source # | |
Defined in Database.CQRS.PostgreSQL.StreamFamily | |
type StreamType (StreamFamily streamId eventId metadata event) Source # | |
Defined in Database.CQRS.PostgreSQL.StreamFamily |
makeStreamFamily :: (forall a. (Connection -> IO a) -> IO a) -> Query -> Query -> (ByteString -> Either String (streamId, eventId)) -> Query -> Query -> [Query] -> Query -> StreamFamily streamId eventId metadata event Source #
Projection
type Projection event st = Projection event st SqlAction Source #
executeSqlActions :: forall streamId eventId action m st. (MonadError Error m, MonadIO m, ToField eventId, ToField streamId, ToField st) => ([action] -> [SqlAction]) -> (forall r. (Connection -> IO r) -> IO r) -> TrackingTable streamId eventId st -> Consumer (st, [action], streamId, eventId) m () Source #
Execute the SQL actions and update the tracking table in one transaction.
The custom actions are transformed into a list of SQL actions by the given
function. See fromTabularDataActions
for an example.
:: (MonadError Error m, MonadIO m, ToField eventId, ToField streamId, ToField st) | |
=> (action -> m (Either String (m ()))) | Run an action returning either an error or a rollback action. If any of the rollback actions fail, the others are not run. Rollback actions are run in reversed order. |
-> TrackingTable streamId eventId st | |
-> Consumer (st, [action], streamId, eventId) m () |
Execute custom actions by calling the runner function on each action in turn and updating the tracking table accordingly.
fromTabularDataActions Source #
:: FromTabularDataAction cols | |
=> Query | Relation name. |
-> [TabularDataAction cols] | |
-> [SqlAction] |
:: (MonadError Error m, MonadIO m) | |
=> (forall r. (Connection -> IO r) -> IO r) | |
-> Query | Name of the tracking table. |
-> Query | Type of stream identifiers. |
-> Query | Type of event identifiers. |
-> Query | Type of the state. |
-> m (TrackingTable streamId eventId st) |
Create tracking table if it doesn't exist already.
A tracking table is a table used to track the last events processed by a projection for each stream in a stream family. It allows them to restart from where they have left off.