-- Copyright (c) 2020-present, EMQX, Inc.
-- All rights reserved.
--
-- This source code is distributed under the terms of a MIT license,
-- found in the LICENSE file.
{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE BlockArguments #-}

-- | This module contains the implementations of communication with Clickhouse server.
--   Most of functions are for internal use. 
--   User should just use Database.ClickHouseDriver.
--

module Database.ClickHouseDriver.Connection
  ( tcpConnect,
    sendQuery,
    receiveData,
    sendData,
    receiveResult,
    closeConnection,
    processInsertQuery,
    ping',
    versionTuple,
    sendCancel,
  )
where

import qualified Database.ClickHouseDriver.Block as Block
import qualified Database.ClickHouseDriver.ClientProtocol as Client
import Database.ClickHouseDriver.Column (transpose)
import Database.ClickHouseDriver.Defines
  ( _BUFFER_SIZE,
    _CLIENT_NAME,
    _CLIENT_REVISION,
    _CLIENT_VERSION_MAJOR,
    _CLIENT_VERSION_MINOR,
    _DBMS_MIN_REVISION_WITH_CLIENT_INFO,
    _DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO,
    _DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME,
    _DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE,
    _DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES,
    _DBMS_MIN_REVISION_WITH_VERSION_PATCH,
    _DBMS_NAME,
    _DEFAULT_INSERT_BLOCK_SIZE,
    _STRINGS_ENCODING,
  )
import qualified Database.ClickHouseDriver.Error as Error
import qualified Database.ClickHouseDriver.QueryProcessingStage as Stage
import qualified Database.ClickHouseDriver.ServerProtocol as Server
import Database.ClickHouseDriver.Types
  ( Block (ColumnOrientedBlock, cdata),
    CKResult (CKResult),
    ClientInfo (ClientInfo),
    ClientSetting
      ( ClientSetting,
        insert_block_size,
        strings_as_bytes,
        strings_encoding
      ),
    Context (Context, client_info, client_setting, server_info),
    Interface (HTTP),
    Packet
      ( Block,
        EndOfStream,
        ErrorMessage,
        Hello,
        MultiString,
        Progress,
        StreamProfileInfo,
        queryData
      ),
    QueryInfo,
    ServerInfo (..),
    TCPConnection (..),
    getDefaultClientInfo,
    getServerInfo,
    readBlockStreamProfileInfo,
    readProgress,
    storeProfile,
    storeProgress,
    ClickhouseType(..)
  )
import Database.ClickHouseDriver.IO.BufferedReader
  ( Buffer (socket),
    Reader,
    createBuffer,
    readBinaryStr,
    readVarInt,
    refill,
  )
import Database.ClickHouseDriver.IO.BufferedWriter
  ( MonoidMap,
    Writer,
    writeBinaryStr,
    writeVarUInt,
  )
import Control.Monad.Loops (iterateWhile)
import Control.Monad.State.Lazy (get, runStateT)
import Control.Monad.Writer (WriterT (runWriterT), execWriterT)
import Control.Monad (when)
import Data.Maybe (fromMaybe)
import Data.ByteString (ByteString)
import Data.Foldable (forM_)
import Data.ByteString.Builder
  ( Builder,
    toLazyByteString,
  )
import Data.ByteString.Char8 (unpack)
import qualified Data.List as List (transpose)
import Data.List.Split (chunksOf)
import Data.Vector ((!))
import qualified Data.Vector as V
  ( concat,
    fromList,
    length,
    map,
  )
import qualified Network.Simple.TCP as TCP
  ( closeSock,
    connectSock,
    sendLazy,
  )
import Network.Socket (Socket)
import System.Timeout (timeout)

--Debug
--import Debug.Trace ( trace )

-- | This module mainly focuses how to make connection
-- | to clickhouse database and protocols to send and receive data
versionTuple :: ServerInfo -> (Word, Word, Word)
versionTuple :: ServerInfo -> (Word, Word, Word)
versionTuple (ServerInfo ByteString
_ Word
major Word
minor Word
patch Word
_ Maybe ByteString
_ ByteString
_) = (Word
major, Word
minor, Word
patch)

-- | internal implementation for ping test. 
ping' :: Int
        -- ^ Time limit
       ->TCPConnection
        -- ^ host name, port number, and socket are needed
       ->IO (Maybe String)
        -- ^ response `ping`, or nothing indicating server is not properly connected.
ping' :: Int -> TCPConnection -> IO (Maybe String)
ping' Int
timeLimit TCPConnection {tcpHost :: TCPConnection -> ByteString
tcpHost = ByteString
host, tcpPort :: TCPConnection -> ByteString
tcpPort = ByteString
port, tcpSocket :: TCPConnection -> Socket
tcpSocket = Socket
sock} =
  Int -> IO String -> IO (Maybe String)
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
timeLimit (IO String -> IO (Maybe String)) -> IO String -> IO (Maybe String)
forall a b. (a -> b) -> a -> b
$ do
    Builder
r <- WriterT Builder IO () -> IO Builder
forall (m :: * -> *) w a. Monad m => WriterT w m a -> m w
execWriterT (WriterT Builder IO () -> IO Builder)
-> WriterT Builder IO () -> IO Builder
forall a b. (a -> b) -> a -> b
$ Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
Client._PING
    Socket -> ByteString -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> ByteString -> m ()
TCP.sendLazy Socket
sock (Builder -> ByteString
toLazyByteString Builder
r)
    Buffer
buf <- Int -> Socket -> IO Buffer
createBuffer Int
1024 Socket
sock
    (Word
packet_type, Buffer
_) <- StateT Buffer IO Word -> Buffer -> IO (Word, Buffer)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
runStateT ((Word -> Bool) -> StateT Buffer IO Word -> StateT Buffer IO Word
forall (m :: * -> *) a. Monad m => (a -> Bool) -> m a -> m a
iterateWhile (Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== Word
Server._PROGRESS) StateT Buffer IO Word
readVarInt) Buffer
buf
    if Word
packet_type Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
/= Word
Server._PONG
      then do
        let p_type :: ByteString
p_type = Int -> ByteString
Server.toString (Int -> ByteString) -> Int -> ByteString
forall a b. (a -> b) -> a -> b
$ Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
packet_type
        let report :: String
report =
              String
"Unexpected packet from server " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
forall a. Show a => a -> String
show ByteString
host String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
":"
                String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
forall a. Show a => a -> String
show ByteString
port
                String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
", expected "
                String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"Pong!"
                String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
", got "
                String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
forall a. Show a => a -> String
show ByteString
p_type
        String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$
          ClickhouseException -> String
forall a. Show a => a -> String
show
            ServerException :: String
-> Integer -> Maybe ClickhouseException -> ClickhouseException
Error.ServerException
              { code :: Integer
code = Integer
Error._UNEXPECTED_PACKET_FROM_SERVER,
                message :: String
message = String
report,
                nested :: Maybe ClickhouseException
nested = Maybe ClickhouseException
forall a. Maybe a
Nothing
              }
      else String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return String
"PONG!"

-- | send hello has to make for every new connection environment.
sendHello :: (ByteString, ByteString, ByteString)
              -- ^ (database name, username, password)
            ->Socket
              -- ^ socket connected to Clickhouse server 
            ->IO ()
sendHello :: (ByteString, ByteString, ByteString) -> Socket -> IO ()
sendHello (ByteString
database, ByteString
username, ByteString
password) Socket
sock = do
  (()
_, Builder
w) <- WriterT Builder IO () -> IO ((), Builder)
forall w (m :: * -> *) a. WriterT w m a -> m (a, w)
runWriterT WriterT Builder IO ()
writeHello
  Socket -> ByteString -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> ByteString -> m ()
TCP.sendLazy Socket
sock (Builder -> ByteString
toLazyByteString Builder
w)
  where
    writeHello :: Writer Builder
    writeHello :: WriterT Builder IO ()
writeHello = do
      Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
Client._HELLO
      ByteString -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr (ByteString
"ClickHouse " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
_CLIENT_NAME)
      Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
_CLIENT_VERSION_MAJOR
      Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
_CLIENT_VERSION_MINOR
      Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
_CLIENT_REVISION
      ByteString -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
database
      ByteString -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
username
      ByteString -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
password

-- | receive server information if connection is successful, otherwise it would receive error message.
receiveHello :: Buffer
              -- ^ Read `Hello` response from server
              ->IO (Either ByteString ServerInfo)
              -- ^ Either error message or server information will be received.
receiveHello :: Buffer -> IO (Either ByteString ServerInfo)
receiveHello Buffer
buf = do
  (Either ByteString ServerInfo
res, Buffer
_) <- StateT Buffer IO (Either ByteString ServerInfo)
-> Buffer -> IO (Either ByteString ServerInfo, Buffer)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
runStateT StateT Buffer IO (Either ByteString ServerInfo)
receiveHello' Buffer
buf
  Either ByteString ServerInfo -> IO (Either ByteString ServerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return Either ByteString ServerInfo
res
  where
    receiveHello' :: Reader (Either ByteString ServerInfo)
    receiveHello' :: StateT Buffer IO (Either ByteString ServerInfo)
receiveHello' = do
      Word
packet_type <- StateT Buffer IO Word
readVarInt
      if Word
packet_type Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== Word
Server._HELLO
        then do
          ByteString
server_name <- Reader ByteString
readBinaryStr
          Word
server_version_major <- StateT Buffer IO Word
readVarInt
          Word
server_version_minor <- StateT Buffer IO Word
readVarInt
          Word
server_revision <- StateT Buffer IO Word
readVarInt
          Maybe ByteString
server_timezone <-
            if Word
server_revision Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
>= Word
_DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE
              then do ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString)
-> Reader ByteString -> StateT Buffer IO (Maybe ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Reader ByteString
readBinaryStr
              else Maybe ByteString -> StateT Buffer IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ByteString
forall a. Maybe a
Nothing
          ByteString
server_display_name <-
            if Word
server_revision Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
>= Word
_DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME
              then do
                Reader ByteString
readBinaryStr
              else ByteString -> Reader ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
""
          Word
server_version_dispatch <-
            if Word
server_revision Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
>= Word
_DBMS_MIN_REVISION_WITH_VERSION_PATCH
              then do
                StateT Buffer IO Word
readVarInt
              else Word -> StateT Buffer IO Word
forall (m :: * -> *) a. Monad m => a -> m a
return Word
server_revision
          Either ByteString ServerInfo
-> StateT Buffer IO (Either ByteString ServerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ByteString ServerInfo
 -> StateT Buffer IO (Either ByteString ServerInfo))
-> Either ByteString ServerInfo
-> StateT Buffer IO (Either ByteString ServerInfo)
forall a b. (a -> b) -> a -> b
$
            ServerInfo -> Either ByteString ServerInfo
forall a b. b -> Either a b
Right
              ServerInfo :: ByteString
-> Word
-> Word
-> Word
-> Word
-> Maybe ByteString
-> ByteString
-> ServerInfo
ServerInfo
                { name :: ByteString
name = ByteString
server_name,
                  version_major :: Word
version_major = Word
server_version_major,
                  version_minor :: Word
version_minor = Word
server_version_minor,
                  version_patch :: Word
version_patch = Word
server_version_dispatch,
                  revision :: Word
revision = Word
server_revision,
                  timezone :: Maybe ByteString
timezone = Maybe ByteString
server_timezone,
                  display_name :: ByteString
display_name = ByteString
server_display_name
                }
        else
          if Word
packet_type Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== Word
Server._EXCEPTION
            then do
              ByteString
e <- Reader ByteString
readBinaryStr
              ByteString
e2 <- Reader ByteString
readBinaryStr
              ByteString
e3 <- Reader ByteString
readBinaryStr
              Either ByteString ServerInfo
-> StateT Buffer IO (Either ByteString ServerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ByteString ServerInfo
 -> StateT Buffer IO (Either ByteString ServerInfo))
-> Either ByteString ServerInfo
-> StateT Buffer IO (Either ByteString ServerInfo)
forall a b. (a -> b) -> a -> b
$ ByteString -> Either ByteString ServerInfo
forall a b. a -> Either a b
Left (ByteString
"exception" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
e ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
e2 ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
e3)
            else Either ByteString ServerInfo
-> StateT Buffer IO (Either ByteString ServerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ByteString ServerInfo
 -> StateT Buffer IO (Either ByteString ServerInfo))
-> Either ByteString ServerInfo
-> StateT Buffer IO (Either ByteString ServerInfo)
forall a b. (a -> b) -> a -> b
$ ByteString -> Either ByteString ServerInfo
forall a b. a -> Either a b
Left ByteString
"Error"

-- | connect to database through TCP port, used in Client module.
tcpConnect ::
  -- | host name to connect
  ByteString ->
  -- | port name to connect. Default would be 8123
  ByteString ->
  -- | username. Default would be "default"
  ByteString ->
  -- | password. Default would be empty string
  ByteString ->
  -- | database. Default would be "default"
  ByteString ->
  -- | choose if send and receive data in compressed form. Default would be False.
  Bool ->
  IO (Either String TCPConnection)
tcpConnect :: ByteString
-> ByteString
-> ByteString
-> ByteString
-> ByteString
-> Bool
-> IO (Either String TCPConnection)
tcpConnect ByteString
host ByteString
port ByteString
user ByteString
password ByteString
database Bool
compression = do
  (Socket
sock, SockAddr
sockaddr) <- String -> String -> IO (Socket, SockAddr)
forall (m :: * -> *).
MonadIO m =>
String -> String -> m (Socket, SockAddr)
TCP.connectSock (ByteString -> String
unpack ByteString
host) (ByteString -> String
unpack ByteString
port)
  (ByteString, ByteString, ByteString) -> Socket -> IO ()
sendHello (ByteString
database, ByteString
user, ByteString
password) Socket
sock
  let isCompressed :: Word
isCompressed = if Bool
compression then Word
Client._COMPRESSION_ENABLE else Word
Client._COMPRESSION_DISABLE

  Buffer
buf <- Int -> Socket -> IO Buffer
createBuffer Int
_BUFFER_SIZE Socket
sock
  Either ByteString ServerInfo
hello <- Buffer -> IO (Either ByteString ServerInfo)
receiveHello Buffer
buf
  case Either ByteString ServerInfo
hello of
    Right ServerInfo
x ->
      Either String TCPConnection -> IO (Either String TCPConnection)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either String TCPConnection -> IO (Either String TCPConnection))
-> Either String TCPConnection -> IO (Either String TCPConnection)
forall a b. (a -> b) -> a -> b
$
        TCPConnection -> Either String TCPConnection
forall a b. b -> Either a b
Right
          TCPConnection :: ByteString
-> ByteString
-> ByteString
-> ByteString
-> Socket
-> SockAddr
-> Context
-> Word
-> TCPConnection
TCPConnection
            { tcpHost :: ByteString
tcpHost = ByteString
host,
              tcpPort :: ByteString
tcpPort = ByteString
port,
              tcpUsername :: ByteString
tcpUsername = ByteString
user,
              tcpPassword :: ByteString
tcpPassword = ByteString
password,
              tcpSocket :: Socket
tcpSocket = Socket
sock,
              tcpSockAdrr :: SockAddr
tcpSockAdrr = SockAddr
sockaddr,
              context :: Context
context =
                Context :: Maybe ClientInfo
-> Maybe ServerInfo -> Maybe ClientSetting -> Context
Context
                  { server_info :: Maybe ServerInfo
server_info = ServerInfo -> Maybe ServerInfo
forall a. a -> Maybe a
Just ServerInfo
x,
                    client_info :: Maybe ClientInfo
client_info = Maybe ClientInfo
forall a. Maybe a
Nothing,
                    client_setting :: Maybe ClientSetting
client_setting =
                      ClientSetting -> Maybe ClientSetting
forall a. a -> Maybe a
Just (ClientSetting -> Maybe ClientSetting)
-> ClientSetting -> Maybe ClientSetting
forall a b. (a -> b) -> a -> b
$
                        ClientSetting :: Word -> Bool -> ByteString -> ClientSetting
ClientSetting
                          { insert_block_size :: Word
insert_block_size = Word
_DEFAULT_INSERT_BLOCK_SIZE,
                            strings_as_bytes :: Bool
strings_as_bytes = Bool
False,
                            strings_encoding :: ByteString
strings_encoding = ByteString
_STRINGS_ENCODING
                          }
                  },
              tcpCompression :: Word
tcpCompression = Word
isCompressed
            }
    Left ByteString
"Exception" -> Either String TCPConnection -> IO (Either String TCPConnection)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either String TCPConnection -> IO (Either String TCPConnection))
-> Either String TCPConnection -> IO (Either String TCPConnection)
forall a b. (a -> b) -> a -> b
$ String -> Either String TCPConnection
forall a b. a -> Either a b
Left String
"exception"
    Left ByteString
