{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards   #-}

module Database.InfluxDB.Writer
    ( Config(..), Handle
    , createHandle, newHandle

    , Value(..), Tags, Fields
    , writePoint, writePoint'
    ) where


import           Data.Int
import           Data.Monoid
import           Data.Pool

import qualified Data.ByteString as BS

import           Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T

import           Data.Map (Map)
import qualified Data.Map as M

import           Control.Monad
import           Control.Exception
import           Control.Concurrent
import           Control.Concurrent.STM

import           System.Clock

import           Network.HTTP.Client
import           Network.HTTP.Client.TLS (tlsManagerSettings)

import           Prelude



data Config = Config
    { cURL :: !String
      -- ^ The database name is extracted from the path (the leading slash is
      -- dropped, the tail is the database name). The rest of the URL (hostname,
      -- port, auth info) describes how to access the InfluxDB server.
      --
      -- Example: @http://localhost:8086/testdb@
    }

data Handle = Handle
    { hPool :: Pool (TQueue (Maybe Point))
      -- ^ A pool of threads which consume messages from 'TQueue's and write
      -- them in batches to InfluxDB.
    }


createHandle :: Config -> IO (Either () Handle)
createHandle c = do
    manager <- newManager tlsManagerSettings
    newHandle c manager


newHandle :: Config -> Manager -> IO (Either () Handle)
newHandle c manager = do
    req  <- mkRequestTemplate <$> parseUrl (cURL c)
    pool <- createPool (allocateResource req) releaseResource 1 600 3

    return $ Right $ Handle pool

  where
    mkRequestTemplate req = req
        { method      = "POST"
        , path        = "/write"
        , queryString = "?db=" <> BS.tail (path req)
        }


    drainQueue :: TQueue (Maybe Point) -> Int -> Int -> IO [Point]
    drainQueue queue timeout count = do
        box <- newEmptyTMVarIO
        tmp <- newTVarIO []

        void $ forkFinally (go tmp count) $
            (\_ -> atomically $ readTVar tmp >>= putTMVar box . reverse)

        atomically $ readTMVar box

      where
        go _   0 = return ()
        go tmp n = mask $ \restore -> do
            mbPoint <- restore $ atomically $ do
                mbPoint <- readTQueue queue
                case mbPoint of
                    Nothing -> return Nothing
                    Just p  -> do
                        modifyTVar' tmp (\x -> p : x)
                        return mbPoint

            case mbPoint of
                Nothing -> return ()
                Just _  -> do
                    when (n == count) $ do
                        threadId <- myThreadId
                        void $ forkFinally (threadDelay timeout) $ \_ -> killThread threadId

                    restore $ go tmp (n - 1)


    flushQueue :: Request -> TQueue (Maybe Point) -> IO ()
    flushQueue req queue = do
        let batchSize = 50
        let timeout   = 10 * 1000 * 1000

        -- Dequeue the next batch of points. This operation will block until
        -- a batch of points is available.
        points <- drainQueue queue timeout batchSize

        -- If the batch is non-empty, send the points to InfluxDB.
        when (length points > 0) $
            flushPoints manager req points

        -- If the batch is not full, it means there wasn't enough data in the
        -- queue. So we can sleep a bit.
        when (length points < batchSize) $
            threadDelay $ 5 * 1000000

        -- Repeat.
        flushQueue req queue


    allocateResource :: Request -> IO (TQueue (Maybe Point))
    allocateResource req = do
        queue <- newTQueueIO

        -- Fork off a thread which periodically flushes the queue.
        --
        -- Async exceptions in the thread are completely masked, the only way
        -- to dispose of this thread is to write 'Nothing' into the queue.

        void $ mask_ $ forkIO $ flushQueue req queue


        return queue


    releaseResource queue =
        atomically $ writeTQueue queue Nothing




-- | A 'Value' is either an integer, a floating point number, a boolean or
-- string.
data Value = I !Int64 | F !Double | B !Bool | S !Text
    deriving (Show, Eq)

type Tags = Map Text Text
type Fields = Map Text Value

data Point = Point
    { pMeasurement :: !Text
    , pTags :: !Tags
    , pFields :: !Fields
    , pTimestamp :: !(Maybe Int64)
    } deriving (Show, Eq)


-- | Write a point to the database. Generates a timestamp from the local clock.
writePoint :: Handle -> Text -> Tags -> Fields -> IO ()
writePoint h measurement tags fields = do
    timestamp <- fromIntegral . timeSpecAsNanoSecs <$> getTime Realtime
    writePoint' h measurement tags fields timestamp

-- | Same as 'writePoint' but allows the caller to supply the timestamp.
writePoint' :: Handle -> Text -> Tags -> Fields -> Int64 -> IO ()
writePoint' h measurement tags fields timestamp =
    withResource (hPool h) $ \queue -> atomically $ writeTQueue queue $ Just $
        Point measurement tags fields (Just timestamp)



-- | Send a batch of points to the InfluxDB server. When the server is
-- unreachable, the batch is lost. Delivery is not retried.
flushPoints :: Manager -> Request -> [Point] -> IO ()
flushPoints manager requestTemplate points = do
    void send

  where
    send :: IO (Either SomeException ())
    send = try $ void $ httpLbs req manager

    renderValue :: Value -> Text
    renderValue (I x) = (T.pack $ show x) <> "i"
    renderValue (F x) = (T.pack $ show x)
    renderValue (B x) = (T.pack $ if x then "true" else "false")
    renderValue (S x) = "\"" <> x <> "\""

    tagsList :: Tags -> [Text]
    tagsList tags = map (\(k,v) -> k <> "=" <> v) $ M.toList tags
    fieldsList fields = map (\(k,v) -> k <> "=" <> renderValue v) $ M.toList fields

    line :: Point -> Text
    line Point{..} = mconcat $
        [ T.intercalate "," ([pMeasurement] ++ (tagsList pTags))
        , " "
        , T.intercalate "," (fieldsList pFields)
        ] ++ maybe [] (\x -> [" ", T.pack $ show x]) pTimestamp

    body = T.encodeUtf8 $ T.intercalate "\n" $ map line points
    req = requestTemplate { requestBody = RequestBodyBS body }