module Polysemy.Hasql.Interpreter.DbConnectionPool where

import Conc (interpretAtomic)
import Control.Concurrent (ThreadId, myThreadId, throwTo)
import qualified Data.Map.Strict as Map
import Data.Map.Strict ((!?))
import qualified Data.Sequence as Seq
import Data.Sequence (Seq ((:<|)), (<|))
import Exon (exon)
import qualified Hasql.Connection as Connection
import Hasql.Connection (Connection)
import Lens.Micro.Extras (view)
import qualified Log
import Polysemy.Db.Data.DbConfig (DbConfig (DbConfig))
import qualified Polysemy.Db.Data.DbConnectionError as DbConnectionError
import Polysemy.Db.Data.DbConnectionError (DbConnectionError)
import Polysemy.Db.Data.DbHost (DbHost (DbHost))
import Polysemy.Db.Data.DbName (DbName (DbName))
import Polysemy.Db.Data.DbPassword (DbPassword (DbPassword))
import Polysemy.Db.Data.DbUser (DbUser (DbUser))
import qualified Text.Show as Show

import Polysemy.Hasql.Data.ConnectionTag (ConnectionTag)
import Polysemy.Hasql.Effect.DbConnectionPool (DbConnectionPool (Acquire, Config, Free, Kill, Release, UnsafeGet, Use))

data KillCommand =
  KillCommand
  deriving stock (Int -> KillCommand -> ShowS
[KillCommand] -> ShowS
KillCommand -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [KillCommand] -> ShowS
$cshowList :: [KillCommand] -> ShowS
show :: KillCommand -> String
$cshow :: KillCommand -> String
showsPrec :: Int -> KillCommand -> ShowS
$cshowsPrec :: Int -> KillCommand -> ShowS
Show)
  deriving anyclass (Show KillCommand
Typeable KillCommand
SomeException -> Maybe KillCommand
KillCommand -> String
KillCommand -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> Exception e
displayException :: KillCommand -> String
$cdisplayException :: KillCommand -> String
fromException :: SomeException -> Maybe KillCommand
$cfromException :: SomeException -> Maybe KillCommand
toException :: KillCommand -> SomeException
$ctoException :: KillCommand -> SomeException
Exception)

newtype PoolConn =
  PoolConn { PoolConn -> Connection
unPoolConn :: Connection }
  deriving stock (forall x. Rep PoolConn x -> PoolConn
forall x. PoolConn -> Rep PoolConn x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep PoolConn x -> PoolConn
$cfrom :: forall x. PoolConn -> Rep PoolConn x
Generic)

instance Show PoolConn where
  show :: PoolConn -> String
show PoolConn
_ = String
"PoolConn"

data ConnectionClients =
  ConnectionClients {
    ConnectionClients -> PoolConn
connection :: PoolConn,
    ConnectionClients -> Map ThreadId Int
clients :: Map ThreadId Int
  }
  deriving stock (Int -> ConnectionClients -> ShowS
[ConnectionClients] -> ShowS
ConnectionClients -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConnectionClients] -> ShowS
$cshowList :: [ConnectionClients] -> ShowS
show :: ConnectionClients -> String
$cshow :: ConnectionClients -> String
showsPrec :: Int -> ConnectionClients -> ShowS
$cshowsPrec :: Int -> ConnectionClients -> ShowS
Show, forall x. Rep ConnectionClients x -> ConnectionClients
forall x. ConnectionClients -> Rep ConnectionClients x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep ConnectionClients x -> ConnectionClients
$cfrom :: forall x. ConnectionClients -> Rep ConnectionClients x
Generic)

data Pools =
  Pools {
    Pools -> Maybe Int
maxActive :: Maybe Int,
    Pools -> Maybe Int
maxAvailable :: Maybe Int,
    Pools -> Map ConnectionTag ConnectionClients
active :: Map ConnectionTag ConnectionClients,
    Pools -> Seq PoolConn
available :: Seq PoolConn
  }
  deriving stock (Int -> Pools -> ShowS
[Pools] -> ShowS
Pools -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Pools] -> ShowS
$cshowList :: [Pools] -> ShowS
show :: Pools -> String
$cshow :: Pools -> String
showsPrec :: Int -> Pools -> ShowS
$cshowsPrec :: Int -> Pools -> ShowS
Show, forall x. Rep Pools x -> Pools
forall x. Pools -> Rep Pools x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep Pools x -> Pools
$cfrom :: forall x. Pools -> Rep Pools x
Generic)

connectionSettings ::
  DbConfig ->
  Connection.Settings
