module Polysemy.Hasql.Queue.Input where

import Conc (
  ClockSkewConfig,
  Monitor,
  Restart,
  RestartingMonitor,
  interpretAtomic,
  interpretMonitorRestart,
  monitor,
  monitorClockSkew,
  restart,
  )
import Control.Concurrent (threadWaitRead)
import qualified Control.Concurrent.Async as Concurrent
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBMQueue (TBMQueue, closeTBMQueue, newTBMQueueIO, readTBMQueue, writeTBMQueue)
import Control.Exception (IOException)
import Control.Monad.Trans.Maybe (MaybeT (MaybeT), runMaybeT)
import qualified Data.List.NonEmpty as NonEmpty
import qualified Data.UUID as UUID
import Data.UUID (UUID)
import qualified Database.PostgreSQL.LibPQ as LibPQ
import Exon (exon)
import Hasql.Connection (Connection, withLibPQConnection)
import qualified Polysemy.Db.Data.DbConnectionError as DbConnectionError
import qualified Polysemy.Db.Data.DbError as DbError
import Polysemy.Db.Data.DbError (DbError)
import qualified Polysemy.Db.Effect.Store as Store
import Polysemy.Db.Effect.Store (Store)
import Polysemy.Final (withWeavingToFinal)
import Polysemy.Input (Input (Input))
import qualified Log
import qualified Time as Time
import Prelude hiding (Queue, listen)
import Sqel.Data.Sql (sql)
import qualified Sqel.Data.Uid as Uid
import Sqel.Data.Uid (Uuid)
import Sqel.SOP.Constraint (symbolText)
import Torsor (Torsor)

import Polysemy.Hasql.Data.ConnectionTag (ConnectionTag (NamedTag))
import Polysemy.Hasql.Data.InitDb (InitDb (InitDb))
import qualified Polysemy.Hasql.Database as Database (retryingSqlDef)
import qualified Polysemy.Hasql.Effect.Database as Database
import Polysemy.Hasql.Effect.Database (Database, Databases, withDatabaseUnique)
import Polysemy.Hasql.Queue.Data.Queue (Queue, QueueName (QueueName))
import Polysemy.Hasql.Queue.Data.Queued (Queued)
import qualified Polysemy.Hasql.Queue.Data.Queued as Queued (Queued (..))

-- | Try to fetch a notification, and if there is none, wait on the connection's file descriptor until some data is
-- received.
-- This connection will be fully blocked when waiting, so it must not be shared with other parts of the application.
--
-- TODO Could it be possible to share a connection among all queues, only for waiting?
tryDequeue ::
  Members [Monitor Restart, Reader QueueName, Log, Embed IO] r =>
  LibPQ.Connection ->
  Sem r (Either Text (Maybe UUID))
tryDequeue :: forall (r :: EffectRow).
Members '[Monitor Restart, Reader QueueName, Log, Embed IO] r =>
Connection -> Sem r (Either Text (Maybe UUID))
tryDequeue Connection
connection = do
  QueueName Text
name <- forall i (r :: EffectRow). Member (Reader i) r => Sem r i
ask
  let status :: Text -> Sem r ()
status Text
msg = forall (r :: EffectRow).
(HasCallStack, Member Log r) =>
Text -> Sem r ()
Log.trace [exon|#{msg} on connection for '#{name}'|]
  Text -> Sem r ()
status Text
"Trying dequeue"
  forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (Connection -> IO (Maybe Notify)
LibPQ.notifies Connection
connection) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Just (LibPQ.Notify ByteString
_ CPid
_ ByteString
payload) -> do
      Text -> Sem r ()
status Text
"Received notify"
      case ByteString -> Maybe UUID
UUID.fromASCIIBytes ByteString
payload of
        Just UUID
d ->
          forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right (forall a. a -> Maybe a
Just UUID
d))
        Maybe UUID
