module Hasql.Pool
(   Pool
,   Settings(..)
,   UsageError(..)
,   ConnectionGetter
,   Stats(..)
,   stats
,   getPoolUsageStat
,   acquire
,   acquireWith
,   release
,   use
,   useWithObserver
,   withResourceOnEither
)
where

import qualified Data.Pool as ResourcePool
import qualified Data.Pool.Internal as Unstable
import           System.Clock (Clock(Monotonic), diffTimeSpec, getTime, toNanoSecs)

import           Hasql.Pool.Prelude
import qualified Hasql.Connection
import qualified Hasql.Session
import           Hasql.Pool.Observer (Observed(..), ObserverAction)


-- |
-- A pool of open DB connections.
newtype Pool =
    Pool (ResourcePool.Pool (Either Hasql.Connection.ConnectionError Hasql.Connection.Connection))



type PoolSize         = Int
type PoolStripes      = Int
type ResidenceTimeout = NominalDiffTime

-- |
-- Connection getter action that allows for obtaining Postgres connection settings
-- via external resources such as AWS tokens etc.
type ConnectionGetter = IO (Either Hasql.Connection.ConnectionError Hasql.Connection.Connection)

-- |
-- Settings of the connection pool. Consist of:
--
-- * Pool-size.
--
-- * Timeout.
-- An amount of time for which an unused resource is kept open.
-- The smallest acceptable value is 0.5 seconds.
--
-- * Connection settings.
--
type Settings =
  (PoolSize, ResidenceTimeout, Hasql.Connection.Settings)

-- |
-- Given the pool-size, timeout and connection settings
-- create a connection-pool.
acquire :: Settings -> IO Pool
acquire :: Settings -> IO Pool
acquire settings :: Settings
settings@(Int
_size, ResidenceTimeout
_timeout, Settings
connectionSettings) =
    ConnectionGetter -> Settings -> IO Pool
acquireWith (Settings -> ConnectionGetter
Hasql.Connection.acquire Settings
connectionSettings) Settings
settings


-- |
-- Similar to 'acquire', allows for finer configuration.
acquireWith :: ConnectionGetter
            -> Settings
            -> IO Pool
acquireWith :: ConnectionGetter -> Settings -> IO Pool
acquireWith ConnectionGetter
connGetter (Int
maxSize, ResidenceTimeout
timeout, Settings
connectionSettings) =
    forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Pool (Either ConnectionError Connection) -> Pool
Pool forall a b. (a -> b) -> a -> b
$ forall a.
IO a -> (a -> IO ()) -> ResidenceTimeout -> Int -> IO (Pool a)
createPool ConnectionGetter
connGetter forall {a}. Either a Connection -> IO ()
release ResidenceTimeout
timeout Int
maxSize
    where
        release :: Either a Connection -> IO ()
release = forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (forall a b. a -> b -> a
const (forall (f :: * -> *) a. Applicative f => a -> f a
pure ())) Connection -> IO ()
Hasql.Connection.release


createPool :: IO a
           -> (a -> IO ())
           -> NominalDiffTime
           -> Int
           -> IO (ResourcePool.Pool a)
createPool :: forall a.
IO a -> (a -> IO ()) -> ResidenceTimeout -> Int -> IO (Pool a)
createPool IO a
create a -> IO ()
free ResidenceTimeout
idleTime Int
maxResources = forall a. PoolConfig a -> IO (Pool a)
ResourcePool.newPool PoolConfig a
cfg where
    -- defaultPoolConfig create free cacheTTL maxResources = PoolConfig
    cfg :: PoolConfig a
cfg = forall a. IO a -> (a -> IO ()) -> Double -> Int -> PoolConfig a
ResourcePool.defaultPoolConfig IO a
create a -> IO ()
free (forall a b. (Real a, Fractional b) => a -> b
realToFrac ResidenceTimeout
idleTime) Int
maxResources


-- |
-- Release the connection-pool by closing and removing all connections.
release :: Pool -> IO ()
release :: Pool -> IO ()
release (Pool Pool (Either ConnectionError Connection)
pool) =
    forall a. Pool a -> IO ()
ResourcePool.destroyAllResources Pool (Either ConnectionError Connection)
pool