x -> do
      ByteString -> IO ()
forall a. Show a => a -> IO ()
print (ByteString
"error is " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
x)
      Socket -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> m ()
TCP.closeSock Socket
sock
      Either String TCPConnection -> IO (Either String TCPConnection)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either String TCPConnection -> IO (Either String TCPConnection))
-> Either String TCPConnection -> IO (Either String TCPConnection)
forall a b. (a -> b) -> a -> b
$ String -> Either String TCPConnection
forall a b. a -> Either a b
Left String
"Connection error"

sendQuery :: TCPConnection
           -- ^ To get socket and server info
           ->ByteString
           -- ^ SQL statement
           ->Maybe ByteString
           -- ^ query_id if any
           ->IO ()
sendQuery :: TCPConnection -> ByteString -> Maybe ByteString -> IO ()
sendQuery TCPConnection {context :: TCPConnection -> Context
context = Context {server_info :: Context -> Maybe ServerInfo
server_info = Maybe ServerInfo
Nothing}} ByteString
_ Maybe ByteString
_ = String -> IO ()
forall a. HasCallStack => String -> a
error String
"Empty server info"
sendQuery
  TCPConnection
    { tcpCompression :: TCPConnection -> Word
tcpCompression = Word
comp,
      tcpSocket :: TCPConnection -> Socket
tcpSocket = Socket
sock,
      context :: TCPConnection -> Context
context =
        Context
          { server_info :: Context -> Maybe ServerInfo
server_info = Just ServerInfo
info
          }
    }
  ByteString
