--------------------------------------------------------------------------------
-- Logstash backend for monad-logger                                          --
--------------------------------------------------------------------------------
-- This source code is licensed under the MIT license found in the LICENSE    --
-- file in the root directory of this source tree.                            --
--------------------------------------------------------------------------------

-- | This module implements `runLogstashLoggingT` which can be
-- used to write log messages that arise in a `LoggingT` computation to a
-- given `LogstashContext`. The following example demonstrates how to use the 
-- `runLogstashLoggingT` function with a TCP connection to Logstash, the
-- default retry policy from `Control.Retry`, a 1s timeout for each attempt,
-- and the @json_lines@ codec:
-- 
-- > main :: IO ()
-- > main = do 
-- >    let ctx = logstashTcp def
-- >    runLogstashLoggingT ctx retryPolicyDefault 1000000 (const stashJsonLine) $ 
-- >         logInfoN "Hello World"
--
-- Assuming a suitable Logstash server that can receive this message,
-- something like the following JSON document should be indexed (see the 
-- documentation for `Control.Monad.Logger` for information about how to include
-- more information in log messages):
--
-- > { 
-- >    "@version":"1",
-- >    "message":"Hello World",
-- >    "log.origin.file.line":0,
-- >    "log.origin.file.module":"<unknown>",
-- >    "log.origin.file.package":"<unknown>",
-- >    "log.origin.file.start.column":0,
-- >    "log.origin.file.start.line":0,
-- >    "log.origin.file.end.column":0,
-- >    "log.origin.file.end.line":0,
-- >    "log.origin.file.name":"<unknown>",
-- >    "log.logger":"",
-- >    "log.level":"info"
-- > }
--
-- If an error or a timeout occurs while writing to the Logstash connection,
-- the retry policy determines whether and when sending the message is
-- attempted again. If all attempts fail, the most recent exception is thrown
-- to the caller.
module Control.Monad.Logger.Logstash (
    runLogstashLoggingT,
    stashJsonLine,
    jsonLogLine,

    withLogstashLoggingT,
    runTBMQueueLoggingT,
    unTBMQueueLoggingT,

    -- * Re-exports
    LogstashContext(..)
) where 

--------------------------------------------------------------------------------

import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
import Control.Exception (Handler)
import Control.Monad
import Control.Monad.Logger
import Control.Monad.Trans.Reader
import Control.Retry

import Data.Aeson
import Data.Maybe
import Data.Text (Text)
import Data.Text.Encoding (decodeUtf8)

import UnliftIO (MonadIO(..), MonadUnliftIO)

import Logstash hiding (stashJsonLine)
import qualified Logstash as L (stashJsonLine)

--------------------------------------------------------------------------------

-- | `runLogstashLoggingT` @context retryPolicy time codec logger@ runs a 
-- `LoggingT` computation which writes all log entries to the Logstash 
-- @context@ using the given @codec@. The @retryPolicy@ determines whether 
-- and how the handler should deal with failures that arise. Each attempt
-- that is made by the @retryPolicy@ will have a timeout of @time@
-- microseconds applied to it. 
runLogstashLoggingT 
    :: LogstashContext ctx 
    => ctx 
    -> RetryPolicyM IO
    -> Integer
    -> ( RetryStatus -> 
         (Loc, LogSource, LogLevel, LogStr) -> 
         ReaderT LogstashConnection IO ()
       )
    -> LoggingT m a 
    -> m a
runLogstashLoggingT :: ctx
-> RetryPolicyM IO
-> Integer
-> (RetryStatus
    -> (Loc, LogSource, LogLevel, LogStr)
    -> ReaderT LogstashConnection IO ())
-> LoggingT m a
-> m a
runLogstashLoggingT ctx
ctx RetryPolicyM IO
policy Integer
time RetryStatus
-> (Loc, LogSource, LogLevel, LogStr)
-> ReaderT LogstashConnection IO ()
codec LoggingT m a
log = LoggingT m a
-> (Loc -> LogSource -> LogLevel -> LogStr -> IO ()) -> m a
forall (m :: * -> *) a.
LoggingT m a
-> (Loc -> LogSource -> LogLevel -> LogStr -> IO ()) -> m a
runLoggingT LoggingT m a
log ((Loc -> LogSource -> LogLevel -> LogStr -> IO ()) -> m a)
-> (Loc -> LogSource -> LogLevel -> LogStr -> IO ()) -> m a
forall a b. (a -> b) -> a -> b
$ 
    \Loc