-- |
-- A union over the connection establishment error and the session error.
data UsageError
    =   ConnectionError Hasql.Connection.ConnectionError
    |   SessionError    Hasql.Session.QueryError
    deriving (Int -> UsageError -> ShowS
[UsageError] -> ShowS
UsageError -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [UsageError] -> ShowS
$cshowList :: [UsageError] -> ShowS
show :: UsageError -> String
$cshow :: UsageError -> String
showsPrec :: Int -> UsageError -> ShowS
$cshowsPrec :: Int -> UsageError -> ShowS
Show, UsageError -> UsageError -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: UsageError -> UsageError -> Bool
$c/= :: UsageError -> UsageError -> Bool
== :: UsageError -> UsageError -> Bool
$c== :: UsageError -> UsageError -> Bool
Eq)

-- |
-- Use a connection from the pool to run a session and
-- return the connection to the pool, when finished.
use :: Pool -> Hasql.Session.Session a -> IO (Either UsageError a)
use :: forall a. Pool -> Session a -> IO (Either UsageError a)
use = forall a.
Maybe ObserverAction
-> Pool -> Session a -> IO (Either UsageError a)
useWithObserver forall a. Maybe a
Nothing

-- |
-- Same as 'use' but allows for a custom observer action. You can use it for gathering latency metrics.
useWithObserver :: Maybe ObserverAction
                -> Pool
                -> Hasql.Session.Session a
                -> IO (Either UsageError a)
useWithObserver :: forall a.
Maybe ObserverAction
-> Pool -> Session a -> IO (Either UsageError a)
useWithObserver Maybe ObserverAction
observer (Pool Pool (Either ConnectionError Connection)
pool) Session a
session =
    forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (forall a b. a -> Either a b
Left forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. ConnectionError -> UsageError
ConnectionError) (forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (forall a b. a -> Either a b
Left forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. QueryError -> UsageError
SessionError) forall a b. b -> Either a b
Right)) forall a b. (a -> b) -> a -> b
$
    forall resource failure success.
Pool resource
-> (resource -> IO (Either failure success))
-> IO (Either failure success)
withResourceOnEither Pool (Either ConnectionError Connection)
pool forall a b. (a -> b) -> a -> b
$
    forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse Connection -> IO (Either QueryError a)
runQuery
    where
        runQuery :: Connection -> IO (Either QueryError a)
runQuery Connection
dbConn = forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO (Either QueryError a)
action (forall {b} {a}. IO b -> (Observed -> IO a) -> IO b
runWithObserver IO (Either QueryError a)
action) Maybe ObserverAction
observer
            where
                action :: IO (Either QueryError a)
action = forall a. Session a -> Connection -> IO (Either QueryError a)
Hasql.Session.run Session a
session Connection
dbConn

        runWithObserver :: IO b -> (Observed -> IO a) -> IO b
runWithObserver IO b
action Observed -> IO a
doObserve = do
            let measure :: IO TimeSpec
measure = Clock -> IO TimeSpec
getTime Clock
Monotonic
            TimeSpec
start  <- IO TimeSpec
measure
            b
result <- IO b
action
            TimeSpec
end    <- IO TimeSpec
measure
            let nsRatio :: a
nsRatio  = a
1000000000
                observed :: Observed
observed = Observed {   latency :: Ratio Integer
latency = forall a. Real a => a -> Ratio Integer
toRational (TimeSpec -> Integer
toNanoSecs (TimeSpec
end TimeSpec -> TimeSpec -> TimeSpec
`diffTimeSpec` TimeSpec
start) forall a. Integral a => a -> a -> Ratio a
% forall {a}. Num a => a
nsRatio)
                                    }
            Observed -> IO a
doObserve Observed
observed forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Applicative f => a -> f a
pure b
result


withResourceOnEither :: ResourcePool.Pool resource
                     -> (resource -> IO (Either failure success))
                     -> IO (Either failure success)
withResourceOnEither :: forall resource failure success.
Pool resource
-> (resource -> IO (Either failure success))
-> IO (Either failure success)
withResourceOnEither Pool resource
pool resource -> IO (Either failure success)
act = forall a. IO a -> IO a
mask_ forall a b. (a -> b) -> a -> b
$ do
    (resource
resource, LocalPool resource
localPool) <- forall a. Pool a -> IO (a, LocalPool a)
ResourcePool.takeResource Pool resource
pool
    Either failure success
failureOrSuccess      <- resource -> IO (Either failure success)
act resource
resource forall a b. IO a -> IO b -> IO a
`onException` forall a. Pool a -> LocalPool a -> a -> IO ()
ResourcePool.destroyResource Pool resource
pool LocalPool resource
localPool resource
resource
    case Either failure success
failureOrSuccess of
        Right success
success -> do
            forall a. LocalPool a -> a -> IO ()