query
  Maybe ByteString
query_id = do
    (()
_, Builder
r) <- WriterT Builder IO () -> IO ((), Builder)
forall w (m :: * -> *) a. WriterT w m a -> m (a, w)
runWriterT (WriterT Builder IO () -> IO ((), Builder))
-> WriterT Builder IO () -> IO ((), Builder)
forall a b. (a -> b) -> a -> b
$ do
      Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
Client._QUERY
      ByteString -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr (ByteString -> WriterT Builder IO ())
-> ByteString -> WriterT Builder IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
fromMaybe ByteString
"" Maybe ByteString
query_id
      let revision' :: Word
revision' = ServerInfo -> Word
revision ServerInfo
info
      Bool -> WriterT Builder IO () -> WriterT Builder IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word
revision' Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
>= Word
_DBMS_MIN_REVISION_WITH_CLIENT_INFO)
          (WriterT Builder IO () -> WriterT Builder IO ())
-> WriterT Builder IO () -> WriterT Builder IO ()
forall a b. (a -> b) -> a -> b
$ do 
         let client_info :: ClientInfo
client_info
              = ByteString -> ClientInfo
getDefaultClientInfo (ByteString
_DBMS_NAME ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
_CLIENT_NAME)
         ClientInfo -> ServerInfo -> WriterT Builder IO ()