Nothing ->
          forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left [exon|invalid UUID payload: #{decodeUtf8 payload}|])
    Maybe Notify
Nothing -> do
      Text -> Sem r ()
status Text
"No notify"
      forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (Connection -> IO (Maybe Fd)
LibPQ.socket Connection
connection) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Just Fd
fd -> do
          Text -> Sem r ()
status Text
"Waiting for activity"
          forall action (r :: EffectRow) a.
Member (Monitor action) r =>
Sem r a -> Sem r a
monitor (forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (Fd -> IO ()
threadWaitRead Fd
fd))
          Text -> Sem r ()
status Text
"Activity received"
          forall a b. b -> Either a b
Right forall a. Maybe a
Nothing forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (Connection -> IO Bool
LibPQ.consumeInput Connection
connection)
        Maybe Fd
Nothing ->
          forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left Text
"couldn't connect with LibPQ.socket")

listen ::
  Members [Database, Reader QueueName, Log, Embed IO] r =>
  Sem r ()
listen :: forall (r :: EffectRow).
Members '[Database, Reader QueueName, Log, Embed IO] r =>
Sem r ()
listen = do
  QueueName Text
name <- forall i (r :: EffectRow). Member (Reader i) r => Sem r i
ask
  forall (r :: EffectRow).
(HasCallStack, Member Log r) =>
Text -> Sem r ()
Log.debug [exon|executing `listen` for queue ##{name}|]
  forall (r :: EffectRow). Member Database r => Sql -> Sem r ()
Database.retryingSqlDef [sql|listen "##{name}"|]

unlisten ::
   e r .
  Members [Database !! e, Reader QueueName, Log] r =>
  Sem r ()
unlisten :: forall e (r :: EffectRow).
Members '[Database !! e, Reader QueueName, Log] r =>
Sem r ()
unlisten = do
  QueueName Text
name <- forall i (r :: EffectRow). Member (Reader i) r => Sem r i
ask
  forall (r :: EffectRow).
(HasCallStack, Member Log r) =>
Text -> Sem r ()
Log.debug [exon|executing `unlisten` for queue `##{name}`|]
  forall err (eff :: (* -> *) -> * -> *) (r :: EffectRow).
Member (Resumable err eff) r =>
Sem (eff : r) () -> Sem r ()
resume_ (forall (r :: EffectRow). Member Database r => Sql -> Sem r ()
Database.retryingSqlDef [sql|unlisten "##{name}"|])

processMessages ::
  Ord t =>
  NonEmpty (Uuid (Queued t d)) ->
  NonEmpty d
processMessages :: forall t d. Ord t => NonEmpty (Uuid (Queued t d)) -> NonEmpty d
processMessages =
  forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall t a. Queued t a -> a
Queued.queue_payload forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall i a. Uid i a -> a
Uid.payload) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall o a. Ord o => (a -> o) -> NonEmpty a -> NonEmpty a
NonEmpty.sortWith (forall t a. Queued t a -> t
Queued.queue_created forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall i a. Uid i a -> a
Uid.payload)

initQueue ::
   e d t r .
  Ord t =>
  Members [Store UUID (Queued t d) !! e, Reader QueueName, Database, Log, Embed IO] r =>
  (d -> Sem r ()) ->
  Sem r ()
initQueue :: forall e d t (r :: EffectRow).
(Ord t,
 Members
   '[Store UUID (Queued t d) !! e, Reader QueueName, Database, Log,
     Embed IO]
   r) =>
(d -> Sem r ()) -> Sem r ()
initQueue d -> Sem r ()
write = do
  QueueName Text
name <- forall i (r :: EffectRow). Member (Reader i) r => Sem r i
ask
  forall (r :: EffectRow).
(HasCallStack, Member Log r) =>
Text -> Sem r ()
Log.trace [exon|Initializing queue '#{name}'|]
  Maybe (NonEmpty (Uuid (Queued t d)))
waiting <- forall err (eff :: (* -> *) -> * -> *) (r :: EffectRow) a.
Member (Resumable err eff) r =>
a -> Sem (eff : r) a -> Sem r a
resumeAs forall a. Maybe a
Nothing (forall a. [a] -> Maybe (NonEmpty a)
nonEmpty forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (f :: * -> *) i d (r :: EffectRow).
Member (QStore f i d) r =>
Sem r [d]
Store.deleteAll)
  forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ d -> Sem r ()
write forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall t d. Ord t => NonEmpty (Uuid (Queued t d)) -> NonEmpty d
processMessages) Maybe (NonEmpty (Uuid (Queued t d)))
waiting
  forall (r :: EffectRow).
