{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE ViewPatterns #-}
#if __GLASGOW_HASKELL__ >= 800
{-# OPTIONS_GHC -Wno-missing-signatures #-}
#else
{-# OPTIONS_GHC -fno-warn-missing-signatures #-}
#endif
module Database.InfluxDB.Write
  ( -- * Writers
    -- $intro
    write
  , writeBatch
  , writeByteString

  -- * Writer parameters
  , WriteParams
  , writeParams
  , Types.server
  , Types.database
  , retentionPolicy
  , Types.precision
  , Types.manager
) where
import Control.Exception
import Control.Monad
import Data.Maybe

import Control.Lens
import qualified Data.Aeson as A
import qualified Data.Aeson.Types as A
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy as BL
import qualified Data.Text.Encoding as TE
import qualified Network.HTTP.Client as HC
import qualified Network.HTTP.Types as HT

import Database.InfluxDB.Line
import Database.InfluxDB.Types as Types
import Database.InfluxDB.JSON

-- $setup
-- >>> :set -XOverloadedStrings -XNoOverloadedLists -XTypeApplications
-- >>> import qualified Data.Map as Map
-- >>> import Data.Time
-- >>> import Database.InfluxDB
-- >>> import qualified Network.HTTP.Client as HC
-- >>> Database.InfluxDB.manage (queryParams "test-db") "CREATE DATABASE \"test-db\""

{- $intro
The code snippets in this module assume the following imports.

@
import qualified Data.Map as Map
import Data.Time
@
-}

-- | The full set of parameters for the HTTP writer.
--
-- Following lenses are available to access its fields:
--
-- * 'server'
-- * 'database'
-- * 'retentionPolicy'
-- * 'precision'
-- * 'authentication'
-- * 'manager'
data WriteParams = WriteParams
  { WriteParams -> Server
writeServer :: !Server
  , WriteParams -> Database
writeDatabase :: !Database
  -- ^ Database to be written
  , WriteParams -> Maybe Key
writeRetentionPolicy :: !(Maybe Key)
  -- ^ 'Nothing' means the default retention policy for the database.
  , WriteParams -> Precision 'WriteRequest
writePrecision :: !(Precision 'WriteRequest)
  -- ^ Timestamp precision
  --
  -- In the HTTP API, timestamps are scaled by the given precision.
  , WriteParams -> Maybe Credentials
writeAuthentication  :: !(Maybe Credentials)
  -- ^ No authentication by default
  , WriteParams -> Either ManagerSettings Manager
writeManager :: !(Either HC.ManagerSettings HC.Manager)
  -- ^ HTTP connection manager
  }

-- | Smart constructor for 'WriteParams'
--
-- Default parameters:
--
--   ['server'] 'defaultServer'
--   ['retentionPolicy'] 'Nothing'
--   ['precision'] 'Nanosecond'
--   ['authentication'] 'Nothing'
--   ['manager'] @'Left' 'HC.defaultManagerSettings'@
writeParams :: Database -> WriteParams
writeParams :: Database -> WriteParams
writeParams Database
writeDatabase = WriteParams :: Server
-> Database
-> Maybe Key
-> Precision 'WriteRequest
-> Maybe Credentials
-> Either ManagerSettings Manager
-> WriteParams
WriteParams
  { writeServer :: Server
writeServer = Server
defaultServer
  , writePrecision :: Precision 'WriteRequest
writePrecision = Precision 'WriteRequest
forall (ty :: RequestType). Precision ty
Nanosecond
  , writeRetentionPolicy :: Maybe Key
writeRetentionPolicy = Maybe Key
forall a. Maybe a
Nothing
  , writeAuthentication :: Maybe Credentials
writeAuthentication = Maybe Credentials
forall a. Maybe a
Nothing
  , writeManager :: Either ManagerSettings Manager
writeManager = ManagerSettings -> Either ManagerSettings Manager
forall a b. a -> Either a b
Left ManagerSettings
HC.defaultManagerSettings
  , Database
writeDatabase :: Database
writeDatabase :: Database
..
  }

-- | Write a 'Line'.
--
-- >>> let p = writeParams "test-db"
-- >>> write p $ Line @UTCTime "room_temp" Map.empty (Map.fromList [("temp", FieldFloat 25.0)]) Nothing
write
  :: Timestamp time
  => WriteParams
  -> Line time
  -> IO ()
write :: WriteParams -> Line time -> IO ()
write p :: WriteParams
p@WriteParams {Precision 'WriteRequest
writePrecision :: Precision 'WriteRequest
writePrecision :: WriteParams -> Precision 'WriteRequest
writePrecision} =
  WriteParams -> ByteString -> IO ()
writeByteString WriteParams
p (ByteString -> IO ())
-> (Line time -> ByteString) -> Line time -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (time -> Int64) -> Line time -> ByteString
forall time. (time -> Int64) -> Line time -> ByteString
encodeLine (Precision 'WriteRequest -> time -> Int64
forall time.
Timestamp time =>
Precision 'WriteRequest -> time -> Int64
scaleTo Precision 'WriteRequest
writePrecision)

-- | Write multiple 'Line's in a batch.
--
-- This is more efficient than calling 'write' multiple times.
--
-- >>> let p = writeParams "test-db"
-- >>> :{
-- writeBatch p
--   [ Line @UTCTime "temp" (Map.singleton "city" "tokyo") (Map.fromList [("temp", FieldFloat 25.0)]) Nothing
--   , Line @UTCTime "temp" (Map.singleton "city" "osaka") (Map.fromList [("temp", FieldFloat 25.2)]) Nothing
--   ]
-- :}
writeBatch
  :: (Timestamp time, Foldable f)
  => WriteParams
  -> f (Line time)
  -> IO ()
writeBatch :: WriteParams -> f (Line time) -> IO ()
writeBatch p :: WriteParams
p@WriteParams {Precision 'WriteRequest
writePrecision :: Precision 'WriteRequest
writePrecision :: WriteParams -> Precision 'WriteRequest
writePrecision} =
  WriteParams -> ByteString -> IO ()
writeByteString WriteParams
p (ByteString -> IO ())
-> (f (Line time) -> ByteString) -> f (Line time) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (time -> Int64) -> f (Line time) -> ByteString
forall (f :: * -> *) time.
Foldable f =>
(time -> Int64) -> f (Line time) -> ByteString
encodeLines (Precision 'WriteRequest -> time -> Int64
forall time.
Timestamp time =>
Precision 'WriteRequest -> time -> Int64
scaleTo Precision 'WriteRequest
writePrecision)

-- | Write a raw 'BL.ByteString'
writeByteString :: WriteParams -> BL.ByteString -> IO ()
writeByteString :: WriteParams -> ByteString -> IO ()
writeByteString WriteParams
params ByteString
payload = do
  Manager
manager' <- (ManagerSettings -> IO Manager)
-> (Manager -> IO Manager)
-> Either ManagerSettings Manager
-> IO Manager
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ManagerSettings -> IO Manager
HC.newManager Manager -> IO Manager
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ManagerSettings Manager -> IO Manager)
-> Either ManagerSettings Manager -> IO Manager
forall a b. (a -> b) -> a -> b
$ WriteParams -> Either ManagerSettings Manager
writeManager WriteParams
params
  Response ByteString
response <- Request -> Manager -> IO (Response ByteString)
HC.httpLbs Request
request Manager
manager' IO (Response ByteString)
-> (HttpException -> IO (Response ByteString))
-> IO (Response ByteString)
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (InfluxException -> IO (Response ByteString)
forall e a. Exception e => e -> IO a
throwIO (InfluxException -> IO (Response ByteString))
-> (HttpException -> InfluxException)
-> HttpException
-> IO (Response ByteString)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HttpException -> InfluxException
HTTPException)
  let body :: ByteString
body = Response ByteString -> ByteString
forall body. Response body -> body
HC.responseBody Response ByteString
response
      status :: Status
status = Response ByteString -> Status
forall body. Response body -> Status
HC.responseStatus Response ByteString
response
  if ByteString -> Bool
BL.null ByteString
body
    then do
      let message :: [Char]
message = ByteString -> [Char]
B8.unpack (ByteString -> [Char]) -> ByteString -> [Char]
forall a b. (a -> b) -> a -> b
$ Status -> ByteString
HT.statusMessage Status
status
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Status -> Bool
HT.statusIsServerError Status
status) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
        InfluxException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (InfluxException -> IO ()) -> InfluxException -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> InfluxException
ServerError [Char]
message
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Status -> Bool
HT.statusIsClientError Status
status) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
        InfluxException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (InfluxException -> IO ()) -> InfluxException -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> Request -> InfluxException
ClientError [Char]
message Request
request
    else case ByteString -> Either [Char] Value
forall a. FromJSON a => ByteString -> Either [Char] a
A.eitherDecode' ByteString
body of
      Left [Char]
message ->
        InfluxException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (InfluxException -> IO ()) -> InfluxException -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> Request -> ByteString -> InfluxException
UnexpectedResponse [Char]
message Request
request ByteString
body
      Right Value
val -> case (Value -> Parser [Char]) -> Value -> Result [Char]
forall a b. (a -> Parser b) -> a -> Result b
A.parse Value -> Parser [Char]
parseErrorObject Value
val of
        A.Success [Char]
err ->
          [Char] -> IO ()
forall (m :: * -> *) a. MonadFail m => [Char] -> m a
fail ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"BUG: impossible code path in "
            [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"Database.InfluxDB.Write.writeByteString: "
            [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
err
        A.Error [Char]
message -> do
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Status -> Bool
HT.statusIsServerError Status
status) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
            InfluxException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (InfluxException -> IO ()) -> InfluxException -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> InfluxException
ServerError [Char]
message
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Status -> Bool
HT.statusIsClientError Status
status) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
            InfluxException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (InfluxException -> IO ()) -> InfluxException -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> Request -> InfluxException
ClientError [Char]
message Request
request
          InfluxException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (InfluxException -> IO ()) -> InfluxException -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> Request -> ByteString -> InfluxException
UnexpectedResponse
            ([Char]
"BUG: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
message
              [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
" in Database.InfluxDB.Write.writeByteString")
            Request
request
            (Value -> ByteString
forall a. ToJSON a => a -> ByteString
A.encode Value
val)
  where
    request :: Request
request = (WriteParams -> Request
writeRequest WriteParams
params)
      { requestBody :: RequestBody
HC.requestBody = ByteString -> RequestBody
HC.RequestBodyLBS ByteString
payload
      }

writeRequest :: WriteParams -> HC.Request
writeRequest :: WriteParams -> Request
writeRequest WriteParams {Maybe Credentials
Maybe Key
Either ManagerSettings Manager
Server
Precision 'WriteRequest
Database
writeManager :: Either ManagerSettings Manager
writeAuthentication :: Maybe Credentials
writePrecision :: Precision 'WriteRequest
writeRetentionPolicy :: Maybe Key
writeDatabase :: Database
writeServer :: Server
writeManager :: WriteParams -> Either ManagerSettings Manager
writeAuthentication :: WriteParams -> Maybe Credentials
writePrecision :: WriteParams -> Precision 'WriteRequest
writeRetentionPolicy :: WriteParams -> Maybe Key
writeDatabase :: WriteParams -> Database
writeServer :: WriteParams -> Server
..} =
  [(ByteString, Maybe ByteString)] -> Request -> Request
HC.setQueryString [(ByteString, Maybe ByteString)]
qs Request
HC.defaultRequest
    { host :: ByteString
HC.host = Text -> ByteString
TE.encodeUtf8 Text
_host
    , port :: Int
HC.port = Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
_port
    , secure :: Bool
HC.secure = Bool
_ssl
    , method :: ByteString
HC.method = ByteString
"POST"
    , path :: ByteString
HC.path = ByteString
"/write"
    }
  where
    Server {Bool
Int
Text
_ssl :: Server -> Bool
_port :: Server -> Int
_host :: Server -> Text
_ssl :: Bool
_port :: Int
_host :: Text
..} = Server
writeServer
    qs :: [(ByteString, Maybe ByteString)]
qs = [[(ByteString, Maybe ByteString)]]
-> [(ByteString, Maybe ByteString)]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat
      [ [ (ByteString
"db", ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString) -> ByteString -> Maybe ByteString
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
TE.encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ Database -> Text
databaseName Database
writeDatabase)
        , (ByteString
"precision", ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString) -> ByteString -> Maybe ByteString
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
TE.encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ Precision 'WriteRequest -> Text
forall (ty :: RequestType). Precision ty -> Text
precisionName Precision 'WriteRequest
writePrecision)
        ]
      , [(ByteString, Maybe ByteString)]
-> Maybe [(ByteString, Maybe ByteString)]
-> [(ByteString, Maybe ByteString)]
forall a. a -> Maybe a -> a
fromMaybe [] (Maybe [(ByteString, Maybe ByteString)]
 -> [(ByteString, Maybe ByteString)])
-> Maybe [(ByteString, Maybe ByteString)]
-> [(ByteString, Maybe ByteString)]
forall a b. (a -> b) -> a -> b
$ do
        Key Text
name <- Maybe Key
writeRetentionPolicy
        [(ByteString, Maybe ByteString)]
-> Maybe [(ByteString, Maybe ByteString)]
forall (m :: * -> *) a. Monad m => a -> m a
return [(ByteString
"rp", ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (Text -> ByteString
TE.encodeUtf8 Text
name))]
      , [(ByteString, Maybe ByteString)]
-> Maybe [(ByteString, Maybe ByteString)]
-> [(ByteString, Maybe ByteString)]
forall a. a -> Maybe a -> a
fromMaybe [] (Maybe [(ByteString, Maybe ByteString)]
 -> [(ByteString, Maybe ByteString)])
-> Maybe [(ByteString, Maybe ByteString)]
-> [(ByteString, Maybe ByteString)]
forall a b. (a -> b) -> a -> b
$ do
        Credentials { _user :: Credentials -> Text
_user = Text
u, _password :: Credentials -> Text
_password = Text
p } <- Maybe Credentials
writeAuthentication
        [(ByteString, Maybe ByteString)]
-> Maybe [(ByteString, Maybe ByteString)]
forall (m :: * -> *) a. Monad m => a -> m a
return
          [ (ByteString
"u", ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (Text -> ByteString
TE.encodeUtf8 Text
u))
          , (ByteString
"p", ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (Text -> ByteString
TE.encodeUtf8 Text
p))
          ]
      ]