logLoc LogSource
logSource LogLevel
logLevel LogStr
logStr -> ctx
-> RetryPolicyM IO
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection IO ())
-> IO ()
forall ctx (m :: * -> *) a.
(LogstashContext ctx, MonadMask m, MonadUnliftIO m) =>
ctx
-> RetryPolicyM m
-> Integer
-> (RetryStatus -> ReaderT LogstashConnection m a)
-> m a
runLogstash ctx
ctx RetryPolicyM IO
policy Integer
time ((RetryStatus -> ReaderT LogstashConnection IO ()) -> IO ())
-> (RetryStatus -> ReaderT LogstashConnection IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ 
    \RetryStatus
s -> RetryStatus
-> (Loc, LogSource, LogLevel, LogStr)
-> ReaderT LogstashConnection IO ()
codec RetryStatus
s (Loc
logLoc, LogSource
logSource, LogLevel
logLevel, LogStr
logStr)

-- | `withLogstashLoggingT` @cfg codec exceptionHandlers logger@ is like
-- `withLogstashQueue` except for `LoggingT` computations so that log messages
-- are automatically added to the queue.
withLogstashLoggingT
    :: (LogstashContext ctx, MonadUnliftIO m)
    => LogstashQueueCfg ctx
    -> ( RetryStatus -> 
         (Loc, LogSource, LogLevel, LogStr) ->
         ReaderT LogstashConnection IO ()
       )
    -> [(Loc, LogSource, LogLevel, LogStr) -> Handler ()]
    -> LoggingT m a
    -> m a
withLogstashLoggingT :: LogstashQueueCfg ctx
-> (RetryStatus
    -> (Loc, LogSource, LogLevel, LogStr)
    -> ReaderT LogstashConnection IO ())
-> [(Loc, LogSource, LogLevel, LogStr) -> Handler ()]
-> LoggingT m a
-> m a
withLogstashLoggingT LogstashQueueCfg ctx
cfg RetryStatus
-> (Loc, LogSource, LogLevel, LogStr)
-> ReaderT LogstashConnection IO ()
dispatch [(Loc, LogSource, LogLevel, LogStr) -> Handler ()]
hs LoggingT m a
log = LogstashQueueCfg ctx
-> (RetryStatus
    -> (Loc, LogSource, LogLevel, LogStr)
    -> ReaderT LogstashConnection IO ())
-> [(Loc, LogSource, LogLevel, LogStr) -> Handler ()]
-> (TBMQueue (Loc, LogSource, LogLevel, LogStr) -> m a)
-> m a
forall ctx (m :: * -> *) item a.
(LogstashContext ctx, MonadUnliftIO m) =>
LogstashQueueCfg ctx
-> (RetryStatus -> item -> ReaderT LogstashConnection IO ())
-> [item -> Handler ()]
-> (TBMQueue item -> m a)
-> m a
withLogstashQueue LogstashQueueCfg ctx
cfg RetryStatus
-> (Loc, LogSource, LogLevel, LogStr)
-> ReaderT LogstashConnection IO ()
dispatch [(Loc, LogSource, LogLevel, LogStr) -> Handler ()]
hs ((TBMQueue (Loc, LogSource, LogLevel, LogStr) -> m a) -> m a)
-> (TBMQueue (Loc, LogSource, LogLevel, LogStr) -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ 
    \TBMQueue (Loc, LogSource, LogLevel, LogStr)
queue -> TBMQueue (Loc, LogSource, LogLevel, LogStr) -> LoggingT m a -> m a
forall (m :: * -> *) a.
MonadUnliftIO m =>
TBMQueue (Loc, LogSource, LogLevel, LogStr) -> LoggingT m a -> m a
runTBMQueueLoggingT TBMQueue (Loc, LogSource, LogLevel, LogStr)
queue LoggingT m a
log

-- | `runTBMQueueLoggingT` @queue logger@ runs @logger@ so that log messages
-- are automatically added to @queue@. This can be used if the same queue and 
-- consumer should be shared among multiple producer threads. The queue should
-- be initialised by `withLogstashQueue`.
runTBMQueueLoggingT 
    :: MonadUnliftIO m 
    => TBMQueue (Loc, LogSource, LogLevel, LogStr)
    -> LoggingT m a
    -> m a
runTBMQueueLoggingT :: TBMQueue (Loc, LogSource, LogLevel, LogStr) -> LoggingT m a -> m a
runTBMQueueLoggingT TBMQueue (Loc, LogSource, LogLevel, LogStr)
queue LoggingT m a
log = LoggingT m a
-> (Loc -> LogSource -> LogLevel -> LogStr -> IO ()) -> m a
forall (m :: * -> *) a.
LoggingT m a
-> (Loc -> LogSource -> LogLevel -> LogStr -> IO ()) -> m a
runLoggingT LoggingT m a
log ((Loc -> LogSource -> LogLevel -> LogStr -> IO ()) -> m a)
-> (Loc -> LogSource -> LogLevel -> LogStr -> IO ()) -> m a
forall a b. (a -> b) -> a -> b
$
    \Loc
logLoc LogSource
logSource LogLevel
logLevel LogStr
logStr -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ 
        TBMQueue (Loc, LogSource, LogLevel, LogStr)
-> (Loc, LogSource, LogLevel, LogStr) -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue (Loc, LogSource, LogLevel, LogStr)
queue (Loc
logLoc, LogSource
logSource, LogLevel
logLevel, LogStr
logStr)

-- | `unTBMQueueLoggingT` @queue@ is like `unChanLoggingT` but for a 
-- `TBMQueue`. Since a `TBMQueue` can be closed, this function does not run
-- forever like `unChanLoggingT` and will return when @queue@ is closed.
unTBMQueueLoggingT 
    :: (MonadIO m, MonadLogger m)
    => TBMQueue (Loc, LogSource, LogLevel, LogStr)
    -> m () 
unTBMQueueLoggingT :: TBMQueue (Loc, LogSource, LogLevel, LogStr) -> m ()
unTBMQueueLoggingT TBMQueue (Loc, LogSource, LogLevel, LogStr)
queue = do
    Maybe (Loc, LogSource, LogLevel, LogStr)
mLine <- IO (Maybe (Loc, LogSource, LogLevel, LogStr))
-> m (Maybe (Loc, LogSource, LogLevel, LogStr))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Loc, LogSource, LogLevel, LogStr))
 -> m (Maybe (Loc, LogSource, LogLevel, LogStr)))