Members '[Database, Reader QueueName, Log, Embed IO] r =>
Sem r ()
listen

withPqConn ::
  Member (Final IO) r =>
  Connection ->
  (LibPQ.Connection -> Sem r a) ->
  Sem r (Either Text a)
withPqConn :: forall (r :: EffectRow) a.
Member (Final IO) r =>
Connection -> (Connection -> Sem r a) -> Sem r (Either Text a)
withPqConn Connection
connection Connection -> Sem r a
use =
  forall e (r :: EffectRow) a.
Member (Final IO) r =>
Sem (Error e : r) a -> Sem r (Either e a)
errorToIOFinal forall a b. (a -> b) -> a -> b
$ forall exc err (r :: EffectRow) a.
(Exception exc, Member (Error err) r, Member (Final IO) r) =>
(exc -> err) -> Sem r a -> Sem r a
fromExceptionSemVia @IOException forall b a. (Show a, IsString b) => a -> b
show forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (r :: EffectRow) a.
Member (Final m) r =>
ThroughWeavingToFinal m (Sem r) a -> Sem r a
withWeavingToFinal \ f ()
s forall x. f (Sem (Error Text : r) x) -> IO (f x)
lower forall x. f x -> Maybe x
_ -> do
    forall a. Connection -> (Connection -> IO a) -> IO a
withLibPQConnection Connection
connection \ Connection
c -> forall x. f (Sem (Error Text : r) x) -> IO (f x)
lower (forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Sem r a -> Sem (e : r) a
raise (Connection -> Sem r a
use Connection
c) forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ f ()
s)

-- TODO check that DbConnectionError is right here
dequeueAndProcess ::
   d t dt r .
  Ord t =>
  Members [Monitor Restart, Reader QueueName, Final IO] r =>
  Members [Store UUID (Queued t d) !! DbError, Database !! DbError, Time t dt, Stop DbError, Log, Embed IO] r =>
  TBMQueue d ->
  Connection ->
  Sem r ()
dequeueAndProcess :: forall d t dt (r :: EffectRow).
(Ord t, Members '[Monitor Restart, Reader QueueName, Final IO] r,
 Members
   '[Store UUID (Queued t d) !! DbError, Database !! DbError,
     Time t dt, Stop DbError, Log, Embed IO]
   r) =>
TBMQueue d -> Connection -> Sem r ()
dequeueAndProcess TBMQueue d
queue Connection
connection = do
  Either Text (Maybe UUID)
result <- forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (r :: EffectRow) a.
Member (Final IO) r =>
Connection -> (Connection -> Sem r a) -> Sem r (Either Text a)
withPqConn Connection
connection forall (r :: EffectRow).
Members '[Monitor Restart, Reader QueueName, Log, Embed IO] r =>
Connection -> Sem r (Either Text (Maybe UUID))
tryDequeue
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT do
    UUID
id' <- forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (forall err err' (r :: EffectRow) a.
Member (Stop err') r =>
(err -> err') -> Either err a -> Sem r a
stopEitherWith (DbConnectionError -> DbError
DbError.Connection forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> DbConnectionError
DbConnectionError.Acquire) Either Text (Maybe UUID)
result)
    Uuid (Queued t d)
messages <- forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (forall err (eff :: (* -> *) -> * -> *) (r :: EffectRow).
Members '[Resumable err eff, Stop err] r =>
InterpreterFor eff r
restop (forall (f :: * -> *) i d (r :: EffectRow).
Member (QStore f i d) r =>
i -> Sem r (f d)
Store.delete UUID
id'))
    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue d
queue) (forall t d. Ord t => NonEmpty (Uuid (Queued t d)) -> NonEmpty d
processMessages (forall (f :: * -> *) a. Applicative f => a -> f a
pure Uuid (Queued t d)
messages)))

dequeue ::
   d t dt r .
  Ord t =>
  Members [Monitor Restart, Reader QueueName, Final IO] r =>
  Members [Store UUID (Queued t d) !! DbError, Database !! DbError, Stop DbError, Time t dt, Log, Embed IO] r =>
  TBMQueue d ->
  Sem r ()
dequeue :: forall d t dt (r :: EffectRow).
(Ord t, Members '[Monitor Restart, Reader QueueName, Final IO] r,
 Members
   '[Store UUID (Queued t d) !! DbError, Database !! DbError,
     Stop DbError, Time t dt, Log, Embed IO]
   r) =>
