{-# LANGUAGE LambdaCase      #-}
{-# LANGUAGE RecordWildCards #-}

module PostgREST.Workers
  ( connectionWorker
  , reReadConfig
  , listener
  ) where

import qualified Data.Aeson                 as JSON
import qualified Data.ByteString            as BS
import qualified Data.ByteString.Lazy       as LBS
import qualified Data.Text.Encoding         as T
import qualified Hasql.Notifications        as SQL
import qualified Hasql.Pool                 as SQL
import qualified Hasql.Transaction.Sessions as SQL

import Control.Retry    (RetryStatus, capDelay, exponentialBackoff,
                         retrying, rsPreviousDelay)
import Hasql.Connection (acquire)

import PostgREST.AppState         (AppState)
import PostgREST.Config           (AppConfig (..), readAppConfig)
import PostgREST.Config.Database  (queryDbSettings, queryPgVersion)
import PostgREST.Config.PgVersion (PgVersion (..), minimumPgVersion)
import PostgREST.DbStructure      (queryDbStructure)
import PostgREST.Error            (PgError (PgError), checkIsFatal,
                                   errorPayload)

import qualified PostgREST.AppState as AppState

import Protolude


-- | Current database connection status data ConnectionStatus
data ConnectionStatus
  = NotConnected
  | Connected PgVersion
  | FatalConnectionError Text
  deriving (ConnectionStatus -> ConnectionStatus -> Bool
(ConnectionStatus -> ConnectionStatus -> Bool)
-> (ConnectionStatus -> ConnectionStatus -> Bool)
-> Eq ConnectionStatus
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ConnectionStatus -> ConnectionStatus -> Bool
$c/= :: ConnectionStatus -> ConnectionStatus -> Bool
== :: ConnectionStatus -> ConnectionStatus -> Bool
$c== :: ConnectionStatus -> ConnectionStatus -> Bool
Eq)

-- | Schema cache status
data SCacheStatus
  = SCLoaded
  | SCOnRetry
  | SCFatalFail

-- | The purpose of this worker is to obtain a healthy connection to pg and an
-- up-to-date schema cache(DbStructure).  This method is meant to be called
-- multiple times by the same thread, but does nothing if the previous
-- invocation has not terminated. In all cases this method does not halt the
-- calling thread, the work is preformed in a separate thread.
--
-- Background thread that does the following :
--  1. Tries to connect to pg server and will keep trying until success.
--  2. Checks if the pg version is supported and if it's not it kills the main
--     program.
--  3. Obtains the dbStructure. If this fails, it goes back to 1.
connectionWorker :: AppState -> IO ()
connectionWorker :: AppState -> IO ()
connectionWorker AppState
appState = do
  Bool
isWorkerOn <- AppState -> IO Bool
AppState.getIsWorkerOn AppState
appState
  -- Prevents multiple workers to be running at the same time. Could happen on
  -- too many SIGUSR1s.
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
isWorkerOn (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    AppState -> Bool -> IO ()
AppState.putIsWorkerOn AppState
appState Bool
True
    IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO IO ()
work
  where
    work :: IO ()
work = do
      AppConfig{Bool
Int
[(Text, Text)]
[ByteString]
[Text]
JSPath
Maybe Integer
Maybe FilePath
Maybe ByteString
Maybe Text
Maybe StringOrURI
Maybe JWKSet
Maybe QualifiedIdentifier
Text
FileMode
NonEmpty Text
NominalDiffTime
OpenAPIMode
LogLevel
configServerUnixSocketMode :: AppConfig -> FileMode
configServerUnixSocket :: AppConfig -> Maybe FilePath
configServerPort :: AppConfig -> Int
configServerHost :: AppConfig -> Text
configRawMediaTypes :: AppConfig -> [ByteString]
configOpenApiServerProxyUri :: AppConfig -> Maybe Text
configOpenApiMode :: AppConfig -> OpenAPIMode
configLogLevel :: AppConfig -> LogLevel
configJwtSecretIsBase64 :: AppConfig -> Bool
configJwtSecret :: AppConfig -> Maybe ByteString
configJwtRoleClaimKey :: AppConfig -> JSPath
configJwtAudience :: AppConfig -> Maybe StringOrURI
configJWKS :: AppConfig -> Maybe JWKSet
configFilePath :: AppConfig -> Maybe FilePath
configDbUseLegacyGucs :: AppConfig -> Bool
configDbUri :: AppConfig -> Text
configDbTxRollbackAll :: AppConfig -> Bool
configDbTxAllowOverride :: AppConfig -> Bool
configDbConfig :: AppConfig -> Bool
configDbSchemas :: AppConfig -> NonEmpty Text
configDbRootSpec :: AppConfig -> Maybe QualifiedIdentifier
configDbPreparedStatements :: AppConfig -> Bool
configDbPreRequest :: AppConfig -> Maybe QualifiedIdentifier
configDbPoolTimeout :: AppConfig -> NominalDiffTime
configDbPoolSize :: AppConfig -> Int
configDbMaxRows :: AppConfig -> Maybe Integer
configDbExtraSearchPath :: AppConfig -> [Text]
configDbChannelEnabled :: AppConfig -> Bool
configDbChannel :: AppConfig -> Text
configDbAnonRole :: AppConfig -> Text
configAppSettings :: AppConfig -> [(Text, Text)]
configServerUnixSocketMode :: FileMode
configServerUnixSocket :: Maybe FilePath
configServerPort :: Int
configServerHost :: Text
configRawMediaTypes :: [ByteString]
configOpenApiServerProxyUri :: Maybe Text
configOpenApiMode :: OpenAPIMode
configLogLevel :: LogLevel
configJwtSecretIsBase64 :: Bool
configJwtSecret :: Maybe ByteString
configJwtRoleClaimKey :: JSPath
configJwtAudience :: Maybe StringOrURI
configJWKS :: Maybe JWKSet
configFilePath :: Maybe FilePath
configDbUseLegacyGucs :: Bool
configDbUri :: Text
configDbTxRollbackAll :: Bool
configDbTxAllowOverride :: Bool
configDbConfig :: Bool
configDbSchemas :: NonEmpty Text
configDbRootSpec :: Maybe QualifiedIdentifier
configDbPreparedStatements :: Bool
configDbPreRequest :: Maybe QualifiedIdentifier
configDbPoolTimeout :: NominalDiffTime
configDbPoolSize :: Int
configDbMaxRows :: Maybe Integer
configDbExtraSearchPath :: [Text]
configDbChannelEnabled :: Bool
configDbChannel :: Text
configDbAnonRole :: Text
configAppSettings :: [(Text, Text)]
..} <- AppState -> IO AppConfig
AppState.getConfig AppState
appState
      AppState -> Text -> IO ()
AppState.logWithZTime AppState
appState Text
"Attempting to connect to the database..."
      ConnectionStatus
connected <- AppState -> IO ConnectionStatus
connectionStatus AppState
appState
      case ConnectionStatus
connected of
        FatalConnectionError Text
reason ->
          -- Fatal error when connecting
          AppState -> Text -> IO ()
AppState.logWithZTime AppState
appState Text
reason IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ThreadId -> IO ()
killThread (AppState -> ThreadId
AppState.getMainThreadId AppState
appState)
        ConnectionStatus
NotConnected ->
          -- Unreachable because connectionStatus will keep trying to connect
          () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Connected PgVersion
actualPgVersion -> do
          -- Procede with initialization
          AppState -> PgVersion -> IO ()
AppState.putPgVersion AppState
appState PgVersion
actualPgVersion
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
configDbChannelEnabled (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
            AppState -> IO ()
AppState.signalListener AppState
appState
          AppState -> Text -> IO ()
AppState.logWithZTime AppState
appState Text
"Connection successful"
          -- this could be fail because the connection drops, but the
          -- loadSchemaCache will pick the error and retry again
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
configDbConfig (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> AppState -> IO ()
reReadConfig Bool
False AppState
appState
          SCacheStatus
scStatus <- AppState -> IO SCacheStatus
loadSchemaCache AppState
appState
          case SCacheStatus
scStatus of
            SCacheStatus
SCLoaded ->
              -- do nothing and proceed if the load was successful
              () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            SCacheStatus
SCOnRetry ->
              IO ()
work
            SCacheStatus
SCFatalFail ->
              -- die if our schema cache query has an error
              ThreadId -> IO ()
killThread (ThreadId -> IO ()) -> ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ AppState -> ThreadId
AppState.getMainThreadId AppState
appState
          AppState -> Bool -> IO ()
AppState.putIsWorkerOn AppState
appState Bool
False

-- | Check if a connection from the pool allows access to the PostgreSQL
-- database.  If not, the pool connections are released and a new connection is
-- tried.  Releasing the pool is key for rapid recovery. Otherwise, the pool
-- timeout would have to be reached for new healthy connections to be acquired.
-- Which might not happen if the server is busy with requests. No idle
-- connection, no pool timeout.
--
-- The connection tries are capped, but if the connection times out no error is
-- thrown, just 'False' is returned.
connectionStatus :: AppState -> IO ConnectionStatus
connectionStatus :: AppState -> IO ConnectionStatus
connectionStatus AppState
appState =
  RetryPolicyM IO
-> (RetryStatus -> ConnectionStatus -> IO Bool)
-> (RetryStatus -> IO ConnectionStatus)
-> IO ConnectionStatus
forall (m :: * -> *) b.
MonadIO m =>
RetryPolicyM m
-> (RetryStatus -> b -> m Bool) -> (RetryStatus -> m b) -> m b
retrying RetryPolicyM IO
retrySettings RetryStatus -> ConnectionStatus -> IO Bool
shouldRetry ((RetryStatus -> IO ConnectionStatus) -> IO ConnectionStatus)
-> (RetryStatus -> IO ConnectionStatus) -> IO ConnectionStatus
forall a b. (a -> b) -> a -> b
$
    IO ConnectionStatus -> RetryStatus -> IO ConnectionStatus
forall a b. a -> b -> a
const (IO ConnectionStatus -> RetryStatus -> IO ConnectionStatus)
-> IO ConnectionStatus -> RetryStatus -> IO ConnectionStatus
forall a b. (a -> b) -> a -> b
$ Pool -> IO ()
SQL.release Pool
pool IO () -> IO ConnectionStatus -> IO ConnectionStatus
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ConnectionStatus
getConnectionStatus
  where
    pool :: Pool
pool = AppState -> Pool
AppState.getPool AppState
appState
    retrySettings :: RetryPolicyM IO
retrySettings = Int -> RetryPolicyM IO -> RetryPolicyM IO
forall (m :: * -> *).
Monad m =>
Int -> RetryPolicyM m -> RetryPolicyM m
capDelay Int
delayMicroseconds (RetryPolicyM IO -> RetryPolicyM IO)
-> RetryPolicyM IO -> RetryPolicyM IO
forall a b. (a -> b) -> a -> b
$ Int -> RetryPolicy
exponentialBackoff Int
backoffMicroseconds
    delayMicroseconds :: Int
delayMicroseconds = Int
32000000 -- 32 seconds
    backoffMicroseconds :: Int
backoffMicroseconds = Int
1000000 -- 1 second

    getConnectionStatus :: IO ConnectionStatus
    getConnectionStatus :: IO ConnectionStatus
getConnectionStatus = do
      Either UsageError PgVersion
pgVersion <- Pool -> Session PgVersion -> IO (Either UsageError PgVersion)
forall a. Pool -> Session a -> IO (Either UsageError a)
SQL.use Pool
pool Session PgVersion
queryPgVersion
      case Either UsageError PgVersion
pgVersion of
        Left UsageError
e -> do
          let err :: PgError
err = Bool -> UsageError -> PgError
PgError Bool
False UsageError
e
          AppState -> Text -> IO ()
AppState.logWithZTime AppState
appState (Text -> IO ()) -> (ByteString -> Text) -> ByteString -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
T.decodeUtf8 (ByteString -> Text)
-> (ByteString -> ByteString) -> ByteString -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
LBS.toStrict (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ PgError -> ByteString
forall a. PgrstError a => a -> ByteString
errorPayload PgError
err
          case PgError -> Maybe Text
checkIsFatal PgError
err of
            Just Text
reason ->
              ConnectionStatus -> IO ConnectionStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (ConnectionStatus -> IO ConnectionStatus)
-> ConnectionStatus -> IO ConnectionStatus
forall a b. (a -> b) -> a -> b
$ Text -> ConnectionStatus
FatalConnectionError Text
reason
            Maybe Text
Nothing ->
              ConnectionStatus -> IO ConnectionStatus
forall (m :: * -> *) a. Monad m => a -> m a
return ConnectionStatus
NotConnected
        Right PgVersion
version ->
          if PgVersion
version PgVersion -> PgVersion -> Bool
forall a. Ord a => a -> a -> Bool
< PgVersion
minimumPgVersion then
            ConnectionStatus -> IO ConnectionStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (ConnectionStatus -> IO ConnectionStatus)
-> (Text -> ConnectionStatus) -> Text -> IO ConnectionStatus
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ConnectionStatus
FatalConnectionError (Text -> IO ConnectionStatus) -> Text -> IO ConnectionStatus
forall a b. (a -> b) -> a -> b
$
              Text
"Cannot run in this PostgreSQL version, PostgREST needs at least "
              Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> PgVersion -> Text
pgvName PgVersion
minimumPgVersion
          else
            ConnectionStatus -> IO ConnectionStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (ConnectionStatus -> IO ConnectionStatus)
-> (PgVersion -> ConnectionStatus)
-> PgVersion
-> IO ConnectionStatus
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PgVersion -> ConnectionStatus
Connected  (PgVersion -> IO ConnectionStatus)
-> PgVersion -> IO ConnectionStatus
forall a b. (a -> b) -> a -> b
$ PgVersion
version

    shouldRetry :: RetryStatus -> ConnectionStatus -> IO Bool
    shouldRetry :: RetryStatus -> ConnectionStatus -> IO Bool
shouldRetry RetryStatus
rs ConnectionStatus
isConnSucc = do
      let
        delay :: Int
delay = Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe Int
0 (RetryStatus -> Maybe Int
rsPreviousDelay RetryStatus
rs) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
backoffMicroseconds
        itShould :: Bool
itShould = ConnectionStatus
NotConnected ConnectionStatus -> ConnectionStatus -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionStatus
isConnSucc
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
itShould (IO () -> IO ()) -> (Text -> IO ()) -> Text -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AppState -> Text -> IO ()
AppState.logWithZTime AppState
appState (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
        Text
"Attempting to reconnect to the database in "
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (Int -> Text
forall a b. (Show a, ConvertText FilePath b) => a -> b
show Int
delay::Text)
        Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" seconds..."
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
itShould (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ AppState -> Int -> IO ()
AppState.putRetryNextIn AppState
appState Int
delay
      Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
itShould

-- | Load the DbStructure by using a connection from the pool.
loadSchemaCache :: AppState -> IO SCacheStatus
loadSchemaCache :: AppState -> IO SCacheStatus
loadSchemaCache AppState
appState = do
  AppConfig{Bool
Int
[(Text, Text)]
[ByteString]
[Text]
JSPath
Maybe Integer
Maybe FilePath
Maybe ByteString
Maybe Text
Maybe StringOrURI
Maybe JWKSet
Maybe QualifiedIdentifier
Text
FileMode
NonEmpty Text
NominalDiffTime
OpenAPIMode
LogLevel
configServerUnixSocketMode :: FileMode
configServerUnixSocket :: Maybe FilePath
configServerPort :: Int
configServerHost :: Text
configRawMediaTypes :: [ByteString]
configOpenApiServerProxyUri :: Maybe Text
configOpenApiMode :: OpenAPIMode
configLogLevel :: LogLevel
configJwtSecretIsBase64 :: Bool
configJwtSecret :: Maybe ByteString
configJwtRoleClaimKey :: JSPath
configJwtAudience :: Maybe StringOrURI
configJWKS :: Maybe JWKSet
configFilePath :: Maybe FilePath
configDbUseLegacyGucs :: Bool
configDbUri :: Text
configDbTxRollbackAll :: Bool
configDbTxAllowOverride :: Bool
configDbConfig :: Bool
configDbSchemas :: NonEmpty Text
configDbRootSpec :: Maybe QualifiedIdentifier
configDbPreparedStatements :: Bool
configDbPreRequest :: Maybe QualifiedIdentifier
configDbPoolTimeout :: NominalDiffTime
configDbPoolSize :: Int
configDbMaxRows :: Maybe Integer
configDbExtraSearchPath :: [Text]
configDbChannelEnabled :: Bool
configDbChannel :: Text
configDbAnonRole :: Text
configAppSettings :: [(Text, Text)]
configServerUnixSocketMode :: AppConfig -> FileMode
configServerUnixSocket :: AppConfig -> Maybe FilePath
configServerPort :: AppConfig -> Int
configServerHost :: AppConfig -> Text
configRawMediaTypes :: AppConfig -> [ByteString]
configOpenApiServerProxyUri :: AppConfig -> Maybe Text
configOpenApiMode :: AppConfig -> OpenAPIMode
configLogLevel :: AppConfig -> LogLevel
configJwtSecretIsBase64 :: AppConfig -> Bool
configJwtSecret :: AppConfig -> Maybe ByteString
configJwtRoleClaimKey :: AppConfig -> JSPath
configJwtAudience :: AppConfig -> Maybe StringOrURI
configJWKS :: AppConfig -> Maybe JWKSet
configFilePath :: AppConfig -> Maybe FilePath
configDbUseLegacyGucs :: AppConfig -> Bool
configDbUri :: AppConfig -> Text
configDbTxRollbackAll :: AppConfig -> Bool
configDbTxAllowOverride :: AppConfig -> Bool
configDbConfig :: AppConfig -> Bool
configDbSchemas :: AppConfig -> NonEmpty Text
configDbRootSpec :: AppConfig -> Maybe QualifiedIdentifier
configDbPreparedStatements :: AppConfig -> Bool
configDbPreRequest :: AppConfig -> Maybe QualifiedIdentifier
configDbPoolTimeout :: AppConfig -> NominalDiffTime
configDbPoolSize :: AppConfig -> Int
configDbMaxRows :: AppConfig -> Maybe Integer
configDbExtraSearchPath :: AppConfig -> [Text]
configDbChannelEnabled :: AppConfig -> Bool
configDbChannel :: AppConfig -> Text
configDbAnonRole :: AppConfig -> Text
configAppSettings :: AppConfig -> [(Text, Text)]
..} <- AppState -> IO AppConfig
AppState.getConfig AppState
appState
  PgVersion
actualPgVersion <- AppState -> IO PgVersion
AppState.getPgVersion AppState
appState
  Either UsageError DbStructure
result <-
    let transaction :: IsolationLevel -> Mode -> Transaction a -> Session a
transaction = if Bool
configDbPreparedStatements then IsolationLevel -> Mode -> Transaction a -> Session a
forall a. IsolationLevel -> Mode -> Transaction a -> Session a
SQL.transaction else IsolationLevel -> Mode -> Transaction a -> Session a
forall a. IsolationLevel -> Mode -> Transaction a -> Session a
SQL.unpreparedTransaction in
    Pool -> Session DbStructure -> IO (Either UsageError DbStructure)
forall a. Pool -> Session a -> IO (Either UsageError a)
SQL.use (AppState -> Pool
AppState.getPool AppState
appState) (Session DbStructure -> IO (Either UsageError DbStructure))
-> (Transaction DbStructure -> Session DbStructure)
-> Transaction DbStructure
-> IO (Either UsageError DbStructure)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IsolationLevel
-> Mode -> Transaction DbStructure -> Session DbStructure
forall a. IsolationLevel -> Mode -> Transaction a -> Session a
transaction IsolationLevel
SQL.ReadCommitted Mode
SQL.Read (Transaction DbStructure -> IO (Either UsageError DbStructure))
-> Transaction DbStructure -> IO (Either UsageError DbStructure)
forall a b. (a -> b) -> a -> b
$
      [Text] -> [Text] -> PgVersion -> Bool -> Transaction DbStructure
queryDbStructure (NonEmpty Text -> [Text]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty Text
configDbSchemas) [Text]
configDbExtraSearchPath PgVersion
actualPgVersion Bool
configDbPreparedStatements
  case Either UsageError DbStructure
result of
    Left UsageError
e -> do
      let
        err :: PgError
err = Bool -> UsageError -> PgError
PgError Bool
False UsageError
e
        putErr :: IO ()
putErr = AppState -> Text -> IO ()
AppState.logWithZTime AppState
appState (Text -> IO ()) -> (ByteString -> Text) -> ByteString -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
T.decodeUtf8 (ByteString -> Text)
-> (ByteString -> ByteString) -> ByteString -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
LBS.toStrict (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ PgError -> ByteString
forall a. PgrstError a => a -> ByteString
errorPayload PgError
err
      case PgError -> Maybe Text
checkIsFatal PgError
err of
        Just Text
hint -> do
          AppState -> Text -> IO ()
AppState.logWithZTime AppState
appState Text
"A fatal error ocurred when loading the schema cache"
          IO ()
putErr
          AppState -> Text -> IO ()
AppState.logWithZTime AppState
appState Text
hint
          SCacheStatus -> IO SCacheStatus
forall (m :: * -> *) a. Monad m => a -> m a
return SCacheStatus
SCFatalFail
        Maybe Text
Nothing -> do
          AppState -> Text -> IO ()
AppState.logWithZTime AppState
appState Text
"An error ocurred when loading the schema cache"
          IO ()
putErr
          SCacheStatus -> IO SCacheStatus
forall (m :: * -> *) a. Monad m => a -> m a
return SCacheStatus
SCOnRetry

    Right DbStructure
dbStructure -> do
      AppState -> DbStructure -> IO ()
AppState.putDbStructure AppState
appState DbStructure
dbStructure
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe QualifiedIdentifier -> Bool
forall a. Maybe a -> Bool
isJust Maybe QualifiedIdentifier
configDbRootSpec) (IO () -> IO ()) -> (ByteString -> IO ()) -> ByteString -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
.
        AppState -> ByteString -> IO ()
AppState.putJsonDbS AppState
appState (ByteString -> IO ())
-> (ByteString -> ByteString) -> ByteString -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
LBS.toStrict (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ DbStructure -> ByteString
forall a. ToJSON a => a -> ByteString
JSON.encode DbStructure
dbStructure
      AppState -> Text -> IO ()
AppState.logWithZTime AppState
appState Text
"Schema cache loaded"
      SCacheStatus -> IO SCacheStatus
forall (m :: * -> *) a. Monad m => a -> m a
return SCacheStatus
SCLoaded

-- | Starts a dedicated pg connection to LISTEN for notifications.  When a
-- NOTIFY <db-channel> - with an empty payload - is done, it refills the schema
-- cache.  It uses the connectionWorker in case the LISTEN connection dies.
listener :: AppState -> IO ()
listener :: AppState -> IO ()
listener AppState
appState = do
  AppConfig{Bool
Int
[(Text, Text)]
[ByteString]
[Text]
JSPath
Maybe Integer
Maybe FilePath
Maybe ByteString
Maybe Text
Maybe StringOrURI
Maybe JWKSet
Maybe QualifiedIdentifier
Text
FileMode
NonEmpty Text
NominalDiffTime
OpenAPIMode
LogLevel
configServerUnixSocketMode :: FileMode
configServerUnixSocket :: Maybe FilePath
configServerPort :: Int
configServerHost :: Text
configRawMediaTypes :: [ByteString]
configOpenApiServerProxyUri :: Maybe Text
configOpenApiMode :: OpenAPIMode
configLogLevel :: LogLevel
configJwtSecretIsBase64 :: Bool
configJwtSecret :: Maybe ByteString
configJwtRoleClaimKey :: JSPath
configJwtAudience :: Maybe StringOrURI
configJWKS :: Maybe JWKSet
configFilePath :: Maybe FilePath
configDbUseLegacyGucs :: Bool
configDbUri :: Text
configDbTxRollbackAll :: Bool
configDbTxAllowOverride :: Bool
configDbConfig :: Bool
configDbSchemas :: NonEmpty Text
configDbRootSpec :: Maybe QualifiedIdentifier
configDbPreparedStatements :: Bool
configDbPreRequest :: Maybe QualifiedIdentifier
configDbPoolTimeout :: NominalDiffTime
configDbPoolSize :: Int
configDbMaxRows :: Maybe Integer
configDbExtraSearchPath :: [Text]
configDbChannelEnabled :: Bool
configDbChannel :: Text
configDbAnonRole :: Text
configAppSettings :: [(Text, Text)]
configServerUnixSocketMode :: AppConfig -> FileMode
configServerUnixSocket :: AppConfig -> Maybe FilePath
configServerPort :: AppConfig -> Int
configServerHost :: AppConfig -> Text
configRawMediaTypes :: AppConfig -> [ByteString]
configOpenApiServerProxyUri :: AppConfig -> Maybe Text
configOpenApiMode :: AppConfig -> OpenAPIMode
configLogLevel :: AppConfig -> LogLevel
configJwtSecretIsBase64 :: AppConfig -> Bool
configJwtSecret :: AppConfig -> Maybe ByteString
configJwtRoleClaimKey :: AppConfig -> JSPath
configJwtAudience :: AppConfig -> Maybe StringOrURI
configJWKS :: AppConfig -> Maybe JWKSet
configFilePath :: AppConfig -> Maybe FilePath
configDbUseLegacyGucs :: AppConfig -> Bool
configDbUri :: AppConfig -> Text
configDbTxRollbackAll :: AppConfig -> Bool
configDbTxAllowOverride :: AppConfig -> Bool
configDbConfig :: AppConfig -> Bool
configDbSchemas :: AppConfig -> NonEmpty Text
configDbRootSpec :: AppConfig -> Maybe QualifiedIdentifier
configDbPreparedStatements :: AppConfig -> Bool
configDbPreRequest :: AppConfig -> Maybe QualifiedIdentifier
configDbPoolTimeout :: AppConfig -> NominalDiffTime
configDbPoolSize :: AppConfig -> Int
configDbMaxRows :: AppConfig -> Maybe Integer
configDbExtraSearchPath :: AppConfig -> [Text]
configDbChannelEnabled :: AppConfig -> Bool
configDbChannel :: AppConfig -> Text
configDbAnonRole :: AppConfig -> Text
configAppSettings :: AppConfig -> [(Text, Text)]
..} <- AppState -> IO AppConfig
AppState.getConfig AppState
appState
  let dbChannel :: Text
dbChannel = Text -> Text
forall a b. ConvertText a b => a -> b
toS Text
configDbChannel

  -- The listener has to wait for a signal from the connectionWorker.
  -- This is because when the connection to the db is lost, the listener also
  -- tries to recover the connection, but not with the same pace as the connectionWorker.
  -- Not waiting makes stderr quickly fill with connection retries messages from the listener.
  AppState -> IO ()
AppState.waitListener AppState
appState

  -- forkFinally allows to detect if the thread dies
  IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (IO () -> (Either SomeException () -> IO ()) -> IO ThreadId)
-> (Either SomeException () -> IO ()) -> IO () -> IO ThreadId
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally (Text -> Either SomeException () -> IO ()
forall p. Text -> p -> IO ()
handleFinally Text
dbChannel) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    Either (Maybe ByteString) Connection
dbOrError <- ByteString -> IO (Either (Maybe ByteString) Connection)
acquire (ByteString -> IO (Either (Maybe ByteString) Connection))
-> ByteString -> IO (Either (Maybe ByteString) Connection)
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
forall a. ConvertText a Text => a -> ByteString
toUtf8 Text
configDbUri
    case Either (Maybe ByteString) Connection
dbOrError of
      Right Connection
db -> do
        AppState -> Text -> IO ()
AppState.logWithZTime AppState
appState (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Listening for notifications on the " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
dbChannel Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" channel"
        Connection -> PgIdentifier -> IO ()
SQL.listen Connection
db (PgIdentifier -> IO ()) -> PgIdentifier -> IO ()
forall a b. (a -> b) -> a -> b
$ Text -> PgIdentifier
SQL.toPgIdentifier Text
dbChannel
        (ByteString -> ByteString -> IO ()) -> Connection -> IO ()
SQL.waitForNotifications ByteString -> ByteString -> IO ()
forall p. p -> ByteString -> IO ()
handleNotification Connection
db
      Either (Maybe ByteString) Connection
_ ->
        Text -> IO ()
forall a. Text -> IO a
die (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Could not listen for notifications on the " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
dbChannel Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" channel"
  where
    handleFinally :: Text -> p -> IO ()
handleFinally Text
dbChannel p
_ = do
      -- if the thread dies, we try to recover
      AppState -> Text -> IO ()
AppState.logWithZTime AppState
appState (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Retrying listening for notifications on the " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
dbChannel Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" channel.."
      -- assume the pool connection was also lost, call the connection worker
      AppState -> IO ()
connectionWorker AppState
appState
      -- retry the listener
      AppState -> IO ()
listener AppState
appState

    handleNotification :: p -> ByteString -> IO ()
handleNotification p
_ ByteString
msg
      | ByteString -> Bool
BS.null ByteString
msg            = IO ()
scLoader -- reload the schema cache
      | ByteString
msg ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
"reload schema" = IO ()
scLoader -- reload the schema cache
      | ByteString
msg ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
"reload config" = Bool -> AppState -> IO ()
reReadConfig Bool
False AppState
appState -- reload the config
      | Bool
otherwise              = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure () -- Do nothing if anything else than an empty message is sent

    scLoader :: IO ()
scLoader =
      -- It's not necessary to check the loadSchemaCache success
      -- here. If the connection drops, the thread will die and
      -- proceed to recover.
      IO SCacheStatus -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO SCacheStatus -> IO ()) -> IO SCacheStatus -> IO ()
forall a b. (a -> b) -> a -> b
$ AppState -> IO SCacheStatus
loadSchemaCache AppState
appState

-- | Re-reads the config plus config options from the db
reReadConfig :: Bool -> AppState -> IO ()
reReadConfig :: Bool -> AppState -> IO ()
reReadConfig Bool
startingUp AppState
appState = do
  AppConfig{Bool
Int
[(Text, Text)]
[ByteString]
[Text]
JSPath
Maybe Integer
Maybe FilePath
Maybe ByteString
Maybe Text
Maybe StringOrURI
Maybe JWKSet
Maybe QualifiedIdentifier
Text
FileMode
NonEmpty Text
NominalDiffTime
OpenAPIMode
LogLevel
configServerUnixSocketMode :: FileMode
configServerUnixSocket :: Maybe FilePath
configServerPort :: Int
configServerHost :: Text
configRawMediaTypes :: [ByteString]
configOpenApiServerProxyUri :: Maybe Text
configOpenApiMode :: OpenAPIMode
configLogLevel :: LogLevel
configJwtSecretIsBase64 :: Bool
configJwtSecret :: Maybe ByteString
configJwtRoleClaimKey :: JSPath
configJwtAudience :: Maybe StringOrURI
configJWKS :: Maybe JWKSet
configFilePath :: Maybe FilePath
configDbUseLegacyGucs :: Bool
configDbUri :: Text
configDbTxRollbackAll :: Bool
configDbTxAllowOverride :: Bool
configDbConfig :: Bool
configDbSchemas :: NonEmpty Text
configDbRootSpec :: Maybe QualifiedIdentifier
configDbPreparedStatements :: Bool
configDbPreRequest :: Maybe QualifiedIdentifier
configDbPoolTimeout :: NominalDiffTime
configDbPoolSize :: Int
configDbMaxRows :: Maybe Integer
configDbExtraSearchPath :: [Text]
configDbChannelEnabled :: Bool
configDbChannel :: Text
configDbAnonRole :: Text
configAppSettings :: [(Text, Text)]
configServerUnixSocketMode :: AppConfig -> FileMode
configServerUnixSocket :: AppConfig -> Maybe FilePath
configServerPort :: AppConfig -> Int
configServerHost :: AppConfig -> Text
configRawMediaTypes :: AppConfig -> [ByteString]
configOpenApiServerProxyUri :: AppConfig -> Maybe Text
configOpenApiMode :: AppConfig -> OpenAPIMode
configLogLevel :: AppConfig -> LogLevel
configJwtSecretIsBase64 :: AppConfig -> Bool
configJwtSecret :: AppConfig -> Maybe ByteString
configJwtRoleClaimKey :: AppConfig -> JSPath
configJwtAudience :: AppConfig -> Maybe StringOrURI
configJWKS :: AppConfig -> Maybe JWKSet
configFilePath :: AppConfig -> Maybe FilePath
configDbUseLegacyGucs :: AppConfig -> Bool
configDbUri :: AppConfig -> Text
configDbTxRollbackAll :: AppConfig -> Bool
configDbTxAllowOverride :: AppConfig -> Bool
configDbConfig :: AppConfig -> Bool
configDbSchemas :: AppConfig -> NonEmpty Text
configDbRootSpec :: AppConfig -> Maybe QualifiedIdentifier
configDbPreparedStatements :: AppConfig -> Bool
configDbPreRequest :: AppConfig -> Maybe QualifiedIdentifier
configDbPoolTimeout :: AppConfig -> NominalDiffTime
configDbPoolSize :: AppConfig -> Int
configDbMaxRows :: AppConfig -> Maybe Integer
configDbExtraSearchPath :: AppConfig -> [Text]
configDbChannelEnabled :: AppConfig -> Bool
configDbChannel :: AppConfig -> Text
configDbAnonRole :: AppConfig -> Text
configAppSettings :: AppConfig -> [(Text, Text)]
..} <- AppState -> IO AppConfig
AppState.getConfig AppState
appState
  [(Text, Text)]
dbSettings <-
    if Bool
configDbConfig then do
      Either UsageError [(Text, Text)]
qDbSettings <- Pool -> Bool -> IO (Either UsageError [(Text, Text)])
queryDbSettings (AppState -> Pool
AppState.getPool AppState
appState) Bool
configDbPreparedStatements
      case Either UsageError [(Text, Text)]
qDbSettings of
        Left UsageError
e -> do
          let
            err :: PgError
err = Bool -> UsageError -> PgError
PgError Bool
False UsageError
e
            putErr :: IO ()
putErr = AppState -> Text -> IO ()
AppState.logWithZTime AppState
appState (Text -> IO ()) -> (ByteString -> Text) -> ByteString -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
T.decodeUtf8 (ByteString -> Text)
-> (ByteString -> ByteString) -> ByteString -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
LBS.toStrict (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ PgError -> ByteString
forall a. PgrstError a => a -> ByteString
errorPayload PgError
err
          AppState -> Text -> IO ()
AppState.logWithZTime AppState
appState
            Text
"An error ocurred when trying to query database settings for the config parameters"
          case PgError -> Maybe Text
checkIsFatal PgError
err of
            Just Text
hint -> do
              IO ()
putErr
              AppState -> Text -> IO ()
AppState.logWithZTime AppState
appState Text
hint
              ThreadId -> IO ()
killThread (AppState -> ThreadId
AppState.getMainThreadId AppState
appState)
            Maybe Text
Nothing -> do
              AppState -> Text -> IO ()
AppState.logWithZTime AppState
appState (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ UsageError -> Text
forall a b. (Show a, ConvertText FilePath b) => a -> b
show UsageError
e
          [(Text, Text)] -> IO [(Text, Text)]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
        Right [(Text, Text)]
x -> [(Text, Text)] -> IO [(Text, Text)]
forall (f :: * -> *) a. Applicative f => a -> f a
pure [(Text, Text)]
x
    else
      [(Text, Text)] -> IO [(Text, Text)]
forall (f :: * -> *) a. Applicative f => a -> f a
pure [(Text, Text)]
forall a. Monoid a => a
mempty
  [(Text, Text)]
-> Maybe FilePath -> Maybe Text -> IO (Either Text AppConfig)
readAppConfig [(Text, Text)]
dbSettings Maybe FilePath
configFilePath (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
configDbUri) IO (Either Text AppConfig)
-> (Either Text AppConfig -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Left Text
err   ->
      if Bool
startingUp then
        Text -> IO ()
forall a. HasCallStack => Text -> a
panic Text
err -- die on invalid config if the program is starting up
      else
        AppState -> Text -> IO ()
AppState.logWithZTime AppState
appState (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Failed re-loading config: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
err
    Right AppConfig
newConf -> do
      AppState -> AppConfig -> IO ()
AppState.putConfig AppState
appState AppConfig
newConf
      if Bool
startingUp then
        IO ()
forall (f :: * -> *). Applicative f => f ()
pass
      else
        AppState -> Text -> IO ()
AppState.logWithZTime AppState
appState Text
"Config re-loaded"