forall w.
MonoidMap ByteString w =>
ClientInfo -> ServerInfo -> Writer w
writeInfo ClientInfo
client_info ServerInfo
info
      Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
0 -- TODO add write settings
      Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
Stage._COMPLETE
      Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
comp
      ByteString -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
query
    Socket -> ByteString -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> ByteString -> m ()
TCP.sendLazy Socket
sock (Builder -> ByteString
toLazyByteString Builder
r)

sendData :: TCPConnection
            -- ^ To get socket and context
          ->ByteString
            -- ^ table name 
          ->Maybe Block
            -- ^ a block data if any
          ->IO ()
sendData :: TCPConnection -> ByteString -> Maybe Block -> IO ()
sendData TCPConnection {tcpSocket :: TCPConnection -> Socket
tcpSocket = Socket
sock, context :: TCPConnection -> Context
context = Context
ctx} ByteString
table_name Maybe Block
maybe_block = do
  -- TODO: ADD REVISION
  let info :: BlockInfo
info = BlockInfo
Block.defaultBlockInfo
  Builder
r <- WriterT Builder IO () -> IO Builder
forall (m :: * -> *) w a. Monad m => WriterT w m a -> m w
execWriterT (WriterT Builder IO () -> IO Builder)
-> WriterT Builder IO () -> IO Builder
forall a b. (a -> b) -> a -> b
$ do
    Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
Client._DATA
    ByteString -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
table_name
    case Maybe Block
maybe_block of
      Maybe Block
Nothing -> do
        BlockInfo -> WriterT Builder IO ()
Block.writeInfo BlockInfo
info
        Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
0 -- #col
        Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
0 -- #row
      Just Block
block -> do
        Context -> Block -> WriterT Builder IO ()
