module Database.InfluxDB.Query
(
Query
, query
, queryChunked
, QueryParams
, queryParams
, server
, database
, precision
, manager
, QueryResults(..)
, parseResultsWith
, withQueryResponse
) where
import Control.Exception
import Control.Monad
import Data.Char
import Data.List
import Control.Lens
import Data.Aeson
import Data.Optional (Optional(..), optional)
import Data.Vector (Vector)
import Data.Void
import qualified Control.Foldl as L
import qualified Data.Aeson.Parser as A
import qualified Data.Aeson.Types as A
import qualified Data.Attoparsec.ByteString as AB
import qualified Data.ByteString as B
import qualified Data.ByteString.Builder as BB
import qualified Data.ByteString.Lazy as BL
import qualified Data.Text.Encoding as TE
import qualified Data.Text as T
import qualified Data.Vector as V
import qualified Network.HTTP.Client as HC
import qualified Network.HTTP.Types as HT
import Database.InfluxDB.JSON
import Database.InfluxDB.Types as Types
import qualified Database.InfluxDB.Format as F
class QueryResults a where
parseResults
:: Precision 'QueryRequest
-> Value
-> A.Parser (Vector a)
instance QueryResults Void where
parseResults _ = A.withObject "error" $ \obj -> obj .:? "error"
>>= maybe (pure V.empty) (withText "error" $ fail . T.unpack)
instance (a ~ Value, b ~ Value) => QueryResults (a, b) where
parseResults _ = parseResultsWith $ \_ _ _ fields ->
maybe (fail $ "invalid fields: " ++ show fields) return $ do
a <- fields V.!? 0
b <- fields V.!? 1
return (a, b)
instance (a ~ Value, b ~ Value, c ~ Value)
=> QueryResults (a, b, c) where
parseResults _ = parseResultsWith $ \_ _ _ fields ->
maybe (fail $ "invalid fields: " ++ show fields) return $ do
a <- fields V.!? 0
b <- fields V.!? 1
c <- fields V.!? 2
return (a, b, c)
instance (a ~ Value, b ~ Value, c ~ Value, d ~ Value)
=> QueryResults (a, b, c, d) where
parseResults _ = parseResultsWith $ \_ _ _ fields ->
maybe (fail $ "invalid fields: " ++ show fields) return $ do
a <- fields V.!? 0
b <- fields V.!? 1
c <- fields V.!? 2
d <- fields V.!? 3
return (a, b, c, d)
instance (a ~ Value, b ~ Value, c ~ Value, d ~ Value, e ~ Value)
=> QueryResults (a, b, c, d, e) where
parseResults _ = parseResultsWith $ \_ _ _ fields ->
maybe (fail $ "invalid fields: " ++ show fields) return $ do
a <- fields V.!? 0
b <- fields V.!? 1
c <- fields V.!? 2
d <- fields V.!? 3
e <- fields V.!? 4
return (a, b, c, d, e)
instance (a ~ Value, b ~ Value, c ~ Value, d ~ Value, e ~ Value, f ~ Value)
=> QueryResults (a, b, c, d, e, f) where
parseResults _ = parseResultsWith $ \_ _ _ fields ->
maybe (fail $ "invalid fields: " ++ show fields) return $ do
a <- fields V.!? 0
b <- fields V.!? 1
c <- fields V.!? 2
d <- fields V.!? 3
e <- fields V.!? 4
f <- fields V.!? 5
return (a, b, c, d, e, f)
instance
( a ~ Value, b ~ Value, c ~ Value, d ~ Value, e ~ Value, f ~ Value
, g ~ Value )
=> QueryResults (a, b, c, d, e, f, g) where
parseResults _ = parseResultsWith $ \_ _ _ fields ->
maybe (fail $ "invalid fields: " ++ show fields) return $ do
a <- fields V.!? 0
b <- fields V.!? 1
c <- fields V.!? 2
d <- fields V.!? 3
e <- fields V.!? 4
f <- fields V.!? 5
g <- fields V.!? 6
return (a, b, c, d, e, f, g)
instance
( a ~ Value, b ~ Value, c ~ Value, d ~ Value, e ~ Value, f ~ Value
, g ~ Value, h ~ Value )
=> QueryResults (a, b, c, d, e, f, g, h) where
parseResults _ = parseResultsWith $ \_ _ _ fields ->
maybe (fail $ "invalid fields: " ++ show fields) return $ do
a <- fields V.!? 0
b <- fields V.!? 1
c <- fields V.!? 2
d <- fields V.!? 3
e <- fields V.!? 4
f <- fields V.!? 5
g <- fields V.!? 6
h <- fields V.!? 7
return (a, b, c, d, e, f, g, h)
data QueryParams = QueryParams
{ queryServer :: !Server
, queryDatabase :: !Database
, queryPrecision :: !(Precision 'QueryRequest)
, queryAuthentication :: !(Maybe Credentials)
, queryManager :: !(Either HC.ManagerSettings HC.Manager)
}
queryParams :: Database -> QueryParams
queryParams queryDatabase = QueryParams
{ queryServer = defaultServer
, queryPrecision = RFC3339
, queryAuthentication = Nothing
, queryManager = Left HC.defaultManagerSettings
, ..
}
query :: QueryResults a => QueryParams -> Query -> IO (Vector a)
query params q = withQueryResponse params Nothing q go
where
go request response = do
chunks <- HC.brConsume $ HC.responseBody response
let body = BL.fromChunks chunks
case eitherDecode' body of
Left message -> throwIO $ UnexpectedResponse message body
Right val -> case A.parse (parseResults (queryPrecision params)) val of
A.Success vec -> return vec
A.Error message ->
errorQuery request response message
setPrecision
:: Precision 'QueryRequest
-> [(B.ByteString, Maybe B.ByteString)]
-> [(B.ByteString, Maybe B.ByteString)]
setPrecision prec qs = maybe qs (\p -> ("epoch", Just p):qs) $
precisionParam prec
precisionParam :: Precision 'QueryRequest -> Maybe B.ByteString
precisionParam = \case
Nanosecond -> return "ns"
Microsecond -> return "u"
Millisecond -> return "ms"
Second -> return "s"
Minute -> return "m"
Hour -> return "h"
RFC3339 -> Nothing
queryChunked
:: QueryResults a
=> QueryParams
-> Optional Int
-> Query
-> L.FoldM IO (Vector a) r
-> IO r
queryChunked params chunkSize q (L.FoldM step initialize extract) =
withQueryResponse params (Just chunkSize) q go
where
go request response = do
x0 <- initialize
chunk0 <- HC.responseBody response
x <- loop x0 k0 chunk0
extract x
where
k0 = AB.parse A.json
loop x k chunk
| B.null chunk = return x
| otherwise = case k chunk of
AB.Fail unconsumed _contexts message ->
throwIO $ UnexpectedResponse message $ BL.fromStrict unconsumed
AB.Partial k' -> do
chunk' <- HC.responseBody response
loop x k' chunk'
AB.Done leftover val ->
case A.parse (parseResults (queryPrecision params)) val of
A.Success vec -> do
x' <- step x vec
loop x' k0 leftover
A.Error message ->
errorQuery request response message
withQueryResponse
:: QueryParams
-> Maybe (Optional Int)
-> Query
-> (HC.Request -> HC.Response HC.BodyReader -> IO r)
-> IO r
withQueryResponse params chunkSize q f = do
manager' <- either HC.newManager return $ queryManager params
HC.withResponse request manager' (f request)
`catch` (throwIO . HTTPException)
where
request =
HC.setQueryString (setPrecision (queryPrecision params) queryString) $
queryRequest params
queryString = addChunkedParam
[ ("q", Just $ F.fromQuery q)
, ("db", Just db)
]
where
!db = TE.encodeUtf8 $ databaseName $ queryDatabase params
addChunkedParam ps = case chunkSize of
Nothing -> ps
Just size ->
let !chunked = optional "true" (decodeChunkSize . max 1) size
in ("chunked", Just chunked) : ps
where
decodeChunkSize = BL.toStrict . BB.toLazyByteString . BB.intDec
queryRequest :: QueryParams -> HC.Request
queryRequest QueryParams {..} = HC.defaultRequest
{ HC.host = TE.encodeUtf8 _host
, HC.port = fromIntegral _port
, HC.secure = _ssl
, HC.method = "GET"
, HC.path = "/query"
}
where
Server {..} = queryServer
errorQuery :: HC.Request -> HC.Response body -> String -> IO a
errorQuery request response message = do
let status = HC.responseStatus response
when (HT.statusIsServerError status) $
throwIO $ ServerError message
when (HT.statusIsClientError status) $
throwIO $ ClientError message request
fail $ "BUG: " ++ message ++ " in Database.InfluxDB.Query.query - "
++ show request
makeLensesWith
( lensRules
& lensField .~ mappingNamer
(\name -> case stripPrefix "query" name of
Just (c:cs) -> ['_':toLower c:cs]
_ -> [])
)
''QueryParams
instance HasServer QueryParams where
server = _server
instance HasDatabase QueryParams where
database = _database
instance HasPrecision 'QueryRequest QueryParams where
precision = _precision
instance HasManager QueryParams where
manager = _manager
instance HasCredentials QueryParams where
authentication = _authentication