{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE ViewPatterns #-}
#if __GLASGOW_HASKELL__ >= 800
{-# OPTIONS_GHC -Wno-missing-signatures #-}
#else
{-# OPTIONS_GHC -fno-warn-missing-signatures #-}
#endif
module Database.InfluxDB.Write
(
write
, writeBatch
, writeByteString
, WriteParams
, writeParams
, Types.server
, Types.database
, retentionPolicy
, Types.precision
, Types.manager
) where
import Control.Exception
import Control.Monad
import Data.Maybe
import Control.Lens
import qualified Data.Aeson as A
import qualified Data.Aeson.Types as A
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy as BL
import qualified Data.Text.Encoding as TE
import qualified Network.HTTP.Client as HC
import qualified Network.HTTP.Types as HT
import Database.InfluxDB.Line
import Database.InfluxDB.Types as Types
import Database.InfluxDB.JSON
data WriteParams = WriteParams
{ WriteParams -> Server
writeServer :: !Server
, WriteParams -> Database
writeDatabase :: !Database
, WriteParams -> Maybe Key
writeRetentionPolicy :: !(Maybe Key)
, WriteParams -> Precision 'WriteRequest
writePrecision :: !(Precision 'WriteRequest)
, WriteParams -> Maybe Credentials
writeAuthentication :: !(Maybe Credentials)
, WriteParams -> Either ManagerSettings Manager
writeManager :: !(Either HC.ManagerSettings HC.Manager)
}
writeParams :: Database -> WriteParams
writeParams :: Database -> WriteParams
writeParams Database
writeDatabase = WriteParams
{ writeServer :: Server
writeServer = Server
defaultServer
, writePrecision :: Precision 'WriteRequest
writePrecision = Precision 'WriteRequest
forall (ty :: RequestType). Precision ty
Nanosecond
, writeRetentionPolicy :: Maybe Key
writeRetentionPolicy = Maybe Key
forall a. Maybe a
Nothing
, writeAuthentication :: Maybe Credentials
writeAuthentication = Maybe Credentials
forall a. Maybe a
Nothing
, writeManager :: Either ManagerSettings Manager
writeManager = ManagerSettings -> Either ManagerSettings Manager
forall a b. a -> Either a b
Left ManagerSettings
HC.defaultManagerSettings
, Database
writeDatabase :: Database
writeDatabase :: Database
..
}
write
:: Timestamp time
=> WriteParams
-> Line time
-> IO ()
write :: forall time. Timestamp time => WriteParams -> Line time -> IO ()
write p :: WriteParams
p@WriteParams {Precision 'WriteRequest
writePrecision :: WriteParams -> Precision 'WriteRequest
writePrecision :: Precision 'WriteRequest
writePrecision} =
WriteParams -> ByteString -> IO ()
writeByteString WriteParams
p (ByteString -> IO ())
-> (Line time -> ByteString) -> Line time -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (time -> Int64) -> Line time -> ByteString
forall time. (time -> Int64) -> Line time -> ByteString
encodeLine (Precision 'WriteRequest -> time -> Int64
forall time.
Timestamp time =>
Precision 'WriteRequest -> time -> Int64
scaleTo Precision 'WriteRequest
writePrecision)
writeBatch
:: (Timestamp time, Foldable f)
=> WriteParams
-> f (Line time)
-> IO ()
writeBatch :: forall time (f :: * -> *).
(Timestamp time, Foldable f) =>
WriteParams -> f (Line time) -> IO ()
writeBatch p :: WriteParams
p@WriteParams {Precision 'WriteRequest
writePrecision :: WriteParams -> Precision 'WriteRequest
writePrecision :: Precision 'WriteRequest
writePrecision} =
WriteParams -> ByteString -> IO ()
writeByteString WriteParams
p (ByteString -> IO ())
-> (f (Line time) -> ByteString) -> f (Line time) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (time -> Int64) -> f (Line time) -> ByteString
forall (f :: * -> *) time.
Foldable f =>
(time -> Int64) -> f (Line time) -> ByteString
encodeLines (Precision 'WriteRequest -> time -> Int64
forall time.
Timestamp time =>
Precision 'WriteRequest -> time -> Int64
scaleTo Precision 'WriteRequest
writePrecision)
writeByteString :: WriteParams -> BL.ByteString -> IO ()
writeByteString :: WriteParams -> ByteString -> IO ()
writeByteString WriteParams
params ByteString
payload = do
Manager
manager' <- (ManagerSettings -> IO Manager)
-> (Manager -> IO Manager)
-> Either ManagerSettings Manager
-> IO Manager
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ManagerSettings -> IO Manager
HC.newManager Manager -> IO Manager
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ManagerSettings Manager -> IO Manager)
-> Either ManagerSettings Manager -> IO Manager
forall a b. (a -> b) -> a -> b
$ WriteParams -> Either ManagerSettings Manager
writeManager WriteParams
params
Response ByteString
response <- Request -> Manager -> IO (Response ByteString)
HC.httpLbs Request
request Manager
manager' IO (Response ByteString)
-> (HttpException -> IO (Response ByteString))
-> IO (Response ByteString)
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (InfluxException -> IO (Response ByteString)
forall e a. Exception e => e -> IO a
throwIO (InfluxException -> IO (Response ByteString))
-> (HttpException -> InfluxException)
-> HttpException
-> IO (Response ByteString)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HttpException -> InfluxException
HTTPException)
let body :: ByteString
body = Response ByteString -> ByteString
forall body. Response body -> body
HC.responseBody Response ByteString
response
status :: Status
status = Response ByteString -> Status
forall body. Response body -> Status
HC.responseStatus Response ByteString
response
if ByteString -> Bool
BL.null ByteString
body
then do
let message :: [Char]
message = Method -> [Char]
B8.unpack (Method -> [Char]) -> Method -> [Char]
forall a b. (a -> b) -> a -> b
$ Status -> Method
HT.statusMessage Status
status
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Status -> Bool
HT.statusIsServerError Status
status) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
InfluxException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (InfluxException -> IO ()) -> InfluxException -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> InfluxException
ServerError [Char]
message
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Status -> Bool
HT.statusIsClientError Status
status) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
InfluxException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (InfluxException -> IO ()) -> InfluxException -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> Request -> InfluxException
ClientError [Char]
message Request
request
else case ByteString -> Either [Char] Value
forall a. FromJSON a => ByteString -> Either [Char] a
A.eitherDecode' ByteString
body of
Left [Char]
message ->
InfluxException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (InfluxException -> IO ()) -> InfluxException -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> Request -> ByteString -> InfluxException
UnexpectedResponse [Char]
message Request
request ByteString
body
Right Value
val -> case (Value -> Parser [Char]) -> Value -> Result [Char]
forall a b. (a -> Parser b) -> a -> Result b
A.parse Value -> Parser [Char]
parseErrorObject Value
val of
A.Success [Char]
err ->
[Char] -> IO ()
forall a. [Char] -> IO a
forall (m :: * -> *) a. MonadFail m => [Char] -> m a
fail ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"BUG: impossible code path in "
[Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"Database.InfluxDB.Write.writeByteString: "
[Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
err
A.Error [Char]
message -> do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Status -> Bool
HT.statusIsServerError Status
status) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
InfluxException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (InfluxException -> IO ()) -> InfluxException -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> InfluxException
ServerError [Char]
message
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Status -> Bool
HT.statusIsClientError Status
status) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
InfluxException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (InfluxException -> IO ()) -> InfluxException -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> Request -> InfluxException
ClientError [Char]
message Request
request
InfluxException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (InfluxException -> IO ()) -> InfluxException -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> Request -> ByteString -> InfluxException
UnexpectedResponse
([Char]
"BUG: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
message
[Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
" in Database.InfluxDB.Write.writeByteString")
Request
request
(Value -> ByteString
forall a. ToJSON a => a -> ByteString
A.encode Value
val)
where
request :: Request
request = (WriteParams -> Request
writeRequest WriteParams
params)
{ HC.requestBody = HC.RequestBodyLBS payload
}
writeRequest :: WriteParams -> HC.Request
writeRequest :: WriteParams -> Request
writeRequest WriteParams {Maybe Credentials
Maybe Key
Either ManagerSettings Manager
Server
Precision 'WriteRequest
Database
writeServer :: WriteParams -> Server
writeDatabase :: WriteParams -> Database
writeRetentionPolicy :: WriteParams -> Maybe Key
writePrecision :: WriteParams -> Precision 'WriteRequest
writeAuthentication :: WriteParams -> Maybe Credentials
writeManager :: WriteParams -> Either ManagerSettings Manager
writeServer :: Server
writeDatabase :: Database
writeRetentionPolicy :: Maybe Key
writePrecision :: Precision 'WriteRequest
writeAuthentication :: Maybe Credentials
writeManager :: Either ManagerSettings Manager
..} =
[(Method, Maybe Method)] -> Request -> Request
HC.setQueryString [(Method, Maybe Method)]
qs Request
HC.defaultRequest
{ HC.host = TE.encodeUtf8 _host
, HC.port = fromIntegral _port
, HC.secure = _ssl
, HC.method = "POST"
, HC.path = "/write"
}
where
Server {Bool
Int
Text
_host :: Text
_port :: Int
_ssl :: Bool
_host :: Server -> Text
_port :: Server -> Int
_ssl :: Server -> Bool
..} = Server
writeServer
qs :: [(Method, Maybe Method)]
qs = [[(Method, Maybe Method)]] -> [(Method, Maybe Method)]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat
[ [ (Method
"db", Method -> Maybe Method
forall a. a -> Maybe a
Just (Method -> Maybe Method) -> Method -> Maybe Method
forall a b. (a -> b) -> a -> b
$ Text -> Method
TE.encodeUtf8 (Text -> Method) -> Text -> Method
forall a b. (a -> b) -> a -> b
$ Database -> Text
databaseName Database
writeDatabase)
, (Method
"precision", Method -> Maybe Method
forall a. a -> Maybe a
Just (Method -> Maybe Method) -> Method -> Maybe Method
forall a b. (a -> b) -> a -> b
$ Text -> Method
TE.encodeUtf8 (Text -> Method) -> Text -> Method
forall a b. (a -> b) -> a -> b
$ Precision 'WriteRequest -> Text
forall (ty :: RequestType). Precision ty -> Text
precisionName Precision 'WriteRequest
writePrecision)
]
, [(Method, Maybe Method)]
-> Maybe [(Method, Maybe Method)] -> [(Method, Maybe Method)]
forall a. a -> Maybe a -> a
fromMaybe [] (Maybe [(Method, Maybe Method)] -> [(Method, Maybe Method)])
-> Maybe [(Method, Maybe Method)] -> [(Method, Maybe Method)]
forall a b. (a -> b) -> a -> b
$ do
Key Text
name <- Maybe Key
writeRetentionPolicy
[(Method, Maybe Method)] -> Maybe [(Method, Maybe Method)]
forall a. a -> Maybe a
forall (m :: * -> *) a. Monad m => a -> m a
return [(Method
"rp", Method -> Maybe Method
forall a. a -> Maybe a
Just (Text -> Method
TE.encodeUtf8 Text
name))]
, [(Method, Maybe Method)]
-> Maybe [(Method, Maybe Method)] -> [(Method, Maybe Method)]
forall a. a -> Maybe a -> a
fromMaybe [] (Maybe [(Method, Maybe Method)] -> [(Method, Maybe Method)])
-> Maybe [(Method, Maybe Method)] -> [(Method, Maybe Method)]
forall a b. (a -> b) -> a -> b
$ do
Credentials { _user :: Credentials -> Text
_user = Text
u, _password :: Credentials -> Text
_password = Text
p } <- Maybe Credentials
writeAuthentication
[(Method, Maybe Method)] -> Maybe [(Method, Maybe Method)]
forall a. a -> Maybe a
forall (m :: * -> *) a. Monad m => a -> m a
return
[ (Method
"u", Method -> Maybe Method
forall a. a -> Maybe a
Just (Text -> Method
TE.encodeUtf8 Text
u))
, (Method
"p", Method -> Maybe Method
forall a. a -> Maybe a
Just (Text -> Method
TE.encodeUtf8 Text
p))
]
]
makeLensesWith
( lensRules
& generateSignatures .~ False
& lensField .~ lookingupNamer
[ ("writeServer", "_server")
, ("writeDatabase", "_database")
, ("writeRetentionPolicy", "retentionPolicy")
, ("writePrecision", "_precision")
, ("writeManager", "_manager")
, ("writeAuthentication", "_authentication")
]
)
''WriteParams
instance HasServer WriteParams where
server :: Lens' WriteParams Server
server = (Server -> f Server) -> WriteParams -> f WriteParams
Lens' WriteParams Server
_server
instance HasDatabase WriteParams where
database :: Lens' WriteParams Database
database = (Database -> f Database) -> WriteParams -> f WriteParams
Lens' WriteParams Database
_database
retentionPolicy :: Lens' WriteParams (Maybe Key)
instance HasPrecision 'WriteRequest WriteParams where
precision :: Lens' WriteParams (Precision 'WriteRequest)
precision = (Precision 'WriteRequest -> f (Precision 'WriteRequest))
-> WriteParams -> f WriteParams
Lens' WriteParams (Precision 'WriteRequest)
_precision
instance HasManager WriteParams where
manager :: Lens' WriteParams (Either ManagerSettings Manager)
manager = (Either ManagerSettings Manager
-> f (Either ManagerSettings Manager))
-> WriteParams -> f WriteParams
Lens' WriteParams (Either ManagerSettings Manager)
_manager
instance HasCredentials WriteParams where
authentication :: Lens' WriteParams (Maybe Credentials)
authentication = (Maybe Credentials -> f (Maybe Credentials))
-> WriteParams -> f WriteParams
Lens' WriteParams (Maybe Credentials)
_authentication