{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE UndecidableInstances #-}
module Database.CQRS.PostgreSQL.Stream
( Stream
, makeStream
, makeStream'
) where
import Control.Exception (catches)
import Control.Monad (when)
import Control.Monad.Trans (MonadIO(..))
import Data.Functor ((<&>))
import Data.List (intersperse)
import Data.Maybe (catMaybes, listToMaybe)
import Data.Proxy (Proxy(..))
import Data.String (fromString)
import Database.PostgreSQL.Simple ((:.)(..))
import qualified Control.Monad.Except as Exc
import qualified Database.PostgreSQL.Simple as PG
import qualified Database.PostgreSQL.Simple.FromField as PG.From
import qualified Database.PostgreSQL.Simple.FromRow as PG.From
import qualified Database.PostgreSQL.Simple.ToField as PG.To
import qualified Database.PostgreSQL.Simple.ToRow as PG.To
import qualified Pipes
import Database.CQRS.PostgreSQL.Internal (SomeParams(..), handleError)
import qualified Database.CQRS as CQRS
data Stream identifier metadata event =
forall r r'. (PG.To.ToRow r, PG.To.ToRow r') => Stream
{ connectionPool :: forall a. (PG.Connection -> IO a) -> IO a
, selectQuery :: (PG.Query, r)
, insertQuery
:: forall encEvent.
(PG.To.ToField encEvent, PG.To.ToRow metadata)
=> encEvent -> metadata -> CQRS.ConsistencyCheck identifier
-> (PG.Query, r')
, identifierColumn :: PG.Query
}
makeStream
:: forall identifier metadata event.
( CQRS.WritableEvent event
, PG.To.ToField (CQRS.EncodingFormat event)
, PG.To.ToField identifier
, PG.To.ToRow metadata
)
=> (forall a. (PG.Connection -> IO a) -> IO a)
-> PG.Query
-> PG.Query
-> [PG.Query]
-> PG.Query
-> Stream identifier metadata event
makeStream connectionPool relation
identifierColumn metadataColumns eventColumn =
let selectQuery = (selectQuery', ())
in Stream{..}
where
selectQuery' :: PG.Query
selectQuery' =
"SELECT "
<> identifierColumn <> ", "
<> metadataList <> ", "
<> eventColumn
<> " FROM " <> relation
<> " ORDER BY " <> identifierColumn <> " ASC"
insertQuery
:: (PG.To.ToField encEvent, PG.To.ToRow metadata)
=> encEvent -> metadata -> CQRS.ConsistencyCheck identifier
-> (PG.Query, SomeParams)
insertQuery encEvent metadata cc =
let baseParams = metadata :. PG.Only encEvent
(cond, params) = case cc of
CQRS.NoConsistencyCheck -> ("", SomeParams baseParams)
CQRS.CheckNoEvents ->
( " WHERE NOT EXISTS (SELECT 1 FROM " <> relation <> ")"
, SomeParams baseParams
)
CQRS.CheckLastEvent identifier ->
( " WHERE NOT EXISTS (SELECT 1 FROM " <> relation <> " WHERE "
<> identifierColumn <> " > ?)"
, SomeParams (baseParams :. PG.Only identifier)
)
query =
"INSERT INTO " <> relation <> "("
<> metadataList <> ", " <> eventColumn
<> ") SELECT " <> metadataMarks <> ", ?"
<> cond
<> " RETURNING " <> identifierColumn
in
(query, params)
metadataList :: PG.Query
metadataList =
mconcat . intersperse "," $ metadataColumns
metadataMarks :: PG.Query
metadataMarks =
mconcat . intersperse "," . map (const "?") $ metadataColumns
makeStream'
:: (PG.To.ToRow r, PG.To.ToRow r')
=> (forall a. (PG.Connection -> IO a) -> IO a)
-> (PG.Query, r)
-> (forall encEvent. (PG.To.ToField encEvent, PG.To.ToRow metadata)
=> encEvent -> metadata -> CQRS.ConsistencyCheck identifier
-> (PG.Query, r'))
-> PG.Query
-> Stream identifier metadata event
makeStream' connectionPool selectQuery insertQuery identifierColumn =
Stream{..}
instance
( CQRS.Event event
, Exc.MonadError CQRS.Error m
, MonadIO m
, Ord identifier
, PG.From.FromField identifier
, PG.To.ToField identifier
, PG.From.FromRow metadata
, PG.From.FromField (CQRS.EncodingFormat event)
) => CQRS.Stream m (Stream identifier metadata event) where
type EventType (Stream identifier metadata event) = event
type EventIdentifier (Stream identifier metadata event) = identifier
type EventMetadata (Stream identifier metadata event) = metadata
streamEvents = streamStreamEvents
instance
( CQRS.WritableEvent event
, Exc.MonadError CQRS.Error m
, MonadIO m
, Ord identifier
, PG.From.FromField identifier
, PG.To.ToField identifier
, PG.From.FromField (CQRS.EncodingFormat event)
, PG.To.ToField (CQRS.EncodingFormat event)
, PG.From.FromRow metadata
, PG.To.ToRow metadata
) => CQRS.WritableStream m (Stream identifier metadata event) where
writeEventWithMetadata = streamWriteEventWithMetadata
streamWriteEventWithMetadata
:: ( CQRS.WritableEvent event
, Exc.MonadError CQRS.Error m
, MonadIO m
, PG.From.FromField identifier
, PG.From.FromField (CQRS.EncodingFormat event)
, PG.To.ToField (CQRS.EncodingFormat event)
, PG.To.ToRow metadata
)
=> Stream identifier metadata event
-> event
-> metadata
-> CQRS.ConsistencyCheck identifier
-> m identifier
streamWriteEventWithMetadata Stream{..} event metadata cc = do
eIds <- liftIO . connectionPool $ \conn -> do
let (req, params) = insertQuery (CQRS.encodeEvent event) metadata cc
(Right <$> PG.query conn req params)
`catches`
[ handleError (Proxy @PG.FormatError) CQRS.EventWriteError
, handleError (Proxy @PG.QueryError) CQRS.EventWriteError
, handleError (Proxy @PG.ResultError) CQRS.EventWriteError
, handleError (Proxy @PG.SqlError) CQRS.EventWriteError
]
case eIds of
Left err -> Exc.throwError err
Right [PG.Only identifier] -> pure identifier
Right [] -> Exc.throwError $ CQRS.ConsistencyCheckError "no events inserted"
Right ids -> Exc.throwError $ CQRS.EventWriteError $
show (length ids) ++ " events were inserted"
streamStreamEvents
:: forall identifier metadata event m.
( CQRS.Event event
, Exc.MonadError CQRS.Error m
, MonadIO m
, Ord identifier
, PG.From.FromField identifier
, PG.To.ToField identifier
, PG.From.FromRow metadata
, PG.From.FromField (CQRS.EncodingFormat event)
)
=> Stream identifier metadata event
-> CQRS.StreamBounds identifier
-> Pipes.Producer
[ Either
(identifier, String) (CQRS.EventWithContext identifier metadata event)
] m ()
streamStreamEvents Stream{..} bounds =
go Nothing
where
go
:: Maybe identifier
-> Pipes.Producer
[ Either
(identifier, String)
(CQRS.EventWithContext identifier metadata event)
] m ()
go lastFetchedIdentifier = do
eRows <- liftIO . connectionPool $ \conn ->
(Right <$> fetchBatch conn lastFetchedIdentifier)
`catches`
[ handleError (Proxy @PG.FormatError) CQRS.EventRetrievalError
, handleError (Proxy @PG.QueryError) CQRS.EventRetrievalError
, handleError (Proxy @PG.ResultError) CQRS.EventRetrievalError
, handleError (Proxy @PG.SqlError) CQRS.EventRetrievalError
]
rows <- either Exc.throwError pure eRows
Pipes.yield $
map
(\(PG.Only identifier :. metadata :. PG.Only encEvent) ->
case CQRS.decodeEvent encEvent of
Left err -> Left (identifier, err)
Right event ->
Right $ CQRS.EventWithContext identifier metadata event
) rows
let mLfi =
fmap (\(PG.Only identifier :. _) -> identifier)
. listToMaybe
. reverse
$ rows
when (length rows == batchSize) $ go mLfi
fetchBatch
:: PG.Connection
-> Maybe identifier
-> IO [ PG.Only identifier
:. metadata
:. PG.Only (CQRS.EncodingFormat event) ]
fetchBatch conn lastFetchedIdentifier = do
let bounds' =
bounds <> maybe mempty CQRS.afterEvent lastFetchedIdentifier
conditions = catMaybes
[ CQRS._afterEvent bounds' <&> \i ->
(identifierColumn <> " > ?", i)
, CQRS._untilEvent bounds' <&> \i ->
(identifierColumn <> " <= ?", i)
]
whereClause
| null conditions = mempty
| otherwise =
(" WHERE " <>)
. mconcat
. intersperse " AND "
. map fst
$ conditions
params = snd selectQuery :. map snd conditions
query =
"SELECT * FROM (" <> fst selectQuery <> ") AS _"
<> whereClause
<> " LIMIT "
<> fromString (show batchSize)
PG.query conn query params
batchSize :: Int
batchSize = 100