eventsourcing-postgresql-0.9.0: PostgreSQL adaptor for eventsourcing.

Safe HaskellNone
LanguageHaskell2010

Database.CQRS.PostgreSQL

Contents

Synopsis

Stream

data Stream identifier metadata event Source #

Stream of events stored in a PostgreSQL relation.

The job of sharding streams in different tables is left to the database. If this is something you want to do, you can create a view and a trigger on insert into that view.

Instances
(Event event, MonadError Error m, MonadIO m, Ord identifier, FromField identifier, ToField identifier, FromRow metadata, FromField (EncodingFormat event)) => Stream m (Stream identifier metadata event) Source # 
Instance details

Defined in Database.CQRS.PostgreSQL.Stream

Associated Types

type EventType (Stream identifier metadata event) :: Type #

type EventIdentifier (Stream identifier metadata event) :: Type #

type EventMetadata (Stream identifier metadata event) :: Type #

Methods

streamEvents :: Stream identifier metadata event -> StreamBounds' (Stream identifier metadata event) -> Producer [Either (EventIdentifier (Stream identifier metadata event), String) (EventWithContext' (Stream identifier metadata event))] m () #

(WritableEvent event, MonadError Error m, MonadIO m, Ord identifier, FromField identifier, ToField identifier, FromField (EncodingFormat event), ToField (EncodingFormat event), FromRow metadata, ToRow metadata) => WritableStream m (Stream identifier metadata event) Source # 
Instance details

Defined in Database.CQRS.PostgreSQL.Stream

Methods

writeEventWithMetadata :: Stream identifier metadata event -> EventType (Stream identifier metadata event) -> EventMetadata (Stream identifier metadata event) -> ConsistencyCheck (EventIdentifier (Stream identifier metadata event)) -> m (EventIdentifier (Stream identifier metadata event)) #

type EventMetadata (Stream identifier metadata event) Source # 
Instance details

Defined in Database.CQRS.PostgreSQL.Stream

type EventMetadata (Stream identifier metadata event) = metadata
type EventIdentifier (Stream identifier metadata event) Source # 
Instance details

Defined in Database.CQRS.PostgreSQL.Stream

type EventIdentifier (Stream identifier metadata event) = identifier
type EventType (Stream identifier metadata event) Source # 
Instance details

Defined in Database.CQRS.PostgreSQL.Stream

type EventType (Stream identifier metadata event) = event

makeStream Source #

Arguments

:: (WritableEvent event, ToField (EncodingFormat event), ToField identifier, ToRow metadata) 
=> (forall a. (Connection -> IO a) -> IO a)

Connection pool as a function.

-> Query

Relation name.

-> Query

Identifier column name. If there are several, use a tuple.

-> [Query]

Column names for metadata.

-> Query

Event column name.

-> Stream identifier metadata event 

Make a Stream from basic information about the relation name and columns.

makeStream' Source #

Arguments

:: (ToRow r, ToRow r') 
=> (forall a. (Connection -> IO a) -> IO a)

Connection pool as a function.

-> (Query, r)

Select query.

-> (forall encEvent. (ToField encEvent, ToRow metadata) => encEvent -> metadata -> ConsistencyCheck identifier -> (Query, r'))

Insert query builder.

-> Query

Identifier column (use tuple if several.)

-> Stream identifier metadata event 

Make a stream from queries.

This function is less safe than makeStream and should only be used when makeStream is not flexible enough. It is also closer to the implementation and more subject to changes.

Stream family

data StreamFamily streamId eventId metadata event Source #

Family of event streams stored in a PostgreSQL relation.

Each stream should have a unique stream identifier and event identifiers must be unique within a stream, but not necessarily across them.

allNewEvents starts a new thread which reads notifications on the given channel and writes them to a transactional bounded queue (a TBQueue) which is then consumed by the returned Producer. The maximum size of this queue is hard-coded to 100. Should an exception be raised in the listening thread, it is thrown back by the producer.

Instances
(Event event, MonadError Error m, MonadIO m, FromField eventId, FromField streamId, FromField (EncodingFormat event), FromRow metadata, ToField eventId, ToField streamId) => StreamFamily m (StreamFamily streamId eventId metadata event) Source # 
Instance details

Defined in Database.CQRS.PostgreSQL.StreamFamily

Associated Types

type StreamType (StreamFamily streamId eventId metadata event) :: Type #

type StreamIdentifier (StreamFamily streamId eventId metadata event) :: Type #

Methods

getStream :: StreamFamily streamId eventId metadata event -> StreamIdentifier (StreamFamily streamId eventId metadata event) -> m (StreamType (StreamFamily streamId eventId metadata event)) #

allNewEvents :: StreamFamily streamId eventId metadata event -> m (Producer [(StreamIdentifier (StreamFamily streamId eventId metadata event), Either (EventIdentifier (StreamType (StreamFamily streamId eventId metadata event)), String) (EventWithContext' (StreamType (StreamFamily streamId eventId metadata event))))] m a) #

latestEventIdentifiers :: StreamFamily streamId eventId metadata event -> Producer (StreamIdentifier (StreamFamily streamId eventId metadata event), EventIdentifier (StreamType (StreamFamily streamId eventId metadata event))) m () #

type StreamIdentifier (StreamFamily streamId eventId metadata event) Source # 
Instance details

Defined in Database.CQRS.PostgreSQL.StreamFamily

type StreamIdentifier (StreamFamily streamId eventId metadata event) = streamId
type StreamType (StreamFamily streamId eventId metadata event) Source # 
Instance details

Defined in Database.CQRS.PostgreSQL.StreamFamily

type StreamType (StreamFamily streamId eventId metadata event) = Stream eventId metadata event

makeStreamFamily :: (forall a. (Connection -> IO a) -> IO a) -> Query -> Query -> (ByteString -> Either String (streamId, eventId)) -> Query -> Query -> [Query] -> Query -> StreamFamily streamId eventId metadata event Source #

Projection

type Projection event st = Projection event st SqlAction Source #

executeSqlActions :: forall streamId eventId action m st. (MonadError Error m, MonadIO m, ToField eventId, ToField streamId, ToField st) => ([action] -> [SqlAction]) -> (forall r. (Connection -> IO r) -> IO r) -> TrackingTable streamId eventId st -> Consumer (st, [action], streamId, eventId) m () Source #

Execute the SQL actions and update the tracking table in one transaction.

The custom actions are transformed into a list of SQL actions by the given function. See fromTabularDataActions for an example.

executeCustomActions Source #

Arguments

:: (MonadError Error m, MonadIO m, ToField eventId, ToField streamId, ToField st) 
=> (action -> m (Either String (m ())))

Run an action returning either an error or a rollback action. If any of the rollback actions fail, the others are not run. Rollback actions are run in reversed order.

-> TrackingTable streamId eventId st 
-> Consumer (st, [action], streamId, eventId) m () 

Execute custom actions by calling the runner function on each action in turn and updating the tracking table accordingly.

fromTabularDataActions Source #

Arguments

:: FromTabularDataAction cols 
=> Query

Relation name.

-> [TabularDataAction cols] 
-> [SqlAction] 

createTrackingTable Source #

Arguments

:: (MonadError Error m, MonadIO m) 
=> (forall r. (Connection -> IO r) -> IO r) 
-> Query

Name of the tracking table.

-> Query

Type of stream identifiers.

-> Query

Type of event identifiers.

-> Query

Type of the state.

-> m (TrackingTable streamId eventId st) 

Create tracking table if it doesn't exist already.

A tracking table is a table used to track the last events processed by a projection for each stream in a stream family. It allows them to restart from where they have left off.