makeLensesWith
  ( lensRules
    & generateSignatures .~ False
    & lensField .~ lookingupNamer
      [ ("writeServer", "_server")
      , ("writeDatabase", "_database")
      , ("writeRetentionPolicy", "retentionPolicy")
      , ("writePrecision", "_precision")
      , ("writeManager", "_manager")
      , ("writeAuthentication", "_authentication")
      ]
    )
  ''WriteParams

-- |
-- >>> let p = writeParams "foo"
-- >>> p ^. server.host
-- "localhost"
instance HasServer WriteParams where
  server :: (Server -> f Server) -> WriteParams -> f WriteParams
server = (Server -> f Server) -> WriteParams -> f WriteParams
Lens' WriteParams Server
_server

-- |
-- >>> let p = writeParams "foo"
-- >>> p ^. database
-- "foo"
instance HasDatabase WriteParams where
  database :: (Database -> f Database) -> WriteParams -> f WriteParams
database = (Database -> f Database) -> WriteParams -> f WriteParams
Lens' WriteParams Database
_database

-- | Target retention policy for the write.
--
-- InfluxDB writes to the @default@ retention policy if this parameter is set
-- to 'Nothing'.
--
-- >>> let p = writeParams "foo" & retentionPolicy .~ Just "two_hours"
-- >>> p ^. retentionPolicy
-- Just "two_hours"
retentionPolicy :: Lens' WriteParams (Maybe Key)

