{-# LANGUAGE Strict #-}

{-|
Module      : Database.PostgreSQL.Replicant
Description : A PostgreSQL streaming replication library
Copyright   : (c) James King, 2020, 2021
License     : BSD3
Maintainer  : james@agentultra.com
Stability   : experimental
Portability : POSIX

Connect to a PostgreSQL server as a logical replication client and
receive changes.

The basic API is this:

@
  withLogicalStream defaultSettings $ \change -> do
    print change
    `catch` \err -> do
      show err
@

This is a low-level library meant to give the primitives necessary to
library authors to add streaming replication support.  The API here to
rather simplistic but should be hooked up to something like conduit to
provide better ergonomics.
-}

module Database.PostgreSQL.Replicant
    (
    -- * Types
      Change (..)
    , Column (..)
    , Delete (..)
    , Insert (..)
    , Message (..)
    , PgSettings (..)
    , ReplicantConnection
    , Update (..)
    , WalLogData (..)
    -- * Connection Handling
    , connect
    , getConnection
    , unsafeCreateConnection
    -- * Functions
    , withLogicalStream
    ) where

import Control.Concurrent
import Control.Exception
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as B
import qualified Data.Text as T
import qualified Data.Text.Read as T
import Database.PostgreSQL.LibPQ hiding (Column)

import Database.PostgreSQL.Replicant.Connection
import Database.PostgreSQL.Replicant.Exception
import Database.PostgreSQL.Replicant.Protocol
import Database.PostgreSQL.Replicant.Message
import Database.PostgreSQL.Replicant.ReplicationSlot
import Database.PostgreSQL.Replicant.Settings
import Database.PostgreSQL.Replicant.Types.Lsn
import Database.PostgreSQL.Replicant.Util

-- | Connect to a PostgreSQL database as a user with the replication
-- attribute and start receiving changes using the logical replication
-- protocol.  Logical replication happens at the query level so the
-- changes you get represent the set of queries in a transaction:
-- /insert/, /update/, and /delete/.
--
-- This function will create the replication slot, if it doesn't
-- exist, or reconnect to it otherwise and restart the stream from
-- where the replication slot left off.
--
-- This function can throw exceptions in @IO@ and shut-down the
-- socket in case of any error.
withLogicalStream :: PgSettings -> (Change -> IO LSN) -> IO ()
withLogicalStream :: PgSettings -> (Change -> IO LSN) -> IO ()
withLogicalStream PgSettings
settings Change -> IO LSN
cb = do
  ReplicantConnection
conn <- PgSettings -> IO ReplicantConnection
connect PgSettings
settings
  let updateFreq :: Int
updateFreq = PgSettings -> Int
getUpdateDelay PgSettings
settings
  Maybe IdentifySystem
maybeInfo <- ReplicantConnection -> IO (Maybe IdentifySystem)
identifySystemSync ReplicantConnection
conn
  IdentifySystem
_ <- ReplicantException -> Maybe IdentifySystem -> IO IdentifySystem
forall e a. Exception e => e -> Maybe a -> IO a
maybeThrow (String -> ReplicantException
ReplicantException String
"withLogicalStream: could not get system information") Maybe IdentifySystem
maybeInfo
  ReplicationSlotInfo
repSlot <- ReplicantConnection -> ByteString -> IO ReplicationSlotInfo
setupReplicationSlot ReplicantConnection
conn (ByteString -> IO ReplicationSlotInfo)
-> ByteString -> IO ReplicationSlotInfo
forall a b. (a -> b) -> a -> b
$ String -> ByteString
B.pack (String -> ByteString)
-> (PgSettings -> String) -> PgSettings -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PgSettings -> String
pgSlotName (PgSettings -> ByteString) -> PgSettings -> ByteString
forall a b. (a -> b) -> a -> b
$ PgSettings
settings
  ReplicantConnection
-> ByteString -> LSN -> Int -> (Change -> IO LSN) -> IO ()
startReplicationStream ReplicantConnection
conn (ReplicationSlotInfo -> ByteString
slotName ReplicationSlotInfo
repSlot) (ReplicationSlotInfo -> LSN
slotRestart ReplicationSlotInfo
repSlot) Int
updateFreq Change -> IO LSN
cb
  () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  where
    getUpdateDelay :: PgSettings -> Int
    getUpdateDelay :: PgSettings -> Int
getUpdateDelay PgSettings {String
Maybe String
pgUpdateDelay :: PgSettings -> String
pgPort :: PgSettings -> String
pgHost :: PgSettings -> String
pgDbName :: PgSettings -> String
pgPassword :: PgSettings -> Maybe String
pgUser :: PgSettings -> String
pgUpdateDelay :: String
pgSlotName :: String
pgPort :: String
pgHost :: String
pgDbName :: String
pgPassword :: Maybe String
pgUser :: String
pgSlotName :: PgSettings -> String
..} =
      case Reader Int
forall a. Integral a => Reader a
T.decimal Reader Int
-> (String -> Text) -> String -> Either String (Int, Text)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack (String -> Either String (Int, Text))
-> String -> Either String (Int, Text)
forall a b. (a -> b) -> a -> b
$ String
pgUpdateDelay of
        Left String
_ -> Int
3000
        Right (Int
i, Text
_) -> Int
i