TBMQueue d -> Sem r ()
dequeue TBMQueue d
queue = do
  QueueName Text
name <- forall i (r :: EffectRow). Member (Reader i) r => Sem r i
ask
  let
    initDb :: InitDb (Sem (Database : r))
initDb = forall (m :: * -> *).
ClientTag -> Bool -> (Connection -> m ()) -> InitDb m
InitDb [exon|dequeue-##{name}|] Bool
False \ Connection
_ -> forall e d t (r :: EffectRow).
(Ord t,
 Members
   '[Store UUID (Queued t d) !! e, Reader QueueName, Database, Log,
     Embed IO]
   r) =>
(d -> Sem r ()) -> Sem r ()
initQueue (forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue d
queue)
  forall err (eff :: (* -> *) -> * -> *) (r :: EffectRow).
Members '[Resumable err eff, Stop err] r =>
InterpreterFor eff r
restop @_ @Database do
    forall (r :: EffectRow) a.
Member Database r =>
InitDb (Sem r) -> Sem r a -> Sem r a
Database.withInit InitDb (Sem (Database : r))
initDb (forall (r :: EffectRow) a.
Member Database r =>
(Connection -> Sem r a) -> Sem r a
Database.use (forall d t dt (r :: EffectRow).
(Ord t, Members '[Monitor Restart, Reader QueueName, Final IO] r,
 Members
   '[Store UUID (Queued t d) !! DbError, Database !! DbError,
     Time t dt, Stop DbError, Log, Embed IO]
   r) =>
TBMQueue d -> Connection -> Sem r ()
dequeueAndProcess TBMQueue d
queue))

dequeueLoop ::
   d t dt u r .
  Ord t =>
  TimeUnit u =>
  Members [Monitor Restart, Reader QueueName] r =>
  Members [Store UUID (Queued t d) !! DbError, Database !! DbError, Time t dt, Log, Resource, Embed IO, Final IO] r =>
  u ->
  (DbError -> Sem r Bool) ->
  TBMQueue d ->
  Sem r ()
dequeueLoop :: forall d t dt u (r :: EffectRow).
(Ord t, TimeUnit u, Members '[Monitor Restart, Reader QueueName] r,
 Members
   '[Store UUID (Queued t d) !! DbError, Database !! DbError,
     Time t dt, Log, Resource, Embed IO, Final IO]
   r) =>
u -> (DbError -> Sem r Bool) -> TBMQueue d -> Sem r ()
dequeueLoop u
errorDelay DbError -> Sem r Bool
errorHandler TBMQueue d
queue = do
  QueueName Text
name <- forall i (r :: EffectRow). Member (Reader i) r => Sem r i
ask
  let
    spin :: Sem r ()
spin =
      forall err (r :: EffectRow) a.
Sem (Stop err : r) a -> Sem r (Either err a)
runStop (forall d t dt (r :: EffectRow).
(Ord t, Members '[Monitor Restart, Reader QueueName, Final IO] r,
 Members
   '[Store UUID (Queued t d) !! DbError, Database !! DbError,
     Stop DbError, Time t dt, Log, Embed IO]
   r) =>
TBMQueue d -> Sem r ()
dequeue TBMQueue d
queue) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Right () -> Sem r ()
spin
        Left DbError
err -> Bool -> Sem r ()
result forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< DbError -> Sem r Bool
errorHandler DbError
err
    result :: Bool -> Sem r ()
result = \case
      Bool
True ->
        Sem r ()
disconnect forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> forall t d u (r :: EffectRow).
(TimeUnit u, Member (Time t d) r) =>
u -> Sem r ()
Time.sleep u
errorDelay forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Sem r ()
spin
      Bool
False -> do
        forall (r :: EffectRow).
(HasCallStack, Member Log r) =>
Text -> Sem r ()
Log.warn [exon|Exiting dequeue loop for '##{name}' after error|]
        forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (forall a. STM a -> IO a
atomically (forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue d
queue))
    disconnect :: Sem r ()
