{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE LambdaCase #-}

module Network.Monitoring.Riemann.BatchClient where

import Control.Concurrent (MVar, forkIO, newEmptyMVar, putMVar, takeMVar)
import qualified Control.Concurrent.Chan.Unagi as Unagi
import Control.Concurrent.KazuraQueue
  ( Queue
  , lengthQueue
  , newQueue
  , readQueue
  , tryReadQueue
  , writeQueue
  )
import Control.Monad.IO.Class (MonadIO, liftIO)
import qualified Data.Sequence as Seq
import Data.Sequence (Seq, (|>))
import qualified Data.Sequence.Extra as Seq
import Network.Monitoring.Riemann.Client (Client, close, sendEvent)
import qualified Network.Monitoring.Riemann.Proto.Event as PE
import qualified Network.Monitoring.Riemann.TCP as TCP
import Network.Socket (HostName)
import System.IO (hPutStrLn, stderr)

newtype BatchClient =
  BatchClient (Unagi.InChan LogCommand)

newtype BatchClientNoBuffer =
  BatchClientNoBuffer (Queue LogCommand)

data LogCommand
  = Event PE.Event
  | Stop (MVar ())

{-|
    A new BatchClient

    The BatchClient is a 'Client' that will do the following

    * Batch events into a specified batch size
    * If events are produced more quickly than Riemann can cope with, they will be passed to the overflow function

    Batching events is important for throughput, see <https://aphyr.com/posts/269-reaching-200k-events-sec>

    It is important to deal with back pressure, if the buffer of events to be sent to Riemann fills up, they will be
    passed to the overflow function until the buffer has space again. This overflow function can be as simple as 'print'

    Current time and host name will be set if not provided.

    '''Note''': We never use IPv6 address resolved for given hostname.
-}
batchClient ::
     HostName -> TCP.Port -> Int -> Int -> (PE.Event -> IO ()) -> IO BatchClient
batchClient :: HostName
-> Port -> Port -> Port -> (Event -> IO ()) -> IO BatchClient
batchClient HostName
hostname Port
port Port
bufferSize Port
batchSize Event -> IO ()
overflow
  | Port
batchSize Port -> Port -> Bool
forall a. Ord a => a -> a -> Bool
<= Port
0 = HostName -> IO BatchClient
forall a. HasCallStack => HostName -> a
error HostName
"Batch Size must be positive"
  | Bool
otherwise = do
    TCPConnection
connection <- HostName -> Port -> IO TCPConnection
TCP.tcpConnection HostName
hostname Port
port
    (InChan LogCommand
inChan, OutChan LogCommand
outChan) <- IO (InChan LogCommand, OutChan LogCommand)
forall a. IO (InChan a, OutChan a)
Unagi.newChan
    Queue LogCommand
queue <- IO (Queue LogCommand)
forall a. IO (Queue a)
newQueue
    ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ OutChan LogCommand
-> Queue LogCommand -> Port -> (Event -> IO ()) -> IO ()
overflowConsumer OutChan LogCommand
outChan Queue LogCommand
queue Port
bufferSize Event -> IO ()
overflow
    ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Port -> Queue LogCommand -> TCPConnection -> IO ()
riemannConsumer Port
batchSize Queue LogCommand
queue TCPConnection
connection
    BatchClient -> IO BatchClient
forall (f :: * -> *) a. Applicative f => a -> f a
pure (BatchClient -> IO BatchClient) -> BatchClient -> IO BatchClient
forall a b. (a -> b) -> a -> b
$ InChan LogCommand -> BatchClient
BatchClient InChan LogCommand
inChan

bufferlessBatchClient :: HostName -> TCP.Port -> Int -> IO BatchClientNoBuffer
bufferlessBatchClient :: HostName -> Port -> Port -> IO BatchClientNoBuffer
bufferlessBatchClient HostName
hostname Port
port Port
batchSize
  | Port
batchSize Port -> Port -> Bool
forall a. Ord a => a -> a -> Bool
<= Port
0 = HostName -> IO BatchClientNoBuffer
forall a. HasCallStack => HostName -> a
error HostName
"Batch Size must be positive"
  | Bool
otherwise = do
    TCPConnection
connection <- HostName -> Port -> IO TCPConnection
TCP.tcpConnection HostName
hostname Port
port
    Queue LogCommand
queue <- IO (Queue LogCommand)
forall a. IO (Queue a)
newQueue
    ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Port -> Queue LogCommand -> TCPConnection -> IO ()
riemannConsumer Port
batchSize Queue LogCommand
queue TCPConnection
connection
    BatchClientNoBuffer -> IO BatchClientNoBuffer
forall (f :: * -> *) a. Applicative f => a -> f a
pure (BatchClientNoBuffer -> IO BatchClientNoBuffer)
-> BatchClientNoBuffer -> IO BatchClientNoBuffer
forall a b. (a -> b) -> a -> b
$ Queue LogCommand -> BatchClientNoBuffer
BatchClientNoBuffer Queue LogCommand
queue

overflowConsumer ::
     Unagi.OutChan LogCommand
  -> Queue LogCommand
  -> Int
  -> (PE.Event -> IO ())
  -> IO ()
overflowConsumer :: OutChan LogCommand
-> Queue LogCommand -> Port -> (Event -> IO ()) -> IO ()
overflowConsumer OutChan LogCommand
outChan Queue LogCommand
queue Port
bufferSize Event -> IO ()
f = IO ()
loop
  where
    loop :: IO ()
loop = do
      LogCommand
cmd <- OutChan LogCommand -> IO LogCommand
forall a. OutChan a -> IO a
Unagi.readChan OutChan LogCommand
outChan
      case LogCommand
cmd of
        Event Event
event -> do
          Port
qSize <- Queue LogCommand -> IO Port
forall a. Queue a -> IO Port
lengthQueue Queue LogCommand
queue
          if Port
qSize Port -> Port -> Bool
forall a. Ord a => a -> a -> Bool
>= Port
bufferSize
            then do
              Event -> IO ()
f Event
event
              IO ()
loop
            else do
              Queue LogCommand -> LogCommand -> IO ()
forall a. Queue a -> a -> IO ()
writeQueue Queue LogCommand
queue LogCommand
cmd
              IO ()
loop
        Stop MVar ()
_ -> do
          Handle -> HostName -> IO ()
hPutStrLn Handle
stderr HostName
"stopping log consumer"
          Queue LogCommand -> LogCommand -> IO ()
forall a. Queue a -> a -> IO ()
writeQueue Queue LogCommand
queue LogCommand
cmd

drainAll :: Queue a -> Int -> IO (Seq a)
drainAll :: Queue a -> Port -> IO (Seq a)
drainAll Queue a
queue Port
limit = do
  a
msg <- Queue a -> IO a
forall a. Queue a -> IO a
readQueue Queue a
queue
  Seq a -> Port -> IO (Seq a)
forall t. (Eq t, Num t) => Seq a -> t -> IO (Seq a)
loop (a -> Seq a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
msg) (Port
limit Port -> Port -> Port
forall a. Num a => a -> a -> a
- Port
1)
  where
    loop :: Seq a -> t -> IO (Seq a)
loop Seq a
msgs t
0 = Seq a -> IO (Seq a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Seq a
msgs
    loop Seq a
msgs t
n =
      Queue a -> IO (Maybe a)
forall a. Queue a -> IO (Maybe a)
tryReadQueue Queue a
queue IO (Maybe a) -> (Maybe a -> IO (Seq a)) -> IO (Seq a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Maybe a
Nothing -> Seq a -> IO (Seq a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Seq a
msgs
        Just a
msg -> Seq a -> t -> IO (Seq a)
loop (Seq a
msgs Seq a -> a -> Seq a
forall a. Seq a -> a -> Seq a
|> a
msg) (t
n t -> t -> t
forall a. Num a => a -> a -> a
- t
1)

riemannConsumer :: Int -> Queue LogCommand -> TCP.TCPConnection -> IO ()
riemannConsumer :: Port -> Queue LogCommand -> TCPConnection -> IO ()
riemannConsumer Port
batchSize Queue LogCommand
queue TCPConnection
connection = IO ()
loop
  where
    loop :: IO ()
loop = do
      Seq LogCommand
cmds <- Queue LogCommand -> Port -> IO (Seq LogCommand)
forall a. Queue a -> Port -> IO (Seq a)
drainAll Queue LogCommand
queue Port
batchSize
      let (Seq Event
events, Seq (MVar ())
stops) =
            (LogCommand -> Either Event (MVar ()))
-> Seq LogCommand -> (Seq Event, Seq (MVar ()))
forall a b c. (a -> Either b c) -> Seq a -> (Seq b, Seq c)
Seq.separate
              (\case
                 Event Event
e -> Event -> Either Event (MVar ())
forall a b. a -> Either a b
Left Event
e
                 Stop MVar ()
s -> MVar () -> Either Event (MVar ())
forall a b. b -> Either a b
Right MVar ()
s)
              Seq LogCommand
cmds
      TCPConnection -> Seq Event -> IO ()
TCP.sendEvents TCPConnection
connection Seq Event
events
      if Seq (MVar ()) -> Bool
forall a. Seq a -> Bool
Seq.null Seq (MVar ())
stops
        then IO ()
loop
        else let s :: MVar ()
s = Seq (MVar ()) -> Port -> MVar ()
forall a. Seq a -> Port -> a
Seq.index Seq (MVar ())
stops Port
0
              in do Handle -> HostName -> IO ()
hPutStrLn Handle
stderr HostName
"stopping riemann consumer"
                    MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
s ()

instance MonadIO m => Client m BatchClient where
  sendEvent :: BatchClient -> Event -> m ()
sendEvent (BatchClient InChan LogCommand
inChan) Event
event =
    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (LogCommand -> IO ()) -> LogCommand -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. InChan LogCommand -> LogCommand -> IO ()
forall a. InChan a -> a -> IO ()
Unagi.writeChan InChan LogCommand
inChan (LogCommand -> m ()) -> LogCommand -> m ()
forall a b. (a -> b) -> a -> b
$ Event -> LogCommand
Event Event
event
  close :: BatchClient -> m ()
close (BatchClient InChan LogCommand
inChan) =
    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      MVar ()
s <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
      InChan LogCommand -> LogCommand -> IO ()
forall a. InChan a -> a -> IO ()
Unagi.writeChan InChan LogCommand
inChan (MVar () -> LogCommand
Stop MVar ()
s)
      MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
s

instance MonadIO m => Client m BatchClientNoBuffer where
  sendEvent :: BatchClientNoBuffer -> Event -> m ()
sendEvent (BatchClientNoBuffer Queue LogCommand
queue) Event
event =
    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (LogCommand -> IO ()) -> LogCommand -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Queue LogCommand -> LogCommand -> IO ()
forall a. Queue a -> a -> IO ()
writeQueue Queue LogCommand
queue (LogCommand -> m ()) -> LogCommand -> m ()
forall a b. (a -> b) -> a -> b
$ Event -> LogCommand
Event Event
event
  close :: BatchClientNoBuffer -> m ()
close (BatchClientNoBuffer Queue LogCommand
queue) =
    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      MVar ()
s <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
      Queue LogCommand -> LogCommand -> IO ()
forall a. Queue a -> a -> IO ()
writeQueue Queue LogCommand
queue (MVar () -> LogCommand
Stop MVar ()
s)
      MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
s