module Hasql.Pool
(
Pool,
acquire,
acquireDynamically,
use,
release,
UsageError (..),
)
where
import Hasql.Connection (Connection)
import qualified Hasql.Connection as Connection
import Hasql.Pool.Prelude
import qualified Hasql.Session as Session
data Conn = Conn
{ Conn -> Connection
connConnection :: Connection,
Conn -> Word64
connCreationTimeNSec :: Word64
}
isAlive :: Word64 -> Word64 -> Conn -> Bool
isAlive :: Word64 -> Word64 -> Conn -> Bool
isAlive Word64
maxLifetime Word64
now Conn
conn =
Word64
now forall a. Ord a => a -> a -> Bool
<= Conn -> Word64
connCreationTimeNSec Conn
conn forall a. Num a => a -> a -> a
+ Word64
maxLifetime
data Pool = Pool
{
Pool -> Int
poolSize :: Int,
Pool -> IO Settings
poolFetchConnectionSettings :: IO Connection.Settings,
Pool -> Int
poolAcquisitionTimeout :: Int,
Pool -> Word64
poolMaxLifetime :: Word64,
Pool -> TQueue Conn
poolConnectionQueue :: TQueue Conn,
Pool -> TVar Int
poolCapacity :: TVar Int,
Pool -> TVar (TVar Bool)
poolReuseVar :: TVar (TVar Bool),
Pool -> IORef ()
poolReaperRef :: IORef ()
}
acquire ::
Int ->
DiffTime ->
DiffTime ->
Connection.Settings ->
IO Pool
acquire :: Int -> DiffTime -> DiffTime -> Settings -> IO Pool
acquire Int
poolSize DiffTime
acqTimeout DiffTime
maxLifetime Settings
connectionSettings =
Int -> DiffTime -> DiffTime -> IO Settings -> IO Pool
acquireDynamically Int
poolSize DiffTime
acqTimeout DiffTime
maxLifetime (forall (f :: * -> *) a. Applicative f => a -> f a
pure Settings
connectionSettings)
acquireDynamically ::
Int ->
DiffTime ->
DiffTime ->
IO Connection.Settings ->
IO Pool
acquireDynamically :: Int -> DiffTime -> DiffTime -> IO Settings -> IO Pool
acquireDynamically Int
poolSize DiffTime
acqTimeout DiffTime
maxLifetime IO Settings
fetchConnectionSettings = do
TQueue Conn
connectionQueue <- forall a. IO (TQueue a)
newTQueueIO
TVar Int
capVar <- forall a. a -> IO (TVar a)
newTVarIO Int
poolSize
TVar (TVar Bool)
reuseVar <- forall a. a -> IO (TVar a)
newTVarIO forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. a -> IO (TVar a)
newTVarIO Bool
True
IORef ()
reaperRef <- forall a. a -> IO (IORef a)
newIORef ()
ThreadId
managerTid <- ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> forall a. IO a -> IO a
unmask forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
Int -> IO ()
threadDelay Int
1000000
Word64
now <- IO Word64
getMonotonicTimeNSec
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
[Conn]
conns <- forall a. TQueue a -> STM [a]
flushTQueue TQueue Conn
connectionQueue
let ([Conn]
keep, [Conn]
close) = forall a. (a -> Bool) -> [a] -> ([a], [a])
partition (Word64 -> Word64 -> Conn -> Bool
isAlive Word64
maxLifetimeNanos Word64
now) [Conn]
conns
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (forall a. TQueue a -> a -> STM ()
writeTQueue TQueue Conn
connectionQueue) [Conn]
keep
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Conn]
close forall a b. (a -> b) -> a -> b
$ \Conn
conn -> do
Connection -> IO ()
Connection.release (Conn -> Connection
connConnection Conn
conn)
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
capVar forall a. Enum a => a -> a
succ
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
reaperRef forall a b. (a -> b) -> a -> b
$ do
ThreadId -> IO ()
killThread ThreadId
managerTid
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int
-> IO Settings
-> Int
-> Word64
-> TQueue Conn
-> TVar Int
-> TVar (TVar Bool)
-> IORef ()
-> Pool
Pool Int
poolSize IO Settings
fetchConnectionSettings Int
acqTimeoutMicros Word64
maxLifetimeNanos TQueue Conn
connectionQueue TVar Int
capVar TVar (TVar Bool)
reuseVar IORef ()
reaperRef
where
acqTimeoutMicros :: Int
acqTimeoutMicros =
forall a. Integral a => a -> a -> a
div (forall a b. (Integral a, Num b) => a -> b
fromIntegral (DiffTime -> Integer
diffTimeToPicoseconds DiffTime
acqTimeout)) Int
1_000_000
maxLifetimeNanos :: Word64
maxLifetimeNanos =
forall a. Integral a => a -> a -> a
div (forall a b. (Integral a, Num b) => a -> b
fromIntegral (DiffTime -> Integer
diffTimeToPicoseconds DiffTime
maxLifetime)) Word64
1_000
release :: Pool -> IO ()
release :: Pool -> IO ()
release Pool {Int
IO Settings
Word64
TVar Int
TVar (TVar Bool)
IORef ()
TQueue Conn
poolReaperRef :: IORef ()
poolReuseVar :: TVar (TVar Bool)
poolCapacity :: TVar Int
poolConnectionQueue :: TQueue Conn
poolMaxLifetime :: Word64
poolAcquisitionTimeout :: Int
poolFetchConnectionSettings :: IO Settings
poolSize :: Int
poolReaperRef :: Pool -> IORef ()
poolReuseVar :: Pool -> TVar (TVar Bool)
poolCapacity :: Pool -> TVar Int
poolConnectionQueue :: Pool -> TQueue Conn
poolMaxLifetime :: Pool -> Word64
poolAcquisitionTimeout :: Pool -> Int
poolFetchConnectionSettings :: Pool -> IO Settings
poolSize :: Pool -> Int
..} =
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
TVar Bool
prevReuse <- forall a. TVar a -> STM a
readTVar TVar (TVar Bool)
poolReuseVar
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
prevReuse Bool
False
TVar Bool
newReuse <- forall a. a -> STM (TVar a)
newTVar Bool
True
forall a. TVar a -> a -> STM ()
writeTVar TVar (TVar Bool)
poolReuseVar TVar Bool
newReuse
[Conn]
conns <- forall a. TQueue a -> STM [a]
flushTQueue TQueue Conn
poolConnectionQueue
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Conn]
conns forall a b. (a -> b) -> a -> b
$ \Conn
conn -> do
Connection -> IO ()
Connection.release (Conn -> Connection
connConnection Conn
conn)
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity forall a. Enum a => a -> a
succ
use :: Pool -> Session.Session a -> IO (Either UsageError a)
use :: forall a. Pool -> Session a -> IO (Either UsageError a)
use Pool {Int
IO Settings
Word64
TVar Int
TVar (TVar Bool)
IORef ()
TQueue Conn
poolReaperRef :: IORef ()
poolReuseVar :: TVar (TVar Bool)
poolCapacity :: TVar Int
poolConnectionQueue :: TQueue Conn
poolMaxLifetime :: Word64
poolAcquisitionTimeout :: Int
poolFetchConnectionSettings :: IO Settings
poolSize :: Int
poolReaperRef :: Pool -> IORef ()
poolReuseVar :: Pool -> TVar (TVar Bool)
poolCapacity :: Pool -> TVar Int
poolConnectionQueue :: Pool -> TQueue Conn
poolMaxLifetime :: Pool -> Word64
poolAcquisitionTimeout :: Pool -> Int
poolFetchConnectionSettings :: Pool -> IO Settings
poolSize :: Pool -> Int
..} Session a
sess = do
STM Bool
timeout <- do
TVar Bool
delay <- Int -> IO (TVar Bool)
registerDelay Int
poolAcquisitionTimeout
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar TVar Bool
delay
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
TVar Bool
reuseVar <- forall a. TVar a -> STM a
readTVar TVar (TVar Bool)
poolReuseVar
forall (t :: * -> *) (f :: * -> *) a.
(Foldable t, Alternative f) =>
t (f a) -> f a
asum
[ forall a. TQueue a -> STM a
readTQueue TQueue Conn
poolConnectionQueue forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> TVar Bool -> Conn -> IO (Either UsageError a)
onConn TVar Bool
reuseVar,
do
Int
capVal <- forall a. TVar a -> STM a
readTVar TVar Int
poolCapacity
if Int
capVal forall a. Ord a => a -> a -> Bool
> Int
0
then do
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
poolCapacity forall a b. (a -> b) -> a -> b
$! forall a. Enum a => a -> a
pred Int
capVal
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ TVar Bool -> IO (Either UsageError a)
onNewConn TVar Bool
reuseVar
else forall a. STM a
retry,
do
Bool
timedOut <- STM Bool
timeout
if Bool
timedOut
then forall (m :: * -> *) a. Monad m => a -> m a
return forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall (m :: * -> *) a. Monad m => a -> m a
return forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ UsageError
AcquisitionTimeoutUsageError
else forall a. STM a
retry
]
where
onNewConn :: TVar Bool -> IO (Either UsageError a)
onNewConn TVar Bool
reuseVar = do
Settings
settings <- IO Settings
poolFetchConnectionSettings
Word64
now <- IO Word64
getMonotonicTimeNSec
Either ConnectionError Connection
connRes <- Settings -> IO (Either ConnectionError Connection)
Connection.acquire Settings
settings
case Either ConnectionError Connection
connRes of
Left ConnectionError
connErr -> do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity forall a. Enum a => a -> a
succ
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ ConnectionError -> UsageError
ConnectionUsageError ConnectionError
connErr
Right Connection
conn -> TVar Bool -> Conn -> IO (Either UsageError a)
onLiveConn TVar Bool
reuseVar (Connection -> Word64 -> Conn
Conn Connection
conn Word64
now)
onConn :: TVar Bool -> Conn -> IO (Either UsageError a)
onConn TVar Bool
reuseVar Conn
conn = do
Word64
now <- IO Word64
getMonotonicTimeNSec
if Word64 -> Word64 -> Conn -> Bool
isAlive Word64
poolMaxLifetime Word64
now Conn
conn
then TVar Bool -> Conn -> IO (Either UsageError a)
onLiveConn TVar Bool
reuseVar Conn
conn
else do
Connection -> IO ()
Connection.release (Conn -> Connection
connConnection Conn
conn)
TVar Bool -> IO (Either UsageError a)
onNewConn TVar Bool
reuseVar
onLiveConn :: TVar Bool -> Conn -> IO (Either UsageError a)
onLiveConn TVar Bool
reuseVar Conn
conn = do
Either QueryError a
sessRes <-
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch (forall a. Session a -> Connection -> IO (Either QueryError a)
Session.run Session a
sess (Conn -> Connection
connConnection Conn
conn)) forall a b. (a -> b) -> a -> b
$ \(SomeException
err :: SomeException) -> do
Connection -> IO ()
Connection.release (Conn -> Connection
connConnection Conn
conn)
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity forall a. Enum a => a -> a
succ
forall a e. Exception e => e -> a
throw SomeException
err
case Either QueryError a
sessRes of
Left QueryError
err -> case QueryError
err of
Session.QueryError Settings
_ [Text]
_ (Session.ClientError ConnectionError
_) -> do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity forall a. Enum a => a -> a
succ
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ QueryError -> UsageError
SessionUsageError QueryError
err
QueryError
_ -> do
IO ()
returnConn
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ QueryError -> UsageError
SessionUsageError QueryError
err
Right a
res -> do
IO ()
returnConn
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. b -> Either a b
Right a
res
where
returnConn :: IO ()
returnConn =
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
Bool
reuse <- forall a. TVar a -> STM a
readTVar TVar Bool
reuseVar
if Bool
reuse
then forall a. TQueue a -> a -> STM ()
writeTQueue TQueue Conn
poolConnectionQueue Conn
conn forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> forall (m :: * -> *) a. Monad m => a -> m a
return ()
else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ do
Connection -> IO ()
Connection.release (Conn -> Connection
connConnection Conn
conn)
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity forall a. Enum a => a -> a
succ
data UsageError
=
ConnectionUsageError Connection.ConnectionError
|
SessionUsageError Session.QueryError
|
AcquisitionTimeoutUsageError
deriving (Int -> UsageError -> ShowS
[UsageError] -> ShowS
UsageError -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [UsageError] -> ShowS
$cshowList :: [UsageError] -> ShowS
show :: UsageError -> String
$cshow :: UsageError -> String
showsPrec :: Int -> UsageError -> ShowS
$cshowsPrec :: Int -> UsageError -> ShowS
Show, UsageError -> UsageError -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: UsageError -> UsageError -> Bool
$c/= :: UsageError -> UsageError -> Bool
== :: UsageError -> UsageError -> Bool
$c== :: UsageError -> UsageError -> Bool
Eq)
instance Exception UsageError