connectionSettings :: DbConfig -> ByteString
connectionSettings (DbConfig (DbHost Text
host) DbPort
port (DbName Text
dbName) (DbUser Text
user) (DbPassword Text
password)) =
  ByteString
-> Word16 -> ByteString -> ByteString -> ByteString -> ByteString
Connection.settings (forall a b. ConvertUtf8 a b => a -> b
encodeUtf8 Text
host) (forall a b. (Integral a, Num b) => a -> b
fromIntegral DbPort
port) (forall a b. ConvertUtf8 a b => a -> b
encodeUtf8 Text
user) (forall a b. ConvertUtf8 a b => a -> b
encodeUtf8 Text
password) (forall a b. ConvertUtf8 a b => a -> b
encodeUtf8 Text
dbName)

dbError :: Maybe ByteString -> DbConnectionError
dbError :: Maybe ByteString -> DbConnectionError
dbError Maybe ByteString
err =
  Text -> DbConnectionError
DbConnectionError.Acquire (forall b a. b -> (a -> b) -> Maybe a -> b
maybe Text
"unspecified error" forall a b. ConvertUtf8 a b => b -> a
decodeUtf8 Maybe ByteString
err)

withActive ::
  Member (AtomicState Pools) r =>
  (Int -> Map ConnectionTag ConnectionClients -> Sem r a) ->
  Sem r (Maybe a)
withActive :: forall (r :: EffectRow) a.
Member (AtomicState Pools) r =>
(Int -> Map ConnectionTag ConnectionClients -> Sem r a)
-> Sem r (Maybe a)
withActive Int -> Map ConnectionTag ConnectionClients -> Sem r a
f =
  forall s (r :: EffectRow). Member (AtomicState s) r => Sem r s
atomicGet forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Pools {$sel:maxActive:Pools :: Pools -> Maybe Int
maxActive = Just Int
ma, Maybe Int
Map ConnectionTag ConnectionClients
Seq PoolConn
available :: Seq PoolConn
active :: Map ConnectionTag ConnectionClients
maxAvailable :: Maybe Int
$sel:available:Pools :: Pools -> Seq PoolConn
$sel:active:Pools :: Pools -> Map ConnectionTag ConnectionClients
$sel:maxAvailable:Pools :: Pools -> Maybe Int
..} ->
      forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> Map ConnectionTag ConnectionClients -> Sem r a
f Int
ma Map ConnectionTag ConnectionClients
active
    Pools
_ ->
      forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing

acquireNative ::
  Members [Stop DbConnectionError, Embed IO] r =>
  DbConfig ->
  Sem r Connection
acquireNative :: forall (r :: EffectRow).
Members '[Stop DbConnectionError, Embed IO] r =>
DbConfig -> Sem r Connection
acquireNative DbConfig
dbConfig = do
  Either (Maybe ByteString) Connection
conn <- forall err (r :: EffectRow) a.
Members '[Stop err, Embed IO] r =>
(Text -> err) -> IO a -> Sem r a
stopTryIOError Text -> DbConnectionError
DbConnectionError.Acquire (ByteString -> IO (Either (Maybe ByteString) Connection)
Connection.acquire (DbConfig -> ByteString
connectionSettings DbConfig
dbConfig))
  forall err (r :: EffectRow) a.
Member (Stop err) r =>
Either err a -> Sem r a
stopEither (forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first Maybe ByteString -> DbConnectionError
dbError Either (Maybe ByteString) Connection
conn)

acquire ::
  Members [AtomicState Pools, Stop DbConnectionError, Embed IO] r =>
  DbConfig ->
  ConnectionTag ->
  Sem r Connection
acquire :: forall (r :: EffectRow).
Members '[AtomicState Pools, Stop DbConnectionError, Embed IO] r =>
DbConfig -> ConnectionTag -> Sem r Connection
acquire DbConfig
dbConfig ConnectionTag
ctag = do
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (r :: EffectRow) a.
Member (AtomicState Pools) r =>
(Int -> Map ConnectionTag ConnectionClients -> Sem r a)
-> Sem r (Maybe a)
withActive \ Int
m Map ConnectionTag ConnectionClients
act ->
    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall k a. Map k a -> Int