disconnect =
      forall e (r :: EffectRow).
Members '[Database !! e, Reader QueueName, Log] r =>
Sem r ()
unlisten forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> forall err (eff :: (* -> *) -> * -> *) (r :: EffectRow).
Member (Resumable err eff) r =>
Sem (eff : r) () -> Sem r ()
resume_ forall (r :: EffectRow). Member Database r => Sem r ()
Database.release
  Sem r ()
spin

startDequeueLoop ::
   d t dt u r .
  Ord t =>
  TimeUnit u =>
  Members [RestartingMonitor, Reader QueueName] r =>
  Members [Store UUID (Queued t d) !! DbError, Databases, Time t dt, Log, Resource, Embed IO, Final IO] r =>
  u ->
  (DbError -> Sem r Bool) ->
  TBMQueue d ->
  Sem r ()
startDequeueLoop :: forall d t dt u (r :: EffectRow).
(Ord t, TimeUnit u,
 Members '[RestartingMonitor, Reader QueueName] r,
 Members
   '[Store UUID (Queued t d) !! DbError, Databases, Time t dt, Log,
     Resource, Embed IO, Final IO]
   r) =>
u -> (DbError -> Sem r Bool) -> TBMQueue d -> Sem r ()
startDequeueLoop u
errorDelay DbError -> Sem r Bool
errorHandler TBMQueue d
queue = do
  QueueName Text
name <- forall i (r :: EffectRow). Member (Reader i) r => Sem r i
ask
  forall (r :: EffectRow).
Member Databases r =>
Maybe ConnectionTag -> InterpreterFor (Database !! DbError) r
withDatabaseUnique (forall a. a -> Maybe a
Just (Text -> ConnectionTag
NamedTag [exon|dequeue-#{name}|])) do
    forall (r :: EffectRow) a b.
Member Resource r =>
Sem r a -> Sem r b -> Sem r a
finally (forall (r :: EffectRow).
Member RestartingMonitor r =>
InterpreterFor (Monitor Restart) r
restart (forall d t dt u (r :: EffectRow).
(Ord t, TimeUnit u, Members '[Monitor Restart, Reader QueueName] r,
 Members
   '[Store UUID (Queued t d) !! DbError, Database !! DbError,
     Time t dt, Log, Resource, Embed IO, Final IO]
   r) =>
u -> (DbError -> Sem r Bool) -> TBMQueue d -> Sem r ()
dequeueLoop u
errorDelay (forall (index :: Nat) (inserted :: EffectRow) (head :: EffectRow)
       (oldTail :: EffectRow) (tail :: EffectRow) (old :: EffectRow)
       (full :: EffectRow) a.
(ListOfLength index head, WhenStuck index InsertAtUnprovidedIndex,
 old ~ Append head oldTail, tail ~ Append inserted oldTail,
 full ~ Append head tail,
 InsertAtIndex index head tail oldTail full inserted) =>
Sem old a -> Sem full a
insertAt @0 forall b c a. (b -> c) -> (a -> b) -> a -> c
. DbError -> Sem r Bool
errorHandler) TBMQueue d
queue)) forall e (r :: EffectRow).
Members '[Database !! e, Reader QueueName, Log] r =>
Sem r ()
unlisten

interpretInputQueue ::
   d r .
  Member (Embed IO) r =>
  TBMQueue d ->
  InterpreterFor (Input (Maybe d)) r
interpretInputQueue :: forall d (r :: EffectRow).
Member (Embed IO) r =>
TBMQueue d -> InterpreterFor (Input (Maybe d)) r
interpretInputQueue TBMQueue d
queue =
  forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
FirstOrder e "interpret" =>
(forall (rInitial :: EffectRow) x. e (Sem rInitial) x -> Sem r x)
-> Sem (e : r) a -> Sem r a
interpret \case
    Input (Maybe d) (Sem rInitial) x
Input ->
      forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (forall a. STM a -> IO a
atomically (forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue d
queue))

dequeueThread ::
   d t dt u r .
  Ord t =>
  TimeUnit u =>
  Members [RestartingMonitor, Reader QueueName, Resource] r =>
  Members [Store UUID (Queued t d) !! DbError, Databases, Time t dt, Log, Async, Embed IO, Final IO] r =>
  u ->
  (DbError -> Sem r Bool) ->
  Sem r (Concurrent.Async (Maybe ()), TBMQueue d)
dequeueThread :: forall d t dt u (r :: EffectRow).
(Ord t, TimeUnit u,
 Members '[RestartingMonitor, Reader QueueName, Resource] r,
 Members
   '[Store UUID (Queued t d) !! DbError, Databases, Time t dt, Log,
     Async, Embed IO, Final IO]
   r) =>
u
-> (DbError -> Sem r Bool) -> Sem r (Async (Maybe ()), TBMQueue d)
dequeueThread u
errorDelay DbError -> Sem r Bool
errorHandler = do
  TBMQueue d
queue <- forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
64)
  Async (Maybe ())