-- |
-- >>> let p = writeParams "foo"
-- >>> p ^. precision
-- Nanosecond
instance HasPrecision 'WriteRequest WriteParams where
  precision :: (Precision 'WriteRequest -> f (Precision 'WriteRequest))
-> WriteParams -> f WriteParams
precision = (Precision 'WriteRequest -> f (Precision 'WriteRequest))
-> WriteParams -> f WriteParams
Lens' WriteParams (Precision 'WriteRequest)
_precision

-- |
-- >>> let p = writeParams "foo" & manager .~ Left HC.defaultManagerSettings
instance HasManager WriteParams where
  manager :: (Either ManagerSettings Manager
 -> f (Either ManagerSettings Manager))
-> WriteParams -> f WriteParams
manager = (Either ManagerSettings Manager
 -> f (Either ManagerSettings Manager))
-> WriteParams -> f WriteParams
Lens' WriteParams (Either ManagerSettings Manager)
_manager

-- | Authentication info for the write
--
-- >>> let p = writeParams "foo"
-- >>> p ^. authentication
-- Nothing
-- >>> let p' = p & authentication ?~ credentials "john" "passw0rd"
-- >>> p' ^. authentication . traverse . user
-- "john"
instance HasCredentials WriteParams where
  authentication :: (Maybe Credentials -> f (Maybe Credentials))
-> WriteParams -> f WriteParams
authentication = (Maybe Credentials -> f (Maybe Credentials))
-> WriteParams -> f WriteParams
Lens' WriteParams (Maybe Credentials)
_authentication