{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE BlockArguments #-}
module Database.ClickHouseDriver.Connection
( tcpConnect,
sendQuery,
receiveData,
sendData,
receiveResult,
closeConnection,
processInsertQuery,
ping',
versionTuple,
sendCancel,
)
where
import qualified Database.ClickHouseDriver.Block as Block
import qualified Database.ClickHouseDriver.ClientProtocol as Client
import Database.ClickHouseDriver.Column (transpose)
import Database.ClickHouseDriver.Defines
( _BUFFER_SIZE,
_CLIENT_NAME,
_CLIENT_REVISION,
_CLIENT_VERSION_MAJOR,
_CLIENT_VERSION_MINOR,
_DBMS_MIN_REVISION_WITH_CLIENT_INFO,
_DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO,
_DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME,
_DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE,
_DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES,
_DBMS_MIN_REVISION_WITH_VERSION_PATCH,
_DBMS_NAME,
_DEFAULT_INSERT_BLOCK_SIZE,
_STRINGS_ENCODING,
)
import qualified Database.ClickHouseDriver.Error as Error
import qualified Database.ClickHouseDriver.QueryProcessingStage as Stage
import qualified Database.ClickHouseDriver.ServerProtocol as Server
import Database.ClickHouseDriver.Types
( Block (ColumnOrientedBlock, cdata),
CKResult (CKResult),
ClientInfo (ClientInfo),
ClientSetting
( ClientSetting,
insert_block_size,
strings_as_bytes,
strings_encoding
),
Context (Context, client_info, client_setting, server_info),
Interface (HTTP),
Packet
( Block,
EndOfStream,
ErrorMessage,
Hello,
MultiString,
Progress,
StreamProfileInfo,
queryData
),
QueryInfo,
ServerInfo (..),
TCPConnection (..),
getDefaultClientInfo,
getServerInfo,
readBlockStreamProfileInfo,
readProgress,
storeProfile,
storeProgress,
ClickhouseType(..)
)
import Database.ClickHouseDriver.IO.BufferedReader
( Buffer (socket),
Reader,
createBuffer,
readBinaryStr,
readVarInt,
refill,
)
import Database.ClickHouseDriver.IO.BufferedWriter
( MonoidMap,
Writer,
writeBinaryStr,
writeVarUInt,
)
import Control.Monad.Loops (iterateWhile)
import Control.Monad.State.Lazy (get, runStateT)
import Control.Monad.Writer (WriterT (runWriterT), execWriterT)
import Control.Monad (when)
import Data.Maybe (fromMaybe)
import Data.ByteString (ByteString)
import Data.Foldable (forM_)
import Data.ByteString.Builder
( Builder,
toLazyByteString,
)
import Data.ByteString.Char8 (unpack)
import qualified Data.List as List (transpose)
import Data.List.Split (chunksOf)
import Data.Vector ((!))
import qualified Data.Vector as V
( concat,
fromList,
length,
map,
)
import qualified Network.Simple.TCP as TCP
( closeSock,
connectSock,
sendLazy,
)
import Network.Socket (Socket)
import System.Timeout (timeout)
versionTuple :: ServerInfo -> (Word, Word, Word)
versionTuple :: ServerInfo -> (Word, Word, Word)
versionTuple (ServerInfo ByteString
_ Word
major Word
minor Word
patch Word
_ Maybe ByteString
_ ByteString
_) = (Word
major, Word
minor, Word
patch)
ping' :: Int
->TCPConnection
->IO (Maybe String)
ping' :: Int -> TCPConnection -> IO (Maybe String)
ping' Int
timeLimit TCPConnection {tcpHost :: TCPConnection -> ByteString
tcpHost = ByteString
host, tcpPort :: TCPConnection -> ByteString
tcpPort = ByteString
port, tcpSocket :: TCPConnection -> Socket
tcpSocket = Socket
sock} =
Int -> IO String -> IO (Maybe String)
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
timeLimit (IO String -> IO (Maybe String)) -> IO String -> IO (Maybe String)
forall a b. (a -> b) -> a -> b
$ do
Builder
r <- WriterT Builder IO () -> IO Builder
forall (m :: * -> *) w a. Monad m => WriterT w m a -> m w
execWriterT (WriterT Builder IO () -> IO Builder)
-> WriterT Builder IO () -> IO Builder
forall a b. (a -> b) -> a -> b
$ Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
Client._PING
Socket -> ByteString -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> ByteString -> m ()
TCP.sendLazy Socket
sock (Builder -> ByteString
toLazyByteString Builder
r)
Buffer
buf <- Int -> Socket -> IO Buffer
createBuffer Int
1024 Socket
sock
(Word
packet_type, Buffer
_) <- StateT Buffer IO Word -> Buffer -> IO (Word, Buffer)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
runStateT ((Word -> Bool) -> StateT Buffer IO Word -> StateT Buffer IO Word
forall (m :: * -> *) a. Monad m => (a -> Bool) -> m a -> m a
iterateWhile (Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== Word
Server._PROGRESS) StateT Buffer IO Word
readVarInt) Buffer
buf
if Word
packet_type Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
/= Word
Server._PONG
then do
let p_type :: ByteString
p_type = Int -> ByteString
Server.toString (Int -> ByteString) -> Int -> ByteString
forall a b. (a -> b) -> a -> b
$ Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
packet_type
let report :: String
report =
String
"Unexpected packet from server " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
forall a. Show a => a -> String
show ByteString
host String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
":"
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
forall a. Show a => a -> String
show ByteString
port
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
", expected "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"Pong!"
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
", got "
String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
forall a. Show a => a -> String
show ByteString
p_type
String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$
ClickhouseException -> String
forall a. Show a => a -> String
show
ServerException :: String
-> Integer -> Maybe ClickhouseException -> ClickhouseException
Error.ServerException
{ code :: Integer
code = Integer
Error._UNEXPECTED_PACKET_FROM_SERVER,
message :: String
message = String
report,
nested :: Maybe ClickhouseException
nested = Maybe ClickhouseException
forall a. Maybe a
Nothing
}
else String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return String
"PONG!"
sendHello :: (ByteString, ByteString, ByteString)
->Socket
->IO ()
sendHello :: (ByteString, ByteString, ByteString) -> Socket -> IO ()
sendHello (ByteString
database, ByteString
username, ByteString
password) Socket
sock = do
(()
_, Builder
w) <- WriterT Builder IO () -> IO ((), Builder)
forall w (m :: * -> *) a. WriterT w m a -> m (a, w)
runWriterT WriterT Builder IO ()
writeHello
Socket -> ByteString -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> ByteString -> m ()
TCP.sendLazy Socket
sock (Builder -> ByteString
toLazyByteString Builder
w)
where
writeHello :: Writer Builder
writeHello :: WriterT Builder IO ()
writeHello = do
Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
Client._HELLO
ByteString -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr (ByteString
"ClickHouse " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
_CLIENT_NAME)
Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
_CLIENT_VERSION_MAJOR
Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
_CLIENT_VERSION_MINOR
Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
_CLIENT_REVISION
ByteString -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
database
ByteString -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
username
ByteString -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
password
receiveHello :: Buffer
->IO (Either ByteString ServerInfo)
receiveHello :: Buffer -> IO (Either ByteString ServerInfo)
receiveHello Buffer
buf = do
(Either ByteString ServerInfo
res, Buffer
_) <- StateT Buffer IO (Either ByteString ServerInfo)
-> Buffer -> IO (Either ByteString ServerInfo, Buffer)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
runStateT StateT Buffer IO (Either ByteString ServerInfo)
receiveHello' Buffer
buf
Either ByteString ServerInfo -> IO (Either ByteString ServerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return Either ByteString ServerInfo
res
where
receiveHello' :: Reader (Either ByteString ServerInfo)
receiveHello' :: StateT Buffer IO (Either ByteString ServerInfo)
receiveHello' = do
Word
packet_type <- StateT Buffer IO Word
readVarInt
if Word
packet_type Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== Word
Server._HELLO
then do
ByteString
server_name <- Reader ByteString
readBinaryStr
Word
server_version_major <- StateT Buffer IO Word
readVarInt
Word
server_version_minor <- StateT Buffer IO Word
readVarInt
Word
server_revision <- StateT Buffer IO Word
readVarInt
Maybe ByteString
server_timezone <-
if Word
server_revision Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
>= Word
_DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE
then do ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString)
-> Reader ByteString -> StateT Buffer IO (Maybe ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Reader ByteString
readBinaryStr
else Maybe ByteString -> StateT Buffer IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ByteString
forall a. Maybe a
Nothing
ByteString
server_display_name <-
if Word
server_revision Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
>= Word
_DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME
then do
Reader ByteString
readBinaryStr
else ByteString -> Reader ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
""
Word
server_version_dispatch <-
if Word
server_revision Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
>= Word
_DBMS_MIN_REVISION_WITH_VERSION_PATCH
then do
StateT Buffer IO Word
readVarInt
else Word -> StateT Buffer IO Word
forall (m :: * -> *) a. Monad m => a -> m a
return Word
server_revision
Either ByteString ServerInfo
-> StateT Buffer IO (Either ByteString ServerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ByteString ServerInfo
-> StateT Buffer IO (Either ByteString ServerInfo))
-> Either ByteString ServerInfo
-> StateT Buffer IO (Either ByteString ServerInfo)
forall a b. (a -> b) -> a -> b
$
ServerInfo -> Either ByteString ServerInfo
forall a b. b -> Either a b
Right
ServerInfo :: ByteString
-> Word
-> Word
-> Word
-> Word
-> Maybe ByteString
-> ByteString
-> ServerInfo
ServerInfo
{ name :: ByteString
name = ByteString
server_name,
version_major :: Word
version_major = Word
server_version_major,
version_minor :: Word
version_minor = Word
server_version_minor,
version_patch :: Word
version_patch = Word
server_version_dispatch,
revision :: Word
revision = Word
server_revision,
timezone :: Maybe ByteString
timezone = Maybe ByteString
server_timezone,
display_name :: ByteString
display_name = ByteString
server_display_name
}
else
if Word
packet_type Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== Word
Server._EXCEPTION
then do
ByteString
e <- Reader ByteString
readBinaryStr
ByteString
e2 <- Reader ByteString
readBinaryStr
ByteString
e3 <- Reader ByteString
readBinaryStr
Either ByteString ServerInfo
-> StateT Buffer IO (Either ByteString ServerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ByteString ServerInfo
-> StateT Buffer IO (Either ByteString ServerInfo))
-> Either ByteString ServerInfo
-> StateT Buffer IO (Either ByteString ServerInfo)
forall a b. (a -> b) -> a -> b
$ ByteString -> Either ByteString ServerInfo
forall a b. a -> Either a b
Left (ByteString
"exception" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
e ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
e2 ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
e3)
else Either ByteString ServerInfo
-> StateT Buffer IO (Either ByteString ServerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ByteString ServerInfo
-> StateT Buffer IO (Either ByteString ServerInfo))
-> Either ByteString ServerInfo
-> StateT Buffer IO (Either ByteString ServerInfo)
forall a b. (a -> b) -> a -> b
$ ByteString -> Either ByteString ServerInfo
forall a b. a -> Either a b
Left ByteString
"Error"
tcpConnect ::
ByteString ->
ByteString ->
ByteString ->
ByteString ->
ByteString ->
Bool ->
IO (Either String TCPConnection)
tcpConnect :: ByteString
-> ByteString
-> ByteString
-> ByteString
-> ByteString
-> Bool
-> IO (Either String TCPConnection)
tcpConnect ByteString
host ByteString
port ByteString
user ByteString
password ByteString
database Bool
compression = do
(Socket
sock, SockAddr
sockaddr) <- String -> String -> IO (Socket, SockAddr)
forall (m :: * -> *).
MonadIO m =>
String -> String -> m (Socket, SockAddr)
TCP.connectSock (ByteString -> String
unpack ByteString
host) (ByteString -> String
unpack ByteString
port)
(ByteString, ByteString, ByteString) -> Socket -> IO ()
sendHello (ByteString
database, ByteString
user, ByteString
password) Socket
sock
let isCompressed :: Word
isCompressed = if Bool
compression then Word
Client._COMPRESSION_ENABLE else Word
Client._COMPRESSION_DISABLE
Buffer
buf <- Int -> Socket -> IO Buffer
createBuffer Int
_BUFFER_SIZE Socket
sock
Either ByteString ServerInfo
hello <- Buffer -> IO (Either ByteString ServerInfo)
receiveHello Buffer
buf
case Either ByteString ServerInfo
hello of
Right ServerInfo
x ->
Either String TCPConnection -> IO (Either String TCPConnection)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either String TCPConnection -> IO (Either String TCPConnection))
-> Either String TCPConnection -> IO (Either String TCPConnection)
forall a b. (a -> b) -> a -> b
$
TCPConnection -> Either String TCPConnection
forall a b. b -> Either a b
Right
TCPConnection :: ByteString
-> ByteString
-> ByteString
-> ByteString
-> Socket
-> SockAddr
-> Context
-> Word
-> TCPConnection
TCPConnection
{ tcpHost :: ByteString
tcpHost = ByteString
host,
tcpPort :: ByteString
tcpPort = ByteString
port,
tcpUsername :: ByteString
tcpUsername = ByteString
user,
tcpPassword :: ByteString
tcpPassword = ByteString
password,
tcpSocket :: Socket
tcpSocket = Socket
sock,
tcpSockAdrr :: SockAddr
tcpSockAdrr = SockAddr
sockaddr,
context :: Context
context =
Context :: Maybe ClientInfo
-> Maybe ServerInfo -> Maybe ClientSetting -> Context
Context
{ server_info :: Maybe ServerInfo
server_info = ServerInfo -> Maybe ServerInfo
forall a. a -> Maybe a
Just ServerInfo
x,
client_info :: Maybe ClientInfo
client_info = Maybe ClientInfo
forall a. Maybe a
Nothing,
client_setting :: Maybe ClientSetting
client_setting =
ClientSetting -> Maybe ClientSetting
forall a. a -> Maybe a
Just (ClientSetting -> Maybe ClientSetting)
-> ClientSetting -> Maybe ClientSetting
forall a b. (a -> b) -> a -> b
$
ClientSetting :: Word -> Bool -> ByteString -> ClientSetting
ClientSetting
{ insert_block_size :: Word
insert_block_size = Word
_DEFAULT_INSERT_BLOCK_SIZE,
strings_as_bytes :: Bool
strings_as_bytes = Bool
False,
strings_encoding :: ByteString
strings_encoding = ByteString
_STRINGS_ENCODING
}
},
tcpCompression :: Word
tcpCompression = Word
isCompressed
}
Left ByteString
"Exception" -> Either String TCPConnection -> IO (Either String TCPConnection)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either String TCPConnection -> IO (Either String TCPConnection))
-> Either String TCPConnection -> IO (Either String TCPConnection)
forall a b. (a -> b) -> a -> b
$ String -> Either String TCPConnection
forall a b. a -> Either a b
Left String
"exception"
Left ByteString
x -> do
ByteString -> IO ()
forall a. Show a => a -> IO ()
print (ByteString
"error is " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
x)
Socket -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> m ()
TCP.closeSock Socket
sock
Either String TCPConnection -> IO (Either String TCPConnection)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either String TCPConnection -> IO (Either String TCPConnection))
-> Either String TCPConnection -> IO (Either String TCPConnection)
forall a b. (a -> b) -> a -> b
$ String -> Either String TCPConnection
forall a b. a -> Either a b
Left String
"Connection error"
sendQuery :: TCPConnection
->ByteString
->Maybe ByteString
->IO ()
sendQuery :: TCPConnection -> ByteString -> Maybe ByteString -> IO ()
sendQuery TCPConnection {context :: TCPConnection -> Context
context = Context {server_info :: Context -> Maybe ServerInfo
server_info = Maybe ServerInfo
Nothing}} ByteString
_ Maybe ByteString
_ = String -> IO ()
forall a. HasCallStack => String -> a
error String
"Empty server info"
sendQuery
TCPConnection
{ tcpCompression :: TCPConnection -> Word
tcpCompression = Word
comp,
tcpSocket :: TCPConnection -> Socket
tcpSocket = Socket
sock,
context :: TCPConnection -> Context
context =
Context
{ server_info :: Context -> Maybe ServerInfo
server_info = Just ServerInfo
info
}
}
ByteString
query
Maybe ByteString
query_id = do
(()
_, Builder
r) <- WriterT Builder IO () -> IO ((), Builder)
forall w (m :: * -> *) a. WriterT w m a -> m (a, w)
runWriterT (WriterT Builder IO () -> IO ((), Builder))
-> WriterT Builder IO () -> IO ((), Builder)
forall a b. (a -> b) -> a -> b
$ do
Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
Client._QUERY
ByteString -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr (ByteString -> WriterT Builder IO ())
-> ByteString -> WriterT Builder IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Maybe ByteString -> ByteString
forall a. a -> Maybe a -> a
fromMaybe ByteString
"" Maybe ByteString
query_id
let revision' :: Word
revision' = ServerInfo -> Word
revision ServerInfo
info
Bool -> WriterT Builder IO () -> WriterT Builder IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word
revision' Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
>= Word
_DBMS_MIN_REVISION_WITH_CLIENT_INFO)
(WriterT Builder IO () -> WriterT Builder IO ())
-> WriterT Builder IO () -> WriterT Builder IO ()
forall a b. (a -> b) -> a -> b
$ do
let client_info :: ClientInfo
client_info
= ByteString -> ClientInfo
getDefaultClientInfo (ByteString
_DBMS_NAME ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
" " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
_CLIENT_NAME)
ClientInfo -> ServerInfo -> WriterT Builder IO ()
forall w.
MonoidMap ByteString w =>
ClientInfo -> ServerInfo -> Writer w
writeInfo ClientInfo
client_info ServerInfo
info
Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
0
Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
Stage._COMPLETE
Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
comp
ByteString -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
query
Socket -> ByteString -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> ByteString -> m ()
TCP.sendLazy Socket
sock (Builder -> ByteString
toLazyByteString Builder
r)
sendData :: TCPConnection
->ByteString
->Maybe Block
->IO ()
sendData :: TCPConnection -> ByteString -> Maybe Block -> IO ()
sendData TCPConnection {tcpSocket :: TCPConnection -> Socket
tcpSocket = Socket
sock, context :: TCPConnection -> Context
context = Context
ctx} ByteString
table_name Maybe Block
maybe_block = do
let info :: BlockInfo
info = BlockInfo
Block.defaultBlockInfo
Builder
r <- WriterT Builder IO () -> IO Builder
forall (m :: * -> *) w a. Monad m => WriterT w m a -> m w
execWriterT (WriterT Builder IO () -> IO Builder)
-> WriterT Builder IO () -> IO Builder
forall a b. (a -> b) -> a -> b
$ do
Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
Client._DATA
ByteString -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
table_name
case Maybe Block
maybe_block of
Maybe Block
Nothing -> do
BlockInfo -> WriterT Builder IO ()
Block.writeInfo BlockInfo
info
Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
0
Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
0
Just Block
block -> do
Context -> Block -> WriterT Builder IO ()
Block.writeBlockOutputStream Context
ctx Block
block
Socket -> ByteString -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> ByteString -> m ()
TCP.sendLazy Socket
sock (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ Builder -> ByteString
toLazyByteString Builder
r
sendCancel :: TCPConnection -> IO ()
sendCancel :: TCPConnection -> IO ()
sendCancel TCPConnection {tcpSocket :: TCPConnection -> Socket
tcpSocket = Socket
sock} = do
Builder
c <- WriterT Builder IO () -> IO Builder
forall (m :: * -> *) w a. Monad m => WriterT w m a -> m w
execWriterT (WriterT Builder IO () -> IO Builder)
-> WriterT Builder IO () -> IO Builder
forall a b. (a -> b) -> a -> b
$ Word -> WriterT Builder IO ()
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
Client._CANCEL
Socket -> ByteString -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> ByteString -> m ()
TCP.sendLazy Socket
sock (Builder -> ByteString
toLazyByteString Builder
c)
processInsertQuery ::
TCPConnection ->
ByteString ->
Maybe ByteString ->
[[ClickhouseType]] ->
IO ByteString
processInsertQuery :: TCPConnection
-> ByteString
-> Maybe ByteString
-> [[ClickhouseType]]
-> IO ByteString
processInsertQuery
tcp :: TCPConnection
tcp@TCPConnection {tcpSocket :: TCPConnection -> Socket
tcpSocket = Socket
sock, context :: TCPConnection -> Context
context = Context {client_setting :: Context -> Maybe ClientSetting
client_setting = Maybe ClientSetting
client_setting}}
ByteString
query_without_data
Maybe ByteString
query_id
[[ClickhouseType]]
items = do
TCPConnection -> ByteString -> Maybe ByteString -> IO ()
sendQuery TCPConnection
tcp ByteString
query_without_data Maybe ByteString
query_id
TCPConnection -> ByteString -> Maybe Block -> IO ()
sendData TCPConnection
tcp ByteString
"" Maybe Block
forall a. Maybe a
Nothing
Buffer
buf <- Int -> Socket -> IO Buffer
createBuffer Int
2048 Socket
sock
let info :: ServerInfo
info = case TCPConnection -> Maybe ServerInfo
getServerInfo TCPConnection
tcp of
Maybe ServerInfo
Nothing -> String -> ServerInfo
forall a. HasCallStack => String -> a
error String
"empty server info"
Just ServerInfo
s -> ServerInfo
s
(Packet
sample_block, Buffer
_) <-
StateT Buffer IO Packet -> Buffer -> IO (Packet, Buffer)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
runStateT
( (Packet -> Bool)
-> StateT Buffer IO Packet -> StateT Buffer IO Packet
forall (m :: * -> *) a. Monad m => (a -> Bool) -> m a -> m a
iterateWhile
( \case
Block {Block
queryData :: Block
queryData :: Packet -> Block
..} -> Bool
False
MultiString (ByteString, ByteString)
_ -> Bool
True
Packet
Hello -> Bool
True
Packet
x -> String -> Bool
forall a. HasCallStack => String -> a
error (String
"unexpected packet type: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Packet -> String
forall a. Show a => a -> String
show Packet
x)
)
(StateT Buffer IO Packet -> StateT Buffer IO Packet)
-> StateT Buffer IO Packet -> StateT Buffer IO Packet
forall a b. (a -> b) -> a -> b
$ ServerInfo -> StateT Buffer IO Packet
receivePacket ServerInfo
info
)
Buffer
buf
case Maybe ClientSetting
client_setting of
Maybe ClientSetting
Nothing -> String -> IO ByteString
forall a. HasCallStack => String -> a
error String
"empty client settings"
Just ClientSetting {insert_block_size :: ClientSetting -> Word
insert_block_size = Word
siz} -> do
let chunks :: [[[ClickhouseType]]]
chunks = Int -> [[ClickhouseType]] -> [[[ClickhouseType]]]
forall e. Int -> [e] -> [[e]]
chunksOf (Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
siz) [[ClickhouseType]]
items
([[ClickhouseType]] -> IO ()) -> [[[ClickhouseType]]] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_
( \[[ClickhouseType]]
chunk -> do
let vectorized :: Vector (Vector ClickhouseType)
vectorized = ([ClickhouseType] -> Vector ClickhouseType)
-> Vector [ClickhouseType] -> Vector (Vector ClickhouseType)
forall a b. (a -> b) -> Vector a -> Vector b
V.map [ClickhouseType] -> Vector ClickhouseType
forall a. [a] -> Vector a
V.fromList ([[ClickhouseType]] -> Vector [ClickhouseType]
forall a. [a] -> Vector a
V.fromList ([[ClickhouseType]] -> Vector [ClickhouseType])
-> [[ClickhouseType]] -> Vector [ClickhouseType]
forall a b. (a -> b) -> a -> b
$ [[ClickhouseType]] -> [[ClickhouseType]]
forall a. [[a]] -> [[a]]
List.transpose [[ClickhouseType]]
chunk)
let dataBlock :: Block
dataBlock = case Packet
sample_block of
Block
typeinfo :: Block
typeinfo@ColumnOrientedBlock
{ columns_with_type :: Block -> Vector (ByteString, ByteString)
columns_with_type = Vector (ByteString, ByteString)
cwt
} ->
Block
typeinfo
{ cdata :: Vector (Vector ClickhouseType)
cdata = Vector (Vector ClickhouseType)
vectorized
}
Packet
x -> String -> Block
forall a. HasCallStack => String -> a
error (String
"unexpected packet type: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Packet -> String
forall a. Show a => a -> String
show Packet
x)
TCPConnection -> ByteString -> Maybe Block -> IO ()
sendData TCPConnection
tcp ByteString
"" (Block -> Maybe Block
forall a. a -> Maybe a
Just Block
dataBlock)
)
[[[ClickhouseType]]]
chunks
TCPConnection -> ByteString -> Maybe Block -> IO ()
sendData TCPConnection
tcp ByteString
"" Maybe Block
forall a. Maybe a
Nothing
Buffer
buf2 <- Buffer -> IO Buffer
refill Buffer
buf
StateT Buffer IO Packet -> Buffer -> IO (Packet, Buffer)
forall s (m :: * -> *) a. StateT s m a -> s -> m (a, s)
runStateT (ServerInfo -> StateT Buffer IO Packet
receivePacket ServerInfo
info) Buffer
buf2
ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
"1"
receiveData :: ServerInfo -> Reader Block.Block
receiveData :: ServerInfo -> Reader Block
receiveData info :: ServerInfo
info@ServerInfo {revision :: ServerInfo -> Word
revision = Word
revision} = do
ByteString
_ <-
if Word
revision Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
>= Word
_DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES
then Reader ByteString
readBinaryStr
else ByteString -> Reader ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
""
ServerInfo -> Reader Block
Block.readBlockInputStream ServerInfo
info
receiveResult :: ServerInfo
-> QueryInfo
-> Reader (Either String CKResult)
receiveResult :: ServerInfo -> QueryInfo -> Reader (Either String CKResult)
receiveResult ServerInfo
info QueryInfo
query_info = do
[Packet]
packets <- Reader [Packet]
packetGen
let onlyDataPacket :: [Packet]
onlyDataPacket = (Packet -> Bool) -> [Packet] -> [Packet]
forall a. (a -> Bool) -> [a] -> [a]
filter Packet -> Bool
isBlock [Packet]
packets
let errors :: [String]
errors = (\(ErrorMessage String
str) -> String
str) (Packet -> String) -> [Packet] -> [String]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Packet -> Bool) -> [Packet] -> [Packet]
forall a. (a -> Bool) -> [a] -> [a]
filter Packet -> Bool
isError [Packet]
packets
case [String]
errors of
[] -> do
let dataVectors :: [Vector (Vector ClickhouseType)]
dataVectors = Vector (Vector ClickhouseType) -> Vector (Vector ClickhouseType)
Database.ClickHouseDriver.Column.transpose (Vector (Vector ClickhouseType) -> Vector (Vector ClickhouseType))
-> (Packet -> Vector (Vector ClickhouseType))
-> Packet
-> Vector (Vector ClickhouseType)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Block -> Vector (Vector ClickhouseType)
Block.cdata (Block -> Vector (Vector ClickhouseType))
-> (Packet -> Block) -> Packet -> Vector (Vector ClickhouseType)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Packet -> Block
queryData (Packet -> Vector (Vector ClickhouseType))
-> [Packet] -> [Vector (Vector ClickhouseType)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Packet]
onlyDataPacket
let newQueryInfo :: QueryInfo
newQueryInfo = (QueryInfo -> Packet -> QueryInfo)
-> QueryInfo -> [Packet] -> QueryInfo
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
Prelude.foldl QueryInfo -> Packet -> QueryInfo
updateQueryInfo QueryInfo
query_info [Packet]
packets
Either String CKResult -> Reader (Either String CKResult)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either String CKResult -> Reader (Either String CKResult))
-> Either String CKResult -> Reader (Either String CKResult)
forall a b. (a -> b) -> a -> b
$ CKResult -> Either String CKResult
forall a b. b -> Either a b
Right (CKResult -> Either String CKResult)
-> CKResult -> Either String CKResult
forall a b. (a -> b) -> a -> b
$ Vector (Vector ClickhouseType) -> QueryInfo -> CKResult
CKResult ([Vector (Vector ClickhouseType)] -> Vector (Vector ClickhouseType)
forall a. [Vector a] -> Vector a
V.concat [Vector (Vector ClickhouseType)]
dataVectors) QueryInfo
newQueryInfo
[String]
xs -> do
Either String CKResult -> Reader (Either String CKResult)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either String CKResult -> Reader (Either String CKResult))
-> Either String CKResult -> Reader (Either String CKResult)
forall a b. (a -> b) -> a -> b
$ String -> Either String CKResult
forall a b. a -> Either a b
Left (String -> Either String CKResult)
-> String -> Either String CKResult
forall a b. (a -> b) -> a -> b
$ [String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
Prelude.concat [String]
xs
where
updateQueryInfo :: QueryInfo -> Packet -> QueryInfo
updateQueryInfo :: QueryInfo -> Packet -> QueryInfo
updateQueryInfo QueryInfo
q (Progress Progress
prog) =
QueryInfo -> Progress -> QueryInfo
storeProgress QueryInfo
q Progress
prog
updateQueryInfo QueryInfo
q (StreamProfileInfo BlockStreamProfileInfo
profile) =
QueryInfo -> BlockStreamProfileInfo -> QueryInfo
storeProfile QueryInfo
q BlockStreamProfileInfo
profile
updateQueryInfo QueryInfo
q Packet
_ = QueryInfo
q
isError :: Packet -> Bool
isError :: Packet -> Bool
isError (ErrorMessage String
_) = Bool
True
isError Packet
_ = Bool
False
isBlock :: Packet -> Bool
isBlock :: Packet -> Bool
isBlock Block {queryData :: Packet -> Block
queryData = Block.ColumnOrientedBlock {cdata :: Block -> Vector (Vector ClickhouseType)
cdata = Vector (Vector ClickhouseType)
d}} =
Vector (Vector ClickhouseType) -> Int
forall a. Vector a -> Int
V.length Vector (Vector ClickhouseType)
d Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 Bool -> Bool -> Bool
&& Vector ClickhouseType -> Int
forall a. Vector a -> Int
V.length (Vector (Vector ClickhouseType)
d Vector (Vector ClickhouseType) -> Int -> Vector ClickhouseType
forall a. Vector a -> Int -> a
! Int
0) Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
isBlock Packet
_ = Bool
False
packetGen :: Reader [Packet]
packetGen :: Reader [Packet]
packetGen = do
Packet
packet <- ServerInfo -> StateT Buffer IO Packet
receivePacket ServerInfo
info
case Packet
packet of
Packet
EndOfStream -> [Packet] -> Reader [Packet]
forall (m :: * -> *) a. Monad m => a -> m a
return []
error :: Packet
error@(ErrorMessage String
_) -> [Packet] -> Reader [Packet]
forall (m :: * -> *) a. Monad m => a -> m a
return [Packet
error]
Packet
_ -> do
[Packet]
next <- Reader [Packet]
packetGen
[Packet] -> Reader [Packet]
forall (m :: * -> *) a. Monad m => a -> m a
return (Packet
packet Packet -> [Packet] -> [Packet]
forall a. a -> [a] -> [a]
: [Packet]
next)
receivePacket :: ServerInfo -> Reader Packet
receivePacket :: ServerInfo -> StateT Buffer IO Packet
receivePacket ServerInfo
info = do
Word
packet_type <- StateT Buffer IO Word
readVarInt
case Word
packet_type of
Word
1 -> ServerInfo -> Reader Block
receiveData ServerInfo
info Reader Block
-> (Block -> StateT Buffer IO Packet) -> StateT Buffer IO Packet
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Packet -> StateT Buffer IO Packet
forall (m :: * -> *) a. Monad m => a -> m a
return (Packet -> StateT Buffer IO Packet)
-> (Block -> Packet) -> Block -> StateT Buffer IO Packet
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Block -> Packet
Block)
Word
2 -> Maybe String -> Reader ClickhouseException
Error.readException Maybe String
forall a. Maybe a
Nothing Reader ClickhouseException
-> (ClickhouseException -> StateT Buffer IO Packet)
-> StateT Buffer IO Packet
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Packet -> StateT Buffer IO Packet
forall (m :: * -> *) a. Monad m => a -> m a
return (Packet -> StateT Buffer IO Packet)
-> (ClickhouseException -> Packet)
-> ClickhouseException
-> StateT Buffer IO Packet
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Packet
ErrorMessage (String -> Packet)
-> (ClickhouseException -> String) -> ClickhouseException -> Packet
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ClickhouseException -> String
forall a. Show a => a -> String
show)
Word
3 -> Word -> Reader Progress
readProgress (ServerInfo -> Word
revision ServerInfo
info) Reader Progress
-> (Progress -> StateT Buffer IO Packet) -> StateT Buffer IO Packet
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Packet -> StateT Buffer IO Packet
forall (m :: * -> *) a. Monad m => a -> m a
return (Packet -> StateT Buffer IO Packet)
-> (Progress -> Packet) -> Progress -> StateT Buffer IO Packet
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Progress -> Packet
Progress)
Word
5 -> Packet -> StateT Buffer IO Packet
forall (m :: * -> *) a. Monad m => a -> m a
return Packet
EndOfStream
Word
6 -> Reader BlockStreamProfileInfo
readBlockStreamProfileInfo Reader BlockStreamProfileInfo
-> (BlockStreamProfileInfo -> StateT Buffer IO Packet)
-> StateT Buffer IO Packet
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Packet -> StateT Buffer IO Packet
forall (m :: * -> *) a. Monad m => a -> m a
return (Packet -> StateT Buffer IO Packet)
-> (BlockStreamProfileInfo -> Packet)
-> BlockStreamProfileInfo
-> StateT Buffer IO Packet
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BlockStreamProfileInfo -> Packet
StreamProfileInfo)
Word
7 -> ServerInfo -> Reader Block
receiveData ServerInfo
info Reader Block
-> (Block -> StateT Buffer IO Packet) -> StateT Buffer IO Packet
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Packet -> StateT Buffer IO Packet
forall (m :: * -> *) a. Monad m => a -> m a
return (Packet -> StateT Buffer IO Packet)
-> (Block -> Packet) -> Block -> StateT Buffer IO Packet
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Block -> Packet
Block)
Word
8 -> ServerInfo -> Reader Block
receiveData ServerInfo
info Reader Block
-> (Block -> StateT Buffer IO Packet) -> StateT Buffer IO Packet
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Packet -> StateT Buffer IO Packet
forall (m :: * -> *) a. Monad m => a -> m a
return (Packet -> StateT Buffer IO Packet)
-> (Block -> Packet) -> Block -> StateT Buffer IO Packet
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Block -> Packet
Block)
Word
11 -> do
ByteString
first <- Reader ByteString
readBinaryStr
ByteString
second <- Reader ByteString
readBinaryStr
Packet -> StateT Buffer IO Packet
forall (m :: * -> *) a. Monad m => a -> m a
return (Packet -> StateT Buffer IO Packet)
-> Packet -> StateT Buffer IO Packet
forall a b. (a -> b) -> a -> b
$ (ByteString, ByteString) -> Packet
MultiString (ByteString
first, ByteString
second)
Word
0 -> Packet -> StateT Buffer IO Packet
forall (m :: * -> *) a. Monad m => a -> m a
return Packet
Hello
Word
_ -> do
Reader ()
closeBufferSocket
String -> StateT Buffer IO Packet
forall a. HasCallStack => String -> a
error (String -> StateT Buffer IO Packet)
-> String -> StateT Buffer IO Packet
forall a b. (a -> b) -> a -> b
$
ClickhouseException -> String
forall a. Show a => a -> String
show
ServerException :: String
-> Integer -> Maybe ClickhouseException -> ClickhouseException
Error.ServerException
{ code :: Integer
code = Integer
Error._UNKNOWN_PACKET_FROM_SERVER,
message :: String
message = String
"Unknown packet from server",
nested :: Maybe ClickhouseException
nested = Maybe ClickhouseException
forall a. Maybe a
Nothing
}
closeConnection :: TCPConnection -> IO ()
closeConnection :: TCPConnection -> IO ()
closeConnection TCPConnection {tcpSocket :: TCPConnection -> Socket
tcpSocket = Socket
sock} = Socket -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> m ()
TCP.closeSock Socket
sock
writeInfo ::
(MonoidMap ByteString w) =>
ClientInfo ->
ServerInfo ->
Writer w
writeInfo :: ClientInfo -> ServerInfo -> Writer w
writeInfo
( ClientInfo
ByteString
client_name
Interface
interface
Word
client_version_major
Word
client_version_minor
Word
client_version_patch
Word
client_revision
ByteString
initial_user
ByteString
initial_query_id
ByteString
initial_address
ByteString
quota_key
QueryKind
query_kind
)
ServerInfo {revision :: ServerInfo -> Word
revision = Word
server_revision, display_name :: ServerInfo -> ByteString
display_name = ByteString
host_name}
| Word
server_revision Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
< Word
_DBMS_MIN_REVISION_WITH_CLIENT_INFO =
String -> Writer w
forall a. HasCallStack => String -> a
error String
"Method writeInfo is called for unsupported server revision"
| Bool
otherwise = do
Word -> Writer w
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
1
ByteString -> Writer w
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
initial_user
ByteString -> Writer w
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
initial_query_id
ByteString -> Writer w
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
initial_address
Word -> Writer w
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt (if Interface
interface Interface -> Interface -> Bool
forall a. Eq a => a -> a -> Bool
== Interface
HTTP then Word
0 else Word
1)
ByteString -> Writer w
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
""
ByteString -> Writer w
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
host_name
ByteString -> Writer w
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
_CLIENT_NAME
Word -> Writer w
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
client_version_major
Word -> Writer w
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
client_version_minor
Word -> Writer w
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
client_revision
Bool -> Writer w -> Writer w
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when
(Word
server_revision Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
>= Word
_DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO)
(Writer w -> Writer w) -> Writer w -> Writer w
forall a b. (a -> b) -> a -> b
$ ByteString -> Writer w
forall w. MonoidMap ByteString w => ByteString -> Writer w
writeBinaryStr ByteString
quota_key
Bool -> Writer w -> Writer w
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when
(Word
server_revision Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
>= Word
_DBMS_MIN_REVISION_WITH_VERSION_PATCH)
(Writer w -> Writer w) -> Writer w -> Writer w
forall a b. (a -> b) -> a -> b
$ Word -> Writer w
forall w. MonoidMap ByteString w => Word -> Writer w
writeVarUInt Word
client_version_patch
{-# INLINE closeBufferSocket #-}
closeBufferSocket :: Reader ()
closeBufferSocket :: Reader ()
closeBufferSocket = do
Buffer
buf <- StateT Buffer IO Buffer
forall s (m :: * -> *). MonadState s m => m s
get
let sock :: Maybe Socket
sock = Buffer -> Maybe Socket
Database.ClickHouseDriver.IO.BufferedReader.socket Buffer
buf
Maybe Socket -> (Socket -> Reader ()) -> Reader ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe Socket
sock Socket -> Reader ()
forall (m :: * -> *). MonadIO m => Socket -> m ()
TCP.closeSock