handle <- forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
async (forall d t dt u (r :: EffectRow).
(Ord t, TimeUnit u,
 Members '[RestartingMonitor, Reader QueueName] r,
 Members
   '[Store UUID (Queued t d) !! DbError, Databases, Time t dt, Log,
     Resource, Embed IO, Final IO]
   r) =>
u -> (DbError -> Sem r Bool) -> TBMQueue d -> Sem r ()
startDequeueLoop u
errorDelay DbError -> Sem r Bool
errorHandler TBMQueue d
queue)
  pure (Async (Maybe ())
handle, TBMQueue d
queue)

interpretInputDbQueueListen ::
   (name :: Symbol) d t dt u r .
  Ord t =>
  TimeUnit u =>
  KnownSymbol name =>
  Members [RestartingMonitor, Final IO] r =>
  Members [Store UUID (Queued t d) !! DbError, Databases, Time t dt, Log, Resource, Async, Embed IO] r =>
  u ->
  (DbError -> Sem r Bool) ->
  InterpreterFor (Input (Maybe d)) r
interpretInputDbQueueListen :: forall (name :: Symbol) d t dt u (r :: EffectRow).
(Ord t, TimeUnit u, KnownSymbol name,
 Members '[RestartingMonitor, Final IO] r,
 Members
   '[Store UUID (Queued t d) !! DbError, Databases, Time t dt, Log,
     Resource, Async, Embed IO]
   r) =>
u -> (DbError -> Sem r Bool) -> InterpreterFor (Input (Maybe d)) r
interpretInputDbQueueListen u
errorDelay DbError -> Sem r Bool
errorHandler Sem (Input (Maybe d) : r) a
sem =
  forall i (r :: EffectRow) a. i -> Sem (Reader i : r) a -> Sem r a
runReader (Text -> QueueName
QueueName (forall (name :: Symbol). KnownSymbol name => Text
symbolText @name)) forall a b. (a -> b) -> a -> b
$
  forall (r :: EffectRow) a c b.
Member Resource r =>
Sem r a -> (a -> Sem r c) -> (a -> Sem r b) -> Sem r b
bracket Sem (Reader QueueName : r) (Async (Maybe ()), TBMQueue d)
acquire forall {r :: EffectRow} {a} {b}.
Member Async r =>
(Async a, b) -> Sem r ()
release \ (Async (Maybe ())
_, TBMQueue d
queue) -> do
    forall d (r :: EffectRow).
Member (Embed IO) r =>
TBMQueue d -> InterpreterFor (Input (Maybe d)) r
interpretInputQueue TBMQueue d
queue (forall (e2 :: (* -> *) -> * -> *) (e1 :: (* -> *) -> * -> *)
       (r :: EffectRow) a.
Sem (e1 : r) a -> Sem (e1 : e2 : r) a
raiseUnder Sem (Input (Maybe d) : r) a
sem)
  where
    acquire :: Sem (Reader QueueName : r) (Async (Maybe ()), TBMQueue d)
acquire = forall d t dt u (r :: EffectRow).
(Ord t, TimeUnit u,
 Members '[RestartingMonitor, Reader QueueName, Resource] r,
 Members
   '[Store UUID (Queued t d) !! DbError, Databases, Time t dt, Log,
     Async, Embed IO, Final IO]
   r) =>
