{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TupleSections #-}
module Database.V5.Bloodhound.Client
(
withBH
, createIndex
, createIndexWith
, flushIndex
, deleteIndex
, updateIndexSettings
, getIndexSettings
, forceMergeIndex
, indexExists
, openIndex
, closeIndex
, listIndices
, waitForYellowIndex
, updateIndexAliases
, getIndexAliases
, deleteIndexAlias
, putTemplate
, templateExists
, deleteTemplate
, putMapping
, indexDocument
, updateDocument
, getDocument
, documentExists
, deleteDocument
, searchAll
, searchByIndex
, searchByIndices
, searchByType
, scanSearch
, getInitialScroll
, getInitialSortedScroll
, advanceScroll
, refreshIndex
, mkSearch
, mkAggregateSearch
, mkHighlightSearch
, bulk
, pageSearch
, mkShardCount
, mkReplicaCount
, getStatus
, getSnapshotRepos
, updateSnapshotRepo
, verifySnapshotRepo
, deleteSnapshotRepo
, createSnapshot
, getSnapshots
, deleteSnapshot
, restoreSnapshot
, getNodesInfo
, getNodesStats
, encodeBulkOperations
, encodeBulkOperation
, basicAuthHook
, isVersionConflict
, isSuccess
, isCreated
, parseEsResponse
)
where
import qualified Blaze.ByteString.Builder as BB
import Control.Applicative as A
import Control.Monad
import Control.Monad.Catch
import Control.Monad.IO.Class
import Data.Aeson
import Data.ByteString.Lazy.Builder
import qualified Data.ByteString.Lazy.Char8 as L
import Data.Foldable (toList)
import qualified Data.HashMap.Strict as HM
import Data.Ix
import qualified Data.List as LS (filter, foldl')
import Data.List.NonEmpty (NonEmpty (..))
import Data.Maybe (catMaybes, fromMaybe, isJust)
import Data.Monoid
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.Time.Clock
import qualified Data.Vector as V
import Network.HTTP.Client
import qualified Network.HTTP.Types.Method as NHTM
import qualified Network.HTTP.Types.Status as NHTS
import qualified Network.HTTP.Types.URI as NHTU
import qualified Network.URI as URI
import Prelude hiding (filter, head)
import Database.V5.Bloodhound.Types
mkShardCount :: Int -> Maybe ShardCount
mkShardCount n
| n < 1 = Nothing
| n > 1000 = Nothing
| otherwise = Just (ShardCount n)
mkReplicaCount :: Int -> Maybe ReplicaCount
mkReplicaCount n
| n < 0 = Nothing
| n > 1000 = Nothing
| otherwise = Just (ReplicaCount n)
emptyBody :: L.ByteString
emptyBody = L.pack ""
dispatch :: MonadBH m
=> Method
-> Text
-> Maybe L.ByteString
-> m Reply
dispatch dMethod url body = do
initReq <- liftIO $ parseUrl' url
reqHook <- bhRequestHook A.<$> getBHEnv
let reqBody = RequestBodyLBS $ fromMaybe emptyBody body
req <- liftIO
$ reqHook
$ setRequestIgnoreStatus
$ initReq { method = dMethod
, requestHeaders =
("Content-Type", "application/json") : requestHeaders initReq
, requestBody = reqBody }
mgr <- bhManager <$> getBHEnv
liftIO $ httpLbs req mgr
joinPath' :: [Text] -> Text
joinPath' = T.intercalate "/"
joinPath :: MonadBH m => [Text] -> m Text
joinPath ps = do
Server s <- bhServer <$> getBHEnv
return $ joinPath' (s:ps)
appendSearchTypeParam :: Text -> SearchType -> Text
appendSearchTypeParam originalUrl st = addQuery params originalUrl
where stText = "search_type"
params
| st == SearchTypeDfsQueryThenFetch = [(stText, Just "dfs_query_then_fetch")]
| otherwise = []
addQuery :: [(Text, Maybe Text)] -> Text -> Text
addQuery q u = u <> rendered
where
rendered =
T.decodeUtf8 $ BB.toByteString $ NHTU.renderQueryText prependQuestionMark q
prependQuestionMark = True
bindM2 :: (Applicative m, Monad m) => (a -> b -> m c) -> m a -> m b -> m c
bindM2 f ma mb = join (f <$> ma <*> mb)
withBH :: ManagerSettings -> Server -> BH IO a -> IO a
withBH ms s f = do
mgr <- newManager ms
let env = mkBHEnv s mgr
runBH env f
delete :: MonadBH m => Text -> m Reply
delete = flip (dispatch NHTM.methodDelete) Nothing
get :: MonadBH m => Text -> m Reply
get = flip (dispatch NHTM.methodGet) Nothing
head :: MonadBH m => Text -> m Reply
head = flip (dispatch NHTM.methodHead) Nothing
put :: MonadBH m => Text -> Maybe L.ByteString -> m Reply
put = dispatch NHTM.methodPut
post :: MonadBH m => Text -> Maybe L.ByteString -> m Reply
post = dispatch NHTM.methodPost
getStatus :: MonadBH m => m (Maybe Status)
getStatus = do
response <- get =<< url
return $ decode (responseBody response)
where url = joinPath []
getSnapshotRepos
:: ( MonadBH m
, MonadThrow m
)
=> SnapshotRepoSelection
-> m (Either EsError [GenericSnapshotRepo])
getSnapshotRepos sel = fmap (fmap unGSRs) . parseEsResponse =<< get =<< url
where
url = joinPath ["_snapshot", selectorSeg]
selectorSeg = case sel of
AllSnapshotRepos -> "_all"
SnapshotRepoList (p :| ps) -> T.intercalate "," (renderPat <$> (p:ps))
renderPat (RepoPattern t) = t
renderPat (ExactRepo (SnapshotRepoName t)) = t
newtype GSRs = GSRs { unGSRs :: [GenericSnapshotRepo] }
instance FromJSON GSRs where
parseJSON = withObject "Collection of GenericSnapshotRepo" parse
where
parse = fmap GSRs . mapM (uncurry go) . HM.toList
go rawName = withObject "GenericSnapshotRepo" $ \o ->
GenericSnapshotRepo (SnapshotRepoName rawName) <$> o .: "type"
<*> o .: "settings"
updateSnapshotRepo
:: ( MonadBH m
, SnapshotRepo repo
)
=> SnapshotRepoUpdateSettings
-> repo
-> m Reply
updateSnapshotRepo SnapshotRepoUpdateSettings {..} repo =
bindM2 put url (return (Just body))
where
url = addQuery params <$> joinPath ["_snapshot", snapshotRepoName gSnapshotRepoName]
params
| repoUpdateVerify = []
| otherwise = [("verify", Just "false")]
body = encode $ object [ "type" .= gSnapshotRepoType
, "settings" .= gSnapshotRepoSettings
]
GenericSnapshotRepo {..} = toGSnapshotRepo repo
verifySnapshotRepo
:: ( MonadBH m
, MonadThrow m
)
=> SnapshotRepoName
-> m (Either EsError SnapshotVerification)
verifySnapshotRepo (SnapshotRepoName n) =
parseEsResponse =<< bindM2 post url (return Nothing)
where
url = joinPath ["_snapshot", n, "_verify"]
deleteSnapshotRepo :: MonadBH m => SnapshotRepoName -> m Reply
deleteSnapshotRepo (SnapshotRepoName n) = delete =<< url
where
url = joinPath ["_snapshot", n]
createSnapshot
:: (MonadBH m)
=> SnapshotRepoName
-> SnapshotName
-> SnapshotCreateSettings
-> m Reply
createSnapshot (SnapshotRepoName repoName)
(SnapshotName snapName)
SnapshotCreateSettings {..} =
bindM2 put url (return (Just body))
where
url = addQuery params <$> joinPath ["_snapshot", repoName, snapName]
params = [("wait_for_completion", Just (boolQP snapWaitForCompletion))]
body = encode $ object prs
prs = catMaybes [ ("indices" .=) . indexSelectionName <$> snapIndices
, Just ("ignore_unavailable" .= snapIgnoreUnavailable)
, Just ("ignore_global_state" .= snapIncludeGlobalState)
, Just ("partial" .= snapPartial)
]
indexSelectionName :: IndexSelection -> Text
indexSelectionName AllIndexes = "_all"
indexSelectionName (IndexList (i :| is)) = T.intercalate "," (renderIndex <$> (i:is))
where
renderIndex (IndexName n) = n
getSnapshots
:: ( MonadBH m
, MonadThrow m
)
=> SnapshotRepoName
-> SnapshotSelection
-> m (Either EsError [SnapshotInfo])
getSnapshots (SnapshotRepoName repoName) sel =
fmap (fmap unSIs) . parseEsResponse =<< get =<< url
where
url = joinPath ["_snapshot", repoName, snapPath]
snapPath = case sel of
AllSnapshots -> "_all"
SnapshotList (s :| ss) -> T.intercalate "," (renderPath <$> (s:ss))
renderPath (SnapPattern t) = t
renderPath (ExactSnap (SnapshotName t)) = t
newtype SIs = SIs { unSIs :: [SnapshotInfo] }
instance FromJSON SIs where
parseJSON = withObject "Collection of SnapshotInfo" parse
where
parse o = SIs <$> o .: "snapshots"
deleteSnapshot :: MonadBH m => SnapshotRepoName -> SnapshotName -> m Reply
deleteSnapshot (SnapshotRepoName repoName) (SnapshotName snapName) =
delete =<< url
where
url = joinPath ["_snapshot", repoName, snapName]
restoreSnapshot
:: MonadBH m
=> SnapshotRepoName
-> SnapshotName
-> SnapshotRestoreSettings
-> m Reply
restoreSnapshot (SnapshotRepoName repoName)
(SnapshotName snapName)
SnapshotRestoreSettings {..} = bindM2 post url (return (Just body))
where
url = addQuery params <$> joinPath ["_snapshot", repoName, snapName, "_restore"]
params = [("wait_for_completion", Just (boolQP snapRestoreWaitForCompletion))]
body = encode (object prs)
prs = catMaybes [ ("indices" .=) . indexSelectionName <$> snapRestoreIndices
, Just ("ignore_unavailable" .= snapRestoreIgnoreUnavailable)
, Just ("include_global_state" .= snapRestoreIncludeGlobalState)
, ("rename_pattern" .=) <$> snapRestoreRenamePattern
, ("rename_replacement" .=) . renderTokens <$> snapRestoreRenameReplacement
, Just ("include_aliases" .= snapRestoreIncludeAliases)
, ("index_settings" .= ) <$> snapRestoreIndexSettingsOverrides
, ("ignore_index_settings" .= ) <$> snapRestoreIgnoreIndexSettings
]
renderTokens (t :| ts) = mconcat (renderToken <$> (t:ts))
renderToken (RRTLit t) = t
renderToken RRSubWholeMatch = "$0"
renderToken (RRSubGroup g) = T.pack (show (rrGroupRefNum g))
getNodesInfo
:: ( MonadBH m
, MonadThrow m
)
=> NodeSelection
-> m (Either EsError NodesInfo)
getNodesInfo sel = parseEsResponse =<< get =<< url
where
url = joinPath ["_nodes", selectionSeg]
selectionSeg = case sel of
LocalNode -> "_local"
NodeList (l :| ls) -> T.intercalate "," (selToSeg <$> (l:ls))
AllNodes -> "_all"
selToSeg (NodeByName (NodeName n)) = n
selToSeg (NodeByFullNodeId (FullNodeId i)) = i
selToSeg (NodeByHost (Server s)) = s
selToSeg (NodeByAttribute (NodeAttrName a) v) = a <> ":" <> v
getNodesStats
:: ( MonadBH m
, MonadThrow m
)
=> NodeSelection
-> m (Either EsError NodesStats)
getNodesStats sel = parseEsResponse =<< get =<< url
where
url = joinPath ["_nodes", selectionSeg, "stats"]
selectionSeg = case sel of
LocalNode -> "_local"
NodeList (l :| ls) -> T.intercalate "," (selToSeg <$> (l:ls))
AllNodes -> "_all"
selToSeg (NodeByName (NodeName n)) = n
selToSeg (NodeByFullNodeId (FullNodeId i)) = i
selToSeg (NodeByHost (Server s)) = s
selToSeg (NodeByAttribute (NodeAttrName a) v) = a <> ":" <> v
createIndex :: MonadBH m => IndexSettings -> IndexName -> m Reply
createIndex indexSettings (IndexName indexName) =
bindM2 put url (return body)
where url = joinPath [indexName]
body = Just $ encode indexSettings
createIndexWith :: MonadBH m
=> [UpdatableIndexSetting]
-> Int
-> IndexName
-> m Reply
createIndexWith updates shards (IndexName indexName) =
bindM2 put url (return (Just body))
where url = joinPath [indexName]
body = encode $ object
["settings" .= deepMerge
( HM.singleton "index.number_of_shards" (toJSON shards) :
[u | Object u <- toJSON <$> updates]
)
]
flushIndex :: MonadBH m => IndexName -> m Reply
flushIndex (IndexName indexName) = do
path <- joinPath [indexName, "_flush"]
post path Nothing
deleteIndex :: MonadBH m => IndexName -> m Reply
deleteIndex (IndexName indexName) =
delete =<< joinPath [indexName]
updateIndexSettings :: MonadBH m => NonEmpty UpdatableIndexSetting -> IndexName -> m Reply
updateIndexSettings updates (IndexName indexName) =
bindM2 put url (return body)
where
url = joinPath [indexName, "_settings"]
body = Just (encode jsonBody)
jsonBody = Object (deepMerge [u | Object u <- toJSON <$> toList updates])
getIndexSettings :: (MonadBH m, MonadThrow m) => IndexName
-> m (Either EsError IndexSettingsSummary)
getIndexSettings (IndexName indexName) =
parseEsResponse =<< get =<< url
where
url = joinPath [indexName, "_settings"]
forceMergeIndex :: MonadBH m => IndexSelection -> ForceMergeIndexSettings -> m Reply
forceMergeIndex ixs ForceMergeIndexSettings {..} =
bindM2 post url (return body)
where url = addQuery params <$> joinPath [indexName, "_forcemerge"]
params = catMaybes [ ("max_num_segments",) . Just . showText <$> maxNumSegments
, Just ("only_expunge_deletes", Just (boolQP onlyExpungeDeletes))
, Just ("flush", Just (boolQP flushAfterOptimize))
]
indexName = indexSelectionName ixs
body = Nothing
deepMerge :: [Object] -> Object
deepMerge = LS.foldl' go mempty
where go acc = LS.foldl' go' acc . HM.toList
go' acc (k, v) = HM.insertWith merge k v acc
merge (Object a) (Object b) = Object (deepMerge [a, b])
merge _ b = b
statusCodeIs :: (Int, Int) -> Reply -> Bool
statusCodeIs r resp = inRange r $ NHTS.statusCode (responseStatus resp)
respIsTwoHunna :: Reply -> Bool
respIsTwoHunna = statusCodeIs (200, 299)
existentialQuery :: MonadBH m => Text -> m (Reply, Bool)
existentialQuery url = do
reply <- head url
return (reply, respIsTwoHunna reply)
parseEsResponse :: ( MonadThrow m
, FromJSON a
)
=> Reply
-> m (Either EsError a)
parseEsResponse reply
| respIsTwoHunna reply = case eitherDecode body of
Right a -> return (Right a)
Left err ->
tryParseError err
| otherwise = tryParseError "Non-200 status code"
where body = responseBody reply
tryParseError originalError
= case eitherDecode body of
Right e -> return (Left e)
Left err -> explode ("Original error was: " <> originalError <> " Error parse failure was: " <> err)
explode errorMsg = throwM (EsProtocolException (T.pack errorMsg) body)
indexExists :: MonadBH m => IndexName -> m Bool
indexExists (IndexName indexName) = do
(_, exists) <- existentialQuery =<< joinPath [indexName]
return exists
refreshIndex :: MonadBH m => IndexName -> m Reply
refreshIndex (IndexName indexName) =
bindM2 post url (return Nothing)
where url = joinPath [indexName, "_refresh"]
waitForYellowIndex :: MonadBH m => IndexName -> m Reply
waitForYellowIndex (IndexName indexName) = get =<< url
where url = addQuery q <$> joinPath ["_cluster","health",indexName]
q = [("wait_for_status",Just "yellow"),("timeout",Just "10s")]
stringifyOCIndex :: OpenCloseIndex -> Text
stringifyOCIndex oci = case oci of
OpenIndex -> "_open"
CloseIndex -> "_close"
openOrCloseIndexes :: MonadBH m => OpenCloseIndex -> IndexName -> m Reply
openOrCloseIndexes oci (IndexName indexName) =
bindM2 post url (return Nothing)
where ociString = stringifyOCIndex oci
url = joinPath [indexName, ociString]
openIndex :: MonadBH m => IndexName -> m Reply
openIndex = openOrCloseIndexes OpenIndex
closeIndex :: MonadBH m => IndexName -> m Reply
closeIndex = openOrCloseIndexes CloseIndex
listIndices :: (MonadThrow m, MonadBH m) => m [IndexName]
listIndices =
parse . responseBody =<< get =<< url
where
url = joinPath ["_cat/indices?format=json"]
parse body = either (\msg -> (throwM (EsProtocolException (T.pack msg) body))) return $ do
vals <- eitherDecode body
forM vals $ \val ->
case val of
Object obj ->
case HM.lookup "index" obj of
(Just (String txt)) -> Right (IndexName txt)
v -> Left $ "indexVal in listIndices failed on non-string, was: " <> show v
v -> Left $ "One of the values parsed in listIndices wasn't an object, it was: " <> show v
updateIndexAliases :: MonadBH m => NonEmpty IndexAliasAction -> m Reply
updateIndexAliases actions = bindM2 post url (return body)
where url = joinPath ["_aliases"]
body = Just (encode bodyJSON)
bodyJSON = object [ "actions" .= toList actions]
getIndexAliases :: (MonadBH m, MonadThrow m)
=> m (Either EsError IndexAliasesSummary)
getIndexAliases = parseEsResponse =<< get =<< url
where url = joinPath ["_aliases"]
deleteIndexAlias :: MonadBH m => IndexAliasName -> m Reply
deleteIndexAlias (IndexAliasName (IndexName name)) = delete =<< url
where url = joinPath ["_all","_alias",name]
putTemplate :: MonadBH m => IndexTemplate -> TemplateName -> m Reply
putTemplate indexTemplate (TemplateName templateName) =
bindM2 put url (return body)
where url = joinPath ["_template", templateName]
body = Just $ encode indexTemplate
templateExists :: MonadBH m => TemplateName -> m Bool
templateExists (TemplateName templateName) = do
(_, exists) <- existentialQuery =<< joinPath ["_template", templateName]
return exists
deleteTemplate :: MonadBH m => TemplateName -> m Reply
deleteTemplate (TemplateName templateName) =
delete =<< joinPath ["_template", templateName]
putMapping :: (MonadBH m, ToJSON a) => IndexName
-> MappingName -> a -> m Reply
putMapping (IndexName indexName) (MappingName mappingName) mapping =
bindM2 put url (return body)
where url = joinPath [indexName, "_mapping", mappingName]
body = Just $ encode mapping
versionCtlParams :: IndexDocumentSettings -> [(Text, Maybe Text)]
versionCtlParams cfg =
case idsVersionControl cfg of
NoVersionControl -> []
InternalVersion v -> versionParams v "internal"
ExternalGT (ExternalDocVersion v) -> versionParams v "external_gt"
ExternalGTE (ExternalDocVersion v) -> versionParams v "external_gte"
ForceVersion (ExternalDocVersion v) -> versionParams v "force"
where
vt = showText . docVersionNumber
versionParams v t = [ ("version", Just $ vt v)
, ("version_type", Just t)
]
indexDocument :: (ToJSON doc, MonadBH m) => IndexName -> MappingName
-> IndexDocumentSettings -> doc -> DocId -> m Reply
indexDocument (IndexName indexName)
(MappingName mappingName) cfg document (DocId docId) =
bindM2 put url (return body)
where url = addQuery params <$> joinPath [indexName, mappingName, docId]
parentParams = case idsParent cfg of
Nothing -> []
Just (DocumentParent (DocId p)) -> [ ("parent", Just p) ]
params = versionCtlParams cfg ++ parentParams
body = Just (encode document)
updateDocument :: (ToJSON patch, MonadBH m) => IndexName -> MappingName
-> IndexDocumentSettings -> patch -> DocId -> m Reply
updateDocument (IndexName indexName)
(MappingName mappingName) cfg patch (DocId docId) =
bindM2 post url (return body)
where url = addQuery (versionCtlParams cfg) <$>
joinPath [indexName, mappingName, docId, "_update"]
body = Just (encode $ object ["doc" .= toJSON patch])
deleteDocument :: MonadBH m => IndexName -> MappingName
-> DocId -> m Reply
deleteDocument (IndexName indexName)
(MappingName mappingName) (DocId docId) =
delete =<< joinPath [indexName, mappingName, docId]
bulk :: MonadBH m => V.Vector BulkOperation -> m Reply
bulk bulkOps =
bindM2 post url (return body)
where
url = joinPath ["_bulk"]
body = Just $ encodeBulkOperations bulkOps
encodeBulkOperations :: V.Vector BulkOperation -> L.ByteString
encodeBulkOperations stream = collapsed where
blobs =
fmap encodeBulkOperation stream
mashedTaters =
mash (mempty :: Builder) blobs
collapsed =
toLazyByteString $ mappend mashedTaters (byteString "\n")
mash :: Builder -> V.Vector L.ByteString -> Builder
mash = V.foldl' (\b x -> b <> byteString "\n" <> lazyByteString x)
mkBulkStreamValue :: Text -> Text -> Text -> Text -> Value
mkBulkStreamValue operation indexName mappingName docId =
object [operation .=
object [ "_index" .= indexName
, "_type" .= mappingName
, "_id" .= docId]]
mkBulkStreamValueAuto :: Text -> Text -> Text -> Value
mkBulkStreamValueAuto operation indexName mappingName =
object [operation .=
object [ "_index" .= indexName
, "_type" .= mappingName]]
encodeBulkOperation :: BulkOperation -> L.ByteString
encodeBulkOperation (BulkIndex (IndexName indexName)
(MappingName mappingName)
(DocId docId) value) = blob
where metadata = mkBulkStreamValue "index" indexName mappingName docId
blob = encode metadata `mappend` "\n" `mappend` encode value
encodeBulkOperation (BulkIndexAuto (IndexName indexName)
(MappingName mappingName)
value) = blob
where metadata = mkBulkStreamValueAuto "index" indexName mappingName
blob = encode metadata `mappend` "\n" `mappend` encode value
encodeBulkOperation (BulkIndexEncodingAuto (IndexName indexName)
(MappingName mappingName)
encoding) = toLazyByteString blob
where metadata = toEncoding (mkBulkStreamValueAuto "index" indexName mappingName)
blob = fromEncoding metadata <> "\n" <> fromEncoding encoding
encodeBulkOperation (BulkCreate (IndexName indexName)
(MappingName mappingName)
(DocId docId) value) = blob
where metadata = mkBulkStreamValue "create" indexName mappingName docId
blob = encode metadata `mappend` "\n" `mappend` encode value
encodeBulkOperation (BulkDelete (IndexName indexName)
(MappingName mappingName)
(DocId docId)) = blob
where metadata = mkBulkStreamValue "delete" indexName mappingName docId
blob = encode metadata
encodeBulkOperation (BulkUpdate (IndexName indexName)
(MappingName mappingName)
(DocId docId) value) = blob
where metadata = mkBulkStreamValue "update" indexName mappingName docId
doc = object ["doc" .= value]
blob = encode metadata `mappend` "\n" `mappend` encode doc
encodeBulkOperation (BulkCreateEncoding (IndexName indexName)
(MappingName mappingName)
(DocId docId) encoding) = toLazyByteString blob
where metadata = toEncoding (mkBulkStreamValue "create" indexName mappingName docId)
blob = fromEncoding metadata <> "\n" <> fromEncoding encoding
getDocument :: MonadBH m => IndexName -> MappingName
-> DocId -> m Reply
getDocument (IndexName indexName)
(MappingName mappingName) (DocId docId) =
get =<< joinPath [indexName, mappingName, docId]
documentExists :: MonadBH m => IndexName -> MappingName
-> Maybe DocumentParent -> DocId -> m Bool
documentExists (IndexName indexName) (MappingName mappingName)
parent (DocId docId) = do
(_, exists) <- existentialQuery =<< url
return exists
where url = addQuery params <$> joinPath [indexName, mappingName, docId]
parentParam = fmap (\(DocumentParent (DocId p)) -> p) parent
params = LS.filter (\(_, v) -> isJust v) [("parent", parentParam)]
dispatchSearch :: MonadBH m => Text -> Search -> m Reply
dispatchSearch url search = post url' (Just (encode search))
where url' = appendSearchTypeParam url (searchType search)
searchAll :: MonadBH m => Search -> m Reply
searchAll = bindM2 dispatchSearch url . return
where url = joinPath ["_search"]
searchByIndex :: MonadBH m => IndexName -> Search -> m Reply
searchByIndex (IndexName indexName) = bindM2 dispatchSearch url . return
where url = joinPath [indexName, "_search"]
searchByIndices :: MonadBH m => NonEmpty IndexName -> Search -> m Reply
searchByIndices ixs = bindM2 dispatchSearch url . return
where url = joinPath [renderedIxs, "_search"]
renderedIxs = T.intercalate (T.singleton ',') (map (\(IndexName t) -> t) (toList ixs))
searchByType :: MonadBH m => IndexName -> MappingName -> Search
-> m Reply
searchByType (IndexName indexName)
(MappingName mappingName) = bindM2 dispatchSearch url . return
where url = joinPath [indexName, mappingName, "_search"]
getInitialScroll ::
(FromJSON a, MonadThrow m, MonadBH m) => IndexName ->
MappingName ->
Search ->
m (Either EsError (SearchResult a))
getInitialScroll (IndexName indexName) (MappingName mappingName) search' = do
let url = addQuery params <$> joinPath [indexName, mappingName, "_search"]
params = [("scroll", Just "1m")]
sorting = Just [DefaultSortSpec $ mkSort (FieldName "_doc") Descending]
search = search' { sortBody = sorting }
resp' <- bindM2 dispatchSearch url (return search)
parseEsResponse resp'
getInitialSortedScroll ::
(FromJSON a, MonadThrow m, MonadBH m) => IndexName ->
MappingName ->
Search ->
m (Either EsError (SearchResult a))
getInitialSortedScroll (IndexName indexName) (MappingName mappingName) search = do
let url = addQuery params <$> joinPath [indexName, mappingName, "_search"]
params = [("scroll", Just "1m")]
resp' <- bindM2 dispatchSearch url (return search)
parseEsResponse resp'
scroll' :: (FromJSON a, MonadBH m, MonadThrow m) => Maybe ScrollId ->
m ([Hit a], Maybe ScrollId)
scroll' Nothing = return ([], Nothing)
scroll' (Just sid) = do
res <- advanceScroll sid 60
case res of
Right SearchResult {..} -> return (hits searchHits, scrollId)
Left _ -> return ([], Nothing)
advanceScroll
:: ( FromJSON a
, MonadBH m
, MonadThrow m
)
=> ScrollId
-> NominalDiffTime
-> m (Either EsError (SearchResult a))
advanceScroll (ScrollId sid) scroll = do
url <- joinPath ["_search", "scroll"]
resp <- post url (Just $ encode scrollObject)
parseEsResponse resp
where scrollTime = showText secs <> "s"
secs :: Integer
secs = round scroll
scrollObject = object [ "scroll" .= scrollTime
, "scroll_id" .= sid
]
simpleAccumulator ::
(FromJSON a, MonadBH m, MonadThrow m) =>
[Hit a] ->
([Hit a], Maybe ScrollId) ->
m ([Hit a], Maybe ScrollId)
simpleAccumulator oldHits (newHits, Nothing) = return (oldHits ++ newHits, Nothing)
simpleAccumulator oldHits ([], _) = return (oldHits, Nothing)
simpleAccumulator oldHits (newHits, msid) = do
(newHits', msid') <- scroll' msid
simpleAccumulator (oldHits ++ newHits) (newHits', msid')
scanSearch :: (FromJSON a, MonadBH m, MonadThrow m) => IndexName
-> MappingName
-> Search
-> m [Hit a]
scanSearch indexName mappingName search = do
initialSearchResult <- getInitialScroll indexName mappingName search
let (hits', josh) = case initialSearchResult of
Right SearchResult {..} -> (hits searchHits, scrollId)
Left _ -> ([], Nothing)
(totalHits, _) <- simpleAccumulator [] (hits', josh)
return totalHits
mkSearch :: Maybe Query -> Maybe Filter -> Search
mkSearch query filter = Search query filter Nothing Nothing Nothing False (From 0) (Size 10) SearchTypeQueryThenFetch Nothing Nothing Nothing Nothing
mkAggregateSearch :: Maybe Query -> Aggregations -> Search
mkAggregateSearch query mkSearchAggs = Search query Nothing Nothing (Just mkSearchAggs) Nothing False (From 0) (Size 0) SearchTypeQueryThenFetch Nothing Nothing Nothing Nothing
mkHighlightSearch :: Maybe Query -> Highlights -> Search
mkHighlightSearch query searchHighlights = Search query Nothing Nothing Nothing (Just searchHighlights) False (From 0) (Size 10) SearchTypeQueryThenFetch Nothing Nothing Nothing Nothing
pageSearch :: From
-> Size
-> Search
-> Search
pageSearch resultOffset pageSize search = search { from = resultOffset, size = pageSize }
parseUrl' :: MonadThrow m => Text -> m Request
parseUrl' t = parseRequest (URI.escapeURIString URI.isAllowedInURI (T.unpack t))
isVersionConflict :: Reply -> Bool
isVersionConflict = statusCheck (== 409)
isSuccess :: Reply -> Bool
isSuccess = statusCheck (inRange (200, 299))
isCreated :: Reply -> Bool
isCreated = statusCheck (== 201)
statusCheck :: (Int -> Bool) -> Reply -> Bool
statusCheck prd = prd . NHTS.statusCode . responseStatus
basicAuthHook :: Monad m => EsUsername -> EsPassword -> Request -> m Request
basicAuthHook (EsUsername u) (EsPassword p) = return . applyBasicAuth u' p'
where u' = T.encodeUtf8 u
p' = T.encodeUtf8 p
boolQP :: Bool -> Text
boolQP True = "true"
boolQP False = "false"