-- Copyright (c) 2020-present, EMQX, Inc.
-- All rights reserved.
--
-- This source code is distributed under the terms of a MIT license,
-- found in the LICENSE file.
----------------------------------------------------------------------------

{-# LANGUAGE BlockArguments             #-}
{-# LANGUAGE CPP                        #-}
{-# LANGUAGE DeriveDataTypeable         #-}
{-# LANGUAGE FlexibleInstances          #-}
{-# LANGUAGE GADTs                      #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses      #-}
{-# LANGUAGE OverloadedStrings          #-}
{-# LANGUAGE ScopedTypeVariables        #-}
{-# LANGUAGE StandaloneDeriving         #-}
{-# LANGUAGE TypeFamilies               #-}
{-# LANGUAGE TypeSynonymInstances       #-}
{-# LANGUAGE RecordWildCards            #-}
{-# LANGUAGE NamedFieldPuns             #-}

-- | This module provides implementations of user's APIs
--

module Database.ClickHouseDriver.Client
  ( -- * Data Fetch and Insert
    query,
    queryWithInfo,
    deploySettings,
    insertMany,
    insertOneRow,
    ping,
    withQuery,
    Database.ClickHouseDriver.Client.fetch,
    fetchWithInfo,
    execute,
    -- * Communication
    createClient,
    defaultClient,
    closeClient,
    -- * Connection pool
    defaultClientPool,
    createClientPool,
    -- * retrieve settings
    client
  )
where

import Database.ClickHouseDriver.Connection
    ( ping',
      tcpConnect,
      sendQuery,
      sendData,
      processInsertQuery,
      receiveResult )
import Database.ClickHouseDriver.Pool ( createConnectionPool )
import Database.ClickHouseDriver.Defines ( _BUFFER_SIZE )
import qualified Database.ClickHouseDriver.Defines      as Defines
import Database.ClickHouseDriver.Types
    ( ConnParams(..),
      CKResult(CKResult, query_result),
      TCPConnection(TCPConnection, tcpSocket),
      getServerInfo,
      defaultQueryInfo,
      ClickhouseType(..) )
import Database.ClickHouseDriver.IO.BufferedReader ( createBuffer )
import Control.Concurrent.Async ( mapConcurrently )
import Control.Exception ( SomeException, try )
import Control.Monad.State ( StateT(runStateT) )
import qualified Data.ByteString                    as BS
import qualified Data.ByteString.Char8              as C8
import Data.Hashable ( Hashable(hashWithSalt) )
import Data.Typeable ( Typeable )
import Data.Vector ( Vector )
import Haxl.Core
    ( putFailure,
      putSuccess,
      dataFetch,
      initEnv,
      runHaxl,
      stateEmpty,
      stateGet,
      stateSet,
      BlockedFetch(..),
      DataSource(fetch),
      DataSourceName(..),
      PerformFetch(SyncFetch),
      Env(states),
      GenHaxl,
      ShowP(..),
      StateKey(State) )                        
import qualified Network.Simple.TCP                 as TCP
import Text.Printf ( printf )
import           Data.Pool                          (Pool(..), withResource, destroyAllResources)
import Data.Time.Clock ( NominalDiffTime )
import           Data.Default.Class                 (def)

{-# INLINE _DEFAULT_PING_WAIT_TIME #-}
_DEFAULT_PING_WAIT_TIME :: Integer
_DEFAULT_PING_WAIT_TIME :: Integer
_DEFAULT_PING_WAIT_TIME = Integer
10000

{-# INLINE _DEFAULT_USERNAME #-}
_DEFAULT_USERNAME :: [Char]
_DEFAULT_USERNAME :: [Char]
_DEFAULT_USERNAME = [Char]
"default"

{-# INLINE _DEFAULT_HOST_NAME #-}
_DEFAULT_HOST_NAME :: [Char]
_DEFAULT_HOST_NAME :: [Char]
_DEFAULT_HOST_NAME = [Char]
"localhost"

{-# INLINE _DEFAULT_PASSWORD #-}
_DEFAULT_PASSWORD :: [Char]
_DEFAULT_PASSWORD :: [Char]
_DEFAULT_PASSWORD =  [Char]
""

{-# INLINE _DEFAULT_PORT_NAME #-}
_DEFAULT_PORT_NAME :: [Char]
_DEFAULT_PORT_NAME :: [Char]
_DEFAULT_PORT_NAME =  [Char]
"9000"

{-# INLINE _DEFAULT_DATABASE#-}
_DEFAULT_DATABASE :: [Char]
_DEFAULT_DATABASE :: [Char]
_DEFAULT_DATABASE =  [Char]
"default"

{-# INLINE _DEFAULT_COMPRESSION_SETTING #-}
_DEFAULT_COMPRESSION_SETTING :: Bool
_DEFAULT_COMPRESSION_SETTING :: Bool
_DEFAULT_COMPRESSION_SETTING =  Bool
False

-- | GADT 
data Query a where
  FetchData :: String
               -- ^ SQL statement such as "SELECT * FROM table"
             ->Query (Either String CKResult)
               -- ^ result data in Haskell type and additional information

deriving instance Show (Query a)

deriving instance Typeable Query

deriving instance Eq (Query a)

instance ShowP Query where showp :: Query a -> [Char]
showp = Query a -> [Char]
forall a. Show a => a -> [Char]
show

instance Hashable (Query a) where
  hashWithSalt :: Int -> Query a -> Int
hashWithSalt Int
salt (FetchData [Char]
cmd) = Int -> [Char] -> Int
forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
salt [Char]
cmd

instance DataSourceName Query where
  dataSourceName :: Proxy Query -> Text
dataSourceName Proxy Query
_ = Text
"ClickhouseServer"

instance DataSource u Query where
  fetch :: State Query -> Flags -> u -> PerformFetch Query
fetch (State Query
resource) Flags
_flags u
env = ([BlockedFetch Query] -> IO ()) -> PerformFetch Query
forall (req :: * -> *).
([BlockedFetch req] -> IO ()) -> PerformFetch req
SyncFetch (([BlockedFetch Query] -> IO ()) -> PerformFetch Query)
-> ([BlockedFetch Query] -> IO ()) -> PerformFetch Query
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch Query]
blockedFetches -> do
    [Char] -> Int -> IO ()
forall r. PrintfType r => [Char] -> r
printf [Char]
"Fetching %d queries.\n" ([BlockedFetch Query] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [BlockedFetch Query]
blockedFetches)
    (BlockedFetch Query -> IO ()) -> [BlockedFetch Query] -> IO [()]
forall (t :: * -> *) a b.
Traversable t =>
(a -> IO b) -> t a -> IO (t b)
mapConcurrently (State Query -> BlockedFetch Query -> IO ()
fetchData State Query
resource) [BlockedFetch Query]
blockedFetches
    () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

instance StateKey Query where
  data State Query = CKResource TCPConnection
                   | CKPool (Pool TCPConnection)

class Resource a where
  client :: Either String a
             -- ^ Either wrong message of resource with type a
           ->IO(Env () w)
            

-- | fetch data
fetchData :: State Query->BlockedFetch Query->IO ()
fetchData :: State Query -> BlockedFetch Query -> IO ()
fetchData (CKResource tcpconn)  BlockedFetch Query
fetch = do
  let (ByteString
queryStr, ResultVar (Either [Char] CKResult)
var) = case BlockedFetch Query
fetch of
        BlockedFetch (FetchData [Char]
q) ResultVar a
var' -> ([Char] -> ByteString
C8.pack [Char]
q, ResultVar a
ResultVar (Either [Char] CKResult)
var')
  Either SomeException (Either [Char] CKResult)
e <- IO (Either [Char] CKResult)
-> IO (Either SomeException (Either [Char] CKResult))
forall e a. Exception e => IO a -> IO (Either e a)
Control.Exception.try (IO (Either [Char] CKResult)
 -> IO (Either SomeException (Either [Char] CKResult)))
-> IO (Either [Char] CKResult)
-> IO (Either SomeException (Either [Char] CKResult))
forall a b. (a -> b) -> a -> b
$ do
    TCPConnection -> ByteString -> Maybe ByteString -> IO ()
sendQuery TCPConnection
tcpconn ByteString
queryStr Maybe ByteString
forall a. Maybe a
Nothing
    TCPConnection -> ByteString -> Maybe Block -> IO ()
sendData TCPConnection
tcpconn ByteString
"" Maybe Block
forall a. Maybe a
Nothing
    let serverInfo :: ServerInfo
serverInfo = case TCPConnection -> Maybe ServerInfo
getServerInfo TCPConnection
tcpconn of
          Just ServerInfo
info -> ServerInfo
info
          Maybe ServerInfo
Nothing   -> [Char] -> ServerInfo
forall a. HasCallStack => [Char] -> a
error [Char]
"Empty server information"
    let sock :: Socket
sock = TCPConnection -> Socket
tcpSocket TCPConnection
tcpconn
    Buffer
buf <- Int -> Socket -> IO Buffer
createBuffer Int
_BUFFER_SIZE Socket
sock
    (Either [Char] CKResult
res, Buffer
_) <- StateT Buffer IO (Either [Char] CKResult)
-> Buffer -> IO (Either [Char] CKResult, Buffer)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
runStateT (ServerInfo
-> QueryInfo -> StateT Buffer IO (Either [Char] CKResult)
receiveResult ServerInfo
serverInfo QueryInfo
defaultQueryInfo) Buffer
buf
    Either [Char] CKResult -> IO (Either [Char] CKResult)
forall (m :: * -> *) a. Monad m => a -> m a
return Either [Char] CKResult
res
  (SomeException -> IO ())
-> (Either [Char] CKResult -> IO ())
-> Either SomeException (Either [Char] CKResult)
-> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
    (ResultVar (Either [Char] CKResult) -> SomeException -> IO ()
forall e a. Exception e => ResultVar a -> e -> IO ()
putFailure ResultVar (Either [Char] CKResult)
var)
    (ResultVar (Either [Char] CKResult)
-> Either [Char] CKResult -> IO ()
forall a. ResultVar a -> a -> IO ()
putSuccess ResultVar (Either [Char] CKResult)
var)
    (Either SomeException (Either [Char] CKResult)
e :: Either SomeException (Either String CKResult))
fetchData (CKPool pool) BlockedFetch Query
fetch = do
  let (ByteString
queryStr, ResultVar (Either [Char] CKResult)
var) = case BlockedFetch Query
fetch of
        BlockedFetch (FetchData [Char]
q) ResultVar a
var' -> ([Char] -> ByteString
C8.pack [Char]
q, ResultVar a
ResultVar (Either [Char] CKResult)
var')
  Either SomeException (Either [Char] CKResult)
e <- IO (Either [Char] CKResult)
-> IO (Either SomeException (Either [Char] CKResult))
forall e a. Exception e => IO a -> IO (Either e a)
Control.Exception.try (IO (Either [Char] CKResult)
 -> IO (Either SomeException (Either [Char] CKResult)))
-> IO (Either [Char] CKResult)
-> IO (Either SomeException (Either [Char] CKResult))
forall a b. (a -> b) -> a -> b
$ do
    Pool TCPConnection
-> (TCPConnection -> IO (Either [Char] CKResult))
-> IO (Either [Char] CKResult)
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Pool a -> (a -> m b) -> m b
withResource Pool TCPConnection
pool ((TCPConnection -> IO (Either [Char] CKResult))
 -> IO (Either [Char] CKResult))
-> (TCPConnection -> IO (Either [Char] CKResult))
-> IO (Either [Char] CKResult)
forall a b. (a -> b) -> a -> b
$ \TCPConnection
conn->do
      TCPConnection -> ByteString -> Maybe ByteString -> IO ()
sendQuery TCPConnection
conn ByteString
queryStr Maybe ByteString
forall a. Maybe a
Nothing
      TCPConnection -> ByteString -> Maybe Block -> IO ()
sendData TCPConnection
conn ByteString
"" Maybe Block
forall a. Maybe a
Nothing
      let serverInfo :: ServerInfo
serverInfo = case TCPConnection -> Maybe ServerInfo
getServerInfo TCPConnection
conn of
            Just ServerInfo
info -> ServerInfo
info
            Maybe ServerInfo
Nothing -> [Char] -> ServerInfo
forall a. HasCallStack => [Char] -> a
error [Char]
"Empty server information"
      let sock :: Socket
sock = TCPConnection -> Socket
tcpSocket TCPConnection
conn
      Buffer
buf <- Int -> Socket -> IO Buffer
createBuffer Int
_BUFFER_SIZE Socket
sock
      (Either [Char] CKResult
res, Buffer
_) <- StateT Buffer IO (Either [Char] CKResult)
-> Buffer -> IO (Either [Char] CKResult, Buffer)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
runStateT (ServerInfo
-> QueryInfo -> StateT Buffer IO (Either [Char] CKResult)
receiveResult ServerInfo
serverInfo QueryInfo
defaultQueryInfo) Buffer
buf
      Either [Char] CKResult -> IO (Either [Char] CKResult)
forall (m :: * -> *) a. Monad m => a -> m a
return Either [Char] CKResult
res
  (SomeException -> IO ())
-> (Either [Char] CKResult -> IO ())
-> Either SomeException (Either [Char] CKResult)
-> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either
    (ResultVar (Either [Char] CKResult) -> SomeException -> IO ()
forall e a. Exception e => ResultVar a -> e -> IO ()
putFailure ResultVar (Either [Char] CKResult)
var)
    (ResultVar (Either [Char] CKResult)
-> Either [Char] CKResult -> IO ()
forall a. ResultVar a -> a -> IO ()
putSuccess ResultVar (Either [Char] CKResult)
var)
    (Either SomeException (Either [Char] CKResult)
e :: Either SomeException (Either String CKResult))

deploySettings :: TCPConnection -> IO (Env () w)
deploySettings :: TCPConnection -> IO (Env () w)
deploySettings TCPConnection
tcp =
  StateStore -> () -> IO (Env () w)
forall u w. StateStore -> u -> IO (Env u w)
initEnv (State Query -> StateStore -> StateStore
forall (f :: * -> *).
StateKey f =>
State f -> StateStore -> StateStore
stateSet (TCPConnection -> State Query
CKResource TCPConnection
tcp) StateStore
stateEmpty) ()

defaultClient :: IO (Env () w)
defaultClient :: IO (Env () w)
defaultClient = ConnParams -> IO (Env () w)
forall w. ConnParams -> IO (Env () w)
createClient ConnParams
forall a. Default a => a
def 

-- | create client with given information such as username, host name and password etc. 
createClient :: ConnParams->IO(Env () w)
createClient :: ConnParams -> IO (Env () w)
createClient ConnParams{
                 ByteString
username' :: ConnParams -> ByteString
username' :: ByteString
username'   
                ,ByteString
host' :: ConnParams -> ByteString
host' :: ByteString
host'       
                ,ByteString
port' :: ConnParams -> ByteString
port' :: ByteString
port'       
                ,ByteString
password' :: ConnParams -> ByteString
password' :: ByteString
password'   
                ,Bool
compression' :: ConnParams -> Bool
compression' :: Bool
compression'
                ,ByteString
database' :: ConnParams -> ByteString
database' :: ByteString
database'   
             } = do
          Either [Char] TCPConnection
tcp <- ByteString
-> ByteString
-> ByteString
-> ByteString
-> ByteString
-> Bool
-> IO (Either [Char] TCPConnection)
tcpConnect  
                  ByteString
host'       
                  ByteString
port'
                  ByteString
username'       
                  ByteString
password'   
                  ByteString
database'
                  Bool
compression'
          case Either [Char] TCPConnection
tcp of
            Left [Char]
e -> Either [Char] (Pool TCPConnection) -> IO (Env () w)
forall a w. Resource a => Either [Char] a -> IO (Env () w)
client (([Char] -> Either [Char] (Pool TCPConnection)
forall a b. a -> Either a b
Left [Char]
e) :: Either String (Pool TCPConnection))
            Right TCPConnection
conn -> Either [Char] TCPConnection -> IO (Env () w)
forall a w. Resource a => Either [Char] a -> IO (Env () w)
client (Either [Char] TCPConnection -> IO (Env () w))
-> Either [Char] TCPConnection -> IO (Env () w)
forall a b. (a -> b) -> a -> b
$ TCPConnection -> Either [Char] TCPConnection
forall a b. b -> Either a b
Right TCPConnection
conn
  
defaultClientPool :: Int
                    -- ^ number of stripes
                   ->NominalDiffTime
                    -- ^ idle time for each stripe
                   ->Int
                    -- ^ maximum resources for reach stripe
                   ->IO (Env () w)
                    -- ^ Haxl env wrapped in IO monad.
defaultClientPool :: Int -> NominalDiffTime -> Int -> IO (Env () w)
defaultClientPool = ConnParams -> Int -> NominalDiffTime -> Int -> IO (Env () w)
forall w.
ConnParams -> Int -> NominalDiffTime -> Int -> IO (Env () w)
createClientPool ConnParams
forall a. Default a => a
def

createClientPool :: ConnParams
                  -- ^ parameters for connection settings
                  ->Int
                  -- ^ number of stripes
                  ->NominalDiffTime
                  -- ^ idle time for each stripe
                  ->Int
                  -- ^ maximum resources for reach stripe
                  ->IO(Env () w)
createClientPool :: ConnParams -> Int -> NominalDiffTime -> Int -> IO (Env () w)
createClientPool ConnParams
params Int
numberStripes NominalDiffTime
idleTime Int
maxResources = do 
  Pool TCPConnection
pool <- ConnParams
-> Int -> NominalDiffTime -> Int -> IO (Pool TCPConnection)
createConnectionPool ConnParams
params Int
numberStripes NominalDiffTime
idleTime Int
maxResources
  Either [Char] (Pool TCPConnection) -> IO (Env () w)
forall a w. Resource a => Either [Char] a -> IO (Env () w)
client (Either [Char] (Pool TCPConnection) -> IO (Env () w))
-> Either [Char] (Pool TCPConnection) -> IO (Env () w)
forall a b. (a -> b) -> a -> b
$ Pool TCPConnection -> Either [Char] (Pool TCPConnection)
forall a b. b -> Either a b
Right Pool TCPConnection
pool

instance Resource TCPConnection where
  client :: Either [Char] TCPConnection -> IO (Env () w)
client (Left [Char]
e) = [Char] -> IO (Env () w)
forall a. HasCallStack => [Char] -> a
error [Char]
e
  client (Right TCPConnection
src) = StateStore -> () -> IO (Env () w)
forall u w. StateStore -> u -> IO (Env u w)
initEnv (State Query -> StateStore -> StateStore
forall (f :: * -> *).
StateKey f =>
State f -> StateStore -> StateStore
stateSet (TCPConnection -> State Query
CKResource TCPConnection
src) StateStore
stateEmpty) ()

instance Resource (Pool TCPConnection) where
  client :: Either [Char] (Pool TCPConnection) -> IO (Env () w)
client (Left [Char]
e) = [Char] -> IO (Env () w)
forall a. HasCallStack => [Char] -> a
error [Char]
e
  client (Right Pool TCPConnection
src) = StateStore -> () -> IO (Env () w)
forall u w. StateStore -> u -> IO (Env u w)
initEnv (State Query -> StateStore -> StateStore
forall (f :: * -> *).
StateKey f =>
State f -> StateStore -> StateStore
stateSet (Pool TCPConnection -> State Query
CKPool Pool TCPConnection
src) StateStore
stateEmpty) ()

-- | fetch data alone with query information
fetchWithInfo :: String->GenHaxl u w (Either String CKResult)
fetchWithInfo :: [Char] -> GenHaxl u w (Either [Char] CKResult)
fetchWithInfo = Query (Either [Char] CKResult)
-> GenHaxl u w (Either [Char] CKResult)
forall u (r :: * -> *) a w.
(DataSource u r, Request r a) =>
r a -> GenHaxl u w a
dataFetch (Query (Either [Char] CKResult)
 -> GenHaxl u w (Either [Char] CKResult))
-> ([Char] -> Query (Either [Char] CKResult))
-> [Char]
-> GenHaxl u w (Either [Char] CKResult)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> Query (Either [Char] CKResult)
FetchData

-- | Fetch data
fetch :: String
        -- ^ SQL SELECT command
       ->GenHaxl u w (Either String (Vector (Vector ClickhouseType)))
        -- ^ result wrapped in Haxl monad for other tasks run with concurrency.
fetch :: [Char]
-> GenHaxl u w (Either [Char] (Vector (Vector ClickhouseType)))
fetch [Char]
str = do
  Either [Char] CKResult
result_with_info <- [Char] -> GenHaxl u w (Either [Char] CKResult)
forall u w. [Char] -> GenHaxl u w (Either [Char] CKResult)
fetchWithInfo [Char]
str
  case Either [Char] CKResult
result_with_info of
    Right CKResult{query_result :: CKResult -> Vector (Vector ClickhouseType)
query_result=Vector (Vector ClickhouseType)
r}->Either [Char] (Vector (Vector ClickhouseType))
-> GenHaxl u w (Either [Char] (Vector (Vector ClickhouseType)))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either [Char] (Vector (Vector ClickhouseType))
 -> GenHaxl u w (Either [Char] (Vector (Vector ClickhouseType))))
-> Either [Char] (Vector (Vector ClickhouseType))
-> GenHaxl u w (Either [Char] (Vector (Vector ClickhouseType)))
forall a b. (a -> b) -> a -> b
$ Vector (Vector ClickhouseType)
-> Either [Char] (Vector (Vector ClickhouseType))
forall a b. b -> Either a b
Right Vector (Vector ClickhouseType)
r
    Left [Char]
err -> Either [Char] (Vector (Vector ClickhouseType))
-> GenHaxl u w (Either [Char] (Vector (Vector ClickhouseType)))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either [Char] (Vector (Vector ClickhouseType))
 -> GenHaxl u w (Either [Char] (Vector (Vector ClickhouseType))))
-> Either [Char] (Vector (Vector ClickhouseType))
-> GenHaxl u w (Either [Char] (Vector (Vector ClickhouseType)))
forall a b. (a -> b) -> a -> b
$ [Char] -> Either [Char] (Vector (Vector ClickhouseType))
forall a b. a -> Either a b
Left [Char]
err

-- | query result contains query information.
queryWithInfo :: String->Env () w->IO (Either String CKResult)
queryWithInfo :: [Char] -> Env () w -> IO (Either [Char] CKResult)
queryWithInfo [Char]
query Env () w
source = Env () w
-> GenHaxl () w (Either [Char] CKResult)
-> IO (Either [Char] CKResult)
forall u w a. Env u w -> GenHaxl u w a -> IO a
runHaxl Env () w
source ([Char] -> GenHaxl () w (Either [Char] CKResult)
forall u w. [Char] -> GenHaxl u w (Either [Char] CKResult)
executeQuery [Char]
query)
  where
    executeQuery :: String->GenHaxl u w (Either String CKResult)
    executeQuery :: [Char] -> GenHaxl u w (Either [Char] CKResult)
executeQuery = Query (Either [Char] CKResult)
-> GenHaxl u w (Either [Char] CKResult)
forall u (r :: * -> *) a w.
(DataSource u r, Request r a) =>
r a -> GenHaxl u w a
dataFetch (Query (Either [Char] CKResult)
 -> GenHaxl u w (Either [Char] CKResult))
-> ([Char] -> Query (Either [Char] CKResult))
-> [Char]
-> GenHaxl u w (Either [Char] CKResult)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> Query (Either [Char] CKResult)
FetchData

-- | query command
query :: Env () w
        -- ^ Haxl environment for connection
       ->String
        -- ^ Query command for "SELECT" and "SHOW" only
       ->IO (Either String (Vector (Vector ClickhouseType)))
query :: Env () w
-> [Char] -> IO (Either [Char] (Vector (Vector ClickhouseType)))
query Env () w
source [Char]
cmd = do
  Either [Char] CKResult
query_with_info <- [Char] -> Env () w -> IO (Either [Char] CKResult)
forall w. [Char] -> Env () w -> IO (Either [Char] CKResult)
queryWithInfo [Char]
cmd Env () w
source
  case Either [Char] CKResult
query_with_info of
    Right CKResult{query_result :: CKResult -> Vector (Vector ClickhouseType)
query_result=Vector (Vector ClickhouseType)
r}->Either [Char] (Vector (Vector ClickhouseType))
-> IO (Either [Char] (Vector (Vector ClickhouseType)))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either [Char] (Vector (Vector ClickhouseType))
 -> IO (Either [Char] (Vector (Vector ClickhouseType))))
-> Either [Char] (Vector (Vector ClickhouseType))
-> IO (Either [Char] (Vector (Vector ClickhouseType)))
forall a b. (a -> b) -> a -> b
$ Vector (Vector ClickhouseType)
-> Either [Char] (Vector (Vector ClickhouseType))
forall a b. b -> Either a b
Right Vector (Vector ClickhouseType)
r
    Left [Char]
err->Either [Char] (Vector (Vector ClickhouseType))
-> IO (Either [Char] (Vector (Vector ClickhouseType)))
forall (m :: * -> *) a. Monad m => a -> m a
return (Either [Char] (Vector (Vector ClickhouseType))
 -> IO (Either [Char] (Vector (Vector ClickhouseType))))
-> Either [Char] (Vector (Vector ClickhouseType))
-> IO (Either [Char] (Vector (Vector ClickhouseType)))
forall a b. (a -> b) -> a -> b
$ [Char] -> Either [Char] (Vector (Vector ClickhouseType))
forall a b. a -> Either a b
Left [Char]
err

-- | For general use e.g. creating table,
-- multiple queries, multiple insertions. 
execute :: Env u w -> GenHaxl u w a -> IO a
execute :: Env u w -> GenHaxl u w a -> IO a
execute = Env u w -> GenHaxl u w a -> IO a
forall u w a. Env u w -> GenHaxl u w a -> IO a
runHaxl

withQuery :: Env () w
            -- ^ enviroment i.e. the database resource
           ->String
           -- ^ sql statement
           ->(Either String (Vector (Vector ClickhouseType))->IO a)
           -- ^ callback function that returns type a
           ->IO a
           -- ^ type a wrapped in IO monad.
withQuery :: Env () w
-> [Char]
-> (Either [Char] (Vector (Vector ClickhouseType)) -> IO a)
-> IO a
withQuery Env () w
source [Char]
cmd Either [Char] (Vector (Vector ClickhouseType)) -> IO a
f = Env () w
-> [Char] -> IO (Either [Char] (Vector (Vector ClickhouseType)))
forall w.
Env () w
-> [Char] -> IO (Either [Char] (Vector (Vector ClickhouseType)))
query Env () w
source [Char]
cmd IO (Either [Char] (Vector (Vector ClickhouseType)))
-> (Either [Char] (Vector (Vector ClickhouseType)) -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Either [Char] (Vector (Vector ClickhouseType)) -> IO a
f

insertMany :: Env () w->String->[[ClickhouseType]]->IO(BS.ByteString)
insertMany :: Env () w -> [Char] -> [[ClickhouseType]] -> IO ByteString
insertMany Env () w
source [Char]
cmd [[ClickhouseType]]
items = do
  let Maybe (State Query)
st :: Maybe (State Query) = StateStore -> Maybe (State Query)
forall (r :: * -> *). StateKey r => StateStore -> Maybe (State r)
stateGet (StateStore -> Maybe (State Query))
-> StateStore -> Maybe (State Query)
forall a b. (a -> b) -> a -> b
$ Env () w -> StateStore
forall u w. Env u w -> StateStore
states Env () w
source
  case Maybe (State Query)
st of
    Maybe (State Query)
Nothing             -> [Char] -> IO ByteString
forall a. HasCallStack => [Char] -> a
error [Char]
"No Connection."
    Just (CKResource tcp) -> TCPConnection
-> ByteString
-> Maybe ByteString
-> [[ClickhouseType]]
-> IO ByteString
processInsertQuery TCPConnection
tcp ([Char] -> ByteString
C8.pack [Char]
cmd) Maybe ByteString
forall a. Maybe a
Nothing [[ClickhouseType]]
items
    Just (CKPool pool) -> 
      Pool TCPConnection
-> (TCPConnection -> IO ByteString) -> IO ByteString
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Pool a -> (a -> m b) -> m b
withResource Pool TCPConnection
pool ((TCPConnection -> IO ByteString) -> IO ByteString)
-> (TCPConnection -> IO ByteString) -> IO ByteString
forall a b. (a -> b) -> a -> b
$ \TCPConnection
tcp->do
        TCPConnection
-> ByteString
-> Maybe ByteString
-> [[ClickhouseType]]
-> IO ByteString
processInsertQuery TCPConnection
tcp ([Char] -> ByteString
C8.pack [Char]
cmd) Maybe ByteString
forall a. Maybe a
Nothing [[ClickhouseType]]
items

insertOneRow :: Env () w
              ->String
              -- ^ SQL command
              ->[ClickhouseType]
              -- ^ a row of local clickhouse data type to be serialized and inserted. 
              ->IO(BS.ByteString)
              -- ^ The resulting bytestring indicates success or failure.
insertOneRow :: Env () w -> [Char] -> [ClickhouseType] -> IO ByteString
insertOneRow Env () w
source [Char]
cmd [ClickhouseType]
items = Env () w -> [Char] -> [[ClickhouseType]] -> IO ByteString
forall w. Env () w -> [Char] -> [[ClickhouseType]] -> IO ByteString
insertMany Env () w
source [Char]
cmd [[ClickhouseType]
items]

-- | ping pong 
ping :: Env () w->IO()
ping :: Env () w -> IO ()
ping Env () w
source = do
  let Maybe (State Query)
get :: Maybe (State Query) = StateStore -> Maybe (State Query)
forall (r :: * -> *). StateKey r => StateStore -> Maybe (State r)
stateGet (StateStore -> Maybe (State Query))
-> StateStore -> Maybe (State Query)
forall a b. (a -> b) -> a -> b
$ Env () w -> StateStore
forall u w. Env u w -> StateStore
states Env () w
source
  case Maybe (State Query)
get of
    Maybe (State Query)
Nothing -> [Char] -> IO ()
forall a. Show a => a -> IO ()
print [Char]
"empty source"
    Just (CKResource tcp)
     -> Int -> TCPConnection -> IO (Maybe [Char])
ping' Int
Defines._DEFAULT_PING_WAIT_TIME TCPConnection
tcp IO (Maybe [Char]) -> (Maybe [Char] -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe [Char] -> IO ()
forall a. Show a => a -> IO ()
print
    Just (CKPool pool)
     -> Pool TCPConnection -> (TCPConnection -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Pool a -> (a -> m b) -> m b
withResource Pool TCPConnection
pool ((TCPConnection -> IO ()) -> IO ())
-> (TCPConnection -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \TCPConnection
src->
       Int -> TCPConnection -> IO (Maybe [Char])
ping' Int
Defines._DEFAULT_PING_WAIT_TIME TCPConnection
src IO (Maybe [Char]) -> (Maybe [Char] -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe [Char] -> IO ()
forall a. Show a => a -> IO ()
print 

-- | close connection
closeClient :: Env () w -> IO()
closeClient :: Env () w -> IO ()
closeClient Env () w
source = do
  let Maybe (State Query)
get :: Maybe (State Query) = StateStore -> Maybe (State Query)
forall (r :: * -> *). StateKey r => StateStore -> Maybe (State r)
stateGet (StateStore -> Maybe (State Query))
-> StateStore -> Maybe (State Query)
forall a b. (a -> b) -> a -> b
$ Env () w -> StateStore
forall u w. Env u w -> StateStore
states Env () w
source
  case Maybe (State Query)
get of
    Maybe (State Query)
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    Just (CKResource TCPConnection{tcpSocket=sock})
     -> Socket -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> m ()
TCP.closeSock Socket
sock
    Just (CKPool pool)
     -> Pool TCPConnection -> IO ()
forall a. Pool a -> IO ()
destroyAllResources Pool TCPConnection
pool