{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE LambdaCase #-}
module Network.Monitoring.Riemann.BatchClient where
import Control.Concurrent (MVar, forkIO, newEmptyMVar, putMVar, takeMVar)
import qualified Control.Concurrent.Chan.Unagi as Unagi
import Control.Concurrent.KazuraQueue
( Queue
, lengthQueue
, newQueue
, readQueue
, tryReadQueue
, writeQueue
)
import Control.Monad.IO.Class (MonadIO, liftIO)
import qualified Data.Sequence as Seq
import Data.Sequence (Seq, (|>))
import qualified Data.Sequence.Extra as Seq
import Network.Monitoring.Riemann.Client (Client, close, sendEvent)
import qualified Network.Monitoring.Riemann.Proto.Event as PE
import qualified Network.Monitoring.Riemann.TCP as TCP
import Network.Socket (HostName)
import System.IO (hPutStrLn, stderr)
newtype BatchClient =
BatchClient (Unagi.InChan LogCommand)
newtype BatchClientNoBuffer =
BatchClientNoBuffer (Queue LogCommand)
data LogCommand
= Event PE.Event
| Stop (MVar ())
batchClient ::
HostName -> TCP.Port -> Int -> Int -> (PE.Event -> IO ()) -> IO BatchClient
batchClient :: HostName
-> Port -> Port -> Port -> (Event -> IO ()) -> IO BatchClient
batchClient HostName
hostname Port
port Port
bufferSize Port
batchSize Event -> IO ()
overflow
| Port
batchSize Port -> Port -> Bool
forall a. Ord a => a -> a -> Bool
<= Port
0 = HostName -> IO BatchClient
forall a. HasCallStack => HostName -> a
error HostName
"Batch Size must be positive"
| Bool
otherwise = do
TCPConnection
connection <- HostName -> Port -> IO TCPConnection
TCP.tcpConnection HostName
hostname Port
port
(InChan LogCommand
inChan, OutChan LogCommand
outChan) <- IO (InChan LogCommand, OutChan LogCommand)
forall a. IO (InChan a, OutChan a)
Unagi.newChan
Queue LogCommand
queue <- IO (Queue LogCommand)
forall a. IO (Queue a)
newQueue
ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ OutChan LogCommand
-> Queue LogCommand -> Port -> (Event -> IO ()) -> IO ()
overflowConsumer OutChan LogCommand
outChan Queue LogCommand
queue Port
bufferSize Event -> IO ()
overflow
ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Port -> Queue LogCommand -> TCPConnection -> IO ()
riemannConsumer Port
batchSize Queue LogCommand
queue TCPConnection
connection
BatchClient -> IO BatchClient
forall (f :: * -> *) a. Applicative f => a -> f a
pure (BatchClient -> IO BatchClient) -> BatchClient -> IO BatchClient
forall a b. (a -> b) -> a -> b
$ InChan LogCommand -> BatchClient
BatchClient InChan LogCommand
inChan
bufferlessBatchClient :: HostName -> TCP.Port -> Int -> IO BatchClientNoBuffer
bufferlessBatchClient :: HostName -> Port -> Port -> IO BatchClientNoBuffer
bufferlessBatchClient HostName
hostname Port
port Port
batchSize
| Port
batchSize Port -> Port -> Bool
forall a. Ord a => a -> a -> Bool
<= Port
0 = HostName -> IO BatchClientNoBuffer
forall a. HasCallStack => HostName -> a
error HostName
"Batch Size must be positive"
| Bool
otherwise = do
TCPConnection
connection <- HostName -> Port -> IO TCPConnection
TCP.tcpConnection HostName
hostname Port
port
Queue LogCommand
queue <- IO (Queue LogCommand)
forall a. IO (Queue a)
newQueue
ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Port -> Queue LogCommand -> TCPConnection -> IO ()
riemannConsumer Port
batchSize Queue LogCommand
queue TCPConnection
connection
BatchClientNoBuffer -> IO BatchClientNoBuffer
forall (f :: * -> *) a. Applicative f => a -> f a
pure (BatchClientNoBuffer -> IO BatchClientNoBuffer)
-> BatchClientNoBuffer -> IO BatchClientNoBuffer
forall a b. (a -> b) -> a -> b
$ Queue LogCommand -> BatchClientNoBuffer
BatchClientNoBuffer Queue LogCommand
queue
overflowConsumer ::
Unagi.OutChan LogCommand
-> Queue LogCommand
-> Int
-> (PE.Event -> IO ())
-> IO ()
overflowConsumer :: OutChan LogCommand
-> Queue LogCommand -> Port -> (Event -> IO ()) -> IO ()
overflowConsumer OutChan LogCommand
outChan Queue LogCommand
queue Port
bufferSize Event -> IO ()
f = IO ()
loop
where
loop :: IO ()
loop = do
LogCommand
cmd <- OutChan LogCommand -> IO LogCommand
forall a. OutChan a -> IO a
Unagi.readChan OutChan LogCommand
outChan
case LogCommand
cmd of
Event Event
event -> do
Port
qSize <- Queue LogCommand -> IO Port
forall a. Queue a -> IO Port
lengthQueue Queue LogCommand
queue
if Port
qSize Port -> Port -> Bool
forall a. Ord a => a -> a -> Bool
>= Port
bufferSize
then do
Event -> IO ()
f Event
event
IO ()
loop
else do
Queue LogCommand -> LogCommand -> IO ()
forall a. Queue a -> a -> IO ()
writeQueue Queue LogCommand
queue LogCommand
cmd
IO ()
loop
Stop MVar ()
_ -> do
Handle -> HostName -> IO ()
hPutStrLn Handle
stderr HostName
"stopping log consumer"
Queue LogCommand -> LogCommand -> IO ()
forall a. Queue a -> a -> IO ()
writeQueue Queue LogCommand
queue LogCommand
cmd
drainAll :: Queue a -> Int -> IO (Seq a)
drainAll :: Queue a -> Port -> IO (Seq a)
drainAll Queue a
queue Port
limit = do
a
msg <- Queue a -> IO a
forall a. Queue a -> IO a
readQueue Queue a
queue
Seq a -> Port -> IO (Seq a)
forall t. (Eq t, Num t) => Seq a -> t -> IO (Seq a)
loop (a -> Seq a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
msg) (Port
limit Port -> Port -> Port
forall a. Num a => a -> a -> a
- Port
1)
where
loop :: Seq a -> t -> IO (Seq a)
loop Seq a
msgs t
0 = Seq a -> IO (Seq a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Seq a
msgs
loop Seq a
msgs t
n =
Queue a -> IO (Maybe a)
forall a. Queue a -> IO (Maybe a)
tryReadQueue Queue a
queue IO (Maybe a) -> (Maybe a -> IO (Seq a)) -> IO (Seq a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe a
Nothing -> Seq a -> IO (Seq a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Seq a
msgs
Just a
msg -> Seq a -> t -> IO (Seq a)
loop (Seq a
msgs Seq a -> a -> Seq a
forall a. Seq a -> a -> Seq a
|> a
msg) (t
n t -> t -> t
forall a. Num a => a -> a -> a
- t
1)
riemannConsumer :: Int -> Queue LogCommand -> TCP.TCPConnection -> IO ()
riemannConsumer :: Port -> Queue LogCommand -> TCPConnection -> IO ()
riemannConsumer Port
batchSize Queue LogCommand
queue TCPConnection
connection = IO ()
loop
where
loop :: IO ()
loop = do
Seq LogCommand
cmds <- Queue LogCommand -> Port -> IO (Seq LogCommand)
forall a. Queue a -> Port -> IO (Seq a)
drainAll Queue LogCommand
queue Port
batchSize
let (Seq Event
events, Seq (MVar ())
stops) =
(LogCommand -> Either Event (MVar ()))
-> Seq LogCommand -> (Seq Event, Seq (MVar ()))
forall a b c. (a -> Either b c) -> Seq a -> (Seq b, Seq c)
Seq.separate
(\case
Event Event
e -> Event -> Either Event (MVar ())
forall a b. a -> Either a b
Left Event
e
Stop MVar ()
s -> MVar () -> Either Event (MVar ())
forall a b. b -> Either a b
Right MVar ()
s)
Seq LogCommand
cmds
TCPConnection -> Seq Event -> IO ()
TCP.sendEvents TCPConnection
connection Seq Event
events
if Seq (MVar ()) -> Bool
forall a. Seq a -> Bool
Seq.null Seq (MVar ())
stops
then IO ()
loop
else let s :: MVar ()
s = Seq (MVar ()) -> Port -> MVar ()
forall a. Seq a -> Port -> a
Seq.index Seq (MVar ())
stops Port
0
in do Handle -> HostName -> IO ()
hPutStrLn Handle
stderr HostName
"stopping riemann consumer"
MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
s ()
instance MonadIO m => Client m BatchClient where
sendEvent :: BatchClient -> Event -> m ()
sendEvent (BatchClient InChan LogCommand
inChan) Event
event =
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (LogCommand -> IO ()) -> LogCommand -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. InChan LogCommand -> LogCommand -> IO ()
forall a. InChan a -> a -> IO ()
Unagi.writeChan InChan LogCommand
inChan (LogCommand -> m ()) -> LogCommand -> m ()
forall a b. (a -> b) -> a -> b
$ Event -> LogCommand
Event Event
event
close :: BatchClient -> m ()
close (BatchClient InChan LogCommand
inChan) =
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
MVar ()
s <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
InChan LogCommand -> LogCommand -> IO ()
forall a. InChan a -> a -> IO ()
Unagi.writeChan InChan LogCommand
inChan (MVar () -> LogCommand
Stop MVar ()
s)
MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
s
instance MonadIO m => Client m BatchClientNoBuffer where
sendEvent :: BatchClientNoBuffer -> Event -> m ()
sendEvent (BatchClientNoBuffer Queue LogCommand
queue) Event
event =
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (LogCommand -> IO ()) -> LogCommand -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Queue LogCommand -> LogCommand -> IO ()
forall a. Queue a -> a -> IO ()
writeQueue Queue LogCommand
queue (LogCommand -> m ()) -> LogCommand -> m ()
forall a b. (a -> b) -> a -> b
$ Event -> LogCommand
Event Event
event
close :: BatchClientNoBuffer -> m ()
close (BatchClientNoBuffer Queue LogCommand
queue) =
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
MVar ()
s <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
Queue LogCommand -> LogCommand -> IO ()
forall a. Queue a -> a -> IO ()
writeQueue Queue LogCommand
queue (MVar () -> LogCommand
Stop MVar ()
s)
MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
s