-> IO (Maybe (Loc, LogSource, LogLevel, LogStr))
-> m (Maybe (Loc, LogSource, LogLevel, LogStr))
forall a b. (a -> b) -> a -> b
$ STM (Maybe (Loc, LogSource, LogLevel, LogStr))
-> IO (Maybe (Loc, LogSource, LogLevel, LogStr))
forall a. STM a -> IO a
atomically (STM (Maybe (Loc, LogSource, LogLevel, LogStr))
 -> IO (Maybe (Loc, LogSource, LogLevel, LogStr)))
-> STM (Maybe (Loc, LogSource, LogLevel, LogStr))
-> IO (Maybe (Loc, LogSource, LogLevel, LogStr))
forall a b. (a -> b) -> a -> b
$ TBMQueue (Loc, LogSource, LogLevel, LogStr)
-> STM (Maybe (Loc, LogSource, LogLevel, LogStr))
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue (Loc, LogSource, LogLevel, LogStr)
queue

    case Maybe (Loc, LogSource, LogLevel, LogStr)
mLine of 
        Maybe (Loc, LogSource, LogLevel, LogStr)
Nothing -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        Just (Loc
loc,LogSource
src,LogLevel
lvl,LogStr
msg) -> do 
            Loc -> LogSource -> LogLevel -> LogStr -> m ()
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> LogSource -> LogLevel -> msg -> m ()
monadLoggerLog Loc
loc LogSource
src LogLevel
lvl LogStr
msg
            TBMQueue (Loc, LogSource, LogLevel, LogStr) -> m ()