Map.size Map ConnectionTag ConnectionClients
act forall a. Ord a => a -> a -> Bool
>= Int
m) (forall e (r :: EffectRow) a. Member (Stop e) r => e -> Sem r a
stop (Text -> DbConnectionError
DbConnectionError.Limit [exon|Too many active connections: #{show m}|]))
  Connection
conn <- forall (r :: EffectRow).
Members '[Stop DbConnectionError, Embed IO] r =>
DbConfig -> Sem r Connection
acquireNative DbConfig
dbConfig
  Connection
conn forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
atomicModify' (forall a. IsLabel "active" a => a
#active forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall m. At m => Index m -> Lens' m (Maybe (IxValue m))
at ConnectionTag
ctag forall s t a b. ASetter s t a (Maybe b) -> b -> s -> t
?~ PoolConn -> Map ThreadId Int -> ConnectionClients
ConnectionClients (Connection -> PoolConn
PoolConn Connection
conn) forall a. Monoid a => a
mempty)

reuseOrAcquire ::
  Members [AtomicState Pools, Stop DbConnectionError, Log, Embed IO] r =>
  DbConfig ->
  ConnectionTag ->
  Sem r Connection
reuseOrAcquire :: forall (r :: EffectRow).
Members
  '[AtomicState Pools, Stop DbConnectionError, Log, Embed IO] r =>
DbConfig -> ConnectionTag -> Sem r Connection
reuseOrAcquire DbConfig
dbConfig ConnectionTag
ctag = do
  Maybe Connection
reuse <- forall s a (r :: EffectRow).
Member (AtomicState s) r =>
(s -> (s, a)) -> Sem r a
atomicState' \ pools :: Pools
pools@Pools {Maybe Int
Map ConnectionTag ConnectionClients
Seq PoolConn
available :: Seq PoolConn
active :: Map ConnectionTag ConnectionClients
maxAvailable :: Maybe Int
maxActive :: Maybe Int
$sel:available:Pools :: Pools -> Seq PoolConn
$sel:active:Pools :: Pools -> Map ConnectionTag ConnectionClients
$sel:maxAvailable:Pools :: Pools -> Maybe Int
$sel:maxActive:Pools :: Pools -> Maybe Int
..} ->
    case Map ConnectionTag ConnectionClients
active forall k a. Ord k => Map k a -> k -> Maybe a
!? ConnectionTag
ctag of
      Just (ConnectionClients (PoolConn Connection
conn) Map ThreadId Int
_) ->
        (Pools
pools, forall a. a -> Maybe a
Just Connection
conn)
      Maybe ConnectionClients
Nothing ->
        case Seq PoolConn
available of
          PoolConn Connection
conn :<| Seq PoolConn
rest ->
            (Pools
pools {$sel:available:Pools :: Seq PoolConn
available = Seq PoolConn
rest}, forall a. a -> Maybe a
Just Connection
conn)
          Seq PoolConn
_ ->
            (Pools
pools, forall a. Maybe a
Nothing)
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. Maybe a -> Bool
isJust Maybe Connection
reuse) do
    forall (r :: EffectRow).
(HasCallStack, Member Log r) =>
Text -> Sem r ()
Log.trace [exon|Reusing connection for '##{ctag}'|]
  forall (m :: * -> *) a. Applicative m => m a -> Maybe a -> m a
fromMaybeA (forall (r :: EffectRow).
Members '[AtomicState Pools, Stop DbConnectionError, Embed IO] r =>
DbConfig -> ConnectionTag -> Sem r Connection
acquire DbConfig
dbConfig ConnectionTag
ctag) Maybe Connection
reuse

releaseNative ::
  Members [Stop DbConnectionError, Embed IO] r =>
  Connection ->
  Sem r ()
releaseNative :: forall (r :: EffectRow).
Members '[Stop DbConnectionError, Embed IO] r =>
Connection -> Sem r ()
releaseNative Connection
connection =
  forall err (r :: EffectRow) a.
Members '[Stop err, Embed IO] r =>
(Text -> err) -> IO a -> Sem r a
stopTryIOError Text -> DbConnectionError
DbConnectionError.Release (Connection -> IO ()
Connection.release Connection
connection)

release ::
  Members [AtomicState Pools, Stop DbConnectionError, Embed IO] r =>
  ConnectionTag ->
  Sem r ()
release :: forall (r :: EffectRow).
Members '[AtomicState Pools, Stop DbConnectionError, Embed IO] r =>
ConnectionTag -> Sem r ()
release ConnectionTag
ctag = do
  Maybe ConnectionClients
conn <- forall s a (r :: EffectRow).
Member (AtomicState s) r =>
(s -> (s, a)) -> Sem r a
atomicState' \ Pools {Maybe Int
Map ConnectionTag ConnectionClients
Seq PoolConn
available :: Seq PoolConn
active :: Map ConnectionTag ConnectionClients
maxAvailable :: Maybe Int
maxActive :: Maybe Int
$sel:available:Pools :: Pools -> Seq PoolConn
$sel:active:Pools :: Pools -> Map ConnectionTag ConnectionClients
$sel:maxAvailable:Pools :: Pools -> Maybe Int
$sel:maxActive:Pools :: Pools -> Maybe Int
..} -> (Pools {$sel:active:Pools :: Map ConnectionTag ConnectionClients
active = forall k a. Ord k => k -> Map k a -> Map k a
Map.delete ConnectionTag
ctag Map ConnectionTag ConnectionClients
active, Maybe Int
Seq PoolConn
available :: Seq PoolConn
maxAvailable :: Maybe Int
maxActive :: Maybe Int
$sel:available:Pools :: Seq PoolConn
$sel:maxAvailable:Pools :: Maybe Int
$sel:maxActive:Pools :: Maybe Int
..}, Map ConnectionTag ConnectionClients
active forall k a. Ord k => Map k a -> k -> Maybe a
!? ConnectionTag
ctag)
  forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (forall (r :: EffectRow).
Members '[Stop DbConnectionError, Embed IO] r =>
Connection -> Sem r ()
releaseNative forall b c a. (b -> c) -> (a -> b) -> a -> c
. coerce :: forall a b. Coercible a b => a -> b
coerce forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionClients -> PoolConn
connection) Maybe ConnectionClients
conn

