{-# LANGUAGE CPP #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RankNTypes, RecordWildCards #-}
module Database.PostgreSQL.LibPQ.Notify
( getNotification
, getNotificationWithConfig
, defaultConfig
, Config (..)
) where
import Control.Exception (try, throwIO)
import qualified Database.PostgreSQL.LibPQ as PQ
import GHC.IO.Exception (IOException(..),IOErrorType(ResourceVanished))
#if defined(mingw32_HOST_OS)
import Control.Concurrent (threadDelay)
#else
import Control.Concurrent
import Control.Concurrent.STM(atomically)
#endif
import Data.Function(fix)
import Data.Bifunctor(first)
import Control.Concurrent.Async (race)
data Config = Config
{ interrupt :: Maybe (IO ())
, interrupted :: IO ()
, threadWaitReadReturned :: IO ()
, startLoop :: IO ()
, beforeWait :: IO ()
#if defined(mingw32_HOST_OS)
, retryDelay :: Int
#endif
}
-- | Default configuration
defaultConfig :: Config
defaultConfig = Config
{ interrupt = Nothing
, interrupted = pure ()
, threadWaitReadReturned = pure ()
, startLoop = pure ()
, beforeWait = pure ()
#if defined(mingw32_HOST_OS)
, retryDelay = 100000
#endif
}
data RetryOrReturn = Retry (IO (Either () ())) | Return PQ.Notify
funcName :: String
funcName = "Hasql.Notification.getNotification"
setLoc :: IOError -> IOError
setLoc err = err {ioe_location = funcName}
fdError :: IOError
fdError =
IOError { ioe_handle = Nothing
, ioe_type = ResourceVanished
, ioe_location = funcName
, ioe_description =
"failed to fetch file descriptor (did the connection time out?)"
, ioe_errno = Nothing
, ioe_filename = Nothing
}
{-|
Returns a single notification. If no notifications are
available, 'getNotificationWithConfig' blocks until one arrives.
Unlike 'getNotification', 'getNotificationWithConfig' takes in an
additional 'Config' parameter which provides custom 'interrupt' and
various event hooks for operational insight.
Using a custom 'interrupt' is necessary if one would like to call
'getNotificationWithConfig' on one thread and @NOTIFY@ on another
thread using the same connection.
To support this behavior one must cause 'interrupt' to return after the
call to @NOTIFY@ checks it's result from the server.
See the test file of this package for an example of how to use a custom
'interrupt'.
Note that PostgreSQL does not
deliver notifications while a connection is inside a transaction.
-}
getNotificationWithConfig
:: Config
-- ^
-> (forall a. c -> (PQ.Connection -> IO a) -> IO a)
-- ^ This is a way to get a connection from a 'c'.
-- A concrete example would be if 'c' is 'MVar PQ.Connection'
-- and then this function would be 'withMVar'
-> c
-- ^ A type that can used to provide a connection when
-- used with the former argument. Typically a concurrency
-- primitive like 'MVar PQ.Connection'
-> IO (Either IOError PQ.Notify)
getNotificationWithConfig Config {..} withConnection conn = fmap (first setLoc) $ try $ fix $ \next -> do
-- We try to get the notification or register a file descriptor callback
-- while holding the lock. We then give up the lock to wait.
-- That is why code is broken up into these two sections.
startLoop
e <- withConnection conn $ \c -> PQ.consumeInput c >> PQ.notifies c >>= \case
-- We found a notification just return it
Just x -> pure $ Return x
-- There wasn't a notification so we need to register to wait on the file handle
Nothing -> PQ.socket c >>= \case
-- This is an odd error
Nothing -> throwIO fdError
-- Typical case. Register to wait on more data.
Just fd -> do
#if defined(mingw32_HOST_OS)
let fileNotification = threadDelay retryDelay
#else
action <-fst <$> threadWaitReadSTM fd
let fileNotification = atomically action
#endif
pure $ Retry $ maybe (pure <$> fileNotification) (fileNotification `race`) interrupt
case e of
Retry raceResult -> do
beforeWait
either (const interrupted) (const threadWaitReadReturned) =<< raceResult
next
Return x -> pure x
getNotification
:: (forall a. c -> (PQ.Connection -> IO a) -> IO a)
-> c
-> IO (Either IOError PQ.Notify)
getNotification = getNotificationWithConfig defaultConfig