{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
module Database.CQRS.PostgreSQL.Projection
( Projection
, executeSqlActions
, executeCustomActions
, fromTabularDataActions
) where
import Control.Exception
import Control.Monad ((<=<), forever, forM_)
import Control.Monad.Trans (MonadIO(..), lift)
import Data.List (intersperse)
import Data.Proxy (Proxy(..))
import Data.String (fromString)
import Database.PostgreSQL.Simple ((:.)(..))
import qualified Control.Monad.Except as Exc
import qualified Control.Monad.Identity as Id
import qualified Control.Monad.State.Strict as St
import qualified Data.Bifunctor as Bifunctor
import qualified Database.PostgreSQL.Simple as PG
import qualified Database.PostgreSQL.Simple.ToField as PG.To
import qualified Database.PostgreSQL.Simple.Types as PG
import qualified Pipes
import Database.CQRS.PostgreSQL.Internal
import Database.CQRS.PostgreSQL.TrackingTable
import qualified Database.CQRS as CQRS
import qualified Database.CQRS.TabularData as CQRS.Tab
type Projection event st = CQRS.Projection event st SqlAction
executeSqlActions
:: forall streamId eventId action m st.
( Exc.MonadError CQRS.Error m
, MonadIO m
, PG.To.ToField eventId
, PG.To.ToField streamId
, PG.To.ToField st
)
=> ([action] -> [SqlAction])
-> (forall r. (PG.Connection -> IO r) -> IO r)
-> TrackingTable streamId eventId st
-> Pipes.Consumer (st, [action], streamId, eventId) m ()
executeSqlActions transform connectionPool trackingTable =
forever $ do
(st, actions, streamId, eventId) <- Pipes.await
let sqlActions = transform actions
(query, values) =
appendSqlActions
[ ("BEGIN", [])
, appendSqlActions sqlActions
, upsertTrackingTable
trackingTable streamId eventId (Right st)
, ("COMMIT", [])
]
Exc.liftEither <=< liftIO . connectionPool $ \conn -> do
eRes <-
(Right <$> PG.execute conn query values)
`catches`
[ handleError (Proxy @PG.FormatError) id
, handleError (Proxy @PG.SqlError) id
]
case eRes of
Left err -> do
let (uquery, uvalues) =
upsertTrackingTable
trackingTable streamId eventId
(Left err :: Either String st)
(const (Right ()) <$> PG.execute conn uquery uvalues)
`catches`
[ handleError (Proxy @PG.FormatError) CQRS.ProjectionError
, handleError (Proxy @PG.SqlError) CQRS.ProjectionError
]
Right _ -> pure $ Right ()
executeCustomActions
:: forall streamId eventId action m st.
( Exc.MonadError CQRS.Error m
, MonadIO m
, PG.To.ToField eventId
, PG.To.ToField streamId
, PG.To.ToField st
)
=> (action -> m (Either String (m ())))
-> TrackingTable streamId eventId st
-> Pipes.Consumer (st, [action], streamId, eventId) m ()
executeCustomActions runAction trackingTable =
forever $ do
(st, actions, streamId, eventId) <- Pipes.await
(eRes, rollbackActions) <- lift . flip St.runStateT [] . Exc.runExceptT $
forM_ actions $ \action -> do
errOrRollback <- lift . lift . runAction $ action
case errOrRollback of
Left err -> Exc.throwError err
Right rollbackAction -> St.modify' (rollbackAction :)
lift $ case eRes of
Left err -> do
doUpsertTrackingTable trackingTable streamId eventId
(Left err :: Either String st)
sequence_ rollbackActions
Right () ->
doUpsertTrackingTable trackingTable streamId eventId (Right st)
fromTabularDataActions
:: FromTabularDataAction cols
=> PG.Query
-> [CQRS.Tab.TabularDataAction cols]
-> [SqlAction]
fromTabularDataActions = map . fromTabularDataAction
class FromTabularDataAction cols where
fromTabularDataAction
:: PG.Query -> CQRS.Tab.TabularDataAction cols -> SqlAction
instance
forall keyCols cols.
( CQRS.Tab.AllColumns
PG.To.ToField (CQRS.Tab.Flatten ('CQRS.Tab.WithUniqueKey keyCols cols))
, CQRS.Tab.AllColumns PG.To.ToField keyCols
, CQRS.Tab.AllColumns PG.To.ToField cols
, CQRS.Tab.MergeSplitTuple keyCols cols
)
=> FromTabularDataAction ('CQRS.Tab.WithUniqueKey keyCols cols) where
fromTabularDataAction relation = \case
CQRS.Tab.Insert tuple ->
makeInsertSqlAction @('CQRS.Tab.WithUniqueKey keyCols cols) relation tuple
CQRS.Tab.Update updates conds ->
makeUpdateSqlAction
@('CQRS.Tab.WithUniqueKey keyCols cols)
relation updates conds
CQRS.Tab.Upsert tuple -> makeUpsertSqlAction @keyCols @cols relation tuple
CQRS.Tab.Delete conds ->
makeDeleteSqlAction
@('CQRS.Tab.WithUniqueKey keyCols cols)
relation conds
instance
forall cols.
CQRS.Tab.AllColumns PG.To.ToField cols
=> FromTabularDataAction ('CQRS.Tab.Flat cols) where
fromTabularDataAction relation = \case
CQRS.Tab.Insert tuple ->
makeInsertSqlAction @('CQRS.Tab.Flat cols) relation tuple
CQRS.Tab.Update updates conds ->
makeUpdateSqlAction @('CQRS.Tab.Flat cols) relation updates conds
CQRS.Tab.Delete conds ->
makeDeleteSqlAction @('CQRS.Tab.Flat cols) relation conds
makeInsertSqlAction
:: forall (cols :: CQRS.Tab.Columns).
CQRS.Tab.AllColumns PG.To.ToField (CQRS.Tab.Flatten cols)
=> PG.Query -> CQRS.Tab.Tuple Id.Identity cols -> SqlAction
makeInsertSqlAction relation tuple =
let (identifiers, values) =
unzip
. CQRS.Tab.toList @PG.To.ToField
(\name (Id.Identity x) ->
(fromString @PG.Identifier name, PG.To.toField x))
$ tuple
questionMarks =
"(" <> mconcat (intersperse "," (map (const "?") values)) <> ")"
query =
"INSERT INTO " <> relation <> questionMarks
<> " VALUES " <> questionMarks
in
makeSqlAction query (identifiers :. values)
makeUpdateSqlAction
:: forall (cols :: CQRS.Tab.Columns).
CQRS.Tab.AllColumns PG.To.ToField (CQRS.Tab.Flatten cols)
=> PG.Query
-> CQRS.Tab.Tuple CQRS.Tab.Update cols
-> CQRS.Tab.Tuple CQRS.Tab.Conditions cols
-> SqlAction
makeUpdateSqlAction relation updates conds =
let (updatesQuery, updatesValues) =
Bifunctor.bimap (mconcat . intersperse ",") mconcat
. unzip
. CQRS.Tab.toList @PG.To.ToField
(\name update -> case update of
CQRS.Tab.NoUpdate -> ("", [])
CQRS.Tab.Set x ->
( "? = ?"
, [ PG.To.toField (fromString @PG.Identifier name)
, PG.To.toField x
]
)
CQRS.Tab.Plus x ->
( "? = ? + ?"
, [ PG.To.toField (fromString @PG.Identifier name)
, PG.To.toField (fromString @PG.Identifier name)
, PG.To.toField x
]
)
CQRS.Tab.Minus x ->
( "? = ? - ?"
, [ PG.To.toField (fromString @PG.Identifier name)
, PG.To.toField (fromString @PG.Identifier name)
, PG.To.toField x
]
)
)
$ updates
(whereStmtQuery, whereStmtValues) = makeWhereStatement @cols conds
values = updatesValues ++ whereStmtValues
query =
"UPDATE " <> relation <> " SET " <> updatesQuery <> whereStmtQuery
in
(query, values)
makeUpsertSqlAction
:: forall keyCols cols.
( CQRS.Tab.AllColumns PG.To.ToField keyCols
, CQRS.Tab.AllColumns PG.To.ToField cols
, CQRS.Tab.MergeSplitTuple keyCols cols
)
=> PG.Query
-> CQRS.Tab.Tuple Id.Identity ('CQRS.Tab.WithUniqueKey keyCols cols)
-> SqlAction
makeUpsertSqlAction relation tuple =
let toSqlValues
:: forall flatCols. CQRS.Tab.AllColumns PG.To.ToField flatCols
=> CQRS.Tab.FlatTuple Id.Identity flatCols
-> [(PG.Identifier, PG.To.Action)]
toSqlValues =
CQRS.Tab.toList @PG.To.ToField
(\name (Id.Identity x) ->
(fromString @PG.Identifier name, PG.To.toField x))
(keyTuple, otherTuple) = CQRS.Tab.splitTuple @keyCols @cols tuple
keyPairs = toSqlValues keyTuple
(keyIdentifiers, keyValues) = unzip keyPairs
otherPairs = toSqlValues otherTuple
(otherIdentifiers, otherValues) = unzip otherPairs
mkQuestionMarks xs =
mconcat (intersperse "," (map (const "?") xs))
keyQuestionMarks = mkQuestionMarks keyValues
rowQuestionMarks =
"(" <> mkQuestionMarks (keyValues ++ otherValues) <> ")"
mkValues =
foldr (\(name, value) acc -> PG.To.toField name : value : acc) []
updateSetValues = mkValues otherPairs
updateWhereValues = mkValues keyPairs
query =
"INSERT INTO " <> relation <> rowQuestionMarks
<> " VALUES " <> rowQuestionMarks
<> " ON CONFLICT " <> keyQuestionMarks <> " DO UPDATE SET "
<> mconcat
(intersperse ", " (map (const "? = ?") otherIdentifiers))
<> " WHERE "
<> mconcat
(intersperse " AND " (map (const "? = ?") keyIdentifiers))
in
makeSqlAction query $
keyIdentifiers :. otherIdentifiers :. keyValues :. otherValues
:. keyIdentifiers :. updateSetValues :. updateWhereValues
makeDeleteSqlAction
:: forall (cols :: CQRS.Tab.Columns).
CQRS.Tab.AllColumns PG.To.ToField (CQRS.Tab.Flatten cols)
=> PG.Query
-> CQRS.Tab.Tuple CQRS.Tab.Conditions cols
-> SqlAction
makeDeleteSqlAction relation conds =
let (whereStmtQuery, whereStmtValues) =
makeWhereStatement @cols conds
query = "DELETE FROM " <> relation <> whereStmtQuery
in
(query, whereStmtValues)
makeWhereStatement
:: forall (cols :: CQRS.Tab.Columns).
CQRS.Tab.AllColumns PG.To.ToField (CQRS.Tab.Flatten cols)
=> CQRS.Tab.Tuple CQRS.Tab.Conditions cols
-> (PG.Query, [PG.To.Action])
makeWhereStatement =
Bifunctor.bimap
(\cs -> if null cs
then ""
else mconcat (" WHERE " : intersperse " AND " cs))
mconcat
. unzip
. mconcat
. CQRS.Tab.toList @PG.To.ToField @(CQRS.Tab.Flatten cols)
(\name value ->
map
(makeCond (PG.To.toField (fromString @PG.Identifier name)))
(CQRS.Tab.getConditions value))
makeCond
:: PG.To.ToField a
=> PG.To.Action -> CQRS.Tab.Condition a -> (PG.Query, [PG.To.Action])
makeCond name = \case
CQRS.Tab.Equal x -> ("? = ?", [name, PG.To.toField x])
CQRS.Tab.NotEqual x -> ("? <> ?", [name, PG.To.toField x])
CQRS.Tab.LowerThan x -> ("? < ?", [name, PG.To.toField x])
CQRS.Tab.LowerThanOrEqual x -> ("? <= ?", [name, PG.To.toField x])
CQRS.Tab.GreaterThan x -> ("? > ?", [name, PG.To.toField x])
CQRS.Tab.GreaterThanOrEqual x -> ("? >= ?", [name, PG.To.toField x])