module Polysemy.Hasql.Queue.Input where
import Conc (
ClockSkewConfig,
Monitor,
Restart,
RestartingMonitor,
interpretAtomic,
interpretMonitorRestart,
monitor,
monitorClockSkew,
restart,
)
import Control.Concurrent (threadWaitRead)
import qualified Control.Concurrent.Async as Concurrent
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBMQueue (TBMQueue, closeTBMQueue, newTBMQueueIO, readTBMQueue, writeTBMQueue)
import Control.Exception (IOException)
import Control.Monad.Trans.Maybe (MaybeT (MaybeT), runMaybeT)
import qualified Data.List.NonEmpty as NonEmpty
import qualified Data.UUID as UUID
import Data.UUID (UUID)
import qualified Database.PostgreSQL.LibPQ as LibPQ
import Exon (exon)
import Hasql.Connection (Connection, withLibPQConnection)
import qualified Polysemy.Db.Data.DbConnectionError as DbConnectionError
import qualified Polysemy.Db.Data.DbError as DbError
import Polysemy.Db.Data.DbError (DbError)
import qualified Polysemy.Db.Effect.Store as Store
import Polysemy.Db.Effect.Store (Store)
import Polysemy.Final (withWeavingToFinal)
import Polysemy.Input (Input (Input))
import qualified Log
import qualified Time as Time
import Prelude hiding (Queue, listen)
import Sqel.Data.Sql (sql)
import qualified Sqel.Data.Uid as Uid
import Sqel.Data.Uid (Uuid)
import Sqel.SOP.Constraint (symbolText)
import Torsor (Torsor)
import Polysemy.Hasql.Data.ConnectionTag (ConnectionTag (NamedTag))
import Polysemy.Hasql.Data.InitDb (InitDb (InitDb))
import qualified Polysemy.Hasql.Database as Database (retryingSqlDef)
import qualified Polysemy.Hasql.Effect.Database as Database
import Polysemy.Hasql.Effect.Database (Database, Databases, withDatabaseUnique)
import Polysemy.Hasql.Queue.Data.Queue (Queue, QueueName (QueueName))
import Polysemy.Hasql.Queue.Data.Queued (Queued)
import qualified Polysemy.Hasql.Queue.Data.Queued as Queued (Queued (..))
tryDequeue ::
Members [Monitor Restart, Reader QueueName, Log, Embed IO] r =>
LibPQ.Connection ->
Sem r (Either Text (Maybe UUID))
tryDequeue :: forall (r :: EffectRow).
Members '[Monitor Restart, Reader QueueName, Log, Embed IO] r =>
Connection -> Sem r (Either Text (Maybe UUID))
tryDequeue Connection
connection = do
QueueName Text
name <- forall i (r :: EffectRow). Member (Reader i) r => Sem r i
ask
let status :: Text -> Sem r ()
status Text
msg = forall (r :: EffectRow).
(HasCallStack, Member Log r) =>
Text -> Sem r ()
Log.trace [exon|#{msg} on connection for '#{name}'|]
Text -> Sem r ()
status Text
"Trying dequeue"
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (Connection -> IO (Maybe Notify)
LibPQ.notifies Connection
connection) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just (LibPQ.Notify ByteString
_ CPid
_ ByteString
payload) -> do
Text -> Sem r ()
status Text
"Received notify"
case ByteString -> Maybe UUID
UUID.fromASCIIBytes ByteString
payload of
Just UUID
d ->
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. b -> Either a b
Right (forall a. a -> Maybe a
Just UUID
d))
Maybe UUID
Nothing ->
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left [exon|invalid UUID payload: #{decodeUtf8 payload}|])
Maybe Notify
Nothing -> do
Text -> Sem r ()
status Text
"No notify"
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (Connection -> IO (Maybe Fd)
LibPQ.socket Connection
connection) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just Fd
fd -> do
Text -> Sem r ()
status Text
"Waiting for activity"
forall action (r :: EffectRow) a.
Member (Monitor action) r =>
Sem r a -> Sem r a
monitor (forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (Fd -> IO ()
threadWaitRead Fd
fd))
Text -> Sem r ()
status Text
"Activity received"
forall a b. b -> Either a b
Right forall a. Maybe a
Nothing forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (Connection -> IO Bool
LibPQ.consumeInput Connection
connection)
Maybe Fd
Nothing ->
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> Either a b
Left Text
"couldn't connect with LibPQ.socket")
listen ::
Members [Database, Reader QueueName, Log, Embed IO] r =>
Sem r ()
listen :: forall (r :: EffectRow).
Members '[Database, Reader QueueName, Log, Embed IO] r =>
Sem r ()
listen = do
QueueName Text
name <- forall i (r :: EffectRow). Member (Reader i) r => Sem r i
ask
forall (r :: EffectRow).
(HasCallStack, Member Log r) =>
Text -> Sem r ()
Log.debug [exon|executing `listen` for queue ##{name}|]
forall (r :: EffectRow). Member Database r => Sql -> Sem r ()
Database.retryingSqlDef [sql|listen "##{name}"|]
unlisten ::
∀ e r .
Members [Database !! e, Reader QueueName, Log] r =>
Sem r ()
unlisten :: forall e (r :: EffectRow).
Members '[Database !! e, Reader QueueName, Log] r =>
Sem r ()
unlisten = do
QueueName Text
name <- forall i (r :: EffectRow). Member (Reader i) r => Sem r i
ask
forall (r :: EffectRow).
(HasCallStack, Member Log r) =>
Text -> Sem r ()
Log.debug [exon|executing `unlisten` for queue `##{name}`|]
forall err (eff :: (* -> *) -> * -> *) (r :: EffectRow).
Member (Resumable err eff) r =>
Sem (eff : r) () -> Sem r ()
resume_ (forall (r :: EffectRow). Member Database r => Sql -> Sem r ()
Database.retryingSqlDef [sql|unlisten "##{name}"|])
processMessages ::
Ord t =>
NonEmpty (Uuid (Queued t d)) ->
NonEmpty d
processMessages :: forall t d. Ord t => NonEmpty (Uuid (Queued t d)) -> NonEmpty d
processMessages =
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall t a. Queued t a -> a
Queued.queue_payload forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall i a. Uid i a -> a
Uid.payload) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall o a. Ord o => (a -> o) -> NonEmpty a -> NonEmpty a
NonEmpty.sortWith (forall t a. Queued t a -> t
Queued.queue_created forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall i a. Uid i a -> a
Uid.payload)
initQueue ::
∀ e d t r .
Ord t =>
Members [Store UUID (Queued t d) !! e, Reader QueueName, Database, Log, Embed IO] r =>
(d -> Sem r ()) ->
Sem r ()
initQueue :: forall e d t (r :: EffectRow).
(Ord t,
Members
'[Store UUID (Queued t d) !! e, Reader QueueName, Database, Log,
Embed IO]
r) =>
(d -> Sem r ()) -> Sem r ()
initQueue d -> Sem r ()
write = do
QueueName Text
name <- forall i (r :: EffectRow). Member (Reader i) r => Sem r i
ask
forall (r :: EffectRow).
(HasCallStack, Member Log r) =>
Text -> Sem r ()
Log.trace [exon|Initializing queue '#{name}'|]
Maybe (NonEmpty (Uuid (Queued t d)))
waiting <- forall err (eff :: (* -> *) -> * -> *) (r :: EffectRow) a.
Member (Resumable err eff) r =>
a -> Sem (eff : r) a -> Sem r a
resumeAs forall a. Maybe a
Nothing (forall a. [a] -> Maybe (NonEmpty a)
nonEmpty forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (f :: * -> *) i d (r :: EffectRow).
Member (QStore f i d) r =>
Sem r [d]
Store.deleteAll)
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ d -> Sem r ()
write forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall t d. Ord t => NonEmpty (Uuid (Queued t d)) -> NonEmpty d
processMessages) Maybe (NonEmpty (Uuid (Queued t d)))
waiting
forall (r :: EffectRow).
Members '[Database, Reader QueueName, Log, Embed IO] r =>
Sem r ()
listen
withPqConn ::
Member (Final IO) r =>
Connection ->
(LibPQ.Connection -> Sem r a) ->
Sem r (Either Text a)
withPqConn :: forall (r :: EffectRow) a.
Member (Final IO) r =>
Connection -> (Connection -> Sem r a) -> Sem r (Either Text a)
withPqConn Connection
connection Connection -> Sem r a
use =
forall e (r :: EffectRow) a.
Member (Final IO) r =>
Sem (Error e : r) a -> Sem r (Either e a)
errorToIOFinal forall a b. (a -> b) -> a -> b
$ forall exc err (r :: EffectRow) a.
(Exception exc, Member (Error err) r, Member (Final IO) r) =>
(exc -> err) -> Sem r a -> Sem r a
fromExceptionSemVia @IOException forall b a. (Show a, IsString b) => a -> b
show forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (r :: EffectRow) a.
Member (Final m) r =>
ThroughWeavingToFinal m (Sem r) a -> Sem r a
withWeavingToFinal \ f ()
s forall x. f (Sem (Error Text : r) x) -> IO (f x)
lower forall x. f x -> Maybe x
_ -> do
forall a. Connection -> (Connection -> IO a) -> IO a
withLibPQConnection Connection
connection \ Connection
c -> forall x. f (Sem (Error Text : r) x) -> IO (f x)
lower (forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Sem r a -> Sem (e : r) a
raise (Connection -> Sem r a
use Connection
c) forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ f ()
s)
dequeueAndProcess ::
∀ d t dt r .
Ord t =>
Members [Monitor Restart, Reader QueueName, Final IO] r =>
Members [Store UUID (Queued t d) !! DbError, Database !! DbError, Time t dt, Stop DbError, Log, Embed IO] r =>
TBMQueue d ->
Connection ->
Sem r ()
dequeueAndProcess :: forall d t dt (r :: EffectRow).
(Ord t, Members '[Monitor Restart, Reader QueueName, Final IO] r,
Members
'[Store UUID (Queued t d) !! DbError, Database !! DbError,
Time t dt, Stop DbError, Log, Embed IO]
r) =>
TBMQueue d -> Connection -> Sem r ()
dequeueAndProcess TBMQueue d
queue Connection
connection = do
Either Text (Maybe UUID)
result <- forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (r :: EffectRow) a.
Member (Final IO) r =>
Connection -> (Connection -> Sem r a) -> Sem r (Either Text a)
withPqConn Connection
connection forall (r :: EffectRow).
Members '[Monitor Restart, Reader QueueName, Log, Embed IO] r =>
Connection -> Sem r (Either Text (Maybe UUID))
tryDequeue
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT do
UUID
id' <- forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (forall err err' (r :: EffectRow) a.
Member (Stop err') r =>
(err -> err') -> Either err a -> Sem r a
stopEitherWith (DbConnectionError -> DbError
DbError.Connection forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> DbConnectionError
DbConnectionError.Acquire) Either Text (Maybe UUID)
result)
Uuid (Queued t d)
messages <- forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (forall err (eff :: (* -> *) -> * -> *) (r :: EffectRow).
Members '[Resumable err eff, Stop err] r =>
InterpreterFor eff r
restop (forall (f :: * -> *) i d (r :: EffectRow).
Member (QStore f i d) r =>
i -> Sem r (f d)
Store.delete UUID
id'))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue d
queue) (forall t d. Ord t => NonEmpty (Uuid (Queued t d)) -> NonEmpty d
processMessages (forall (f :: * -> *) a. Applicative f => a -> f a
pure Uuid (Queued t d)
messages)))
dequeue ::
∀ d t dt r .
Ord t =>
Members [Monitor Restart, Reader QueueName, Final IO] r =>
Members [Store UUID (Queued t d) !! DbError, Database !! DbError, Stop DbError, Time t dt, Log, Embed IO] r =>
TBMQueue d ->
Sem r ()
dequeue :: forall d t dt (r :: EffectRow).
(Ord t, Members '[Monitor Restart, Reader QueueName, Final IO] r,
Members
'[Store UUID (Queued t d) !! DbError, Database !! DbError,
Stop DbError, Time t dt, Log, Embed IO]
r) =>
TBMQueue d -> Sem r ()
dequeue TBMQueue d
queue = do
QueueName Text
name <- forall i (r :: EffectRow). Member (Reader i) r => Sem r i
ask
let
initDb :: InitDb (Sem (Database : r))
initDb = forall (m :: * -> *).
ClientTag -> Bool -> (Connection -> m ()) -> InitDb m
InitDb [exon|dequeue-##{name}|] Bool
False \ Connection
_ -> forall e d t (r :: EffectRow).
(Ord t,
Members
'[Store UUID (Queued t d) !! e, Reader QueueName, Database, Log,
Embed IO]
r) =>
(d -> Sem r ()) -> Sem r ()
initQueue (forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue d
queue)
forall err (eff :: (* -> *) -> * -> *) (r :: EffectRow).
Members '[Resumable err eff, Stop err] r =>
InterpreterFor eff r
restop @_ @Database do
forall (r :: EffectRow) a.
Member Database r =>
InitDb (Sem r) -> Sem r a -> Sem r a
Database.withInit InitDb (Sem (Database : r))
initDb (forall (r :: EffectRow) a.
Member Database r =>
(Connection -> Sem r a) -> Sem r a
Database.use (forall d t dt (r :: EffectRow).
(Ord t, Members '[Monitor Restart, Reader QueueName, Final IO] r,
Members
'[Store UUID (Queued t d) !! DbError, Database !! DbError,
Time t dt, Stop DbError, Log, Embed IO]
r) =>
TBMQueue d -> Connection -> Sem r ()
dequeueAndProcess TBMQueue d
queue))
dequeueLoop ::
∀ d t dt u r .
Ord t =>
TimeUnit u =>
Members [Monitor Restart, Reader QueueName] r =>
Members [Store UUID (Queued t d) !! DbError, Database !! DbError, Time t dt, Log, Resource, Embed IO, Final IO] r =>
u ->
(DbError -> Sem r Bool) ->
TBMQueue d ->
Sem r ()
dequeueLoop :: forall d t dt u (r :: EffectRow).
(Ord t, TimeUnit u, Members '[Monitor Restart, Reader QueueName] r,
Members
'[Store UUID (Queued t d) !! DbError, Database !! DbError,
Time t dt, Log, Resource, Embed IO, Final IO]
r) =>
u -> (DbError -> Sem r Bool) -> TBMQueue d -> Sem r ()
dequeueLoop u
errorDelay DbError -> Sem r Bool
errorHandler TBMQueue d
queue = do
QueueName Text
name <- forall i (r :: EffectRow). Member (Reader i) r => Sem r i
ask
let
spin :: Sem r ()
spin =
forall err (r :: EffectRow) a.
Sem (Stop err : r) a -> Sem r (Either err a)
runStop (forall d t dt (r :: EffectRow).
(Ord t, Members '[Monitor Restart, Reader QueueName, Final IO] r,
Members
'[Store UUID (Queued t d) !! DbError, Database !! DbError,
Stop DbError, Time t dt, Log, Embed IO]
r) =>
TBMQueue d -> Sem r ()
dequeue TBMQueue d
queue) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right () -> Sem r ()
spin
Left DbError
err -> Bool -> Sem r ()
result forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< DbError -> Sem r Bool
errorHandler DbError
err
result :: Bool -> Sem r ()
result = \case
Bool
True ->
Sem r ()
disconnect forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> forall t d u (r :: EffectRow).
(TimeUnit u, Member (Time t d) r) =>
u -> Sem r ()
Time.sleep u
errorDelay forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Sem r ()
spin
Bool
False -> do
forall (r :: EffectRow).
(HasCallStack, Member Log r) =>
Text -> Sem r ()
Log.warn [exon|Exiting dequeue loop for '##{name}' after error|]
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (forall a. STM a -> IO a
atomically (forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue d
queue))
disconnect :: Sem r ()
disconnect =
forall e (r :: EffectRow).
Members '[Database !! e, Reader QueueName, Log] r =>
Sem r ()
unlisten forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> forall err (eff :: (* -> *) -> * -> *) (r :: EffectRow).
Member (Resumable err eff) r =>
Sem (eff : r) () -> Sem r ()
resume_ forall (r :: EffectRow). Member Database r => Sem r ()
Database.release
Sem r ()
spin
startDequeueLoop ::
∀ d t dt u r .
Ord t =>
TimeUnit u =>
Members [RestartingMonitor, Reader QueueName] r =>
Members [Store UUID (Queued t d) !! DbError, Databases, Time t dt, Log, Resource, Embed IO, Final IO] r =>
u ->
(DbError -> Sem r Bool) ->
TBMQueue d ->
Sem r ()
startDequeueLoop :: forall d t dt u (r :: EffectRow).
(Ord t, TimeUnit u,
Members '[RestartingMonitor, Reader QueueName] r,
Members
'[Store UUID (Queued t d) !! DbError, Databases, Time t dt, Log,
Resource, Embed IO, Final IO]
r) =>
u -> (DbError -> Sem r Bool) -> TBMQueue d -> Sem r ()
startDequeueLoop u
errorDelay DbError -> Sem r Bool
errorHandler TBMQueue d
queue = do
QueueName Text
name <- forall i (r :: EffectRow). Member (Reader i) r => Sem r i
ask
forall (r :: EffectRow).
Member Databases r =>
Maybe ConnectionTag -> InterpreterFor (Database !! DbError) r
withDatabaseUnique (forall a. a -> Maybe a
Just (Text -> ConnectionTag
NamedTag [exon|dequeue-#{name}|])) do
forall (r :: EffectRow) a b.
Member Resource r =>
Sem r a -> Sem r b -> Sem r a
finally (forall (r :: EffectRow).
Member RestartingMonitor r =>
InterpreterFor (Monitor Restart) r
restart (forall d t dt u (r :: EffectRow).
(Ord t, TimeUnit u, Members '[Monitor Restart, Reader QueueName] r,
Members
'[Store UUID (Queued t d) !! DbError, Database !! DbError,
Time t dt, Log, Resource, Embed IO, Final IO]
r) =>
u -> (DbError -> Sem r Bool) -> TBMQueue d -> Sem r ()
dequeueLoop u
errorDelay (forall (index :: Nat) (inserted :: EffectRow) (head :: EffectRow)
(oldTail :: EffectRow) (tail :: EffectRow) (old :: EffectRow)
(full :: EffectRow) a.
(ListOfLength index head, WhenStuck index InsertAtUnprovidedIndex,
old ~ Append head oldTail, tail ~ Append inserted oldTail,
full ~ Append head tail,
InsertAtIndex index head tail oldTail full inserted) =>
Sem old a -> Sem full a
insertAt @0 forall b c a. (b -> c) -> (a -> b) -> a -> c
. DbError -> Sem r Bool
errorHandler) TBMQueue d
queue)) forall e (r :: EffectRow).
Members '[Database !! e, Reader QueueName, Log] r =>
Sem r ()
unlisten
interpretInputQueue ::
∀ d r .
Member (Embed IO) r =>
TBMQueue d ->
InterpreterFor (Input (Maybe d)) r
interpretInputQueue :: forall d (r :: EffectRow).
Member (Embed IO) r =>
TBMQueue d -> InterpreterFor (Input (Maybe d)) r
interpretInputQueue TBMQueue d
queue =
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
FirstOrder e "interpret" =>
(forall (rInitial :: EffectRow) x. e (Sem rInitial) x -> Sem r x)
-> Sem (e : r) a -> Sem r a
interpret \case
Input (Maybe d) (Sem rInitial) x
Input ->
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (forall a. STM a -> IO a
atomically (forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue d
queue))
dequeueThread ::
∀ d t dt u r .
Ord t =>
TimeUnit u =>
Members [RestartingMonitor, Reader QueueName, Resource] r =>
Members [Store UUID (Queued t d) !! DbError, Databases, Time t dt, Log, Async, Embed IO, Final IO] r =>
u ->
(DbError -> Sem r Bool) ->
Sem r (Concurrent.Async (Maybe ()), TBMQueue d)
dequeueThread :: forall d t dt u (r :: EffectRow).
(Ord t, TimeUnit u,
Members '[RestartingMonitor, Reader QueueName, Resource] r,
Members
'[Store UUID (Queued t d) !! DbError, Databases, Time t dt, Log,
Async, Embed IO, Final IO]
r) =>
u
-> (DbError -> Sem r Bool) -> Sem r (Async (Maybe ()), TBMQueue d)
dequeueThread u
errorDelay DbError -> Sem r Bool
errorHandler = do
TBMQueue d
queue <- forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
64)
Async (Maybe ())
handle <- forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
async (forall d t dt u (r :: EffectRow).
(Ord t, TimeUnit u,
Members '[RestartingMonitor, Reader QueueName] r,
Members
'[Store UUID (Queued t d) !! DbError, Databases, Time t dt, Log,
Resource, Embed IO, Final IO]
r) =>
u -> (DbError -> Sem r Bool) -> TBMQueue d -> Sem r ()
startDequeueLoop u
errorDelay DbError -> Sem r Bool
errorHandler TBMQueue d
queue)
pure (Async (Maybe ())
handle, TBMQueue d
queue)
interpretInputDbQueueListen ::
∀ (name :: Symbol) d t dt u r .
Ord t =>
TimeUnit u =>
KnownSymbol name =>
Members [RestartingMonitor, Final IO] r =>
Members [Store UUID (Queued t d) !! DbError, Databases, Time t dt, Log, Resource, Async, Embed IO] r =>
u ->
(DbError -> Sem r Bool) ->
InterpreterFor (Input (Maybe d)) r
interpretInputDbQueueListen :: forall (name :: Symbol) d t dt u (r :: EffectRow).
(Ord t, TimeUnit u, KnownSymbol name,
Members '[RestartingMonitor, Final IO] r,
Members
'[Store UUID (Queued t d) !! DbError, Databases, Time t dt, Log,
Resource, Async, Embed IO]
r) =>
u -> (DbError -> Sem r Bool) -> InterpreterFor (Input (Maybe d)) r
interpretInputDbQueueListen u
errorDelay DbError -> Sem r Bool
errorHandler Sem (Input (Maybe d) : r) a
sem =
forall i (r :: EffectRow) a. i -> Sem (Reader i : r) a -> Sem r a
runReader (Text -> QueueName
QueueName (forall (name :: Symbol). KnownSymbol name => Text
symbolText @name)) forall a b. (a -> b) -> a -> b
$
forall (r :: EffectRow) a c b.
Member Resource r =>
Sem r a -> (a -> Sem r c) -> (a -> Sem r b) -> Sem r b
bracket Sem (Reader QueueName : r) (Async (Maybe ()), TBMQueue d)
acquire forall {r :: EffectRow} {a} {b}.
Member Async r =>
(Async a, b) -> Sem r ()
release \ (Async (Maybe ())
_, TBMQueue d
queue) -> do
forall d (r :: EffectRow).
Member (Embed IO) r =>
TBMQueue d -> InterpreterFor (Input (Maybe d)) r
interpretInputQueue TBMQueue d
queue (forall (e2 :: (* -> *) -> * -> *) (e1 :: (* -> *) -> * -> *)
(r :: EffectRow) a.
Sem (e1 : r) a -> Sem (e1 : e2 : r) a
raiseUnder Sem (Input (Maybe d) : r) a
sem)
where
acquire :: Sem (Reader QueueName : r) (Async (Maybe ()), TBMQueue d)
acquire = forall d t dt u (r :: EffectRow).
(Ord t, TimeUnit u,
Members '[RestartingMonitor, Reader QueueName, Resource] r,
Members
'[Store UUID (Queued t d) !! DbError, Databases, Time t dt, Log,
Async, Embed IO, Final IO]
r) =>
u
-> (DbError -> Sem r Bool) -> Sem r (Async (Maybe ()), TBMQueue d)
dequeueThread u
errorDelay (forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Sem r a -> Sem (e : r) a
raise forall b c a. (b -> c) -> (a -> b) -> a -> c
. DbError -> Sem r Bool
errorHandler)
release :: (Async a, b) -> Sem r ()
release (Async a
handle, b
_) = forall (r :: EffectRow) a. Member Async r => Async a -> Sem r ()
cancel Async a
handle
interpretInputQueueDb ::
∀ qname u t dt d diff r .
TimeUnit u =>
TimeUnit diff =>
Torsor t diff =>
Queue qname t =>
Members [Store UUID (Queued t d) !! DbError, Databases] r =>
Members [Time t dt, Log, Resource, Async, Race, Embed IO, Final IO] r =>
u ->
ClockSkewConfig ->
(DbError -> Sem r Bool) ->
InterpreterFor (Input (Maybe d)) r
interpretInputQueueDb :: forall (qname :: Symbol) u t dt d diff (r :: EffectRow).
(TimeUnit u, TimeUnit diff, Torsor t diff, Queue qname t,
Members '[Store UUID (Queued t d) !! DbError, Databases] r,
Members
'[Time t dt, Log, Resource, Async, Race, Embed IO, Final IO] r) =>
u
-> ClockSkewConfig
-> (DbError -> Sem r Bool)
-> InterpreterFor (Input (Maybe d)) r
interpretInputQueueDb u
errorDelay ClockSkewConfig
csConfig DbError -> Sem r Bool
errorHandler =
forall a (r :: EffectRow).
Member (Embed IO) r =>
a -> InterpreterFor (AtomicState a) r
interpretAtomic forall a. Maybe a
Nothing forall b c a. (b -> c) -> (a -> b) -> a -> c
.
forall t d (r :: EffectRow).
Members '[Time t d, Resource, Async, Race, Final IO] r =>
MonitorCheck r -> InterpreterFor RestartingMonitor r
interpretMonitorRestart (forall t d diff (r :: EffectRow).
(Torsor t diff, TimeUnit diff,
Members '[AtomicState (Maybe t), Time t d, Embed IO] r) =>
ClockSkewConfig -> MonitorCheck r
monitorClockSkew ClockSkewConfig
csConfig) forall b c a. (b -> c) -> (a -> b) -> a -> c
.
forall (e2 :: (* -> *) -> * -> *) (e1 :: (* -> *) -> * -> *)
(r :: EffectRow) a.
Sem (e1 : r) a -> Sem (e1 : e2 : r) a
raiseUnder forall b c a. (b -> c) -> (a -> b) -> a -> c
.
forall (name :: Symbol) d t dt u (r :: EffectRow).
(Ord t, TimeUnit u, KnownSymbol name,
Members '[RestartingMonitor, Final IO] r,
Members
'[Store UUID (Queued t d) !! DbError, Databases, Time t dt, Log,
Resource, Async, Embed IO]
r) =>
u -> (DbError -> Sem r Bool) -> InterpreterFor (Input (Maybe d)) r
interpretInputDbQueueListen @qname u
errorDelay (forall (index :: Nat) (inserted :: EffectRow) (head :: EffectRow)
(oldTail :: EffectRow) (tail :: EffectRow) (old :: EffectRow)
(full :: EffectRow) a.
(ListOfLength index head, WhenStuck index InsertAtUnprovidedIndex,
old ~ Append head oldTail, tail ~ Append inserted oldTail,
full ~ Append head tail,
InsertAtIndex index head tail oldTail full inserted) =>
Sem old a -> Sem full a
insertAt @0 forall b c a. (b -> c) -> (a -> b) -> a -> c
. DbError -> Sem r Bool
errorHandler) forall b c a. (b -> c) -> (a -> b) -> a -> c
.
forall (e2 :: (* -> *) -> * -> *) (e1 :: (* -> *) -> * -> *)
(r :: EffectRow) a.
Sem (e1 : r) a -> Sem (e1 : e2 : r) a
raiseUnder