Block.writeBlockOutputStream Context
ctx Block
block
  Socket -> ByteString -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> ByteString -> m ()
TCP.sendLazy Socket
sock (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ Builder -> ByteString
toLazyByteString Builder
r

-- | Cancel last query sent to server
sendCancel :: TCPConnection -> IO ()
sendCancel :: TCPConnection -> IO ()
sendCancel TCPConnection {tcpSocket :: TCPConnection -> Socket
tcpSocket = Socket
sock} = do
  Builder
c <- WriterT Builder IO () -> IO Builder
forall (m :: * -> *) w a. Monad m => WriterT w m a -> m w
execWriterT (WriterT Builder IO () -> IO Builder)
-> WriterT Builder IO () -> IO Builder
forall a b. (a -> b) -> a -> b
$ Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
Client._CANCEL
  Socket -> ByteString -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> ByteString -> m ()
TCP.sendLazy Socket
sock (Builder -> ByteString
toLazyByteString Builder
c)

processInsertQuery ::
  -- | source
  TCPConnection ->
  -- | query without data
  ByteString ->
  -- | query id
  Maybe ByteString ->
  -- | data in Haskell type.
  [[ClickhouseType]] ->
  -- | return 1 if insertion successfully completed
  IO ByteString
processInsertQuery :: TCPConnection
-> ByteString
-> Maybe ByteString
-> [[ClickhouseType]]
-> IO ByteString
processInsertQuery
  tcp :: TCPConnection
tcp@TCPConnection {tcpSocket :: TCPConnection -> Socket
tcpSocket = Socket
sock, context :: TCPConnection -> Context
context = Context {client_setting :: Context -> Maybe ClientSetting
client_setting = Maybe ClientSetting
client_setting}}
  ByteString
query_without_data
  Maybe ByteString
query_id
  [[ClickhouseType]]
items = do
    TCPConnection -> ByteString -> Maybe ByteString -> IO ()
sendQuery TCPConnection
tcp ByteString
query_without_data Maybe ByteString
query_id
    TCPConnection -> ByteString -> Maybe Block -> IO ()
sendData TCPConnection
tcp ByteString
"" Maybe Block
forall a. Maybe a
Nothing
    Buffer
buf <- Int -> Socket -> IO Buffer
createBuffer Int
2048 Socket
sock
    let info :: ServerInfo
info = case TCPConnection -> Maybe ServerInfo
getServerInfo TCPConnection
tcp of
          Maybe ServerInfo
Nothing -> String -> ServerInfo
forall a. HasCallStack => String -> a
error String
"empty server info"
          Just ServerInfo
s -> ServerInfo
s
    (Packet
sample_block, Buffer
_) <-
      StateT Buffer IO Packet -> Buffer -> IO (Packet, Buffer)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
runStateT
        ( (Packet -> Bool)
-> StateT Buffer IO Packet -> StateT Buffer IO Packet
forall (m :: * -> *) a. Monad m => (a -> Bool) -> m a -> m a
iterateWhile
            ( \case
                Block {Block
queryData :: Block
queryData :: Packet -> Block
..} -> Bool
False
                MultiString (ByteString, ByteString)
_ -> Bool
True
                Packet
Hello -> Bool
True
                Packet
x -> String -> Bool
forall a. HasCallStack => String -> a
error (String
"unexpected packet type: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Packet -> String
forall a. Show a => a -> String
show Packet
x)
            )
            (StateT Buffer IO Packet -> StateT Buffer IO Packet)
-> StateT Buffer IO Packet -> StateT Buffer IO Packet
forall a b. (a -> b) -> a -> b
$ ServerInfo -> StateT Buffer IO Packet
receivePacket ServerInfo
info
        )
        Buffer
buf
    case Maybe ClientSetting
client_setting of
      Maybe ClientSetting
Nothing -> String -> IO ByteString
forall a. HasCallStack => String -> a
error String
"empty client settings"
      Just ClientSetting {insert_block_size :: ClientSetting -> Word
insert_block_size = Word
siz} -> do
        let chunks :: [[[ClickhouseType]]]
chunks = Int -> [[ClickhouseType]] -> [[[ClickhouseType]]]
forall e. Int -> [e] -> [[e]]
chunksOf (Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
siz) [[ClickhouseType]]
items
        ([[ClickhouseType]] -> IO ()) -> [[[ClickhouseType]]] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_
          ( \[[ClickhouseType]]
chunk -> do
              let vectorized :: Vector (Vector ClickhouseType)
vectorized = ([ClickhouseType] -> Vector ClickhouseType)
-> Vector [ClickhouseType] -> Vector (Vector ClickhouseType)
forall a b. (a -> b) -> Vector a -> Vector b
V.map [ClickhouseType] -> Vector ClickhouseType
forall a. [a] -> Vector a
V.fromList ([[ClickhouseType]] -> Vector [ClickhouseType]
forall a. [a] -> Vector a
V.fromList ([[ClickhouseType]] -> Vector [ClickhouseType])
-> [[ClickhouseType]] -> Vector [ClickhouseType]
forall a b. (a -> b) -> a -> b
$ [[ClickhouseType]] -> [[ClickhouseType]]
forall a. [[a]] -> [[a]]
List.transpose [[ClickhouseType]]
chunk)
              let dataBlock :: Block
dataBlock = case Packet
sample_block of
                    Block
                      typeinfo :: Block
typeinfo@ColumnOrientedBlock
                        { columns_with_type :: Block -> Vector (ByteString, ByteString)
columns_with_type = Vector (ByteString, ByteString)
cwt
                        } ->
                        Block
typeinfo
                          { cdata :: Vector (Vector ClickhouseType)
cdata = Vector (Vector ClickhouseType)
vectorized
                          }
                    Packet
x -> String -> Block
forall a. HasCallStack => String -> a
error (String
"unexpected packet type: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Packet -> String
forall a. Show a => a -> String
show Packet
x)
              TCPConnection -> ByteString -> Maybe Block -> IO ()
sendData TCPConnection
tcp ByteString
"" (Block -> Maybe Block
forall a. a -> Maybe a
Just Block
dataBlock)
          )
          [[[ClickhouseType]]]
chunks
        TCPConnection -> ByteString -> Maybe Block -> IO ()
sendData TCPConnection
tcp ByteString
"" Maybe Block
forall a. Maybe a
Nothing
        Buffer
buf2 <- Buffer -> IO Buffer
refill Buffer
buf
        StateT Buffer IO Packet -> Buffer -> IO (Packet, Buffer)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
runStateT (ServerInfo -> StateT Buffer IO Packet
receivePacket ServerInfo
info) Buffer
buf2
        ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
"1"

-- | Read data from stream.
receiveData :: ServerInfo -> Reader Block.Block
receiveData :: ServerInfo -> Reader Block
receiveData info :: ServerInfo
info@ServerInfo {revision :: ServerInfo -> Word
revision = Word
revision} = do
  ByteString
_ <-
    if Word
revision Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
>= Word
_DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES
      then Reader ByteString
readBinaryStr
      else ByteString -> Reader ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
""
  ServerInfo -> Reader Block
Block.readBlockInputStream ServerInfo
info


-- | Transform received query data into Clickhouse type
receiveResult :: ServerInfo
                -- ^ Server information
              -> QueryInfo
               -- ^ Query information
              -> Reader (Either String CKResult)
              -- ^ Receive either error message or query result.
receiveResult :: ServerInfo -> QueryInfo -> Reader (Either String CKResult)
receiveResult ServerInfo
info QueryInfo
query_info = do
  [Packet]
packets <- Reader [Packet]
packetGen
  let onlyDataPacket :: [Packet]
onlyDataPacket = (Packet -> Bool) -> [Packet] -> [Packet]
forall a. (a -> Bool) -> [a] -> [a]
filter Packet -> Bool
isBlock [Packet]
packets
  let errors :: [String]
errors = (\(ErrorMessage String
str) -> String
str) (Packet -> String) -> [Packet] -> [String]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Packet -> Bool) -> [Packet] -> [Packet]
forall a. (a -> Bool) -> [a] -> [a]
filter Packet -> Bool
isError [Packet]
packets
  case [String]
