{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards   #-}
{-# LANGUAGE TupleSections     #-}
module Database.V5.Bloodhound.Client
       ( 
         
         
         
         
         withBH
       
       , createIndex
       , createIndexWith
       , flushIndex
       , deleteIndex
       , updateIndexSettings
       , getIndexSettings
       , forceMergeIndex
       , indexExists
       , openIndex
       , closeIndex
       , listIndices
       , waitForYellowIndex
       
       , updateIndexAliases
       , getIndexAliases
       , deleteIndexAlias
       
       , putTemplate
       , templateExists
       , deleteTemplate
       
       , putMapping
       
       , indexDocument
       , updateDocument
       , getDocument
       , documentExists
       , deleteDocument
       
       , searchAll
       , searchByIndex
       , searchByIndices
       , searchByType
       , scanSearch
       , getInitialScroll
       , getInitialSortedScroll
       , advanceScroll
       , refreshIndex
       , mkSearch
       , mkAggregateSearch
       , mkHighlightSearch
       , bulk
       , pageSearch
       , mkShardCount
       , mkReplicaCount
       , getStatus
       
       
       , getSnapshotRepos
       , updateSnapshotRepo
       , verifySnapshotRepo
       , deleteSnapshotRepo
       
       , createSnapshot
       , getSnapshots
       , deleteSnapshot
       
       , restoreSnapshot
       
       , getNodesInfo
       , getNodesStats
       
       , encodeBulkOperations
       , encodeBulkOperation
       
       , basicAuthHook
       
       , isVersionConflict
       , isSuccess
       , isCreated
       , parseEsResponse
       )
       where
import qualified Blaze.ByteString.Builder     as BB
import           Control.Applicative          as A
import           Control.Monad
import           Control.Monad.Catch
import           Control.Monad.IO.Class
import           Data.Aeson
import           Data.ByteString.Lazy.Builder
import qualified Data.ByteString.Lazy.Char8   as L
import           Data.Foldable                (toList)
import qualified Data.HashMap.Strict          as HM
import           Data.Ix
import qualified Data.List                    as LS (filter, foldl')
import           Data.List.NonEmpty           (NonEmpty (..))
import           Data.Maybe                   (catMaybes, fromMaybe, isJust)
import           Data.Monoid
import           Data.Text                    (Text)
import qualified Data.Text                    as T
import qualified Data.Text.Encoding           as T
import           Data.Time.Clock
import qualified Data.Vector                  as V
import           Network.HTTP.Client
import qualified Network.HTTP.Types.Method    as NHTM
import qualified Network.HTTP.Types.Status    as NHTS
import qualified Network.HTTP.Types.URI       as NHTU
import qualified Network.URI                  as URI
import           Prelude                      hiding (filter, head)
import           Database.V5.Bloodhound.Types
mkShardCount :: Int -> Maybe ShardCount
mkShardCount n
  | n < 1 = Nothing
  | n > 1000 = Nothing
  | otherwise = Just (ShardCount n)
mkReplicaCount :: Int -> Maybe ReplicaCount
mkReplicaCount n
  | n < 0 = Nothing
  | n > 1000 = Nothing 
  | otherwise = Just (ReplicaCount n)
emptyBody :: L.ByteString
emptyBody = L.pack ""
dispatch :: MonadBH m
         => Method
         -> Text
         -> Maybe L.ByteString
         -> m Reply
dispatch dMethod url body = do
  initReq <- liftIO $ parseUrl' url
  reqHook <- bhRequestHook A.<$> getBHEnv
  let reqBody = RequestBodyLBS $ fromMaybe emptyBody body
  req <- liftIO
         $ reqHook
         $ setRequestIgnoreStatus
         $ initReq { method = dMethod
                   , requestHeaders =
                     
                     ("Content-Type", "application/json") : requestHeaders initReq
                   , requestBody = reqBody }
  
  
  mgr <- bhManager <$> getBHEnv
  liftIO $ httpLbs req mgr
joinPath' :: [Text] -> Text
joinPath' = T.intercalate "/"
joinPath :: MonadBH m => [Text] -> m Text
joinPath ps = do
  Server s <- bhServer <$> getBHEnv
  return $ joinPath' (s:ps)
appendSearchTypeParam :: Text -> SearchType -> Text
appendSearchTypeParam originalUrl st = addQuery params originalUrl
  where stText = "search_type"
        params
          | st == SearchTypeDfsQueryThenFetch = [(stText, Just "dfs_query_then_fetch")]
        
          | otherwise                         = []
addQuery :: [(Text, Maybe Text)] -> Text -> Text
addQuery q u = u <> rendered
  where
    rendered =
      T.decodeUtf8 $ BB.toByteString $ NHTU.renderQueryText prependQuestionMark q
    prependQuestionMark = True
bindM2 :: (Applicative m, Monad m) => (a -> b -> m c) -> m a -> m b -> m c
bindM2 f ma mb = join (f <$> ma <*> mb)
withBH :: ManagerSettings -> Server -> BH IO a -> IO a
withBH ms s f = do
  mgr <- newManager ms
  let env = mkBHEnv s mgr
  runBH env f
delete :: MonadBH m => Text -> m Reply
delete = flip (dispatch NHTM.methodDelete) Nothing
get    :: MonadBH m => Text -> m Reply
get    = flip (dispatch NHTM.methodGet) Nothing
head   :: MonadBH m => Text -> m Reply
head   = flip (dispatch NHTM.methodHead) Nothing
put    :: MonadBH m => Text -> Maybe L.ByteString -> m Reply
put    = dispatch NHTM.methodPut
post   :: MonadBH m => Text -> Maybe L.ByteString -> m Reply
post   = dispatch NHTM.methodPost
getStatus :: MonadBH m => m (Maybe Status)
getStatus = do
  response <- get =<< url
  return $ decode (responseBody response)
  where url = joinPath []
getSnapshotRepos
    :: ( MonadBH m
       , MonadThrow m
       )
    => SnapshotRepoSelection
    -> m (Either EsError [GenericSnapshotRepo])
getSnapshotRepos sel = fmap (fmap unGSRs) . parseEsResponse =<< get =<< url
  where
    url = joinPath ["_snapshot", selectorSeg]
    selectorSeg = case sel of
                    AllSnapshotRepos -> "_all"
                    SnapshotRepoList (p :| ps) -> T.intercalate "," (renderPat <$> (p:ps))
    renderPat (RepoPattern t)                  = t
    renderPat (ExactRepo (SnapshotRepoName t)) = t
newtype GSRs = GSRs { unGSRs :: [GenericSnapshotRepo] }
instance FromJSON GSRs where
  parseJSON = withObject "Collection of GenericSnapshotRepo" parse
    where
      parse = fmap GSRs . mapM (uncurry go) . HM.toList
      go rawName = withObject "GenericSnapshotRepo" $ \o ->
        GenericSnapshotRepo (SnapshotRepoName rawName) <$> o .: "type"
                                                       <*> o .: "settings"
updateSnapshotRepo
  :: ( MonadBH m
     , SnapshotRepo repo
     )
  => SnapshotRepoUpdateSettings
  
  -> repo
  -> m Reply
updateSnapshotRepo SnapshotRepoUpdateSettings {..} repo =
  bindM2 put url (return (Just body))
  where
    url = addQuery params <$> joinPath ["_snapshot", snapshotRepoName gSnapshotRepoName]
    params
      | repoUpdateVerify = []
      | otherwise        = [("verify", Just "false")]
    body = encode $ object [ "type" .= gSnapshotRepoType
                           , "settings" .= gSnapshotRepoSettings
                           ]
    GenericSnapshotRepo {..} = toGSnapshotRepo repo
verifySnapshotRepo
    :: ( MonadBH m
       , MonadThrow m
       )
    => SnapshotRepoName
    -> m (Either EsError SnapshotVerification)
verifySnapshotRepo (SnapshotRepoName n) =
  parseEsResponse =<< bindM2 post url (return Nothing)
  where
    url = joinPath ["_snapshot", n, "_verify"]
deleteSnapshotRepo :: MonadBH m => SnapshotRepoName -> m Reply
deleteSnapshotRepo (SnapshotRepoName n) = delete =<< url
  where
    url = joinPath ["_snapshot", n]
createSnapshot
    :: (MonadBH m)
    => SnapshotRepoName
    -> SnapshotName
    -> SnapshotCreateSettings
    -> m Reply
createSnapshot (SnapshotRepoName repoName)
               (SnapshotName snapName)
               SnapshotCreateSettings {..} =
  bindM2 put url (return (Just body))
  where
    url = addQuery params <$> joinPath ["_snapshot", repoName, snapName]
    params = [("wait_for_completion", Just (boolQP snapWaitForCompletion))]
    body = encode $ object prs
    prs = catMaybes [ ("indices" .=) . indexSelectionName <$> snapIndices
                    , Just ("ignore_unavailable" .= snapIgnoreUnavailable)
                    , Just ("ignore_global_state" .= snapIncludeGlobalState)
                    , Just ("partial" .= snapPartial)
                    ]
indexSelectionName :: IndexSelection -> Text
indexSelectionName AllIndexes            = "_all"
indexSelectionName (IndexList (i :| is)) = T.intercalate "," (renderIndex <$> (i:is))
  where
    renderIndex (IndexName n) = n
getSnapshots
    :: ( MonadBH m
       , MonadThrow m
       )
    => SnapshotRepoName
    -> SnapshotSelection
    -> m (Either EsError [SnapshotInfo])
getSnapshots (SnapshotRepoName repoName) sel =
  fmap (fmap unSIs) . parseEsResponse =<< get =<< url
  where
    url = joinPath ["_snapshot", repoName, snapPath]
    snapPath = case sel of
      AllSnapshots -> "_all"
      SnapshotList (s :| ss) -> T.intercalate "," (renderPath <$> (s:ss))
    renderPath (SnapPattern t)              = t
    renderPath (ExactSnap (SnapshotName t)) = t
newtype SIs = SIs { unSIs :: [SnapshotInfo] }
instance FromJSON SIs where
  parseJSON = withObject "Collection of SnapshotInfo" parse
    where
      parse o = SIs <$> o .: "snapshots"
deleteSnapshot :: MonadBH m => SnapshotRepoName -> SnapshotName -> m Reply
deleteSnapshot (SnapshotRepoName repoName) (SnapshotName snapName) =
  delete =<< url
  where
    url = joinPath ["_snapshot", repoName, snapName]
restoreSnapshot
    :: MonadBH m
    => SnapshotRepoName
    -> SnapshotName
    -> SnapshotRestoreSettings
    
    
    -> m Reply
restoreSnapshot (SnapshotRepoName repoName)
                (SnapshotName snapName)
                SnapshotRestoreSettings {..} = bindM2 post url (return (Just body))
  where
    url = addQuery params <$> joinPath ["_snapshot", repoName, snapName, "_restore"]
    params = [("wait_for_completion", Just (boolQP snapRestoreWaitForCompletion))]
    body = encode (object prs)
    prs = catMaybes [ ("indices" .=) . indexSelectionName <$> snapRestoreIndices
                 , Just ("ignore_unavailable" .= snapRestoreIgnoreUnavailable)
                 , Just ("include_global_state" .= snapRestoreIncludeGlobalState)
                 , ("rename_pattern" .=) <$> snapRestoreRenamePattern
                 , ("rename_replacement" .=) . renderTokens <$> snapRestoreRenameReplacement
                 , Just ("include_aliases" .= snapRestoreIncludeAliases)
                 , ("index_settings" .= ) <$> snapRestoreIndexSettingsOverrides
                 , ("ignore_index_settings" .= ) <$> snapRestoreIgnoreIndexSettings
                 ]
    renderTokens (t :| ts) = mconcat (renderToken <$> (t:ts))
    renderToken (RRTLit t)      = t
    renderToken RRSubWholeMatch = "$0"
    renderToken (RRSubGroup g)  = T.pack (show (rrGroupRefNum g))
getNodesInfo
    :: ( MonadBH m
       , MonadThrow m
       )
    => NodeSelection
    -> m (Either EsError NodesInfo)
getNodesInfo sel = parseEsResponse =<< get =<< url
  where
    url = joinPath ["_nodes", selectionSeg]
    selectionSeg = case sel of
      LocalNode -> "_local"
      NodeList (l :| ls) -> T.intercalate "," (selToSeg <$> (l:ls))
      AllNodes -> "_all"
    selToSeg (NodeByName (NodeName n))            = n
    selToSeg (NodeByFullNodeId (FullNodeId i))    = i
    selToSeg (NodeByHost (Server s))              = s
    selToSeg (NodeByAttribute (NodeAttrName a) v) = a <> ":" <> v
getNodesStats
    :: ( MonadBH m
       , MonadThrow m
       )
    => NodeSelection
    -> m (Either EsError NodesStats)
getNodesStats sel = parseEsResponse =<< get =<< url
  where
    url = joinPath ["_nodes", selectionSeg, "stats"]
    selectionSeg = case sel of
      LocalNode -> "_local"
      NodeList (l :| ls) -> T.intercalate "," (selToSeg <$> (l:ls))
      AllNodes -> "_all"
    selToSeg (NodeByName (NodeName n))            = n
    selToSeg (NodeByFullNodeId (FullNodeId i))    = i
    selToSeg (NodeByHost (Server s))              = s
    selToSeg (NodeByAttribute (NodeAttrName a) v) = a <> ":" <> v
createIndex :: MonadBH m => IndexSettings -> IndexName -> m Reply
createIndex indexSettings (IndexName indexName) =
  bindM2 put url (return body)
  where url = joinPath [indexName]
        body = Just $ encode indexSettings
createIndexWith :: MonadBH m
  => [UpdatableIndexSetting]
  -> Int 
  -> IndexName 
  -> m Reply
createIndexWith updates shards (IndexName indexName) =
  bindM2 put url (return (Just body))
  where url = joinPath [indexName]
        body = encode $ object
          ["settings" .= deepMerge 
            ( HM.singleton "index.number_of_shards" (toJSON shards) :
              [u | Object u <- toJSON <$> updates]
            )
          ]
flushIndex :: MonadBH m => IndexName -> m Reply
flushIndex (IndexName indexName) = do
  path <- joinPath [indexName, "_flush"]
  post path Nothing
deleteIndex :: MonadBH m => IndexName -> m Reply
deleteIndex (IndexName indexName) =
  delete =<< joinPath [indexName]
updateIndexSettings :: MonadBH m => NonEmpty UpdatableIndexSetting -> IndexName -> m Reply
updateIndexSettings updates (IndexName indexName) =
  bindM2 put url (return body)
  where
    url = joinPath [indexName, "_settings"]
    body = Just (encode jsonBody)
    jsonBody = Object (deepMerge [u | Object u <- toJSON <$> toList updates])
getIndexSettings :: (MonadBH m, MonadThrow m) => IndexName
                 -> m (Either EsError IndexSettingsSummary)
getIndexSettings (IndexName indexName) =
  parseEsResponse =<< get =<< url
  where
    url = joinPath [indexName, "_settings"]
forceMergeIndex :: MonadBH m => IndexSelection -> ForceMergeIndexSettings -> m Reply
forceMergeIndex ixs ForceMergeIndexSettings {..} =
    bindM2 post url (return body)
  where url = addQuery params <$> joinPath [indexName, "_forcemerge"]
        params = catMaybes [ ("max_num_segments",) . Just . showText <$> maxNumSegments
                           , Just ("only_expunge_deletes", Just (boolQP onlyExpungeDeletes))
                           , Just ("flush", Just (boolQP flushAfterOptimize))
                           ]
        indexName = indexSelectionName ixs
        body = Nothing
deepMerge :: [Object] -> Object
deepMerge = LS.foldl' go mempty
  where go acc = LS.foldl' go' acc . HM.toList
        go' acc (k, v) = HM.insertWith merge k v acc
        merge (Object a) (Object b) = Object (deepMerge [a, b])
        merge _ b = b
statusCodeIs :: (Int, Int) -> Reply -> Bool
statusCodeIs r resp = inRange r $ NHTS.statusCode (responseStatus resp)
respIsTwoHunna :: Reply -> Bool
respIsTwoHunna = statusCodeIs (200, 299)
existentialQuery :: MonadBH m => Text -> m (Reply, Bool)
existentialQuery url = do
  reply <- head url
  return (reply, respIsTwoHunna reply)
parseEsResponse :: ( MonadThrow m
                   , FromJSON a
                   )
                => Reply
                -> m (Either EsError a)
parseEsResponse reply
  | respIsTwoHunna reply = case eitherDecode body of
                             Right a -> return (Right a)
                             Left err ->
                               tryParseError err
  | otherwise = tryParseError "Non-200 status code"
  where body = responseBody reply
        tryParseError originalError
          = case eitherDecode body of
              Right e -> return (Left e)
              
              Left err -> explode ("Original error was: " <> originalError <> " Error parse failure was: " <> err)
        explode errorMsg = throwM (EsProtocolException (T.pack errorMsg) body)
indexExists :: MonadBH m => IndexName -> m Bool
indexExists (IndexName indexName) = do
  (_, exists) <- existentialQuery =<< joinPath [indexName]
  return exists
refreshIndex :: MonadBH m => IndexName -> m Reply
refreshIndex (IndexName indexName) =
  bindM2 post url (return Nothing)
  where url = joinPath [indexName, "_refresh"]
waitForYellowIndex :: MonadBH m => IndexName -> m Reply
waitForYellowIndex (IndexName indexName) = get =<< url
  where url = addQuery q <$> joinPath ["_cluster","health",indexName]
        q = [("wait_for_status",Just "yellow"),("timeout",Just "10s")]
stringifyOCIndex :: OpenCloseIndex -> Text
stringifyOCIndex oci = case oci of
  OpenIndex  -> "_open"
  CloseIndex -> "_close"
openOrCloseIndexes :: MonadBH m => OpenCloseIndex -> IndexName -> m Reply
openOrCloseIndexes oci (IndexName indexName) =
  bindM2 post url (return Nothing)
  where ociString = stringifyOCIndex oci
        url = joinPath [indexName, ociString]
openIndex :: MonadBH m => IndexName -> m Reply
openIndex = openOrCloseIndexes OpenIndex
closeIndex :: MonadBH m => IndexName -> m Reply
closeIndex = openOrCloseIndexes CloseIndex
listIndices :: (MonadThrow m, MonadBH m) => m [IndexName]
listIndices =
  parse . responseBody =<< get =<< url
  where
    url = joinPath ["_cat/indices?format=json"]
    parse body = either (\msg -> (throwM (EsProtocolException (T.pack msg) body))) return $ do
      vals <- eitherDecode body
      forM vals $ \val ->
        case val of
          Object obj ->
            case HM.lookup "index" obj of
              (Just (String txt)) -> Right (IndexName txt)
              v -> Left $ "indexVal in listIndices failed on non-string, was: " <> show v
          v -> Left $ "One of the values parsed in listIndices wasn't an object, it was: " <> show v
updateIndexAliases :: MonadBH m => NonEmpty IndexAliasAction -> m Reply
updateIndexAliases actions = bindM2 post url (return body)
  where url = joinPath ["_aliases"]
        body = Just (encode bodyJSON)
        bodyJSON = object [ "actions" .= toList actions]
getIndexAliases :: (MonadBH m, MonadThrow m)
                => m (Either EsError IndexAliasesSummary)
getIndexAliases = parseEsResponse =<< get =<< url
  where url = joinPath ["_aliases"]
deleteIndexAlias :: MonadBH m => IndexAliasName -> m Reply
deleteIndexAlias (IndexAliasName (IndexName name)) = delete =<< url
  where url = joinPath ["_all","_alias",name]
putTemplate :: MonadBH m => IndexTemplate -> TemplateName -> m Reply
putTemplate indexTemplate (TemplateName templateName) =
  bindM2 put url (return body)
  where url = joinPath ["_template", templateName]
        body = Just $ encode indexTemplate
templateExists :: MonadBH m => TemplateName -> m Bool
templateExists (TemplateName templateName) = do
  (_, exists) <- existentialQuery =<< joinPath ["_template", templateName]
  return exists
deleteTemplate :: MonadBH m => TemplateName -> m Reply
deleteTemplate (TemplateName templateName) =
  delete =<< joinPath ["_template", templateName]
putMapping :: (MonadBH m, ToJSON a) => IndexName
                 -> MappingName -> a -> m Reply
putMapping (IndexName indexName) (MappingName mappingName) mapping =
  bindM2 put url (return body)
  where url = joinPath [indexName, "_mapping", mappingName]
        
        
        body = Just $ encode mapping
versionCtlParams :: IndexDocumentSettings -> [(Text, Maybe Text)]
versionCtlParams cfg =
  case idsVersionControl cfg of
    NoVersionControl -> []
    InternalVersion v -> versionParams v "internal"
    ExternalGT (ExternalDocVersion v) -> versionParams v "external_gt"
    ExternalGTE (ExternalDocVersion v) -> versionParams v "external_gte"
    ForceVersion (ExternalDocVersion v) -> versionParams v "force"
  where
    vt = showText . docVersionNumber
    versionParams v t = [ ("version", Just $ vt v)
                        , ("version_type", Just t)
                        ]
indexDocument :: (ToJSON doc, MonadBH m) => IndexName -> MappingName
                 -> IndexDocumentSettings -> doc -> DocId -> m Reply
indexDocument (IndexName indexName)
  (MappingName mappingName) cfg document (DocId docId) =
  bindM2 put url (return body)
  where url = addQuery params <$> joinPath [indexName, mappingName, docId]
        parentParams = case idsParent cfg of
          Nothing -> []
          Just (DocumentParent (DocId p)) -> [ ("parent", Just p) ]
        params = versionCtlParams cfg ++ parentParams
        body = Just (encode document)
updateDocument :: (ToJSON patch, MonadBH m) => IndexName -> MappingName
                  -> IndexDocumentSettings -> patch -> DocId -> m Reply
updateDocument (IndexName indexName)
  (MappingName mappingName) cfg patch (DocId docId) =
  bindM2 post url (return body)
  where url = addQuery (versionCtlParams cfg) <$>
              joinPath [indexName, mappingName, docId, "_update"]
        body = Just (encode $ object ["doc" .= toJSON patch])
deleteDocument :: MonadBH m => IndexName -> MappingName
                  -> DocId -> m Reply
deleteDocument (IndexName indexName)
  (MappingName mappingName) (DocId docId) =
  delete =<< joinPath [indexName, mappingName, docId]
bulk :: MonadBH m => V.Vector BulkOperation -> m Reply
bulk bulkOps =
  bindM2 post url (return body)
  where
    url = joinPath ["_bulk"]
    body = Just $ encodeBulkOperations bulkOps
encodeBulkOperations :: V.Vector BulkOperation -> L.ByteString
encodeBulkOperations stream = collapsed where
  blobs =
    fmap encodeBulkOperation stream
  mashedTaters =
    mash (mempty :: Builder) blobs
  collapsed =
    toLazyByteString $ mappend mashedTaters (byteString "\n")
mash :: Builder -> V.Vector L.ByteString -> Builder
mash = V.foldl' (\b x -> b <> byteString "\n" <> lazyByteString x)
mkBulkStreamValue :: Text -> Text -> Text -> Text -> Value
mkBulkStreamValue operation indexName mappingName docId =
  object [operation .=
          object [ "_index" .= indexName
                 , "_type"  .= mappingName
                 , "_id"    .= docId]]
mkBulkStreamValueAuto :: Text -> Text -> Text -> Value
mkBulkStreamValueAuto operation indexName mappingName =
  object [operation .=
          object [ "_index" .= indexName
                 , "_type"  .= mappingName]]
encodeBulkOperation :: BulkOperation -> L.ByteString
encodeBulkOperation (BulkIndex (IndexName indexName)
                (MappingName mappingName)
                (DocId docId) value) = blob
    where metadata = mkBulkStreamValue "index" indexName mappingName docId
          blob = encode metadata `mappend` "\n" `mappend` encode value
encodeBulkOperation (BulkIndexAuto (IndexName indexName)
                (MappingName mappingName)
                value) = blob
    where metadata = mkBulkStreamValueAuto "index" indexName mappingName
          blob = encode metadata `mappend` "\n" `mappend` encode value
encodeBulkOperation (BulkIndexEncodingAuto (IndexName indexName)
                (MappingName mappingName)
                encoding) = toLazyByteString blob
    where metadata = toEncoding (mkBulkStreamValueAuto "index" indexName mappingName)
          blob = fromEncoding metadata <> "\n" <> fromEncoding encoding
encodeBulkOperation (BulkCreate (IndexName indexName)
                (MappingName mappingName)
                (DocId docId) value) = blob
    where metadata = mkBulkStreamValue "create" indexName mappingName docId
          blob = encode metadata `mappend` "\n" `mappend` encode value
encodeBulkOperation (BulkDelete (IndexName indexName)
                (MappingName mappingName)
                (DocId docId)) = blob
    where metadata = mkBulkStreamValue "delete" indexName mappingName docId
          blob = encode metadata
encodeBulkOperation (BulkUpdate (IndexName indexName)
                (MappingName mappingName)
                (DocId docId) value) = blob
    where metadata = mkBulkStreamValue "update" indexName mappingName docId
          doc = object ["doc" .= value]
          blob = encode metadata `mappend` "\n" `mappend` encode doc
encodeBulkOperation (BulkCreateEncoding (IndexName indexName)
                (MappingName mappingName)
                (DocId docId) encoding) = toLazyByteString blob
    where metadata = toEncoding (mkBulkStreamValue "create" indexName mappingName docId)
          blob = fromEncoding metadata <> "\n" <> fromEncoding encoding
getDocument :: MonadBH m => IndexName -> MappingName
               -> DocId -> m Reply
getDocument (IndexName indexName)
  (MappingName mappingName) (DocId docId) =
  get =<< joinPath [indexName, mappingName, docId]
documentExists :: MonadBH m => IndexName -> MappingName
               -> Maybe DocumentParent -> DocId -> m Bool
documentExists (IndexName indexName) (MappingName mappingName)
               parent (DocId docId) = do
  (_, exists) <- existentialQuery =<< url
  return exists
  where url = addQuery params <$> joinPath [indexName, mappingName, docId]
        parentParam = fmap (\(DocumentParent (DocId p)) -> p) parent
        params = LS.filter (\(_, v) -> isJust v) [("parent", parentParam)]
dispatchSearch :: MonadBH m => Text -> Search -> m Reply
dispatchSearch url search = post url' (Just (encode search))
  where url' = appendSearchTypeParam url (searchType search)
searchAll :: MonadBH m => Search -> m Reply
searchAll = bindM2 dispatchSearch url . return
  where url = joinPath ["_search"]
searchByIndex :: MonadBH m => IndexName -> Search -> m Reply
searchByIndex (IndexName indexName) = bindM2 dispatchSearch url . return
  where url = joinPath [indexName, "_search"]
searchByIndices :: MonadBH m => NonEmpty IndexName -> Search -> m Reply
searchByIndices ixs = bindM2 dispatchSearch url . return
  where url = joinPath [renderedIxs, "_search"]
        renderedIxs = T.intercalate (T.singleton ',') (map (\(IndexName t) -> t) (toList ixs))
searchByType :: MonadBH m => IndexName -> MappingName -> Search
                -> m Reply
searchByType (IndexName indexName)
  (MappingName mappingName) = bindM2 dispatchSearch url . return
  where url = joinPath [indexName, mappingName, "_search"]
getInitialScroll :: 
  (FromJSON a, MonadThrow m, MonadBH m) => IndexName -> 
                                           MappingName -> 
                                           Search -> 
                                           m (Either EsError (SearchResult a))
getInitialScroll (IndexName indexName) (MappingName mappingName) search' = do
    let url = addQuery params <$> joinPath [indexName, mappingName, "_search"]
        params = [("scroll", Just "1m")]
        sorting = Just [DefaultSortSpec $ mkSort (FieldName "_doc") Descending]
        search = search' { sortBody = sorting }
    resp' <- bindM2 dispatchSearch url (return search)
    parseEsResponse resp'
getInitialSortedScroll ::
  (FromJSON a, MonadThrow m, MonadBH m) => IndexName ->
                                           MappingName ->
                                           Search ->
                                           m (Either EsError (SearchResult a))
getInitialSortedScroll (IndexName indexName) (MappingName mappingName) search = do
    let url = addQuery params <$> joinPath [indexName, mappingName, "_search"]
        params = [("scroll", Just "1m")]
    resp' <- bindM2 dispatchSearch url (return search)
    parseEsResponse resp'
scroll' :: (FromJSON a, MonadBH m, MonadThrow m) => Maybe ScrollId -> 
                                                    m ([Hit a], Maybe ScrollId)
scroll' Nothing = return ([], Nothing)
scroll' (Just sid) = do
    res <- advanceScroll sid 60
    case res of
      Right SearchResult {..} -> return (hits searchHits, scrollId)
      Left _ -> return ([], Nothing)
advanceScroll
  :: ( FromJSON a
     , MonadBH m
     , MonadThrow m
     )
  => ScrollId
  -> NominalDiffTime
  
  -> m (Either EsError (SearchResult a))
advanceScroll (ScrollId sid) scroll = do
  url <- joinPath ["_search", "scroll"]
  resp <- post url (Just $ encode scrollObject)
  parseEsResponse resp
  where scrollTime = showText secs <> "s"
        secs :: Integer
        secs = round scroll
        
        scrollObject = object [ "scroll" .= scrollTime
                              , "scroll_id" .= sid
                              ]
simpleAccumulator :: 
  (FromJSON a, MonadBH m, MonadThrow m) => 
                                [Hit a] ->
                                ([Hit a], Maybe ScrollId) ->
                                m ([Hit a], Maybe ScrollId)
simpleAccumulator oldHits (newHits, Nothing) = return (oldHits ++ newHits, Nothing)
simpleAccumulator oldHits ([], _) = return (oldHits, Nothing)
simpleAccumulator oldHits (newHits, msid) = do
    (newHits', msid') <- scroll' msid
    simpleAccumulator (oldHits ++ newHits) (newHits', msid')
scanSearch :: (FromJSON a, MonadBH m, MonadThrow m) => IndexName
                                                    -> MappingName
                                                    -> Search
                                                    -> m [Hit a]
scanSearch indexName mappingName search = do
    initialSearchResult <- getInitialScroll indexName mappingName search
    let (hits', josh) = case initialSearchResult of
                          Right SearchResult {..} -> (hits searchHits, scrollId)
                          Left _ -> ([], Nothing)
    (totalHits, _) <- simpleAccumulator [] (hits', josh)
    return totalHits
mkSearch :: Maybe Query -> Maybe Filter -> Search
mkSearch query filter = Search query filter Nothing Nothing Nothing False (From 0) (Size 10) SearchTypeQueryThenFetch Nothing Nothing Nothing Nothing
mkAggregateSearch :: Maybe Query -> Aggregations -> Search
mkAggregateSearch query mkSearchAggs = Search query Nothing Nothing (Just mkSearchAggs) Nothing False (From 0) (Size 0) SearchTypeQueryThenFetch Nothing Nothing Nothing Nothing
mkHighlightSearch :: Maybe Query -> Highlights -> Search
mkHighlightSearch query searchHighlights = Search query Nothing Nothing Nothing (Just searchHighlights) False (From 0) (Size 10) SearchTypeQueryThenFetch Nothing Nothing Nothing Nothing
pageSearch :: From     
           -> Size     
           -> Search  
           -> Search  
pageSearch resultOffset pageSize search = search { from = resultOffset, size = pageSize }
parseUrl' :: MonadThrow m => Text -> m Request
parseUrl' t = parseRequest (URI.escapeURIString URI.isAllowedInURI (T.unpack t))
isVersionConflict :: Reply -> Bool
isVersionConflict = statusCheck (== 409)
isSuccess :: Reply -> Bool
isSuccess = statusCheck (inRange (200, 299))
isCreated :: Reply -> Bool
isCreated = statusCheck (== 201)
statusCheck :: (Int -> Bool) -> Reply -> Bool
statusCheck prd = prd . NHTS.statusCode . responseStatus
basicAuthHook :: Monad m => EsUsername -> EsPassword -> Request -> m Request
basicAuthHook (EsUsername u) (EsPassword p) = return . applyBasicAuth u' p'
  where u' = T.encodeUtf8 u
        p' = T.encodeUtf8 p
boolQP :: Bool -> Text
boolQP True  = "true"
boolQP False = "false"