-- | Remove the connection used by @ctag@ from the active pool if it exists.
-- Store it for reuse if @maxAvailable@ is @Nothing@ or larger than the currently stored number, otherwise return the
-- connection for release.
removeActive :: ConnectionTag -> Pools -> (Pools, Maybe Connection)
removeActive :: ConnectionTag -> Pools -> (Pools, Maybe Connection)
removeActive ConnectionTag
ctag Pools {Maybe Int
Map ConnectionTag ConnectionClients
Seq PoolConn
available :: Seq PoolConn
active :: Map ConnectionTag ConnectionClients
maxAvailable :: Maybe Int
maxActive :: Maybe Int
$sel:available:Pools :: Pools -> Seq PoolConn
$sel:active:Pools :: Pools -> Map ConnectionTag ConnectionClients
$sel:maxAvailable:Pools :: Pools -> Maybe Int
$sel:maxActive:Pools :: Pools -> Maybe Int
..} =
  (Pools {$sel:active:Pools :: Map ConnectionTag ConnectionClients
active = Map ConnectionTag ConnectionClients
newActive, $sel:available:Pools :: Seq PoolConn
available = Seq PoolConn
newAvailable, Maybe Int
maxAvailable :: Maybe Int
maxActive :: Maybe Int
$sel:maxAvailable:Pools :: Maybe Int
$sel:maxActive:Pools :: Maybe Int
..}, coerce :: forall a b. Coercible a b => a -> b
coerce Maybe PoolConn
toRelease)
  where
    -- Chooses the functor @(Maybe Connection, -)@ for @alterF@, thereby returning the potentially existing element.
    -- The @Nothing@ causes the element to be deleted if it exists.
    (Maybe ConnectionClients
conn, Map ConnectionTag ConnectionClients
newActive) = forall (f :: * -> *) k a.
(Functor f, Ord k) =>
(Maybe a -> f (Maybe a)) -> k -> Map k a -> f (Map k a)
Map.alterF (,forall a. Maybe a
Nothing) ConnectionTag
ctag Map ConnectionTag ConnectionClients
active

    (Maybe PoolConn
toRelease, Seq PoolConn
newAvailable) = case Maybe ConnectionClients
conn of
      Just (ConnectionClients PoolConn
c Map ThreadId Int
_)
        | Bool
keep -> (forall a. Maybe a
Nothing, PoolConn
c forall a. a -> Seq a -> Seq a
<| Seq PoolConn
available)
        | Bool
otherwise -> (forall a. a -> Maybe a
Just PoolConn
c, Seq PoolConn
available)
      Maybe ConnectionClients
Nothing -> (forall a. Maybe a
Nothing, Seq PoolConn
available)

    keep :: Bool
keep = case Maybe Int
maxAvailable of
      Maybe Int
Nothing -> Bool
True
      Just Int
m -> forall a. Seq a -> Int
Seq.length Seq PoolConn
available forall a. Ord a => a -> a -> Bool
< Int
m

catchingKill ::
  Members [Stop DbConnectionError, Final IO] r =>
  Sem r a ->
  Sem r a
catchingKill :: forall (r :: EffectRow) a.
Members '[Stop DbConnectionError, Final IO] r =>
Sem r a -> Sem r a
catchingKill =
  forall err (r :: EffectRow) a.