errors of
    [] -> do
      let dataVectors :: [Vector (Vector ClickhouseType)]
dataVectors = Vector (Vector ClickhouseType) -> Vector (Vector ClickhouseType)
Database.ClickHouseDriver.Column.transpose (Vector (Vector ClickhouseType) -> Vector (Vector ClickhouseType))
-> (Packet -> Vector (Vector ClickhouseType))
-> Packet
-> Vector (Vector ClickhouseType)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Block -> Vector (Vector ClickhouseType)
Block.cdata (Block -> Vector (Vector ClickhouseType))
-> (Packet -> Block) -> Packet -> Vector (Vector ClickhouseType)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Packet -> Block
queryData (Packet -> Vector (Vector ClickhouseType))
-> [Packet] -> [Vector (Vector ClickhouseType)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Packet]
onlyDataPacket
      let newQueryInfo :: QueryInfo
newQueryInfo = (QueryInfo -> Packet -> QueryInfo)
-> QueryInfo -> [Packet] -> QueryInfo
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Prelude.foldl QueryInfo -> Packet -> QueryInfo
updateQueryInfo QueryInfo
query_info [Packet]
packets
      Either String CKResult -> Reader (Either String CKResult)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either String CKResult -> Reader (Either String CKResult))
-> Either String CKResult -> Reader (Either String CKResult)
forall a b. (a -> b) -> a -> b
$ CKResult -> Either String CKResult
forall a b. b -> Either a b
Right (CKResult -> Either String CKResult)
-> CKResult -> Either String CKResult
forall a b. (a -> b) -> a -> b
$ Vector (Vector ClickhouseType) -> QueryInfo -> CKResult
CKResult ([Vector (Vector ClickhouseType)] -> Vector (Vector ClickhouseType)
forall a. [Vector a] -> Vector a
V.concat [Vector (Vector ClickhouseType)]
dataVectors) QueryInfo
newQueryInfo
    [String]
xs -> do
      Either String CKResult -> Reader (Either String CKResult)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either String CKResult -> Reader (Either String CKResult))
