{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeApplications #-}
module Database.CQRS.PostgreSQL.TrackingTable
( TrackingTable
, createTrackingTable
, upsertTrackingTable
, doUpsertTrackingTable
) where
import Control.Applicative ((<|>))
import Control.Exception (catches)
import Control.Monad ((<=<), void)
import Control.Monad.Trans (MonadIO(..))
import Data.Proxy (Proxy(..))
import qualified Control.Monad.Except as Exc
import qualified Database.PostgreSQL.Simple as PG
import qualified Database.PostgreSQL.Simple.Types as PG
import qualified Database.PostgreSQL.Simple.FromField as PG.From
import qualified Database.PostgreSQL.Simple.ToField as PG.To
import qualified Database.PostgreSQL.Simple.ToRow as PG.To
import Database.CQRS.PostgreSQL.Internal (makeSqlAction, SqlAction, handleError)
import qualified Database.CQRS as CQRS
data TrackingTable streamId eventId st = TrackingTable
{ connectionPool :: forall r. (PG.Connection -> IO r) -> IO r
, relation :: PG.Query
}
instance
( Exc.MonadError CQRS.Error m
, MonadIO m
, PG.From.FromField eventId
, PG.From.FromField st
, PG.From.FromField streamId
, PG.To.ToField eventId
, PG.To.ToField st
, PG.To.ToField streamId
)
=> CQRS.TrackingTable m (TrackingTable streamId eventId st)
streamId eventId st where
getTrackedState
:: TrackingTable streamId eventId st
-> streamId
-> m (CQRS.TrackedState eventId st)
getTrackedState TrackingTable{..} streamId =
handlePgErrors . connectionPool $ \conn -> do
let query =
"SELECT event_id, failed_event_id, failed_message, state FROM "
<> relation <> " WHERE stream_id = ?"
rows <- PG.query conn query (PG.Only streamId)
pure $ case rows of
[(Just eventId, Nothing, Nothing, SomeState state)] ->
CQRS.SuccessAt eventId state
[(mEventId, Just failedAt, Just err, oState)] ->
CQRS.FailureAt
((,) <$> mEventId <*> fromOptionalState oState) failedAt err
_ -> CQRS.NeverRan
upsertError
:: TrackingTable streamId eventId st
-> streamId
-> eventId
-> String
-> m ()
upsertError trackingTable streamId eventId err =
doUpsertTrackingTable trackingTable streamId eventId (Left err)
upsertTrackingTable
:: ( PG.To.ToField streamId
, PG.To.ToField eventId
, PG.To.ToField st
)
=> TrackingTable streamId eventId st
-> streamId
-> eventId
-> Either String st
-> SqlAction
upsertTrackingTable TrackingTable{..} streamId eventId eState =
let (updates, updateValues, insertValues) =
case eState of
Right state ->
( "event_id = ?, failed_event_id = NULL,\
\ failed_message = NULL, state = ?"
, [PG.To.toField eventId, PG.To.toField state]
, PG.To.toRow (streamId, eventId, PG.Null, PG.Null, state)
)
Left err ->
( "failed_event_id = ?, failed_message = ?"
, PG.To.toRow (eventId, err)
, PG.To.toRow (streamId, PG.Null, eventId, err, PG.Null)
)
query =
"INSERT INTO " <> relation
<> " (stream_id, event_id, failed_event_id, failed_message, state)"
<> " VALUES (?, ?, ?, ?, ?) ON CONFLICT (stream_id) DO UPDATE SET "
<> updates
in
makeSqlAction query $ insertValues ++ updateValues
doUpsertTrackingTable
:: ( Exc.MonadError CQRS.Error m
, MonadIO m
, PG.To.ToField eventId
, PG.To.ToField streamId
, PG.To.ToField st
)
=> TrackingTable streamId eventId st
-> streamId
-> eventId
-> Either String st
-> m ()
doUpsertTrackingTable tt@TrackingTable{..} streamId eventId eState = do
handlePgErrors . connectionPool $ \conn -> do
let (query, values) = upsertTrackingTable tt streamId eventId eState
void $ PG.execute conn query values
createTrackingTable
:: ( Exc.MonadError CQRS.Error m
, MonadIO m
)
=> (forall r. (PG.Connection -> IO r) -> IO r)
-> PG.Query
-> PG.Query
-> PG.Query
-> PG.Query
-> m (TrackingTable streamId eventId st)
createTrackingTable
connectionPool relation streamIdType eventIdType stateType = do
handlePgErrors . connectionPool $ \conn -> do
let query =
"CREATE TABLE IF NOT EXISTS " <> relation
<> " ( stream_id " <> streamIdType <> " PRIMARY KEY"
<> " , event_id " <> eventIdType
<> " , failed_event_id " <> eventIdType
<> " , failed_message varchar"
<> " , state " <> stateType <> ")"
void $ PG.execute_ conn query
pure TrackingTable{..}
handlePgErrors
:: ( Exc.MonadError CQRS.Error m
, MonadIO m
)
=> IO a -> m a
handlePgErrors f =
Exc.liftEither <=< liftIO $ do
(Right <$> f)
`catches`
[ handleError (Proxy @PG.FormatError) CQRS.TrackingTableError
, handleError (Proxy @PG.SqlError) CQRS.TrackingTableError
, handleError (Proxy @PG.QueryError) CQRS.TrackingTableError
, handleError (Proxy @PG.ResultError) CQRS.TrackingTableError
]
data OptionalState st = SomeState st | NoState
instance PG.From.FromField st => PG.From.FromField (OptionalState st) where
fromField f mBS =
case mBS of
Nothing -> (SomeState <$> PG.From.fromField f mBS) <|> pure NoState
Just _ -> SomeState <$> PG.From.fromField f mBS
fromOptionalState :: OptionalState a -> Maybe a
fromOptionalState = \case
SomeState x -> Just x
NoState -> Nothing