module Hasql.Pool
  ( Pool,
    Settings (..),
    acquire,
    release,
    UsageError (..),
    use,
  )
where

import Hasql.Connection (Connection)
import qualified Hasql.Connection as Connection
import Hasql.Pool.Prelude
import Hasql.Session (Session)
import qualified Hasql.Session as Session

-- |
-- A pool of connections to DB.
data Pool
  = Pool
      Connection.Settings
      -- ^ Connection settings.
      (TQueue ActiveConnection)
      -- ^ Queue of established connections.
      (TVar Int)
      -- ^ Slots available for establishing new connections.
      (TVar Bool)
      -- ^ Flag signaling whether pool's alive.

data ActiveConnection = ActiveConnection
  { ActiveConnection -> Int
activeConnectionLastUseTimestamp :: Int,
    ActiveConnection -> Connection
activeConnectionConnection :: Connection
  }

loopCollectingGarbage :: Int -> TQueue ActiveConnection -> TVar Int -> TVar Bool -> IO ()
loopCollectingGarbage :: Int -> TQueue ActiveConnection -> TVar Int -> TVar Bool -> IO ()
loopCollectingGarbage Int
timeout TQueue ActiveConnection
establishedQueue TVar Int
slotsAvailVar TVar Bool
aliveVar =
  IO ()
decide
  where
    decide :: IO ()
decide =
      do
        Int
ts <- IO Int
getMillisecondsSinceEpoch
        IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
          STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