Member (Stop err) r =>
Sem (Error err : r) a -> Sem r a
stopOnError forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e1 e2 (r :: EffectRow) a.
Member (Error e2) r =>
(e1 -> e2) -> Sem (Error e1 : r) a -> Sem r a
mapError KillCommand -> DbConnectionError
exception forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e (r :: EffectRow) a.
(Exception e, Member (Error e) r, Member (Final IO) r) =>
Sem r a -> Sem r a
fromExceptionSem forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (e2 :: (* -> *) -> * -> *) (e1 :: (* -> *) -> * -> *)
       (r :: EffectRow) a.
Sem (e1 : r) a -> Sem (e1 : e2 : r) a
raiseUnder forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Sem r a -> Sem (e : r) a
raise
  where
    exception :: KillCommand -> DbConnectionError
exception KillCommand
KillCommand =
      Text -> DbConnectionError
DbConnectionError.Query Text
"command was interrupted by DbConnectionPool.Kill"

-- Incrementing the tid must not be masked or it might not be cleaned up.
withRegisteredClient ::
  Members [AtomicState Pools, Stop DbConnectionError, Resource, Embed IO, Final IO] r =>
  ConnectionTag ->
  Sem r a ->
  Sem r a
withRegisteredClient :: forall (r :: EffectRow) a.
Members
  '[AtomicState Pools, Stop DbConnectionError, Resource, Embed IO,
    Final IO]
  r =>
ConnectionTag -> Sem r a -> Sem r a
withRegisteredClient ConnectionTag
ctag Sem r a
main = do
  ThreadId
tid <- forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed IO ThreadId
myThreadId
  forall (r :: EffectRow) a b.
Member Resource r =>
Sem r a -> Sem r b -> Sem r a
finally
    do
      forall (r :: EffectRow) a.
Members '[Stop DbConnectionError, Final IO] r =>
Sem r a -> Sem r a
catchingKill do
        ThreadId -> (Maybe Int -> Maybe Int) -> Sem r ()
change ThreadId
tid Maybe Int -> Maybe Int
increment
        Sem r a
main
    do
      ThreadId -> (Maybe Int -> Maybe Int) -> Sem r ()
change ThreadId
tid Maybe Int -> Maybe Int
decrement
  where
    change :: ThreadId -> (Maybe Int -> Maybe Int) -> Sem r ()
change ThreadId
tid Maybe Int -> Maybe Int
f =
      forall s (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s) -> Sem r ()
atomicModify' (forall a. IsLabel "active" a => a
#active forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall m. At m => Index m -> Lens' m (Maybe (IxValue m))
at ConnectionTag
ctag forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
%~ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a. IsLabel "clients" a => a
#clients forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
%~ forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter Maybe Int -> Maybe Int
f ThreadId
tid))
    increment :: Maybe Int -> Maybe Int
increment = \case
      Just Int
n -> forall a. a -> Maybe a
Just (Int
n forall a. Num a => a -> a -> a
+ Int
1)
      Maybe Int
Nothing -> forall a. a -> Maybe a
Just Int
1
    decrement :: Maybe Int -> Maybe Int
decrement = \case
      Just Int
1 -> forall a. Maybe a
Nothing
      Just Int
n -> forall a. a -> Maybe a
Just (Int
n forall a. Num a => a -> a -> a
- Int
1)
      Maybe Int
Nothing -> forall a. Maybe a
Nothing

releaseAll ::
  Members [AtomicState Pools, Log, Resource, Embed IO, Final IO] r =>
  Sem r ()
releaseAll :: forall (r :: EffectRow).
Members
  '[AtomicState Pools, Log, Resource, Embed IO, Final IO] r =>
Sem r ()
releaseAll =
  forall s (r :: EffectRow). Member (AtomicState s) r => Sem r s
atomicGet forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ Pools {Map ConnectionTag ConnectionClients
active :: Map ConnectionTag ConnectionClients
$sel:active:Pools :: Pools -> Map ConnectionTag ConnectionClients
active, Seq PoolConn
available :: Seq PoolConn
$sel:available:Pools :: Pools -> Seq PoolConn
available} -> do
    forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (forall k a. Map k a -> [a]
Map.elems Map ConnectionTag ConnectionClients
active) \ (ConnectionClients PoolConn
conn Map ThreadId Int
_) ->
      forall {r :: EffectRow}.
(Member (Embed IO) r, Member Log r) =>
PoolConn -> Sem r ()
releaseOrLog PoolConn
conn
    forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ forall {r :: EffectRow}.
