module Control.Eff.LogWriter.Async
( withAsyncLogWriter
, withAsyncLogging
)
where
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.DeepSeq
import Control.Eff as Eff
import Control.Eff.Log
import Control.Eff.LogWriter.Rich
import Control.Exception ( evaluate )
import Control.Lens
import Control.Monad ( unless, when )
import Control.Monad.Trans.Control ( MonadBaseControl
, liftBaseOp
)
import Data.Foldable ( traverse_ )
import Data.Kind ( )
import Data.Text as T
withAsyncLogging
:: (Lifted IO e, MonadBaseControl IO (Eff e), Integral len)
=> LogWriter
-> len
-> Text
-> Facility
-> LogPredicate
-> Eff (Logs : LogWriterReader : e) a
-> Eff e a
withAsyncLogging lw queueLength a f p e = liftBaseOp
(withAsyncLogChannel queueLength (runLogWriter lw . force))
(\lc -> withRichLogging (makeLogChannelWriter lc) a f p e)
withAsyncLogWriter
:: (IoLogging e, MonadBaseControl IO (Eff e), Integral len)
=> len
-> Eff e a
-> Eff e a
withAsyncLogWriter queueLength e = do
lw <- askLogWriter
liftBaseOp (withAsyncLogChannel queueLength (runLogWriter lw . force))
(\lc -> setLogWriter (makeLogChannelWriter lc) e)
withAsyncLogChannel
:: forall a len
. (Integral len)
=> len
-> (LogMessage -> IO ())
-> (LogChannel -> IO a)
-> IO a
withAsyncLogChannel queueLen ioWriter action = do
msgQ <- newTBQueueIO (fromIntegral queueLen)
withAsync (logLoop msgQ) (action . ConcurrentLogChannel msgQ)
where
logLoop tq = do
ms <- atomically $ do
isEmpty <- isEmptyTBQueue tq
when isEmpty retry
flushTBQueue tq
traverse_ ioWriter ms
logLoop tq
makeLogChannelWriter :: LogChannel -> LogWriter
makeLogChannelWriter lc = MkLogWriter logChannelPutIO
where
logChannelPutIO (force -> me) = do
!m <- evaluate me
isFull <- atomically (
if m^.lmSeverity <= warningSeverity then do
writeTBQueue logQ m
return False
else do
isFull <- isFullTBQueue logQ
unless isFull (writeTBQueue logQ m)
return isFull
)
when isFull $
threadDelay 1_000
logQ = fromLogChannel lc
data LogChannel = ConcurrentLogChannel
{ fromLogChannel :: TBQueue LogMessage
, _logChannelThread :: Async ()
}