u
-> (DbError -> Sem r Bool) -> Sem r (Async (Maybe ()), TBMQueue d)
dequeueThread u
errorDelay (forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Sem r a -> Sem (e : r) a
raise forall b c a. (b -> c) -> (a -> b) -> a -> c
. DbError -> Sem r Bool
errorHandler)
    release :: (Async a, b) -> Sem r ()
release (Async a
handle, b
_) = forall (r :: EffectRow) a. Member Async r => Async a -> Sem r ()
cancel Async a
handle

interpretInputQueueDb ::
   qname u t dt d diff r .
  TimeUnit u =>
  TimeUnit diff =>
  Torsor t diff =>
  Queue qname t =>
  Members [Store UUID (Queued t d) !! DbError, Databases] r =>
  Members [Time t dt, Log, Resource, Async, Race, Embed IO, Final IO] r =>
  u ->
  ClockSkewConfig ->
  (DbError -> Sem r Bool) ->
  InterpreterFor (Input (Maybe d)) r
interpretInputQueueDb :: forall (qname :: Symbol) u t dt d diff (r :: EffectRow).
(TimeUnit u, TimeUnit diff, Torsor t diff, Queue qname t,
 Members '[Store UUID (Queued t d) !! DbError, Databases] r,
 Members
   '[Time t dt, Log, Resource, Async, Race, Embed IO, Final IO] r) =>
u
-> ClockSkewConfig
-> (DbError -> Sem r Bool)
-> InterpreterFor (Input (Maybe d)) r
interpretInputQueueDb u
errorDelay ClockSkewConfig
csConfig DbError -> Sem r Bool
errorHandler =
  forall a (r :: EffectRow).
Member (Embed IO) r =>
a -> InterpreterFor (AtomicState a) r
interpretAtomic forall a. Maybe a
Nothing forall b c a. (b -> c) -> (a -> b) -> a -> c
.
  forall t d (r :: EffectRow).
Members '[Time t d, Resource, Async, Race, Final IO] r =>
MonitorCheck r -> InterpreterFor RestartingMonitor r
interpretMonitorRestart (forall t d diff (r :: EffectRow).
(Torsor t diff, TimeUnit diff,
 Members '[AtomicState (Maybe t), Time t d, Embed IO] r) =>
ClockSkewConfig -> MonitorCheck r
monitorClockSkew ClockSkewConfig
csConfig) forall b c a. (b -> c) -> (a -> b) -> a -> c
.
  forall (e2 :: (* -> *) -> * -> *) (e1 :: (* -> *) -> * -> *)
       (r :: EffectRow) a.
Sem (e1 : r) a -> Sem (e1 : e2 : r) a
raiseUnder forall b c a. (b -> c) -> (a -> b) -> a -> c
.
  forall (name :: Symbol) d t dt u (r :: EffectRow).
(Ord t, TimeUnit u, KnownSymbol name,
 Members '[RestartingMonitor, Final IO] r,
 Members
   '[Store UUID (Queued t d) !! DbError, Databases, Time t dt, Log,
     Resource, Async, Embed IO]
   r) =>
u -> (DbError -> Sem r Bool) -> InterpreterFor (Input (Maybe d)) r
interpretInputDbQueueListen @qname u
errorDelay (forall (index :: Nat) (inserted :: EffectRow) (head :: EffectRow)
       (oldTail :: EffectRow) (tail :: EffectRow) (old :: EffectRow)
       (full :: EffectRow) a.
(ListOfLength index head, WhenStuck index InsertAtUnprovidedIndex,
 old ~ Append head oldTail, tail ~ Append inserted oldTail,
 full ~ Append head tail,
 InsertAtIndex index head tail oldTail full inserted) =>
Sem old a -> Sem full a
insertAt @0 forall b c a. (b -> c) -> (a -> b) -> a -> c
. DbError -> Sem r Bool
errorHandler) forall b c a. (b -> c) -> (a -> b) -> a -> c
.
  forall (e2 :: (* -> *) -> * -> *) (e1 :: (* -> *) -> * -> *)
       (r :: EffectRow) a.
Sem (e1 : r) a -> Sem (e1 : e2 : r) a
raiseUnder