-- Copyright (c) 2020-present, EMQX, Inc. -- All rights reserved. -- -- This source code is distributed under the terms of a MIT license, -- found in the LICENSE file. {-# LANGUAGE ApplicativeDo #-} {-# LANGUAGE BlockArguments #-} {-# LANGUAGE CPP #-} {-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TypeFamilies #-} -- | This module provides implementation of user's API module Database.ClickHouseDriver.HTTP.Client ( -- * Setting setupEnv, runQuery, -- * Query getByteString, getJSON, getText, getTextM, getJsonM, insertOneRow, insertMany, ping, exec, insertFromFile, -- * Connection defaultHttpClient, httpClient, defaultHttpPool, ) where import Control.Concurrent.Async (mapConcurrently) import Control.Exception (SomeException, try) import Control.Monad.State.Lazy (MonadIO (..)) import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as LBS import Data.ByteString.Lazy.Builder ( char8, lazyByteString, toLazyByteString, ) import qualified Data.ByteString.Lazy.Char8 as C8 import Data.Default.Class (def) import Data.Hashable (Hashable (hashWithSalt)) import Data.Pool (Pool, withResource) import qualified Data.Text as T import Data.Text.Encoding (decodeUtf8) import Data.Time.Clock (NominalDiffTime) import Data.Typeable (Typeable) import Database.ClickHouseDriver.Defines as Defines ( _DEFAULT_HOST, _DEFAULT_HTTP_PORT, ) import Database.ClickHouseDriver.HTTP.Connection ( createHttpPool, defaultHttpConnection, httpConnectDb, ) import Database.ClickHouseDriver.HTTP.Helpers ( extract, genURL, toString, ) import Database.ClickHouseDriver.HTTP.Types (Format (..), HttpConnection (..), JSONResult) import Database.ClickHouseDriver.Types (ClickhouseType) import Haxl.Core ( BlockedFetch (..), DataSource (fetch), DataSourceName (..), Env (userEnv), GenHaxl, PerformFetch (SyncFetch), ShowP (..), StateKey (State), dataFetch, initEnv, putFailure, putSuccess, runHaxl, stateEmpty, stateSet, ) import Network.HTTP.Client ( RequestBody (..), httpLbs, method, parseRequest, requestBody, responseBody, streamFile, ) import Text.Printf (printf) {-Implementation in Haxl-} -- data HttpClient a where FetchByteString :: String -> HttpClient BS.ByteString FetchJSON :: String -> HttpClient BS.ByteString FetchCSV :: String -> HttpClient BS.ByteString FetchText :: String -> HttpClient BS.ByteString Ping :: HttpClient BS.ByteString deriving instance Show (HttpClient a) deriving instance Typeable HttpClient deriving instance Eq (HttpClient a) instance ShowP HttpClient where showp = show instance Hashable (HttpClient a) where hashWithSalt salt (FetchByteString cmd) = hashWithSalt salt cmd hashWithSalt salt (FetchJSON cmd) = hashWithSalt salt cmd hashWithSalt salt (FetchCSV cmd) = hashWithSalt salt cmd hashWithSalt salt Ping = hashWithSalt salt ("ok" :: BS.ByteString) instance DataSourceName HttpClient where dataSourceName _ = "ClickhouseDataSource" instance DataSource u HttpClient where fetch (settings) _flags _usrenv = SyncFetch $ \blockedFetches -> do printf "Fetching %d queries.\n" (length blockedFetches) res <- mapConcurrently (fetchData settings) blockedFetches case res of [()] -> return () instance StateKey HttpClient where data State HttpClient = SingleHttp HttpConnection | HttpPool (Pool HttpConnection) class HttpEnvironment a where toEnv :: a -> State HttpClient pick :: a -> IO HttpConnection instance HttpEnvironment HttpConnection where toEnv = SingleHttp pick = return instance HttpEnvironment (Pool HttpConnection) where toEnv = HttpPool pick pool = withResource pool $ return -- | fetch function fetchData :: State HttpClient -> --Connection configuration BlockedFetch HttpClient -> --fetched data IO () fetchData (settings) fetches = do let (queryWithType, var) = case fetches of BlockedFetch (FetchJSON query) var' -> (query ++ " FORMAT JSON", var') BlockedFetch (FetchCSV query) var' -> (query ++ " FORMAT CSV", var') BlockedFetch (FetchByteString query) var' -> (query, var') BlockedFetch Ping var' -> ("ping", var') e <- Control.Exception.try $ do case settings of SingleHttp http@(HttpConnection _ mng) -> do url <- genURL http queryWithType req <- parseRequest url ans <- responseBody <$> httpLbs req mng return $ LBS.toStrict ans HttpPool pool -> withResource pool $ \conn@(HttpConnection _ mng) -> do url <- genURL conn queryWithType req <- parseRequest url ans <- responseBody <$> httpLbs req mng return $ LBS.toStrict ans either (putFailure var) (putSuccess var) (e :: Either SomeException (BS.ByteString)) -- | Fetch data from ClickHouse client in the text format. getByteString :: String -> GenHaxl u w BS.ByteString getByteString = dataFetch . FetchByteString getText :: String -> GenHaxl u w T.Text getText cmd = fmap decodeUtf8 (getByteString cmd) -- | Fetch data from ClickHouse client in the JSON format. getJSON :: String -> GenHaxl u w JSONResult getJSON cmd = fmap extract (dataFetch $ FetchJSON cmd) -- | Fetch data from Clickhouse client with commands warped in a Traversable monad. getTextM :: (Monad m, Traversable m) => m String -> GenHaxl u w (m T.Text) getTextM = mapM getText -- | Fetch data from Clickhouse client in the format of JSON getJsonM :: (Monad m, Traversable m) => m String -> GenHaxl u w (m JSONResult) getJsonM = mapM getJSON -- | actual function used by user to perform fetching command exec :: (HttpEnvironment a) => String -> Env a w -> IO (Either C8.ByteString String) exec cmd' env = do let cmd = C8.pack cmd' conn@HttpConnection {httpManager = mng} <- pick $ userEnv env url <- genURL conn "" req <- parseRequest url ans <- responseBody <$> httpLbs req { method = "POST", requestBody = RequestBodyLBS cmd } mng if ans /= "" then return $ Left ans -- error message else return $ Right "Created successfully" -- | insert one row insertOneRow :: (HttpEnvironment a) => String -> [ClickhouseType] -> Env a w -> IO (Either C8.ByteString String) insertOneRow table_name arr environment = do let row = toString arr let cmd = C8.pack ("INSERT INTO " ++ table_name ++ " VALUES " ++ row) settings@HttpConnection {httpManager = mng} <- pick $ userEnv environment url <- genURL settings "" req <- parseRequest url ans <- responseBody <$> httpLbs req { method = "POST", requestBody = RequestBodyLBS cmd } mng if ans /= "" then return $ Left ans -- error messagethe hellenic republic else return $ Right "Inserted successfully" -- | insert one or more rows insertMany :: (HttpEnvironment a) => String -> [[ClickhouseType]] -> Env a w -> IO (Either C8.ByteString String) insertMany table_name rows environment = do let rowsString = map (lazyByteString . C8.pack . toString) rows comma = char8 ',' preset = lazyByteString $ C8.pack $ "INSERT INTO " <> table_name <> " VALUES " togo = preset <> (foldl1 (\x y -> x <> comma <> y) rowsString) settings@HttpConnection {httpManager = mng} <- pick $ userEnv environment url <- genURL settings "" req <- parseRequest url ans <- responseBody <$> httpLbs req { method = "POST", requestBody = RequestBodyLBS $ toLazyByteString togo } mng print "inserted successfully" if ans /= "" then return $ Left ans else return $ Right "Successful insertion" -- | insert data from insertFromFile :: (HttpEnvironment a) => String -> Format -> FilePath -> Env a w -> IO (Either C8.ByteString String) insertFromFile table_name format file environment = do fileReqBody <- streamFile file settings@HttpConnection {httpManager = mng} <- pick $ userEnv environment url <- genURL settings ( "INSERT INTO " <> table_name <> case format of CSV -> " FORMAT CSV" JSON -> " FORMAT JSON" TUPLE -> " VALUES" ) req <- parseRequest url ans <- responseBody <$> httpLbs req { method = "POST", requestBody = fileReqBody } mng if ans /= "" then return $ Left ans -- error message else return $ Right "Inserted successfully" ping :: GenHaxl u w BS.ByteString ping = dataFetch $ Ping -- | Default environment setupEnv :: (MonadIO m, HttpEnvironment a) => a -> m (Env a w) setupEnv csetting = liftIO $ initEnv (stateSet (toEnv csetting) stateEmpty) csetting defaultHttpClient :: (MonadIO m) => m (Env HttpConnection w) defaultHttpClient = liftIO $ defaultHttpConnection >>= setupEnv defaultHttpPool :: (MonadIO m) => Int -> NominalDiffTime -> Int -> m (Env (Pool HttpConnection) w) defaultHttpPool numStripes idleTime maxResources = liftIO $ createHttpPool def numStripes idleTime maxResources >>= setupEnv httpClient :: (MonadIO m) => String -> String -> m (Env HttpConnection w) httpClient user password = liftIO $ httpConnectDb user password Defines._DEFAULT_HTTP_PORT Defines._DEFAULT_HOST Nothing >>= setupEnv -- | rename runHaxl function. {-# INLINE runQuery #-} runQuery :: (MonadIO m) => Env u w -> GenHaxl u w a -> m a runQuery env haxl = liftIO $ runHaxl env haxl