(Member (Embed IO) r, Member Log r) =>
PoolConn -> Sem r ()
releaseOrLog Seq PoolConn
available
  where
    releaseOrLog :: PoolConn -> Sem r ()
releaseOrLog (PoolConn Connection
conn) =
      forall err (r :: EffectRow) a.
Sem (Stop err : r) a -> Sem r (Either err a)
runStop (forall (r :: EffectRow).
Members '[Stop DbConnectionError, Embed IO] r =>
Connection -> Sem r ()
releaseNative Connection
conn) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (m :: * -> *) a b.
Applicative m =>
(a -> m b) -> Either a b -> m b
leftA \ DbConnectionError
e ->
        forall (r :: EffectRow).
(HasCallStack, Member Log r) =>
Text -> Sem r ()
Log.error [exon|Releasing connection failed: #{show e}|]

handleDbConnectionPool ::
  Members [AtomicState Pools, Stop DbConnectionError, Log, Resource, Embed IO, Final IO] r =>
  DbConfig ->
  DbConnectionPool m a ->
  Tactical e m r a
handleDbConnectionPool :: forall (r :: EffectRow) (m :: * -> *) a (e :: (* -> *) -> * -> *).
Members
  '[AtomicState Pools, Stop DbConnectionError, Log, Resource,
    Embed IO, Final IO]
  r =>
DbConfig -> DbConnectionPool m a -> Tactical e m r a
handleDbConnectionPool DbConfig
dbConfig = \case
  Acquire ConnectionTag
ctag -> do
    forall (r :: EffectRow).
(HasCallStack, Member Log r) =>
Text -> Sem r ()
Log.trace [exon|Acquiring connection '##{ctag}'|]
    forall (f :: * -> *) a (e :: (* -> *) -> * -> *) (m :: * -> *)
       (r :: EffectRow).
Functor f =>
a -> Sem (WithTactics e f m r) (f a)
pureT forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (r :: EffectRow).
Members
  '[AtomicState Pools, Stop DbConnectionError, Log, Embed IO] r =>
DbConfig -> ConnectionTag -> Sem r Connection
reuseOrAcquire DbConfig
dbConfig ConnectionTag
ctag
  Free ConnectionTag
ctag -> do
    forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ forall (r :: EffectRow).
Members '[Stop DbConnectionError, Embed IO] r =>
Connection -> Sem r ()
releaseNative forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall s a (r :: EffectRow).
Member (AtomicState s) r =>
(s -> (s, a)) -> Sem r a
atomicState' (ConnectionTag -> Pools -> (Pools, Maybe Connection)
removeActive ConnectionTag
ctag)
    forall (f :: * -> *) (e :: (* -> *) -> * -> *) (m :: * -> *)
       (r :: EffectRow).
Functor f =>
Sem (WithTactics e f m r) (f ())
unitT
  Release ConnectionTag
ctag -> do
    forall (r :: EffectRow).
(HasCallStack, Member Log r) =>
Text -> Sem r ()
Log.trace [exon|Releasing connection '##{ctag}'|]
    forall (f :: * -> *) a (e :: (* -> *) -> * -> *) (m :: * -> *)
       (r :: EffectRow).
Functor f =>
a -> Sem (WithTactics e f m r) (f a)
pureT forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (r :: EffectRow).
Members '[AtomicState Pools, Stop DbConnectionError, Embed IO] r =>
ConnectionTag -> Sem r ()
release ConnectionTag
ctag
  Use ConnectionTag
ctag m a
ma ->
    forall (r :: EffectRow) a.
Members
  '[AtomicState Pools, Stop DbConnectionError, Resource, Embed IO,
    Final IO]
  r =>
ConnectionTag -> Sem r a -> Sem r a
withRegisteredClient ConnectionTag
ctag (forall (m :: * -> *) a (e :: (* -> *) -> * -> *) (r :: EffectRow).
m a -> Tactical e m r a
runTSimple m a
ma)
  Kill ConnectionTag
ctag -> do
    ThreadId
cur <- forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed IO ThreadId
myThreadId
    forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
atomicGets (forall a s. Getting a s a -> s -> a
view (forall a. IsLabel "active" a => a
#active forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall m. At m => Index m -> Lens' m (Maybe (IxValue m))
at ConnectionTag
ctag)) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ \ (ConnectionClients PoolConn
_ Map ThreadId Int
clients) -> do
      forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (forall k a. Map k a -> [k]
Map.keys Map ThreadId Int
clients) \ ThreadId
c ->
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ThreadId
cur forall a. Eq a => a -> a -> Bool
== ThreadId
c) (forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
c KillCommand
KillCommand))
    forall (f :: * -> *) a (e :: (* -> *) -> * -> *) (m :: * -> *)
       (r :: EffectRow).