atomically (STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ do
            Bool
alive <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
aliveVar
            if Bool
alive
              then
                let tryToRelease :: STM (IO ())
tryToRelease =
                      TQueue ActiveConnection -> STM (Maybe ActiveConnection)
forall a. TQueue a -> STM (Maybe a)
tryReadTQueue TQueue ActiveConnection
establishedQueue STM (Maybe ActiveConnection)
-> (Maybe ActiveConnection -> STM (IO ())) -> STM (IO ())
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                        -- The queue is empty. Just wait for changes in the state.
                        Maybe ActiveConnection
Nothing ->
                          STM (IO ())
forall a. STM a
retry
                        Just entry :: ActiveConnection
entry@(ActiveConnection Int
lastUseTs Connection
connection) ->
                          let outdatingTs :: Int
outdatingTs =
                                Int
lastUseTs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
timeout
                           in -- Check whether it's outdated.
                              if Int
outdatingTs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
ts
                                then -- Fetch the current value of available slots and
                                -- release this one and other connections.
                                do
                                  Int
slotsAvail <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
slotsAvailVar
                                  Int -> [Connection] -> Int -> STM (IO ())
collectAndRelease Int
slotsAvail [Connection
connection] Int
outdatingTs
                                else -- Return it to the front of the queue and
                                -- wait until it's outdating time.
                                do
                                  TQueue ActiveConnection -> ActiveConnection -> STM ()
forall a. TQueue a -> a -> STM ()
unGetTQueue TQueue ActiveConnection
establishedQueue ActiveConnection
entry
                                  IO () -> STM (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO ()
sleep Int
outdatingTs IO () -> IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> IO ()
decide)
                    collectAndRelease :: Int -> [Connection] -> Int -> STM (IO ())
collectAndRelease !Int
slotsAvail ![Connection]
outdatedList Int
outdatingTs =
                      TQueue ActiveConnection -> STM (Maybe ActiveConnection)
forall a. TQueue a -> STM (Maybe a)
tryReadTQueue TQueue ActiveConnection
establishedQueue STM (Maybe ActiveConnection)
-> (Maybe ActiveConnection -> STM (IO ())) -> STM (IO ())
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                        Maybe ActiveConnection
Nothing ->
                          Int -> [Connection] -> Int -> STM (IO ())
finalizeAndRelease Int
slotsAvail [Connection]
outdatedList Int
outdatingTs
                        Just entry :: ActiveConnection
entry@(ActiveConnection Int
lastUseTs Connection
connection) ->
                          let outdatingTs :: Int
outdatingTs =
                                Int
lastUseTs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
timeout
                           in if Int
outdatingTs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
ts
                                then do
                                  TQueue ActiveConnection -> ActiveConnection -> STM ()
forall a. TQueue a -> a -> STM ()
unGetTQueue TQueue ActiveConnection
establishedQueue ActiveConnection
entry
                                  Int -> [Connection] -> Int -> STM (IO ())
finalizeAndRelease Int
slotsAvail [Connection]
outdatedList Int
outdatingTs
                                else Int -> [Connection] -> Int -> STM (IO ())
collectAndRelease (Int -> Int
forall a. Enum a => a -> a
succ Int
slotsAvail) (Connection
connection Connection -> [Connection] -> [Connection]
forall a. a -> [a] -> [a]
: [Connection]
outdatedList) Int
outdatingTs
                    finalizeAndRelease :: Int -> [Connection] -> Int -> STM (IO ())
finalizeAndRelease Int
slotsAvail [Connection]
outdatedList Int
outdatingTs =
                      do
                        TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
slotsAvailVar Int
slotsAvail
                        IO () -> STM (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return ([Connection] -> IO ()
forall (t :: * -> *). Foldable t => t Connection -> IO ()
release [Connection]
outdatedList IO () -> IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Int -> IO ()
sleep Int
outdatingTs IO () -> IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> IO ()
decide)
                 in STM (IO ())
tryToRelease
              else do
                [ActiveConnection]
list <- TQueue ActiveConnection -> STM [ActiveConnection]
forall a. TQueue a -> STM [a]
flushTQueue TQueue ActiveConnection
establishedQueue
                IO () -> STM (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return ([Connection] -> IO ()
forall (t :: * -> *). Foldable t => t Connection -> IO ()
release ((ActiveConnection -> Connection)
-> [ActiveConnection] -> [Connection]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ActiveConnection -> Connection
activeConnectionConnection [ActiveConnection]
list))
    sleep :: Int -> IO ()
sleep Int
untilTs =
      do
        Int
ts <- IO Int
getMillisecondsSinceEpoch
        let diff :: Int
diff =
              Int
untilTs Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
ts
         in if Int
diff Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
              then Int -> IO ()
threadDelay (Int
diff Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000)
              else () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    release :: t Connection -> IO ()
release =
      (Connection -> IO ()) -> t Connection -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Connection -> IO ()
Connection.release

-- |
-- Settings of the connection pool. Consist of:
--
-- * Pool-size.
--
-- * Timeout.
-- An amount of time in milliseconds for which the unused connections are kept open.
--
-- * Connection settings.
type Settings =
  (Int, Int, Connection.Settings)

-- |
-- Given the pool-size, timeout and connection settings
-- create a connection-pool.
acquire :: Settings -> IO Pool
acquire :: Settings -> IO Pool
acquire (Int
size, Int
timeout, Settings
connectionSettings) =
  do
    TQueue ActiveConnection
establishedQueue <- IO (TQueue ActiveConnection)
forall a. IO (TQueue a)
newTQueueIO
    TVar Int
slotsAvailVar <- Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
size
    TVar Bool
aliveVar <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO (Int
size Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0)
    IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Int -> TQueue ActiveConnection -> TVar Int -> TVar Bool -> IO ()
loopCollectingGarbage Int
timeout TQueue ActiveConnection
establishedQueue TVar Int
slotsAvailVar TVar Bool
aliveVar
    Pool -> IO Pool
forall (m :: * -> *) a. Monad m => a -> m a
return (Settings
-> TQueue ActiveConnection -> TVar Int -> TVar Bool -> Pool
Pool Settings
connectionSettings TQueue ActiveConnection
establishedQueue TVar Int
slotsAvailVar TVar Bool
aliveVar)

-- |
-- Release the connection-pool.
release :: Pool -> IO ()
release :: Pool -> IO ()
release (Pool Settings
_ TQueue ActiveConnection
_ TVar Int
_ TVar Bool
aliveVar) =
  STM () -> IO ()
forall a. STM a -> IO a
atomically (TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
aliveVar Bool
False)

-- |
-- A union over the connection establishment error and the session error.
data UsageError
  = -- | Error during an attempt to connect.
    ConnectionUsageError Connection.ConnectionError
  | -- | Error during session execution.
    SessionUsageError Session.QueryError
  | -- | Pool has been released and can no longer be used.
    PoolIsReleasedUsageError
  deriving (Int -> UsageError -> ShowS
[UsageError] -> ShowS
UsageError -> String
(Int -> UsageError -> ShowS)
-> (UsageError -> String)
-> ([UsageError] -> ShowS)
-> Show UsageError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [UsageError] -> ShowS
$cshowList :: [UsageError] -> ShowS
show :: UsageError -> String
$cshow :: UsageError -> String
showsPrec :: Int -> UsageError -> ShowS
$cshowsPrec :: Int -> UsageError -> ShowS
Show, UsageError -> UsageError -> Bool
(UsageError -> UsageError -> Bool)
-> (UsageError -> UsageError -> Bool) -> Eq UsageError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: UsageError -> UsageError -> Bool
$c/= :: UsageError -> UsageError -> Bool
== :: UsageError -> UsageError -> Bool
$c== :: UsageError -> UsageError -> Bool
Eq)

-- | Use a connection from the pool to run a session and return the connection
-- to the pool, when finished. If the session fails
-- with 'Session.ClientError' the connection gets reestablished.
use :: Pool -> Session.Session a -> IO (Either UsageError a)
use :: Pool -> Session a -> IO (Either UsageError a)
use (Pool Settings
connectionSettings TQueue ActiveConnection
establishedQueue TVar Int
slotsAvailVar TVar Bool
aliveVar) Session a
session =
  IO (IO (Either UsageError a)) -> IO (Either UsageError a)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO (Either UsageError a)) -> IO (Either UsageError a))
-> IO (IO (Either UsageError a)) -> IO (Either UsageError a)
forall a b. (a -> b) -> a -> b
$
    STM (IO (Either UsageError a)) -> IO (IO (Either UsageError a))
forall a. STM a -> IO a
atomically (STM (IO (Either UsageError a)) -> IO (IO (Either UsageError a)))
-> STM (IO (Either UsageError a)) -> IO (IO (Either UsageError a))
forall a b. (a -> b) -> a -> b
$ do
      Bool
alive <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
aliveVar
      if Bool
alive
        then
          TQueue ActiveConnection -> STM (Maybe ActiveConnection)
forall a. TQueue a -> STM (Maybe a)
tryReadTQueue TQueue ActiveConnection
establishedQueue STM (Maybe ActiveConnection)
-> (Maybe ActiveConnection -> STM (IO (Either UsageError a)))
-> STM (IO (Either UsageError a))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            -- No established connection avail at the moment.
            Maybe ActiveConnection
Nothing -> do
              Int
slotsAvail <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
slotsAvailVar
              -- Do we have any slots left for establishing new connections?
              if Int
slotsAvail Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
                then -- Reduce the available slots var and instruct to
                -- establish and use a new connection.
                do
                  TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
slotsAvailVar (Int -> STM ()) -> Int -> STM ()
forall a b. (a -> b) -> a -> b
$! Int -> Int
forall a. Enum a => a -> a
pred Int
slotsAvail
                  IO (Either UsageError a) -> STM (IO (Either UsageError a))
forall (m :: * -> *) a. Monad m => a -> m a
return IO (Either UsageError a)
acquireConnectionThenUseThenPutItToQueue
                else -- Wait until the state changes and retry.

                  STM (IO (Either UsageError a))
forall a. STM a
retry
            Just (ActiveConnection Int
_ Connection
connection) ->
              IO (Either UsageError a) -> STM (IO (Either UsageError a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection -> IO (Either UsageError a)
useConnectionThenPutItToQueue Connection
connection)
        else IO (Either UsageError a) -> STM (IO (Either UsageError a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either UsageError a -> IO (Either UsageError a)
forall (m :: * -> *) a. Monad m => a -> m a
return (UsageError -> Either UsageError a
forall a b. a -> Either a b
Left UsageError
PoolIsReleasedUsageError))
  where
    acquireConnectionThenUseThenPutItToQueue :: IO (Either UsageError a)
acquireConnectionThenUseThenPutItToQueue =
      do
        Either ConnectionError Connection
res <- Settings -> IO (Either ConnectionError Connection)
Connection.acquire Settings
connectionSettings
        case Either ConnectionError Connection
res of
          -- Failed to acquire, so release an availability slot,
          -- returning the error details.
          Left ConnectionError
acquisitionError -> do
            STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
slotsAvailVar Int -> Int
forall a. Enum a => a -> a
succ
            Either UsageError a -> IO (Either UsageError a)
forall (m :: * -> *) a. Monad m => a -> m a
return (UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (ConnectionError -> UsageError
ConnectionUsageError ConnectionError
acquisitionError))
          Right Connection
connection ->
            Connection -> IO (Either UsageError a)
useConnectionThenPutItToQueue Connection
connection
    useConnectionThenPutItToQueue :: Connection -> IO (Either UsageError a)
useConnectionThenPutItToQueue Connection
connection =
      do
        Either QueryError a
res <- Session a -> Connection -> IO (Either QueryError a)
forall a. Session a -> Connection -> IO (Either QueryError a)
Session.run Session a
session Connection
connection
        case Either QueryError a
res of
          Left QueryError
queryError -> do
            -- Check whether the error is on client-side,
            -- and in that case release the connection.
            case QueryError
queryError of
              Session.QueryError Settings
_ [Text]
_ (Session.ClientError ConnectionError
_) ->
                Connection -> IO ()
releaseConnection Connection
connection
              QueryError
_ ->
                Connection -> IO ()
putConnectionToPool Connection
connection
            Either UsageError a -> IO (Either UsageError a)
forall (m :: * -> *) a. Monad m => a -> m a
return (UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (QueryError -> UsageError
SessionUsageError QueryError
queryError))
          Right a
res -> do
            Connection -> IO ()
putConnectionToPool Connection
connection
            Either UsageError a -> IO (Either UsageError a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Either UsageError a
forall a b. b -> Either a b
Right a
res)
    putConnectionToPool :: Connection -> IO ()
putConnectionToPool Connection
connection =
      do
        Int
ts <- IO Int
getMillisecondsSinceEpoch
        STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue ActiveConnection -> ActiveConnection -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue ActiveConnection
establishedQueue (Int -> Connection -> ActiveConnection
ActiveConnection Int
ts Connection
connection)
    releaseConnection :: Connection -> IO ()
releaseConnection Connection
connection =
      do
        STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
slotsAvailVar Int -> Int
forall a. Enum a => a -> a
succ
        Connection -> IO ()
Connection.release Connection
connection