{-# LANGUAGE DeriveGeneric #-}
module Log.Backend.ElasticSearch.Internal
( ElasticSearchConfig(..)
, defaultElasticSearchConfig
, EsVersion(..)
, parseEsVersion
, esV5, esV7
, serverInfo
, indexExists
, createIndexWithMapping
, bulkIndex
, refreshIndex
, EsEnv(..)
, mkEsEnv
, dispatch
, decodeReply
, isSuccess
) where
import Control.Applicative
import Control.Exception
import Control.Monad
import Data.Aeson
import Data.Ix (inRange)
import Data.Maybe
import Data.Semigroup
import GHC.Generics (Generic)
import Network.HTTP.Client
import Network.HTTP.Types
import Network.HTTP.Client.TLS (tlsManagerSettings)
import Prelude
import qualified Data.ByteString.Builder as BSB
import qualified Data.ByteString.Lazy.Char8 as BSL
import qualified Data.HashMap.Strict as H
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import qualified Data.Vector as V
data ElasticSearchConfig = ElasticSearchConfig
{ esServer :: !T.Text
, esIndex :: !T.Text
, esShardCount :: !Int
, esReplicaCount :: !Int
, esMapping :: !T.Text
, esLogin :: Maybe (T.Text, T.Text)
, esLoginInsecure :: !Bool
} deriving (Eq, Show, Generic)
defaultElasticSearchConfig :: ElasticSearchConfig
defaultElasticSearchConfig = ElasticSearchConfig
{ esServer = "http://localhost:9200"
, esIndex = "logs"
, esShardCount = 4
, esReplicaCount = 1
, esMapping = "log"
, esLogin = Nothing
, esLoginInsecure = False
}
data EsVersion = EsVersion !Int !Int !Int
deriving (Eq, Ord)
parseEsVersion :: Value -> Maybe EsVersion
parseEsVersion js = do
Object props <- pure js
Object version <- "version" `H.lookup` props
String number <- "number" `H.lookup` version
[v1, v2, v3] <- mapM (maybeRead . T.unpack) $ T.splitOn "." number
pure $ EsVersion v1 v2 v3
where
maybeRead s = do
[(v, "")] <- pure $ reads s
pure v
esV5 :: EsVersion
esV5 = EsVersion 5 0 0
esV7 :: EsVersion
esV7 = EsVersion 7 0 0
serverInfo :: EsEnv -> IO (Either HttpException (Response Value))
serverInfo env = try $ dispatch env methodGet [] Nothing
indexExists :: EsEnv -> T.Text -> IO Bool
indexExists env index =
isSuccess <$> dispatch env methodHead [index] Nothing
createIndexWithMapping
:: EsVersion
-> EsEnv
-> ElasticSearchConfig
-> T.Text
-> IO (Response Value)
createIndexWithMapping version env ElasticSearchConfig{..} index = do
dispatch env methodPut [index] . Just . encode $ object
[ "settings" .= object
[ "number_of_shards" .= esShardCount
, "number_of_replicas" .= esReplicaCount
]
, "mappings" .= if version >= esV7
then logsMapping
else object [ esMapping .= logsMapping ]
]
where
logsMapping = object
[ "properties" .= object
[ "insertion_order" .= object
[ "type" .= ("integer"::T.Text)
]
, "insertion_time" .= object
[ "type" .= ("date"::T.Text)
, "format" .= ("date_time"::T.Text)
]
, "time" .= object
[ "type" .= ("date"::T.Text)
, "format" .= ("date_time"::T.Text)
]
, "domain" .= object
[ "type" .= textTy
]
, "level" .= object
[ "type" .= textTy
]
, "component" .= object
[ "type" .= textTy
]
, "message" .= object
[ "type" .= textTy
]
]
]
where
textTy :: T.Text
textTy = if version >= esV5
then "text"
else "string"
bulkIndex
:: EsVersion
-> EsEnv
-> ElasticSearchConfig
-> T.Text
-> V.Vector (H.HashMap T.Text Value)
-> IO (Response Value)
bulkIndex version env conf index objs = do
dispatch env methodPost route . Just . BSB.toLazyByteString $ foldMap ixOp objs
where
route = if version >= esV7
then [index, "_bulk"]
else [index, esMapping conf, "_bulk"]
ixOp obj = ixCmd
<> BSB.char8 '\n'
<> BSB.lazyByteString (encode $ Object obj)
<> BSB.char8 '\n'
where
ixCmd = BSB.lazyByteString . encode $ object
[ "index" .= object []
]
refreshIndex :: EsEnv -> T.Text -> IO ()
refreshIndex env index =
void $ dispatch env methodPost [index, "_refresh"] Nothing
data EsEnv = EsEnv
{ envServer :: !T.Text
, envManager :: !Manager
, envRequestHook :: !(Request -> Request)
}
mkEsEnv :: ElasticSearchConfig -> IO EsEnv
mkEsEnv ElasticSearchConfig{..} = do
envManager <- newManager tlsManagerSettings
let envServer = esServer
envRequestHook = maybe id mkAuthHook esLogin
pure EsEnv{..}
where
mkAuthHook (u, p) = applyBasicAuth (T.encodeUtf8 u) (T.encodeUtf8 p)
dispatch :: EsEnv
-> Method
-> [T.Text]
-> Maybe BSL.ByteString
-> IO (Response Value)
dispatch EsEnv{..} dMethod url body = do
initReq <- parseRequest $ T.unpack $ T.intercalate "/" $ envServer : url
let req = envRequestHook . setRequestIgnoreStatus $ initReq
{ method = dMethod
, requestBody = RequestBodyLBS $ fromMaybe BSL.empty body
, requestHeaders = ("Content-Type", "application/json") : requestHeaders initReq
}
fmap decodeReply <$> httpLbs req envManager
decodeReply :: BSL.ByteString -> Value
decodeReply bs = case eitherDecode' bs of
Right js -> js
Left err -> object ["decoding_error" .= err]
isSuccess :: Response a -> Bool
isSuccess = statusCheck (inRange (200, 299))
where
statusCheck :: (Int -> Bool) -> Response a -> Bool
statusCheck p = p . statusCode . responseStatus