Functor f =>
a -> Sem (WithTactics e f m r) (f a)
pureT forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (r :: EffectRow).
Members '[AtomicState Pools, Stop DbConnectionError, Embed IO] r =>
ConnectionTag -> Sem r ()
release ConnectionTag
ctag
  UnsafeGet ConnectionTag
ctag ->
    forall (f :: * -> *) a (e :: (* -> *) -> * -> *) (m :: * -> *)
       (r :: EffectRow).
Functor f =>
a -> Sem (WithTactics e f m r) (f a)
pureT forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (coerce :: forall a b. Coercible a b => a -> b
coerce forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionClients -> PoolConn
connection) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall s s' (r :: EffectRow).
Member (AtomicState s) r =>
(s -> s') -> Sem r s'
atomicGets (forall a s. Getting a s a -> s -> a
view (forall a. IsLabel "active" a => a
#active forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall m. At m => Index m -> Lens' m (Maybe (IxValue m))
at ConnectionTag
ctag))
  DbConnectionPool m a
Config ->
    forall (f :: * -> *) a (e :: (* -> *) -> * -> *) (m :: * -> *)
       (r :: EffectRow).
Functor f =>
a -> Sem (WithTactics e f m r) (f a)
pureT DbConfig
dbConfig

interpretDbConnectionPool ::
  Members [Log, Resource, Embed IO, Final IO] r =>
  DbConfig ->
  Maybe Int ->
  Maybe Int ->
  InterpreterFor (DbConnectionPool !! DbConnectionError) r
interpretDbConnectionPool :: forall (r :: EffectRow).
Members '[Log, Resource, Embed IO, Final IO] r =>
DbConfig
-> Maybe Int
-> Maybe Int
-> InterpreterFor (DbConnectionPool !! DbConnectionError) r
interpretDbConnectionPool DbConfig
dbConfig Maybe Int
maxActive Maybe Int
maxAvailable =
  forall a (r :: EffectRow).
Member (Embed IO) r =>
a -> InterpreterFor (AtomicState a) r
interpretAtomic (Maybe Int
-> Maybe Int
-> Map ConnectionTag ConnectionClients
-> Seq PoolConn
-> Pools
Pools Maybe Int
maxActive Maybe Int
maxAvailable forall a. Monoid a => a
mempty forall a. Monoid a => a
mempty) forall b c a. (b -> c) -> (a -> b) -> a -> c
.
  forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (r :: EffectRow) a b.
Member Resource r =>
Sem r a -> Sem r b -> Sem r a
finally forall (r :: EffectRow).
Members
  '[AtomicState Pools, Log, Resource, Embed IO, Final IO] r =>
Sem r ()
releaseAll forall b c a. (b -> c) -> (a -> b) -> a -> c
.
  forall err (eff :: (* -> *) -> * -> *) (r :: EffectRow).
(forall x (r0 :: EffectRow).
 eff (Sem r0) x
 -> Tactical (Resumable err eff) (Sem r0) (Stop err : r) x)
-> InterpreterFor (Resumable err eff) r
interpretResumableH (forall (r :: EffectRow) (m :: * -> *) a (e :: (* -> *) -> * -> *).
Members
  '[AtomicState Pools, Stop DbConnectionError, Log, Resource,
    Embed IO, Final IO]
  r =>
DbConfig -> DbConnectionPool m a -> Tactical e m r a
handleDbConnectionPool DbConfig
dbConfig) forall b c a. (b -> c) -> (a -> b) -> a -> c
.
  forall (e2 :: (* -> *) -> * -> *) (e1 :: (* -> *) -> * -> *)
       (r :: EffectRow) a.
Sem (e1 : r) a -> Sem (e1 : e2 : r) a
raiseUnder

handleDbConnectionPoolSingle ::
  Members [AtomicState (Maybe Connection), Stop DbConnectionError, Embed IO] r =>
  DbConfig ->
  DbConnectionPool m a ->
  Tactical e m r a
handleDbConnectionPoolSingle :: forall (r :: EffectRow) (m :: * -> *) a (e :: (* -> *) -> * -> *).
Members
  '[AtomicState (Maybe Connection), Stop DbConnectionError, Embed IO]
  r =>
DbConfig -> DbConnectionPool m a -> Tactical e m r a
handleDbConnectionPoolSingle DbConfig
dbConfig = \case
  Acquire ConnectionTag
