module Hasql.Pool
( Pool,
Settings (..),
acquire,
release,
UsageError (..),
use,
)
where
import Hasql.Connection (Connection)
import qualified Hasql.Connection as Connection
import Hasql.Pool.Prelude
import Hasql.Session (Session)
import qualified Hasql.Session as Session
data Pool
= Pool
Connection.Settings
(TQueue ActiveConnection)
(TVar Int)
(TVar Bool)
data ActiveConnection = ActiveConnection
{ ActiveConnection -> Int
activeConnectionLastUseTimestamp :: Int,
ActiveConnection -> Connection
activeConnectionConnection :: Connection
}
loopCollectingGarbage :: Int -> TQueue ActiveConnection -> TVar Int -> TVar Bool -> IO ()
loopCollectingGarbage :: Int -> TQueue ActiveConnection -> TVar Int -> TVar Bool -> IO ()
loopCollectingGarbage Int
timeout TQueue ActiveConnection
establishedQueue TVar Int
slotsAvailVar TVar Bool
aliveVar =
IO ()
decide
where
decide :: IO ()
decide =
do
Int
ts <- IO Int
getMillisecondsSinceEpoch
IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ()) -> IO (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$
STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
atomically (STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ do
Bool
alive <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
aliveVar
if Bool
alive
then
let tryToRelease :: STM (IO ())
tryToRelease =
TQueue ActiveConnection -> STM (Maybe ActiveConnection)
forall a. TQueue a -> STM (Maybe a)
tryReadTQueue TQueue ActiveConnection
establishedQueue STM (Maybe ActiveConnection)
-> (Maybe ActiveConnection -> STM (IO ())) -> STM (IO ())
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe ActiveConnection
Nothing ->
STM (IO ())
forall a. STM a
retry
Just entry :: ActiveConnection
entry@(ActiveConnection Int
lastUseTs Connection
connection) ->
let outdatingTs :: Int
outdatingTs =
Int
lastUseTs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
timeout
in
if Int
outdatingTs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
ts
then
do
Int
slotsAvail <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
slotsAvailVar
Int -> [Connection] -> Int -> STM (IO ())
collectAndRelease Int
slotsAvail [Connection
connection] Int
outdatingTs
else
do
TQueue ActiveConnection -> ActiveConnection -> STM ()
forall a. TQueue a -> a -> STM ()
unGetTQueue TQueue ActiveConnection
establishedQueue ActiveConnection
entry
IO () -> STM (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO ()
sleep Int
outdatingTs IO () -> IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> IO ()
decide)
collectAndRelease :: Int -> [Connection] -> Int -> STM (IO ())
collectAndRelease !Int
slotsAvail ![Connection]
outdatedList Int
outdatingTs =
TQueue ActiveConnection -> STM (Maybe ActiveConnection)
forall a. TQueue a -> STM (Maybe a)
tryReadTQueue TQueue ActiveConnection
establishedQueue STM (Maybe ActiveConnection)
-> (Maybe ActiveConnection -> STM (IO ())) -> STM (IO ())
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe ActiveConnection
Nothing ->
Int -> [Connection] -> Int -> STM (IO ())
finalizeAndRelease Int
slotsAvail [Connection]
outdatedList Int
outdatingTs
Just entry :: ActiveConnection
entry@(ActiveConnection Int
lastUseTs Connection
connection) ->
let outdatingTs :: Int
outdatingTs =
Int
lastUseTs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
timeout
in if Int
outdatingTs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
ts
then do
TQueue ActiveConnection -> ActiveConnection -> STM ()
forall a. TQueue a -> a -> STM ()
unGetTQueue TQueue ActiveConnection
establishedQueue ActiveConnection
entry
Int -> [Connection] -> Int -> STM (IO ())
finalizeAndRelease Int
slotsAvail [Connection]
outdatedList Int
outdatingTs
else Int -> [Connection] -> Int -> STM (IO ())
collectAndRelease (Int -> Int
forall a. Enum a => a -> a
succ Int
slotsAvail) (Connection
connection Connection -> [Connection] -> [Connection]
forall a. a -> [a] -> [a]
: [Connection]
outdatedList) Int
outdatingTs
finalizeAndRelease :: Int -> [Connection] -> Int -> STM (IO ())
finalizeAndRelease Int
slotsAvail [Connection]
outdatedList Int
outdatingTs =
do
TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
slotsAvailVar Int
slotsAvail
IO () -> STM (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return ([Connection] -> IO ()
forall (t :: * -> *). Foldable t => t Connection -> IO ()
release [Connection]
outdatedList IO () -> IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Int -> IO ()
sleep Int
outdatingTs IO () -> IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> IO ()
decide)
in STM (IO ())
tryToRelease
else do
[ActiveConnection]
list <- TQueue ActiveConnection -> STM [ActiveConnection]
forall a. TQueue a -> STM [a]
flushTQueue TQueue ActiveConnection
establishedQueue
IO () -> STM (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return ([Connection] -> IO ()
forall (t :: * -> *). Foldable t => t Connection -> IO ()
release ((ActiveConnection -> Connection)
-> [ActiveConnection] -> [Connection]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ActiveConnection -> Connection
activeConnectionConnection [ActiveConnection]
list))
sleep :: Int -> IO ()
sleep Int
untilTs =
do
Int
ts <- IO Int
getMillisecondsSinceEpoch
let diff :: Int
diff =
Int
untilTs Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
ts
in if Int
diff Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then Int -> IO ()
threadDelay (Int
diff Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000)
else () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
release :: t Connection -> IO ()
release =
(Connection -> IO ()) -> t Connection -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Connection -> IO ()
Connection.release
type Settings =
(Int, Int, Connection.Settings)
acquire :: Settings -> IO Pool
acquire :: Settings -> IO Pool
acquire (Int
size, Int
timeout, Settings
connectionSettings) =
do
TQueue ActiveConnection
establishedQueue <- IO (TQueue ActiveConnection)
forall a. IO (TQueue a)
newTQueueIO
TVar Int
slotsAvailVar <- Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
size
TVar Bool
aliveVar <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO (Int
size Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0)
IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Int -> TQueue ActiveConnection -> TVar Int -> TVar Bool -> IO ()
loopCollectingGarbage Int
timeout TQueue ActiveConnection
establishedQueue TVar Int
slotsAvailVar TVar Bool
aliveVar
Pool -> IO Pool
forall (m :: * -> *) a. Monad m => a -> m a
return (Settings
-> TQueue ActiveConnection -> TVar Int -> TVar Bool -> Pool
Pool Settings
connectionSettings TQueue ActiveConnection
establishedQueue TVar Int
slotsAvailVar TVar Bool
aliveVar)
release :: Pool -> IO ()
release :: Pool -> IO ()
release (Pool Settings
_ TQueue ActiveConnection
_ TVar Int
_ TVar Bool
aliveVar) =
STM () -> IO ()
forall a. STM a -> IO a
atomically (TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
aliveVar Bool
False)
data UsageError
=
ConnectionUsageError Connection.ConnectionError
|
SessionUsageError Session.QueryError
|
PoolIsReleasedUsageError
deriving (Int -> UsageError -> ShowS
[UsageError] -> ShowS
UsageError -> String
(Int -> UsageError -> ShowS)
-> (UsageError -> String)
-> ([UsageError] -> ShowS)
-> Show UsageError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [UsageError] -> ShowS
$cshowList :: [UsageError] -> ShowS
show :: UsageError -> String
$cshow :: UsageError -> String
showsPrec :: Int -> UsageError -> ShowS
$cshowsPrec :: Int -> UsageError -> ShowS
Show, UsageError -> UsageError -> Bool
(UsageError -> UsageError -> Bool)
-> (UsageError -> UsageError -> Bool) -> Eq UsageError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: UsageError -> UsageError -> Bool
$c/= :: UsageError -> UsageError -> Bool
== :: UsageError -> UsageError -> Bool
$c== :: UsageError -> UsageError -> Bool
Eq)
use :: Pool -> Session.Session a -> IO (Either UsageError a)
use :: Pool -> Session a -> IO (Either UsageError a)
use (Pool Settings
connectionSettings TQueue ActiveConnection
establishedQueue TVar Int
slotsAvailVar TVar Bool
aliveVar) Session a
session =
IO (IO (Either UsageError a)) -> IO (Either UsageError a)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO (Either UsageError a)) -> IO (Either UsageError a))
-> IO (IO (Either UsageError a)) -> IO (Either UsageError a)
forall a b. (a -> b) -> a -> b
$
STM (IO (Either UsageError a)) -> IO (IO (Either UsageError a))
forall a. STM a -> IO a
atomically (STM (IO (Either UsageError a)) -> IO (IO (Either UsageError a)))
-> STM (IO (Either UsageError a)) -> IO (IO (Either UsageError a))
forall a b. (a -> b) -> a -> b
$ do
Bool
alive <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
aliveVar
if Bool
alive
then
TQueue ActiveConnection -> STM (Maybe ActiveConnection)
forall a. TQueue a -> STM (Maybe a)
tryReadTQueue TQueue ActiveConnection
establishedQueue STM (Maybe ActiveConnection)
-> (Maybe ActiveConnection -> STM (IO (Either UsageError a)))
-> STM (IO (Either UsageError a))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe ActiveConnection
Nothing -> do
Int
slotsAvail <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
slotsAvailVar
if Int
slotsAvail Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then
do
TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
slotsAvailVar (Int -> STM ()) -> Int -> STM ()
forall a b. (a -> b) -> a -> b
$! Int -> Int
forall a. Enum a => a -> a
pred Int
slotsAvail
IO (Either UsageError a) -> STM (IO (Either UsageError a))
forall (m :: * -> *) a. Monad m => a -> m a
return IO (Either UsageError a)
acquireConnectionThenUseThenPutItToQueue
else
STM (IO (Either UsageError a))
forall a. STM a
retry
Just (ActiveConnection Int
_ Connection
connection) ->
IO (Either UsageError a) -> STM (IO (Either UsageError a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection -> IO (Either UsageError a)
useConnectionThenPutItToQueue Connection
connection)
else IO (Either UsageError a) -> STM (IO (Either UsageError a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either UsageError a -> IO (Either UsageError a)
forall (m :: * -> *) a. Monad m => a -> m a
return (UsageError -> Either UsageError a
forall a b. a -> Either a b
Left UsageError
PoolIsReleasedUsageError))
where
acquireConnectionThenUseThenPutItToQueue :: IO (Either UsageError a)
acquireConnectionThenUseThenPutItToQueue =
do
Either ConnectionError Connection
res <- Settings -> IO (Either ConnectionError Connection)
Connection.acquire Settings
connectionSettings
case Either ConnectionError Connection
res of
Left ConnectionError
acquisitionError -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
slotsAvailVar Int -> Int
forall a. Enum a => a -> a
succ
Either UsageError a -> IO (Either UsageError a)
forall (m :: * -> *) a. Monad m => a -> m a
return (UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (ConnectionError -> UsageError
ConnectionUsageError ConnectionError
acquisitionError))
Right Connection
connection ->
Connection -> IO (Either UsageError a)
useConnectionThenPutItToQueue Connection
connection
useConnectionThenPutItToQueue :: Connection -> IO (Either UsageError a)
useConnectionThenPutItToQueue Connection
connection =
do
Either QueryError a
res <- Session a -> Connection -> IO (Either QueryError a)
forall a. Session a -> Connection -> IO (Either QueryError a)
Session.run Session a
session Connection
connection
case Either QueryError a
res of
Left QueryError
queryError -> do
case QueryError
queryError of
Session.QueryError Settings
_ [Text]
_ (Session.ClientError ConnectionError
_) ->
Connection -> IO ()
releaseConnection Connection
connection
QueryError
_ ->
Connection -> IO ()
putConnectionToPool Connection
connection
Either UsageError a -> IO (Either UsageError a)
forall (m :: * -> *) a. Monad m => a -> m a
return (UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (QueryError -> UsageError
SessionUsageError QueryError
queryError))
Right a
res -> do
Connection -> IO ()
putConnectionToPool Connection
connection
Either UsageError a -> IO (Either UsageError a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Either UsageError a
forall a b. b -> Either a b
Right a
res)
putConnectionToPool :: Connection -> IO ()
putConnectionToPool Connection
connection =
do
Int
ts <- IO Int
getMillisecondsSinceEpoch
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue ActiveConnection -> ActiveConnection -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue ActiveConnection
establishedQueue (Int -> Connection -> ActiveConnection
ActiveConnection Int
ts Connection
connection)
releaseConnection :: Connection -> IO ()
releaseConnection Connection
connection =
do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
slotsAvailVar Int -> Int
forall a. Enum a => a -> a
succ
Connection -> IO ()
Connection.release Connection
connection