-- |Description: Internal
module Polysemy.Log.Conc where

import Control.Concurrent.STM.TBMQueue (TBMQueue, closeTBMQueue, newTBMQueueIO, readTBMQueue, writeTBMQueue)
import Polysemy (interceptH, runT, subsume)
import Polysemy.Async (Async, async)
import Polysemy.Internal.Tactics (liftT)
import Polysemy.Resource (Resource, bracket)

import qualified Polysemy.Log.Data.DataLog as DataLog
import Polysemy.Log.Data.DataLog (DataLog(DataLog, Local))

-- |Intercept 'DataLog' for concurrent processing.
-- This does not send any action to the ultimate interpreter but writes all log messages to the provided queue.
-- 'Local' has to be handled here, otherwise this will not be called for actions in higher-order thunks.
interceptDataLogConcWithLocal ::
   msg r a .
  Members [DataLog msg, Embed IO] r =>
  (msg -> msg) ->
  TBMQueue msg ->
  Sem r a ->
  Sem r a
interceptDataLogConcWithLocal :: (msg -> msg) -> TBMQueue msg -> Sem r a -> Sem r a
interceptDataLogConcWithLocal msg -> msg
context TBMQueue msg
queue =
  (forall x (rInitial :: EffectRow).
 DataLog msg (Sem rInitial) x
 -> Tactical (DataLog msg) (Sem rInitial) r x)
-> Sem r a -> Sem r a
forall (e :: Effect) (r :: EffectRow) a.
Member e r =>
(forall x (rInitial :: EffectRow).
 e (Sem rInitial) x -> Tactical e (Sem rInitial) r x)
-> Sem r a -> Sem r a
interceptH \case
    DataLog msg ->
      Sem r ()
-> Sem (WithTactics (DataLog msg) f (Sem rInitial) r) (f ())
forall (m :: * -> *) (f :: * -> *) (r :: EffectRow) (e :: Effect)
       a.
Functor f =>
Sem r a -> Sem (WithTactics e f m r) (f a)
liftT (STM () -> Sem r ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TBMQueue msg -> msg -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue msg
queue (msg -> msg
context msg
msg)))
    Local f ma ->
      Sem r (f x)
-> Sem (WithTactics (DataLog msg) f (Sem rInitial) r) (f x)
forall (e :: Effect) (r :: EffectRow) a. Sem r a -> Sem (e : r) a
raise (Sem r (f x)
 -> Sem (WithTactics (DataLog msg) f (Sem rInitial) r) (f x))
-> (Sem (DataLog msg : r) (f x) -> Sem r (f x))
-> Sem (DataLog msg : r) (f x)
-> Sem (WithTactics (DataLog msg) f (Sem rInitial) r) (f x)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (msg -> msg) -> TBMQueue msg -> Sem r (f x) -> Sem r (f x)
forall msg (r :: EffectRow) a.
Members '[DataLog msg, Embed IO] r =>
(msg -> msg) -> TBMQueue msg -> Sem r a -> Sem r a
interceptDataLogConcWithLocal (msg -> msg
f (msg -> msg) -> (msg -> msg) -> msg -> msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. msg -> msg
context) TBMQueue msg
queue (Sem r (f x) -> Sem r (f x))
-> (Sem (DataLog msg : r) (f x) -> Sem r (f x))
-> Sem (DataLog msg : r) (f x)
-> Sem r (f x)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem (DataLog msg : r) (f x) -> Sem r (f x)
forall (e :: Effect) (r :: EffectRow) a.
Member e r =>
Sem (e : r) a -> Sem r a
subsume (Sem (DataLog msg : r) (f x)
 -> Sem (WithTactics (DataLog msg) f (Sem rInitial) r) (f x))
-> Sem
     (WithTactics (DataLog msg) f (Sem rInitial) r)
     (Sem (DataLog msg : r) (f x))
-> Sem (WithTactics (DataLog msg) f (Sem rInitial) r) (f x)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Sem rInitial x
-> Sem
     (WithTactics (DataLog msg) f (Sem rInitial) r)
     (Sem (DataLog msg : r) (f x))
forall (m :: * -> *) a (e :: Effect) (f :: * -> *)
       (r :: EffectRow).