-> Either String CKResult -> Reader (Either String CKResult)
forall a b. (a -> b) -> a -> b
$ String -> Either String CKResult
forall a b. a -> Either a b
Left (String -> Either String CKResult)
-> String -> Either String CKResult
forall a b. (a -> b) -> a -> b
$ [String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
Prelude.concat [String]
xs
  where
    updateQueryInfo :: QueryInfo -> Packet -> QueryInfo
    updateQueryInfo :: QueryInfo -> Packet -> QueryInfo
updateQueryInfo QueryInfo
q (Progress Progress
prog) =
      QueryInfo -> Progress -> QueryInfo
storeProgress QueryInfo
q Progress
prog
    updateQueryInfo QueryInfo
q (StreamProfileInfo BlockStreamProfileInfo
profile) =
      QueryInfo -> BlockStreamProfileInfo -> QueryInfo
storeProfile QueryInfo
q BlockStreamProfileInfo
profile
    updateQueryInfo QueryInfo
q Packet
_ = QueryInfo
q

    isError :: Packet -> Bool
    isError :: Packet -> Bool
isError (ErrorMessage String
_) = Bool
True
    isError Packet
_ = Bool
False

    isBlock :: Packet -> Bool
    isBlock :: Packet -> Bool
isBlock Block {queryData :: Packet -> Block
queryData = Block.ColumnOrientedBlock {cdata :: Block -> Vector (Vector ClickhouseType)
cdata = Vector (Vector ClickhouseType)
d}} =
      Vector (Vector ClickhouseType) -> Int
forall a. Vector a -> Int
V.length Vector (Vector ClickhouseType)
d Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 Bool -> Bool -> Bool
&& Vector ClickhouseType -> Int
forall a. Vector a -> Int
V.length (Vector (Vector ClickhouseType)
d Vector (Vector ClickhouseType) -> Int -> Vector ClickhouseType
forall a. Vector a -> Int -> a
! Int
0) Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
    isBlock Packet
_ = Bool
False

    packetGen :: Reader [Packet]
    packetGen :: Reader [Packet]
packetGen = do
      Packet
packet <- ServerInfo -> StateT Buffer IO Packet
receivePacket ServerInfo
info
      case Packet
packet of
        Packet
EndOfStream -> [Packet] -> Reader [Packet]
forall (m :: * -> *) a. Monad m => a -> m a
return []
        error :: Packet
error@(ErrorMessage String
_) -> [Packet] -> Reader [Packet]
forall (m :: * -> *) a. Monad m => a -> m a
return [Packet
error]
        Packet
_ -> do
          [Packet]
next <- Reader [Packet]
packetGen
          [Packet] -> Reader [Packet]
forall (m :: * -> *) a. Monad m => a -> m a
return (Packet
packet Packet -> [Packet] -> [Packet]
forall a. a -> [a] -> [a]
: [Packet]
next)

-- | Receive data packet from server 
receivePacket :: ServerInfo -> Reader Packet
receivePacket :: ServerInfo -> StateT Buffer IO Packet
receivePacket ServerInfo
info = do
  Word
packet_type <- StateT Buffer IO Word
readVarInt
  -- The pattern matching does not support match with variable name,
  -- so here we use number instead.
  case Word
packet_type of
    Word
1 -> ServerInfo -> Reader Block
receiveData ServerInfo
info Reader Block
-> (Block -> StateT Buffer IO Packet) -> StateT Buffer IO Packet
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Packet -> StateT Buffer IO Packet
forall (m :: * -> *) a. Monad m => a -> m a
return (Packet -> StateT Buffer IO Packet)
-> (Block -> Packet) -> Block -> StateT Buffer IO Packet
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Block -> Packet
Block) -- Data
    Word
2 -> Maybe String -> Reader ClickhouseException
Error.readException Maybe String
forall a. Maybe a
Nothing Reader ClickhouseException
-> (ClickhouseException -> StateT Buffer IO Packet)
-> StateT Buffer IO Packet
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Packet -> StateT Buffer IO Packet
forall (m :: * -> *) a. Monad m => a -> m a
return (Packet -> StateT Buffer IO Packet)
-> (ClickhouseException -> Packet)
-> ClickhouseException
-> StateT Buffer IO Packet
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Packet
ErrorMessage (String -> Packet)
-> (ClickhouseException -> String) -> ClickhouseException -> Packet
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ClickhouseException -> String
forall a. Show a => a -> String
show) -- Exception
    Word
3 -> Word -> Reader Progress
readProgress (ServerInfo -> Word
revision ServerInfo
info) Reader Progress
-> (Progress -> StateT Buffer IO Packet) -> StateT Buffer IO Packet
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Packet -> StateT Buffer IO Packet
forall (m :: * -> *) a. Monad m => a -> m a
return (Packet -> StateT Buffer IO Packet)
-> (Progress -> Packet) -> Progress -> StateT Buffer IO Packet
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Progress -> Packet
Progress) -- Progress
    Word
5 -> Packet -> StateT Buffer IO Packet
forall (m :: * -> *) a. Monad m => a -> m a
return Packet
EndOfStream -- End of Stream
    Word
6 -> Reader BlockStreamProfileInfo
readBlockStreamProfileInfo Reader BlockStreamProfileInfo
-> (BlockStreamProfileInfo -> StateT Buffer IO Packet)
-> StateT Buffer IO Packet
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Packet -> StateT Buffer IO Packet
forall (m :: * -> *) a. Monad m => a -> m a
return (Packet -> StateT Buffer IO Packet)
-> (BlockStreamProfileInfo -> Packet)
-> BlockStreamProfileInfo
-> StateT Buffer IO Packet
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BlockStreamProfileInfo -> Packet
StreamProfileInfo) --Profile
    Word
7 -> ServerInfo -> Reader Block
receiveData ServerInfo
info Reader Block
-> (Block -> StateT Buffer IO Packet) -> StateT Buffer IO Packet
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Packet -> StateT Buffer IO Packet
forall (m :: * -> *) a. Monad m => a -> m a
return (Packet -> StateT Buffer IO Packet)
-> (Block -> Packet) -> Block -> StateT Buffer IO Packet
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Block -> Packet
Block) -- Total
    Word
