{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Moto.PostgreSQL
( registryConf
, withRegistry
) where
import qualified Control.Exception.Safe as Ex
import Control.Monad.IO.Class (MonadIO, liftIO)
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as B8
import Data.Foldable (foldlM)
import qualified Data.List as List
import qualified Database.PostgreSQL.Simple as Pg
import qualified Database.PostgreSQL.Simple.FromRow as Pg
import qualified Database.PostgreSQL.Simple.FromField as Pg
import qualified Database.PostgreSQL.Simple.ToRow as Pg
import qualified Database.PostgreSQL.Simple.ToField as Pg
import qualified Database.PostgreSQL.Simple.Types as Pg
import qualified Moto
import qualified Moto.Registry as Moto
registryConf :: Moto.RegistryConf
registryConf = Moto.RegistryConf
{ Moto.registryConf_help =
"URI to the database where registry is stored. \
\See https://www.postgresql.org/docs/current/static/libpq-connect.html#LIBPQ-CONNSTRING"
, Moto.registryConf_parse = \s -> do
case any (flip List.isPrefixOf s) ["postgres://", "postgresql://"] of
False -> Left "Registry URI should start with 'postgresql://' \
\or 'postgres://'"
True -> Right (B8.pack s)
, Moto.registryConf_with = withRegistry
}
withRegistry
:: (MonadIO m, Ex.MonadMask m)
=> B.ByteString
-> (Moto.Registry -> m a)
-> m a
withRegistry cs k = Ex.bracket
(liftIO (Pg.connectPostgreSQL cs))
(liftIO . Pg.close)
(\conn -> k =<< liftIO (do
logs :: [LogV1] <- Pg.query_ conn
"SELECT pg_advisory_lock(589412153);\n\
\SET client_min_messages = error;\n\
\CREATE SCHEMA IF NOT EXISTS moto;\n\
\CREATE TABLE IF NOT EXISTS moto.registry\n\
\ ( ord bigserial NOT NULL\
\ , act text NOT NULL\
\ , t timestamptz NOT NULL\
\ , mid text NULL\
\ , dir text NULL );\
\SET client_min_messages = notice;\n\
\SELECT act, t, mid, dir FROM moto.registry ORDER BY ord ASC;"
state0 :: Moto.State <- either Ex.throwM pure $ do
foldlM Moto.updateState Moto.emptyState (map unLogV1 logs)
Moto.newAppendOnlyRegistry state0 $ \log_ -> do
1 <- Pg.execute conn
"INSERT INTO moto.registry (act,t,mid,dir) VALUES (?,?,?,?);"
(LogV1 log_)
pure ()))
newtype LogV1 = LogV1 { unLogV1 :: Moto.Log }
instance Pg.ToRow LogV1 where
toRow (LogV1 (Moto.Log_Prepare t mId d)) =
[ Pg.toField ("prepare" :: String)
, Pg.toField t
, Pg.toField (Moto.unMigId mId)
, Pg.toField (Moto.direction "backwards" "forwards" d :: String) ]
toRow (LogV1 (Moto.Log_Abort t)) =
[ Pg.toField ("abort" :: String)
, Pg.toField t
, Pg.toField Pg.Null
, Pg.toField Pg.Null ]
toRow (LogV1 (Moto.Log_Commit t)) =
[ Pg.toField ("commit" :: String)
, Pg.toField t
, Pg.toField Pg.Null
, Pg.toField Pg.Null ]
instance Pg.FromRow LogV1 where
fromRow = fmap LogV1 $ do
Pg.field >>= \case
"prepare" -> do
t <- Pg.field
mId <- fmap Moto.MigId Pg.field
d <- Pg.field >>= \case
"backwards" -> pure Moto.Backwards
"forwards" -> pure Moto.Forwards
s -> fail ("bad direction: " ++ show (s :: String))
pure (Moto.Log_Prepare t mId d)
"abort" -> do
t <- Pg.field
Pg.Null <- Pg.field
Pg.Null <- Pg.field
pure (Moto.Log_Abort t)
"commit" -> do
t <- Pg.field
Pg.Null <- Pg.field
Pg.Null <- Pg.field
pure (Moto.Log_Commit t)
s -> fail ("bad action: " ++ show (s :: String))