_ -> do
    let
      acquireSingle :: Sem (WithTactics e f m r) Connection
acquireSingle = do
        Connection
c <- forall (r :: EffectRow).
Members '[Stop DbConnectionError, Embed IO] r =>
DbConfig -> Sem r Connection
acquireNative DbConfig
dbConfig
        Connection
c forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall s (r :: EffectRow).
Member (AtomicState s) r =>
s -> Sem r ()
atomicPut (forall a. a -> Maybe a
Just Connection
c)
    forall (f :: * -> *) a (e :: (* -> *) -> * -> *) (m :: * -> *)
       (r :: EffectRow).
Functor f =>
a -> Sem (WithTactics e f m r) (f a)
pureT forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (m :: * -> *) a. Applicative m => m a -> Maybe a -> m a
fromMaybeA Sem (WithTactics e f m r) Connection
acquireSingle forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall s (r :: EffectRow). Member (AtomicState s) r => Sem r s
atomicGet
  Free ConnectionTag
_ ->
    forall (f :: * -> *) (e :: (* -> *) -> * -> *) (m :: * -> *)
       (r :: EffectRow).
Functor f =>
Sem (WithTactics e f m r) (f ())
unitT
  -- TODO this should be called from outside of the DbConnection scope interpreter only
  Release ConnectionTag
_ -> do
    forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ forall (r :: EffectRow).
Members '[Stop DbConnectionError, Embed IO] r =>
Connection -> Sem r ()
releaseNative forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall s (r :: EffectRow). Member (AtomicState s) r => Sem r s
atomicGet
    forall (f :: * -> *) (e :: (* -> *) -> * -> *) (m :: * -> *)
       (r :: EffectRow).
Functor f =>
Sem (WithTactics e f m r) (f ())
unitT
  -- TODO maybe not very useful but possible
  Use ConnectionTag
_ m a
ma ->
    forall (m :: * -> *) a (e :: (* -> *) -> * -> *) (r :: EffectRow).
m a -> Tactical e m r a
runTSimple m a
ma
  Kill ConnectionTag
_ ->
    forall (f :: * -> *) (e :: (* -> *) -> * -> *) (m :: * -> *)
       (r :: EffectRow).
Functor f =>
Sem (WithTactics e f m r) (f ())
unitT
  UnsafeGet ConnectionTag
_ ->
    forall (f :: * -> *) a (e :: (* -> *) -> * -> *) (m :: * -> *)
       (r :: EffectRow).
Functor f =>
a -> Sem (WithTactics e f m r) (f a)
pureT forall a. Maybe a
Nothing
  DbConnectionPool m a
Config ->
    forall (f :: * -> *) a (e :: (* -> *) -> * -> *) (m :: * -> *)
       (r :: EffectRow).
Functor f =>
a -> Sem (WithTactics e f m r) (f a)
pureT DbConfig
dbConfig

interpretDbConnectionPoolSingle ::
  Member (Embed IO) r =>
  DbConfig ->
  InterpreterFor (DbConnectionPool !! DbConnectionError) r
interpretDbConnectionPoolSingle :: forall (r :: EffectRow).
Member (Embed IO) r =>
DbConfig
-> InterpreterFor (DbConnectionPool !! DbConnectionError) r
interpretDbConnectionPoolSingle DbConfig
dbConfig =
  forall a (r :: EffectRow).
Member (Embed IO) r =>
a -> InterpreterFor (AtomicState a) r
interpretAtomic forall a. Maybe a
Nothing forall b c a. (b -> c) -> (a -> b) -> a -> c
.
  forall err (eff :: (* -> *) -> * -> *) (r :: EffectRow).
(forall x (r0 :: EffectRow).
 eff (Sem r0) x
 -> Tactical (Resumable err eff) (Sem r0) (Stop err : r) x)
-> InterpreterFor (Resumable err eff) r
interpretResumableH (forall (r :: EffectRow) (m :: * -> *) a (e :: (* -> *) -> * -> *).
Members
  '[AtomicState (Maybe Connection), Stop DbConnectionError, Embed IO]
  r =>
DbConfig -> DbConnectionPool m a -> Tactical e m r a
handleDbConnectionPoolSingle DbConfig
dbConfig) forall b c a. (b -> c) -> (a -> b) -> a -> c
.
  forall (e2 :: (* -> *) -> * -> *) (e1 :: (* -> *) -> * -> *)
       (r :: EffectRow) a.
Sem (e1 : r) a -> Sem (e1 : e2 : r) a
raiseUnder