m a -> Sem (WithTactics e f m r) (Sem (e : r) (f a))
runT Sem rInitial x
ma
{-# INLINE interceptDataLogConcWithLocal #-}

-- |Intercept 'DataLog' for concurrent processing.
interceptDataLogConcWith ::
   msg r a .
  Members [DataLog msg, Embed IO] r =>
  TBMQueue msg ->
  Sem r a ->
  Sem r a
interceptDataLogConcWith :: TBMQueue msg -> Sem r a -> Sem r a
interceptDataLogConcWith =
  (msg -> msg) -> TBMQueue msg -> Sem r a -> Sem r a
forall msg (r :: EffectRow) a.
Members '[DataLog msg, Embed IO] r =>
(msg -> msg) -> TBMQueue msg -> Sem r a -> Sem r a
interceptDataLogConcWithLocal msg -> msg
forall a. a -> a
id
{-# INLINE interceptDataLogConcWith #-}

-- |Part of 'interceptDataLogConc'.
-- Loop as long as the proided queue is open and relay all dequeued messages to the ultimate interpreter, thereby
-- forcing the logging implementation to work in this thread.
loggerThread ::
   msg r .
  Members [DataLog msg, Embed IO] r =>
  TBMQueue msg ->
  Sem r ()
loggerThread :: TBMQueue msg -> Sem r ()
loggerThread TBMQueue msg
queue = do
  Sem r ()
spin
  where
    spin :: Sem r ()
spin =
      STM (Maybe msg) -> Sem r (Maybe msg)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TBMQueue msg -> STM (Maybe msg)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue msg
queue) Sem r (Maybe msg) -> (Maybe msg -> Sem r ()) -> Sem r ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Maybe msg
Nothing -> () -> Sem r ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        Just msg
msg -> do
          msg -> Sem r ()
forall a (r :: EffectRow).
MemberWithError (DataLog a) r =>
a -> Sem r ()
DataLog.dataLog @msg msg
msg
          Sem r ()
spin

-- |Part of 'interceptDataLogConc'.
-- Create a queue and start a thread that reads messages from it, calling the logging implementation.
acquireQueue ::
   msg r .
  Members [DataLog msg, Async, Embed IO] r =>
  Int ->
  Sem r (TBMQueue msg)
acquireQueue :: Int -> Sem r (TBMQueue msg)
acquireQueue Int
maxQueued = do
  TBMQueue msg
queue <- IO (TBMQueue msg) -> Sem r (TBMQueue msg)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (Int -> IO (TBMQueue msg)
forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
maxQueued)
  !Async (Maybe ())
_ <- Sem r () -> Sem r (Async (Maybe ()))
forall (r :: EffectRow) a.
MemberWithError Async r =>
Sem r a -> Sem r (Async (Maybe a))
async (TBMQueue msg -> Sem r ()
forall msg (r :: EffectRow).
Members '[DataLog msg, Embed IO] r =>
TBMQueue msg -> Sem r ()
loggerThread TBMQueue msg
queue)
  TBMQueue msg -> Sem r (TBMQueue msg)
forall (f :: * -> *) a. Applicative f => a -> f a
pure TBMQueue msg
queue

-- |Intercept 'DataLog' for concurrent processing.
-- Creates a queue and starts a worker thread.
-- All log messages received by the interceptor in 'interceptDataLogConcWithLocal' are written to the queue and sent to
-- the next 'DataLog' interpreter when the thread reads from the queue.
--
-- Since this is an interceptor, it will not remove the effect from the stack, but relay it to another interpreter:
--
-- @
-- interpretDataLogAtomic (interceptDataLogConc (DataLog.dataLog "message"))
-- @
interceptDataLogConc ::
   msg r a .
  Members [DataLog msg, Resource, Async, Embed IO] r =>
  -- |Queue size. When the queue fills up, the interceptor will block.
  Int ->
  Sem r a ->
  Sem r a
interceptDataLogConc :: Int -> Sem r a -> Sem r a
interceptDataLogConc Int
maxQueued Sem r a
sem = do
  Sem r (TBMQueue msg)
-> (TBMQueue msg -> Sem r ())
-> (TBMQueue msg -> Sem r a)
-> Sem r a
forall (r :: EffectRow) a c b.
MemberWithError Resource r =>
Sem r a -> (a -> Sem r c) -> (a -> Sem r b) -> Sem r b
bracket (Int -> Sem r (TBMQueue msg)
forall msg (r :: EffectRow).
Members '[DataLog msg, Async, Embed IO] r =>
Int -> Sem r (TBMQueue msg)
acquireQueue Int
maxQueued) (STM () -> Sem r ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> Sem r ())
-> (TBMQueue msg -> STM ()) -> TBMQueue msg -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TBMQueue msg -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue) \ TBMQueue msg
queue ->
    TBMQueue msg -> Sem r a -> Sem r a
forall msg (r :: EffectRow) a.
Members '[DataLog msg, Embed IO] r =>
TBMQueue msg -> Sem r a -> Sem r a
interceptDataLogConcWith @msg TBMQueue msg
queue Sem r a
sem
{-# INLINE interceptDataLogConc #-}