forall (m :: * -> *).
(MonadIO m, MonadLogger m) =>
TBMQueue (Loc, LogSource, LogLevel, LogStr) -> m ()
unTBMQueueLoggingT TBMQueue (Loc, LogSource, LogLevel, LogStr)
queue

--------------------------------------------------------------------------------

-- | `stashJsonLine` @entry@ serialises @entry@ as JSON using reasonable
-- defaults for Elasticsearch based on 
-- https://www.elastic.co/guide/en/ecs/current/ecs-field-reference.html
-- and sends the result to Logstash using the @json_lines@ codec.
stashJsonLine :: (Loc, LogSource, LogLevel, LogStr) 
              -> ReaderT LogstashConnection IO ()
stashJsonLine :: (Loc, LogSource, LogLevel, LogStr)
-> ReaderT LogstashConnection IO ()
stashJsonLine = Value -> ReaderT LogstashConnection IO ()
forall (m :: * -> *) a.
(MonadIO m, ToJSON a) =>
a -> ReaderT LogstashConnection m ()
L.stashJsonLine (Value -> ReaderT LogstashConnection IO ())
-> ((Loc, LogSource, LogLevel, LogStr) -> Value)
-> (Loc, LogSource, LogLevel, LogStr)
-> ReaderT LogstashConnection IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Loc, LogSource, LogLevel, LogStr) -> Value
jsonLogLine 
    
-- | `jsonLogLine` @entry@ serialises @entry@ as JSON using reasonable
-- defaults for Elasticsearch based on 
-- https://www.elastic.co/guide/en/ecs/current/ecs-field-reference.html
jsonLogLine :: (Loc, LogSource, LogLevel, LogStr) -> Value
jsonLogLine :: (Loc, LogSource, LogLevel, LogStr) -> Value
jsonLogLine (Loc
loc, LogSource
src, LogLevel
lvl, LogStr
msg) = [Pair] -> Value
object 
    [ Key
"message" Key -> LogSource -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= ByteString -> LogSource
decodeUtf8 (LogStr -> ByteString
fromLogStr LogStr
msg)
    , Key
"log" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= [Pair] -> Value
object 
        [ Key
"logger" Key -> LogSource -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= LogSource
src
        , Key
"level" Key -> LogSource -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= LogLevel -> LogSource
jsonLogLevel LogLevel
lvl
        , Key
"origin" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= [Pair] -> Value
object 
            [ Key
"file" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= [Pair] -> Value
object 
                [ Key
"name" Key -> String -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Loc -> String
loc_filename Loc
loc 
                , Key
"line" Key -> Int -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= (Int, Int) -> Int
forall a b. (a, b) -> a
fst (Loc -> (Int, Int)
loc_start Loc
loc)
                -- the following fields are not part of the ECS
                , Key
"package" Key -> String -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Loc -> String
loc_package Loc
loc
                , Key
"module" Key -> String -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Loc -> String
loc_module Loc
loc
                , Key
"start" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= (Int, Int) -> Value
jsonCharPos (Loc -> (Int, Int)
loc_start Loc
loc)
                , Key
"end" Key -> Value -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= (Int, Int) -> Value
jsonCharPos (Loc -> (Int, Int)
loc_end Loc
loc)
                ] 
            ]
        ]
    ]
    where jsonLogLevel :: LogLevel -> Text
          jsonLogLevel :: LogLevel -> LogSource
jsonLogLevel LogLevel
LevelDebug = LogSource
"debug"
          jsonLogLevel LogLevel
LevelInfo = LogSource
"info"
          jsonLogLevel LogLevel
LevelWarn = LogSource
"warn"
          jsonLogLevel LogLevel
LevelError = LogSource
"error"
          jsonLogLevel (LevelOther LogSource
x) = LogSource
x

          jsonCharPos :: (Int, Int) -> Value
          jsonCharPos :: (Int, Int) -> Value
jsonCharPos (Int
line, Int
column) =
              [Pair] -> Value
object [ Key
"line" Key -> Int -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Int
line, Key
"column" Key -> Int -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Int
column ]

--------------------------------------------------------------------------------