module Hasql.Pool
(
  Pool,
  Settings(..),
  acquire,
  release,
  UsageError(..),
  use,
)
where

import Hasql.Pool.Prelude
import Hasql.Connection (Connection)
import Hasql.Session (Session)
import qualified Hasql.Connection as Connection
import qualified Hasql.Session as Session


-- |
-- A pool of connections to DB.
data Pool =
  Pool
    Connection.Settings
    {-^ Connection settings. -}
    (TQueue ActiveConnection)
    {-^ Queue of established connections. -}
    (TVar Int)
    {-^ Slots available for establishing new connections. -}
    (TVar Bool)
    {-^ Flag signaling whether pool's alive. -}

data ActiveConnection =
  ActiveConnection {
    ActiveConnection -> Int
activeConnectionLastUseTimestamp :: Int,
    ActiveConnection -> Connection
activeConnectionConnection :: Connection
  }

loopCollectingGarbage :: Int -> TQueue ActiveConnection -> TVar Int -> TVar Bool -> IO ()
loopCollectingGarbage :: Int -> TQueue ActiveConnection -> TVar Int -> TVar Bool -> IO ()
loopCollectingGarbage Int
timeout TQueue ActiveConnection
establishedQueue TVar Int
slotsAvailVar TVar Bool
aliveVar =
  IO ()
decide
  where
    decide :: IO ()
decide =
      do
        Int
ts <- IO Int
getMillisecondsSinceEpoch
        IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
atomically (STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ do
          Bool
alive <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
aliveVar
          if Bool
alive
            then let
              tryToRelease :: STM (IO ())
tryToRelease =
                TQueue ActiveConnection -> STM (Maybe ActiveConnection)
forall a. TQueue a -> STM (Maybe a)
tryReadTQueue TQueue ActiveConnection
establishedQueue STM (Maybe ActiveConnection)
-> (Maybe ActiveConnection -> STM (IO ())) -> STM (IO ())
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ case
                  -- The queue is empty. Just wait for changes in the state.
                  Maybe ActiveConnection
Nothing ->
                    STM (IO ())
forall a. STM a
retry
                  Just entry :: ActiveConnection
entry@(ActiveConnection Int
lastUseTs Connection
connection) ->
                    let
                      outdatingTs :: Int
outdatingTs =
                        Int
lastUseTs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
timeout
                      in
                        -- Check whether it's outdated.
                        if Int
outdatingTs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
ts
                          -- Fetch the current value of available slots and
                          -- release this one and other connections.
                          then do
                            Int
slotsAvail <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
slotsAvailVar
                            Int -> [Connection] -> Int -> STM (IO ())
collectAndRelease Int
slotsAvail [Connection
connection] Int
outdatingTs
                          -- Return it to the front of the queue and
                          -- wait until it's outdating time.
                          else do
                            TQueue ActiveConnection -> ActiveConnection -> STM ()
forall a. TQueue a -> a -> STM ()
unGetTQueue TQueue ActiveConnection
establishedQueue ActiveConnection
entry
                            IO () -> STM (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO ()
sleep Int
outdatingTs IO () -> IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> IO ()
decide)
              collectAndRelease :: Int -> [Connection] -> Int -> STM (IO ())
collectAndRelease !Int
slotsAvail ![Connection]
outdatedList Int
outdatingTs =
                TQueue ActiveConnection -> STM (Maybe ActiveConnection)
forall a. TQueue a -> STM (Maybe a)
tryReadTQueue TQueue ActiveConnection
establishedQueue STM (Maybe ActiveConnection)
-> (Maybe ActiveConnection -> STM (IO ())) -> STM (IO ())
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ case
                  Maybe ActiveConnection
Nothing ->
                    Int -> [Connection] -> Int -> STM (IO ())
finalizeAndRelease Int
slotsAvail [Connection]
outdatedList Int
outdatingTs
                  Just entry :: ActiveConnection
entry@(ActiveConnection Int
lastUseTs Connection
connection) ->
                    let
                      outdatingTs :: Int
outdatingTs =
                        Int
lastUseTs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
timeout
                      in
                        if Int
outdatingTs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
ts
                          then do
                            TQueue ActiveConnection -> ActiveConnection -> STM ()
forall a. TQueue a -> a -> STM ()
unGetTQueue TQueue ActiveConnection
establishedQueue ActiveConnection
entry
                            Int -> [Connection] -> Int -> STM (IO ())
finalizeAndRelease Int
slotsAvail [Connection]
outdatedList Int
outdatingTs
                          else
                            Int -> [Connection] -> Int -> STM (IO ())
collectAndRelease (Int -> Int
forall a. Enum a => a -> a
succ Int
slotsAvail) (Connection
connection Connection -> [Connection] -> [Connection]
forall a. a -> [a] -> [a]
: [Connection]
outdatedList) Int
outdatingTs
              finalizeAndRelease :: Int -> [Connection] -> Int -> STM (IO ())
finalizeAndRelease Int
slotsAvail [Connection]
outdatedList Int
outdatingTs =
                do
                  TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
slotsAvailVar Int
slotsAvail
                  IO () -> STM (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return ([Connection] -> IO ()
forall (t :: * -> *). Foldable t => t Connection -> IO ()
release [Connection]
outdatedList IO () -> IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Int -> IO ()
sleep Int
outdatingTs IO () -> IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> IO ()
decide)
              in STM (IO ())
tryToRelease
            else do
              [ActiveConnection]
list <- TQueue ActiveConnection -> STM [ActiveConnection]
forall a. TQueue a -> STM [a]
flushTQueue TQueue ActiveConnection
establishedQueue
              IO () -> STM (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return ([Connection] -> IO ()
forall (t :: * -> *). Foldable t => t Connection -> IO ()
release ((ActiveConnection -> Connection)
-> [ActiveConnection] -> [Connection]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ActiveConnection -> Connection
activeConnectionConnection [ActiveConnection]
list))
    sleep :: Int -> IO ()
sleep Int
untilTs =
      do
        Int
ts <- IO Int
getMillisecondsSinceEpoch
        let
          diff :: Int
diff =
            Int
untilTs Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
ts
          in if Int
diff Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
            then Int -> IO ()
threadDelay (Int
diff Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000)
            else () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    release :: t Connection -> IO ()
release =
      (Connection -> IO ()) -> t Connection -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Connection -> IO ()
Connection.release

-- |
-- Settings of the connection pool. Consist of:
-- 
-- * Pool-size.
-- 
-- * Timeout.   
-- An amount of time in milliseconds for which the unused connections are kept open.
-- 
-- * Connection settings.
-- 
type Settings =
  (Int, Int, Connection.Settings)

-- |
-- Given the pool-size, timeout and connection settings
-- create a connection-pool.
acquire :: Settings -> IO Pool
acquire :: Settings -> IO Pool
acquire (Int
size, Int
timeout, Settings
connectionSettings) =
  do
    TQueue ActiveConnection
establishedQueue <- IO (TQueue ActiveConnection)
forall a. IO (TQueue a)
newTQueueIO
    TVar Int
slotsAvailVar <- Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
size
    TVar Bool
aliveVar <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO (Int
size Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0)
    IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Int -> TQueue ActiveConnection -> TVar Int -> TVar Bool -> IO ()
loopCollectingGarbage Int
timeout TQueue ActiveConnection
establishedQueue TVar Int
slotsAvailVar TVar Bool
aliveVar
    Pool -> IO Pool
forall (m :: * -> *) a. Monad m => a -> m a
return (Settings
-> TQueue ActiveConnection -> TVar Int -> TVar Bool -> Pool
Pool Settings
connectionSettings TQueue ActiveConnection
establishedQueue TVar Int
slotsAvailVar TVar Bool
aliveVar)

-- |
-- Release the connection-pool.
release :: Pool -> IO ()
release :: Pool -> IO ()
release (Pool Settings
_ TQueue ActiveConnection
_ TVar Int
_ TVar Bool
aliveVar) =
  STM () -> IO ()
forall a. STM a -> IO a
atomically (TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
aliveVar Bool
False)

-- |
-- A union over the connection establishment error and the session error.
data UsageError =
  ConnectionUsageError Connection.ConnectionError |
  SessionUsageError Session.QueryError |
  PoolIsReleasedUsageError
  deriving (Int -> UsageError -> ShowS
[UsageError] -> ShowS
UsageError -> String
(Int -> UsageError -> ShowS)
-> (UsageError -> String)
-> ([UsageError] -> ShowS)
-> Show UsageError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [UsageError] -> ShowS
$cshowList :: [UsageError] -> ShowS
show :: UsageError -> String
$cshow :: UsageError -> String
showsPrec :: Int -> UsageError -> ShowS
$cshowsPrec :: Int -> UsageError -> ShowS
Show, UsageError -> UsageError -> Bool
(UsageError -> UsageError -> Bool)
-> (UsageError -> UsageError -> Bool) -> Eq UsageError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: UsageError -> UsageError -> Bool
$c/= :: UsageError -> UsageError -> Bool
== :: UsageError -> UsageError -> Bool
$c== :: UsageError -> UsageError -> Bool
Eq)

-- |
-- Use a connection from the pool to run a session and
-- return the connection to the pool, when finished.
use :: Pool -> Session.Session a -> IO (Either UsageError a)
use :: Pool -> Session a -> IO (Either UsageError a)
use (Pool Settings
connectionSettings TQueue ActiveConnection
establishedQueue TVar Int
slotsAvailVar TVar Bool
aliveVar) Session a
session =
  IO (IO (Either UsageError a)) -> IO (Either UsageError a)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO (Either UsageError a)) -> IO (Either UsageError a))
-> IO (IO (Either UsageError a)) -> IO (Either UsageError a)
forall a b. (a -> b) -> a -> b
$ STM (IO (Either UsageError a)) -> IO (IO (Either UsageError a))
forall a. STM a -> IO a
atomically (STM (IO (Either UsageError a)) -> IO (IO (Either UsageError a)))
-> STM (IO (Either UsageError a)) -> IO (IO (Either UsageError a))
forall a b. (a -> b) -> a -> b
$ do
    Bool
alive <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
aliveVar
    if Bool
alive
      then
        TQueue ActiveConnection -> STM (Maybe ActiveConnection)
forall a. TQueue a -> STM (Maybe a)
tryReadTQueue TQueue ActiveConnection
establishedQueue STM (Maybe ActiveConnection)
-> (Maybe ActiveConnection -> STM (IO (Either UsageError a)))
-> STM (IO (Either UsageError a))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ case
          -- No established connection avail at the moment.
          Maybe ActiveConnection
Nothing -> do
            Int
slotsAvail <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
slotsAvailVar
            -- Do we have any slots left for establishing new connections?
            if Int
slotsAvail Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
              -- Reduce the available slots var and instruct to
              -- establish and use a new connection.
              then do
                TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
slotsAvailVar (Int -> STM ()) -> Int -> STM ()
forall a b. (a -> b) -> a -> b
$! Int -> Int
forall a. Enum a => a -> a
pred Int
slotsAvail
                IO (Either UsageError a) -> STM (IO (Either UsageError a))
forall (m :: * -> *) a. Monad m => a -> m a
return IO (Either UsageError a)
acquireConnectionThenUseThenPutItToQueue
              -- Wait until the state changes and retry.
              else
                STM (IO (Either UsageError a))
forall a. STM a
retry
          Just (ActiveConnection Int
_ Connection
connection) ->
            IO (Either UsageError a) -> STM (IO (Either UsageError a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection -> IO (Either UsageError a)
useConnectionThenPutItToQueue Connection
connection)
      else
        IO (Either UsageError a) -> STM (IO (Either UsageError a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either UsageError a -> IO (Either UsageError a)
forall (m :: * -> *) a. Monad m => a -> m a
return (UsageError -> Either UsageError a
forall a b. a -> Either a b
Left UsageError
PoolIsReleasedUsageError))
  where
    acquireConnectionThenUseThenPutItToQueue :: IO (Either UsageError a)
acquireConnectionThenUseThenPutItToQueue =
      do
        Either ConnectionError Connection
res <- Settings -> IO (Either ConnectionError Connection)
Connection.acquire Settings
connectionSettings
        case Either ConnectionError Connection
res of
          -- Failed to acquire, so release an availability slot,
          -- returning the error details.
          Left ConnectionError
acquisitionError -> do
            STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
slotsAvailVar Int -> Int
forall a. Enum a => a -> a
succ
            Either UsageError a -> IO (Either UsageError a)
forall (m :: * -> *) a. Monad m => a -> m a
return (UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (ConnectionError -> UsageError
ConnectionUsageError ConnectionError
acquisitionError))
          Right Connection
connection ->
            Connection -> IO (Either UsageError a)
useConnectionThenPutItToQueue Connection
connection
    useConnectionThenPutItToQueue :: Connection -> IO (Either UsageError a)
useConnectionThenPutItToQueue Connection
connection =
      do
        Either QueryError a
res <- Session a -> Connection -> IO (Either QueryError a)
forall a. Session a -> Connection -> IO (Either QueryError a)
Session.run Session a
session Connection
connection
        case Either QueryError a
res of
          Left QueryError
queryError -> do
            -- Check whether the error is on client-side,
            -- and in that case release the connection.
            case QueryError
queryError of
              Session.QueryError Settings
_ [Text]
_ (Session.ClientError ConnectionError
_) ->
                Connection -> IO ()
releaseConnection Connection
connection
              QueryError
_ ->
                Connection -> IO ()
putConnectionToPool Connection
connection
            Either UsageError a -> IO (Either UsageError a)
forall (m :: * -> *) a. Monad m => a -> m a
return (UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (QueryError -> UsageError
SessionUsageError QueryError
queryError))
          Right a
res -> do
            Connection -> IO ()
putConnectionToPool Connection
connection
            Either UsageError a -> IO (Either UsageError a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Either UsageError a
forall a b. b -> Either a b
Right a
res)
    putConnectionToPool :: Connection -> IO ()
putConnectionToPool Connection
connection =
      do
        Int
ts <- IO Int
getMillisecondsSinceEpoch
        STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue ActiveConnection -> ActiveConnection -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue ActiveConnection
establishedQueue (Int -> Connection -> ActiveConnection
ActiveConnection Int
ts Connection
connection)
    releaseConnection :: Connection -> IO ()
releaseConnection Connection
connection =
      do
        STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
slotsAvailVar Int -> Int
forall a. Enum a => a -> a
succ
        Connection -> IO ()
Connection.release Connection
connection