#if __GLASGOW_HASKELL__ >= 800
#else
#endif
module Database.InfluxDB.Write
(
write
, writeBatch
, writeByteString
, WriteParams
, writeParams
, Types.server
, Types.database
, retentionPolicy
, Types.precision
, Types.manager
) where
import Control.Exception
import Control.Monad
import Data.Maybe
import Control.Lens
import qualified Data.Aeson as A
import qualified Data.Aeson.Types as A
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy as BL
import qualified Data.Text.Encoding as TE
import qualified Network.HTTP.Client as HC
import qualified Network.HTTP.Types as HT
import Database.InfluxDB.Line
import Database.InfluxDB.Types as Types
import Database.InfluxDB.JSON
data WriteParams = WriteParams
{ writeServer :: !Server
, writeDatabase :: !Database
, writeRetentionPolicy :: !(Maybe Key)
, writePrecision :: !(Precision 'WriteRequest)
, writeAuthentication :: !(Maybe Credentials)
, writeManager :: !(Either HC.ManagerSettings HC.Manager)
}
writeParams :: Database -> WriteParams
writeParams writeDatabase = WriteParams
{ writeServer = defaultServer
, writePrecision = Nanosecond
, writeRetentionPolicy = Nothing
, writeAuthentication = Nothing
, writeManager = Left HC.defaultManagerSettings
, ..
}
write
:: Timestamp time
=> WriteParams
-> Line time
-> IO ()
write p@WriteParams {writePrecision} =
writeByteString p . encodeLine (scaleTo writePrecision)
writeBatch
:: (Timestamp time, Foldable f)
=> WriteParams
-> f (Line time)
-> IO ()
writeBatch p@WriteParams {writePrecision} =
writeByteString p . encodeLines (scaleTo writePrecision)
writeByteString :: WriteParams -> BL.ByteString -> IO ()
writeByteString params payload = do
manager' <- either HC.newManager return $ writeManager params
response <- HC.httpLbs request manager' `catch` (throwIO . HTTPException)
let body = HC.responseBody response
status = HC.responseStatus response
if BL.null body
then do
let message = B8.unpack $ HT.statusMessage status
when (HT.statusIsServerError status) $
throwIO $ ServerError message
when (HT.statusIsClientError status) $
throwIO $ ClientError message request
else case A.eitherDecode' body of
Left message ->
throwIO $ UnexpectedResponse message body
Right val -> case A.parse parseErrorObject val of
A.Success _ ->
fail $ "BUG: impossible code path in "
++ "Database.InfluxDB.Write.writeByteString"
A.Error message -> do
when (HT.statusIsServerError status) $
throwIO $ ServerError message
when (HT.statusIsClientError status) $
throwIO $ ClientError message request
fail $ "BUG: " ++ message
++ " in Database.InfluxDB.Write.writeByteString"
where
request = (writeRequest params)
{ HC.requestBody = HC.RequestBodyLBS payload
}
writeRequest :: WriteParams -> HC.Request
writeRequest WriteParams {..} =
HC.setQueryString qs HC.defaultRequest
{ HC.host = TE.encodeUtf8 _host
, HC.port = fromIntegral _port
, HC.secure = _ssl
, HC.method = "POST"
, HC.path = "/write"
}
where
Server {..} = writeServer
qs = concat
[ [("db", Just $ TE.encodeUtf8 $ databaseName writeDatabase)]
, fromMaybe [] $ do
Key name <- writeRetentionPolicy
return [("rp", Just (TE.encodeUtf8 name))]
, fromMaybe [] $ do
Credentials { _user = u, _password = p } <- writeAuthentication
return
[ ("u", Just (TE.encodeUtf8 u))
, ("p", Just (TE.encodeUtf8 p))
]
]
makeLensesWith
( lensRules
& generateSignatures .~ False
& lensField .~ lookingupNamer
[ ("writeServer", "_server")
, ("writeDatabase", "_database")
, ("writeRetentionPolicy", "retentionPolicy")
, ("writePrecision", "_precision")
, ("writeManager", "_manager")
, ("writeAuthentication", "_authentication")
]
)
''WriteParams
instance HasServer WriteParams where
server = _server
instance HasDatabase WriteParams where
database = _database
retentionPolicy :: Lens' WriteParams (Maybe Key)
instance HasPrecision 'WriteRequest WriteParams where
precision = _precision
instance HasManager WriteParams where
manager = _manager
instance HasCredentials WriteParams where
authentication = _authentication