{-# LANGUAGE Strict #-}
module Database.PostgreSQL.Replicant
(
Change (..)
, Column (..)
, Delete (..)
, Insert (..)
, Message (..)
, PgSettings (..)
, ReplicantConnection
, Update (..)
, WalLogData (..)
, connect
, getConnection
, unsafeCreateConnection
, withLogicalStream
) where
import Control.Concurrent
import Control.Exception
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as B
import qualified Data.Text as T
import qualified Data.Text.Read as T
import Database.PostgreSQL.LibPQ hiding (Column)
import Database.PostgreSQL.Replicant.Connection
import Database.PostgreSQL.Replicant.Exception
import Database.PostgreSQL.Replicant.Protocol
import Database.PostgreSQL.Replicant.Message
import Database.PostgreSQL.Replicant.ReplicationSlot
import Database.PostgreSQL.Replicant.Settings
import Database.PostgreSQL.Replicant.Types.Lsn
import Database.PostgreSQL.Replicant.Util
withLogicalStream :: PgSettings -> (Change -> IO LSN) -> IO ()
withLogicalStream :: PgSettings -> (Change -> IO LSN) -> IO ()
withLogicalStream PgSettings
settings Change -> IO LSN
cb = do
ReplicantConnection
conn <- PgSettings -> IO ReplicantConnection
connect PgSettings
settings
let updateFreq :: Int
updateFreq = PgSettings -> Int
getUpdateDelay PgSettings
settings
Maybe IdentifySystem
maybeInfo <- ReplicantConnection -> IO (Maybe IdentifySystem)
identifySystemSync ReplicantConnection
conn
IdentifySystem
_ <- ReplicantException -> Maybe IdentifySystem -> IO IdentifySystem
forall e a. Exception e => e -> Maybe a -> IO a
maybeThrow (String -> ReplicantException
ReplicantException String
"withLogicalStream: could not get system information") Maybe IdentifySystem
maybeInfo
ReplicationSlotInfo
repSlot <- ReplicantConnection -> ByteString -> IO ReplicationSlotInfo
setupReplicationSlot ReplicantConnection
conn (ByteString -> IO ReplicationSlotInfo)
-> ByteString -> IO ReplicationSlotInfo
forall a b. (a -> b) -> a -> b
$ String -> ByteString
B.pack (String -> ByteString)
-> (PgSettings -> String) -> PgSettings -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PgSettings -> String
pgSlotName (PgSettings -> ByteString) -> PgSettings -> ByteString
forall a b. (a -> b) -> a -> b
$ PgSettings
settings
ReplicantConnection
-> ByteString -> LSN -> Int -> (Change -> IO LSN) -> IO ()
startReplicationStream ReplicantConnection
conn (ReplicationSlotInfo -> ByteString
slotName ReplicationSlotInfo
repSlot) (ReplicationSlotInfo -> LSN
slotRestart ReplicationSlotInfo
repSlot) Int
updateFreq Change -> IO LSN
cb
() -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
where
getUpdateDelay :: PgSettings -> Int
getUpdateDelay :: PgSettings -> Int
getUpdateDelay PgSettings {String
Maybe String
pgUpdateDelay :: PgSettings -> String
pgPort :: PgSettings -> String
pgHost :: PgSettings -> String
pgDbName :: PgSettings -> String
pgPassword :: PgSettings -> Maybe String
pgUser :: PgSettings -> String
pgUpdateDelay :: String
pgSlotName :: String
pgPort :: String
pgHost :: String
pgDbName :: String
pgPassword :: Maybe String
pgUser :: String
pgSlotName :: PgSettings -> String
..} =
case Reader Int
forall a. Integral a => Reader a
T.decimal Reader Int
-> (String -> Text) -> String -> Either String (Int, Text)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> Either String (Int, Text))
-> String -> Either String (Int, Text)
forall a b. (a -> b) -> a -> b
$ String
pgUpdateDelay of
Left String
_ -> Int
3000
Right (Int
i, Text
_) -> Int
i