8 -> ServerInfo -> Reader Block
receiveData ServerInfo
info Reader Block
-> (Block -> StateT Buffer IO Packet) -> StateT Buffer IO Packet
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Packet -> StateT Buffer IO Packet
forall (m :: * -> *) a. Monad m => a -> m a
return (Packet -> StateT Buffer IO Packet)
-> (Block -> Packet) -> Block -> StateT Buffer IO Packet
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Block -> Packet
Block) -- Extreme
    -- 10 -> return undefined -- Log
    Word
11 -> do
      -- MultiStrings message
      ByteString
first <- Reader ByteString
readBinaryStr
      ByteString
second <- Reader ByteString
readBinaryStr
      Packet -> StateT Buffer IO Packet
forall (m :: * -> *) a. Monad m => a -> m a
return (Packet -> StateT Buffer IO Packet)
-> Packet -> StateT Buffer IO Packet
forall a b. (a -> b) -> a -> b
$ (ByteString, ByteString) -> Packet
MultiString (ByteString
first, ByteString
second)
    Word
0 -> Packet -> StateT Buffer IO Packet
forall (m :: * -> *) a. Monad m => a -> m a
return Packet
Hello -- Hello
    Word
_ -> do
      Reader ()
closeBufferSocket
      String -> StateT Buffer IO Packet
forall a. HasCallStack => String -> a
error (String -> StateT Buffer IO Packet)
-> String -> StateT Buffer IO Packet
forall a b. (a -> b) -> a -> b
$
        ClickhouseException -> String
forall a. Show a => a -> String
show
          ServerException :: String
-> Integer -> Maybe ClickhouseException -> ClickhouseException
Error.ServerException
            { code :: Integer
code = Integer
Error._UNKNOWN_PACKET_FROM_SERVER,
              message :: String
message = String
"Unknown packet from server",
              nested :: Maybe ClickhouseException
nested = Maybe ClickhouseException
forall a. Maybe a
Nothing
            }

closeConnection :: TCPConnection -> IO ()
closeConnection :: TCPConnection -> IO ()
closeConnection TCPConnection {tcpSocket :: TCPConnection -> Socket
tcpSocket = Socket
sock} = Socket -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> m ()
TCP.closeSock Socket
sock

-- | write client information and server infomation to protocols
writeInfo ::
  (MonoidMap ByteString w) =>
  ClientInfo ->
  ServerInfo ->
  Writer w
writeInfo :: ClientInfo -> ServerInfo -> Writer w
writeInfo
  ( ClientInfo
      ByteString
client_name
      Interface
interface
      Word
client_version_major
      Word
client_version_minor
      Word
client_version_patch
      Word
client_revision
      ByteString
initial_user
      ByteString
initial_query_id
      ByteString
initial_address
      ByteString
quota_key
      QueryKind
query_kind
    )
  ServerInfo {revision :: ServerInfo -> Word
revision = Word
server_revision, display_name :: ServerInfo -> ByteString
display_name = ByteString
host_name}
    | Word
server_revision Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
< Word
_DBMS_MIN_REVISION_WITH_CLIENT_INFO =
      String -> Writer w
forall a. HasCallStack => String -> a
error String
"Method writeInfo is called for unsupported server revision"
    | Bool
otherwise = do
      Word -> Writer w
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
1
      ByteString -> Writer w
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
initial_user
      ByteString -> Writer w
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
initial_query_id
      ByteString -> Writer w
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
initial_address
      Word -> Writer w
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt (if Interface
interface Interface -> Interface -> Bool
forall a. Eq a => a -> a -> Bool
== Interface
HTTP then Word
0 else Word
1)
      ByteString -> Writer w
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
"" -- os_user. Seems that haskell modules don't have support of getting system username yet.
      ByteString -> Writer w
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
host_name
      ByteString -> Writer w
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
_CLIENT_NAME --client_name
      Word -> Writer w
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
client_version_major
      Word -> Writer w
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
client_version_minor
      Word -> Writer w
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
client_revision
      Bool -> Writer w -> Writer w
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when
        (Word
server_revision Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
>= Word
_DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO)
        (Writer w -> Writer w) -> Writer w -> Writer w
forall a b. (a -> b) -> a -> b
$ ByteString -> Writer w
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
quota_key
      Bool -> Writer w -> Writer w
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when 
        (Word
server_revision Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
>= Word
_DBMS_MIN_REVISION_WITH_VERSION_PATCH)
        (Writer w -> Writer w) -> Writer w -> Writer w
forall a b. (a -> b) -> a -> b
$ Word -> Writer w
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
client_version_patch

-------------------------------------------------------------------------------------------------------------------
---Helpers
{-# INLINE closeBufferSocket #-}
closeBufferSocket :: Reader ()
closeBufferSocket :: Reader ()
closeBufferSocket = do
  Buffer
buf <- StateT Buffer IO Buffer
forall s (m :: * -> *). MonadState s m => m s
get
  let sock :: Maybe Socket
sock = Buffer -> Maybe Socket
Database.ClickHouseDriver.IO.BufferedReader.socket Buffer
buf
  Maybe Socket -> (Socket -> Reader ()) -> Reader ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe Socket
sock Socket -> Reader ()
forall (m :: * -> *). MonadIO m => Socket -> m ()
TCP.closeSock