ResourcePool.putResource LocalPool resource
localPool resource
resource
            forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a b. b -> Either a b
Right success
success
        Left failure
failure -> do
            forall a. Pool a -> LocalPool a -> a -> IO ()
ResourcePool.destroyResource Pool resource
pool LocalPool resource
localPool resource
resource
            forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left failure
failure


data Stats = Stats
    {   Stats -> Int
currentUsage  :: !Int
        -- ^ Current number of items.
    ,   Stats -> Int
available     :: !Int
        -- ^ Total items available for consumption.
    } deriving Int -> Stats -> ShowS
[Stats] -> ShowS
Stats -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Stats] -> ShowS
$cshowList :: [Stats] -> ShowS
show :: Stats -> String
$cshow :: Stats -> String
showsPrec :: Int -> Stats -> ShowS
$cshowsPrec :: Int -> Stats -> ShowS
Show


stats :: Pool -> IO Stats
stats :: Pool -> IO Stats
stats (Pool Pool (Either ConnectionError Connection)
pool) = IO (SmallArray Int)
currentlyAvailablePerStripe forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SmallArray Int -> IO Stats
collect where
    -- attributes extraction and counting
    collect :: SmallArray Int -> IO Stats
collect SmallArray Int
xs = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ Int -> Int -> Stats
Stats Int
inUse Int
avail where
        inUse :: Int
inUse = Int
maxResources forall a. Num a => a -> a -> a
- Int
avail
        avail :: Int
avail = forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum SmallArray Int
xs

    currentlyAvailablePerStripe :: IO (SmallArray Int)
currentlyAvailablePerStripe = forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse forall {k} (cat :: k -> k -> *) (a :: k). Category cat => cat a a
id SmallArray (IO Int)
peekAvailable
    peekAvailable :: SmallArray (IO Int)
peekAvailable               = (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe (Stripe (Either ConnectionError Connection)) -> Int
stripeAvailability) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SmallArray
  (IO (Maybe (Stripe (Either ConnectionError Connection))))
allStripes    -- array of IO Int
    stripeAvailability :: Maybe (Stripe (Either ConnectionError Connection)) -> Int
stripeAvailability Maybe (Stripe (Either ConnectionError Connection))
ms       = forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
quotaPerStripe forall a. Stripe a -> Int
Unstable.available Maybe (Stripe (Either ConnectionError Connection))
ms  -- if the stripe ref is uninitialised, count the default availability
    allStripes :: SmallArray
  (IO (Maybe (Stripe (Either ConnectionError Connection))))
allStripes                  = forall {a}. LocalPool a -> IO (Maybe (Stripe a))
peekStripe forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Pool a -> SmallArray (LocalPool a)
Unstable.localPools Pool (Either ConnectionError Connection)
pool     -- array of IO Maybe
    peekStripe :: LocalPool a -> IO (Maybe (Stripe a))
peekStripe                  = forall a. MVar a -> IO (Maybe a)
tryReadMVar forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a. LocalPool a -> MVar (Stripe a)
Unstable.stripeVar

    -- data from the pool
    quotaPerStripe :: Int
quotaPerStripe              = Int
maxResources forall {a}. Integral a => a -> a -> a
`quotCeil` Int
numStripes
    numStripes :: Int
numStripes                  = forall (t :: * -> *) a. Foldable t => t a -> Int
length forall a b. (a -> b) -> a -> b
$ forall a. Pool a -> SmallArray (LocalPool a)
Unstable.localPools Pool (Either ConnectionError Connection)
pool  -- can be 'sizeofSmallArray' but requires 'primitive' as dependency
    maxResources :: Int
maxResources                = forall a. PoolConfig a -> Int
Unstable.poolMaxResources forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a. Pool a -> PoolConfig a
Unstable.poolConfig forall a b. (a -> b) -> a -> b
$ Pool (Either ConnectionError Connection)
pool
    quotCeil :: a -> a -> a
quotCeil a
x a
y                = let (a
z, a
r) = a
x forall a. Integral a => a -> a -> (a, a)
`quotRem` a
y in if a
r forall a. Eq a => a -> a -> Bool
== a
0 then a
z else a
z forall a. Num a => a -> a -> a
+ a
1  -- copied from 'Data.Pool.Internal'


getPoolUsageStat :: Pool -> IO PoolSize
getPoolUsageStat :: Pool -> IO Int
getPoolUsageStat Pool
pool = Pool -> IO Stats
stats Pool
pool forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (forall (f :: * -> *) a. Applicative f => a -> f a
pure forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. Stats -> Int
currentUsage)