{-# LANGUAGE ForeignFunctionInterface #-}
{-# LANGUAGE OverloadedStrings #-}
module Database.ClickHouseDriver.IO.BufferedReader
( readBinaryStrWithLength,
readVarInt',
readBinaryStr',
readBinaryStr,
readVarInt,
readBinaryInt8,
readBinaryInt16,
readBinaryInt64,
readBinaryInt32,
readBinaryUInt8,
readBinaryUInt128,
readBinaryUInt64,
readBinaryUInt32,
readBinaryUInt16,
Reader,
Buffer(..),
createBuffer,
refill
)
where
import Control.Monad.State.Lazy ( StateT(StateT) )
import Data.Binary
( Word8, Word16, Word32, Word64, Binary, decode )
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as L
import qualified Data.ByteString.Unsafe as UBS
import Data.DoubleWord (Word128 (..))
import Data.Int ( Int8, Int16, Int32, Int64 )
import Data.Maybe ( fromJust, isNothing )
import Foreign.C ( CString )
import qualified Network.Simple.TCP as TCP
import Network.Socket ( Socket )
data Buffer = Buffer {
Buffer -> Int
bufSize :: !Int,
Buffer -> ByteString
bytesData :: ByteString,
Buffer -> Maybe Socket
socket :: Maybe Socket
}
createBuffer :: Int->Socket->IO Buffer
createBuffer :: Int -> Socket -> IO Buffer
createBuffer Int
size Socket
sock = do
Maybe ByteString
receive <- Socket -> Int -> IO (Maybe ByteString)
forall (m :: * -> *).
MonadIO m =>
Socket -> Int -> m (Maybe ByteString)
TCP.recv Socket
sock Int
size
Buffer -> IO Buffer
forall (m :: * -> *) a. Monad m => a -> m a
return Buffer :: Int -> ByteString -> Maybe Socket -> Buffer
Buffer{
bufSize :: Int
bufSize = Int
size,
bytesData :: ByteString
bytesData = if Maybe ByteString -> Bool
forall a. Maybe a -> Bool
isNothing Maybe ByteString
receive then ByteString
"" else Maybe ByteString -> ByteString
forall a. HasCallStack => Maybe a -> a
fromJust Maybe ByteString
receive,
socket :: Maybe Socket
socket = Socket -> Maybe Socket
forall a. a -> Maybe a
Just Socket
sock
}
refill :: Buffer->IO Buffer
refill :: Buffer -> IO Buffer
refill Buffer{socket :: Buffer -> Maybe Socket
socket = Just Socket
sock, bufSize :: Buffer -> Int
bufSize = Int
size} = do
Maybe ByteString
newData' <- Socket -> Int -> IO (Maybe ByteString)
forall (m :: * -> *).
MonadIO m =>
Socket -> Int -> m (Maybe ByteString)
TCP.recv Socket
sock Int
size
let newBuffer :: Buffer
newBuffer = case Maybe ByteString
newData' of
Just ByteString
newData -> Buffer :: Int -> ByteString -> Maybe Socket -> Buffer
Buffer {
bufSize :: Int
bufSize = Int
size,
bytesData :: ByteString
bytesData = ByteString
newData,
socket :: Maybe Socket
socket = Socket -> Maybe Socket
forall a. a -> Maybe a
Just Socket
sock
}
Maybe ByteString
Nothing -> [Char] -> Buffer
forall a. HasCallStack => [Char] -> a
error [Char]
"Network error"
Buffer -> IO Buffer
forall (m :: * -> *) a. Monad m => a -> m a
return Buffer
newBuffer
refill Buffer{socket :: Buffer -> Maybe Socket
socket=Maybe Socket
Nothing} = [Char] -> IO Buffer
forall a. HasCallStack => [Char] -> a
error [Char]
"empty socket"
type Reader a = StateT Buffer IO a
readBinaryStrWithLength' :: Int
-> Buffer
-> IO (ByteString, Buffer)
readBinaryStrWithLength' :: Int -> Buffer -> IO (ByteString, Buffer)
readBinaryStrWithLength' Int
n buf :: Buffer
buf@Buffer{bufSize :: Buffer -> Int
bufSize=Int
size, bytesData :: Buffer -> ByteString
bytesData=ByteString
str, socket :: Buffer -> Maybe Socket
socket=Maybe Socket
sock} = do
let l :: Int
l = ByteString -> Int
BS.length ByteString
str
let (ByteString
part, ByteString
tail) = Int -> ByteString -> (ByteString, ByteString)
BS.splitAt Int
n ByteString
str
if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
l
then do
Buffer
newbuff <- Buffer -> IO Buffer
refill Buffer
buf
(ByteString
unread, Buffer
altbuff) <- Int -> Buffer -> IO (ByteString, Buffer)
readBinaryStrWithLength' (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
l) Buffer
newbuff
(ByteString, Buffer) -> IO (ByteString, Buffer)
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString
part ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
unread, Buffer
altbuff)
else do
(ByteString, Buffer) -> IO (ByteString, Buffer)
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString
part, Int -> ByteString -> Maybe Socket -> Buffer
Buffer Int
size ByteString
tail Maybe Socket
sock)
readVarInt' :: Buffer
-> IO (Word, Buffer)
readVarInt' :: Buffer -> IO (Word, Buffer)
readVarInt' buf :: Buffer
buf@Buffer{bufSize :: Buffer -> Int
bufSize=Int
size,bytesData :: Buffer -> ByteString
bytesData=ByteString
str, socket :: Buffer -> Maybe Socket
socket=Maybe Socket
sock} = do
let l :: Word
l = Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Word) -> Int -> Word
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
BS.length ByteString
str
Word
skip <- ByteString -> (CString -> IO Word) -> IO Word
forall a. ByteString -> (CString -> IO a) -> IO a
UBS.unsafeUseAsCString ByteString
str (\CString
x -> CString -> Word -> IO Word
c_count CString
x Word
l)
if Word
skip Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== Word
0
then do
Word
varint' <- ByteString -> (CString -> IO Word) -> IO Word
forall a. ByteString -> (CString -> IO a) -> IO a
UBS.unsafeUseAsCString ByteString
str (\CString
x->Word -> CString -> Word -> IO Word
c_read_varint Word
0 CString
x Word
l)
Buffer
new_buf <- Buffer -> IO Buffer
refill Buffer
buf
let new_str :: ByteString
new_str = Buffer -> ByteString
bytesData Buffer
new_buf
Word
varint <- ByteString -> (CString -> IO Word) -> IO Word
forall a. ByteString -> (CString -> IO a) -> IO a
UBS.unsafeUseAsCString ByteString
new_str (\CString
x->Word -> CString -> Word -> IO Word
c_read_varint Word
varint' CString
x Word
l)
Word
skip2 <- ByteString -> (CString -> IO Word) -> IO Word
forall a. ByteString -> (CString -> IO a) -> IO a
UBS.unsafeUseAsCString ByteString
new_str (\CString
x->CString -> Word -> IO Word
c_count CString
x Word
l)
let tail :: ByteString
tail = Int -> ByteString -> ByteString
BS.drop (Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
skip) ByteString
new_str
(Word, Buffer) -> IO (Word, Buffer)
forall (m :: * -> *) a. Monad m => a -> m a
return (Word
varint, Int -> ByteString -> Maybe Socket -> Buffer
Buffer Int
size ByteString
tail Maybe Socket
sock)
else do
Word
varint <- ByteString -> (CString -> IO Word) -> IO Word
forall a. ByteString -> (CString -> IO a) -> IO a
UBS.unsafeUseAsCString ByteString
str (\CString
x -> Word -> CString -> Word -> IO Word
c_read_varint Word
0 CString
x Word
l)
let tail :: ByteString
tail = Int -> ByteString -> ByteString
BS.drop (Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
skip) ByteString
str
(Word, Buffer) -> IO (Word, Buffer)
forall (m :: * -> *) a. Monad m => a -> m a
return (Word
varint, Int -> ByteString -> Maybe Socket -> Buffer
Buffer Int
size ByteString
tail Maybe Socket
sock)
readBinaryStr' :: Buffer
-> IO (ByteString, Buffer)
readBinaryStr' :: Buffer -> IO (ByteString, Buffer)
readBinaryStr' Buffer
str = do
(Word
len, Buffer
tail) <- Buffer -> IO (Word, Buffer)
readVarInt' Buffer
str
(ByteString
head, Buffer
tail') <- Int -> Buffer -> IO (ByteString, Buffer)
readBinaryStrWithLength' (Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
len) Buffer
tail
(ByteString, Buffer) -> IO (ByteString, Buffer)
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString
head, Buffer
tail')
readBinaryHelper :: Binary a => Int -> Buffer -> IO (a, Buffer)
readBinaryHelper :: Int -> Buffer -> IO (a, Buffer)
readBinaryHelper Int
fmt Buffer
str = do
(ByteString
cut, Buffer
tail) <- Int -> Buffer -> IO (ByteString, Buffer)
readBinaryStrWithLength' Int
fmt Buffer
str
let v :: a
v = ByteString -> a
forall a. Binary a => ByteString -> a
decode ((ByteString -> ByteString
L.fromStrict(ByteString -> ByteString)
-> (ByteString -> ByteString) -> ByteString -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BS.reverse) ByteString
cut)
(a, Buffer) -> IO (a, Buffer)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
v, Buffer
tail)
class Readable a where
readIn :: Reader a
instance Readable Word where
readIn :: Reader Word
readIn = (Buffer -> IO (Word, Buffer)) -> Reader Word
forall s (m :: * -> *) a. (s -> m (a, s)) -> StateT s m a
StateT Buffer -> IO (Word, Buffer)
readVarInt'
instance Readable ByteString where
readIn :: Reader ByteString
readIn = (Buffer -> IO (ByteString, Buffer)) -> Reader ByteString
forall s (m :: * -> *) a. (s -> m (a, s)) -> StateT s m a
StateT Buffer -> IO (ByteString, Buffer)
readBinaryStr'
instance Readable Int8 where
readIn :: Reader Int8
readIn = (Buffer -> IO (Int8, Buffer)) -> Reader Int8
forall s (m :: * -> *) a. (s -> m (a, s)) -> StateT s m a
StateT ((Buffer -> IO (Int8, Buffer)) -> Reader Int8)
-> (Buffer -> IO (Int8, Buffer)) -> Reader Int8
forall a b. (a -> b) -> a -> b
$ Int -> Buffer -> IO (Int8, Buffer)
forall a. Binary a => Int -> Buffer -> IO (a, Buffer)
readBinaryHelper Int
1
instance Readable Int16 where
readIn :: Reader Int16
readIn = (Buffer -> IO (Int16, Buffer)) -> Reader Int16
forall s (m :: * -> *) a. (s -> m (a, s)) -> StateT s m a
StateT ((Buffer -> IO (Int16, Buffer)) -> Reader Int16)
-> (Buffer -> IO (Int16, Buffer)) -> Reader Int16
forall a b. (a -> b) -> a -> b
$ Int -> Buffer -> IO (Int16, Buffer)
forall a. Binary a => Int -> Buffer -> IO (a, Buffer)
readBinaryHelper Int
2
instance Readable Int32 where
readIn :: Reader Int32
readIn = (Buffer -> IO (Int32, Buffer)) -> Reader Int32
forall s (m :: * -> *) a. (s -> m (a, s)) -> StateT s m a
StateT ((Buffer -> IO (Int32, Buffer)) -> Reader Int32)
-> (Buffer -> IO (Int32, Buffer)) -> Reader Int32
forall a b. (a -> b) -> a -> b
$ Int -> Buffer -> IO (Int32, Buffer)
forall a. Binary a => Int -> Buffer -> IO (a, Buffer)
readBinaryHelper Int
4
instance Readable Int64 where
readIn :: Reader Int64
readIn = (Buffer -> IO (Int64, Buffer)) -> Reader Int64
forall s (m :: * -> *) a. (s -> m (a, s)) -> StateT s m a
StateT ((Buffer -> IO (Int64, Buffer)) -> Reader Int64)
-> (Buffer -> IO (Int64, Buffer)) -> Reader Int64
forall a b. (a -> b) -> a -> b
$ Int -> Buffer -> IO (Int64, Buffer)
forall a. Binary a => Int -> Buffer -> IO (a, Buffer)
readBinaryHelper Int
8
instance Readable Word8 where
readIn :: Reader Word8
readIn = (Buffer -> IO (Word8, Buffer)) -> Reader Word8
forall s (m :: * -> *) a. (s -> m (a, s)) -> StateT s m a
StateT ((Buffer -> IO (Word8, Buffer)) -> Reader Word8)
-> (Buffer -> IO (Word8, Buffer)) -> Reader Word8
forall a b. (a -> b) -> a -> b
$ Int -> Buffer -> IO (Word8, Buffer)
forall a. Binary a => Int -> Buffer -> IO (a, Buffer)
readBinaryHelper Int
1
instance Readable Word16 where
readIn :: Reader Word16
readIn = (Buffer -> IO (Word16, Buffer)) -> Reader Word16
forall s (m :: * -> *) a. (s -> m (a, s)) -> StateT s m a
StateT ((Buffer -> IO (Word16, Buffer)) -> Reader Word16)
-> (Buffer -> IO (Word16, Buffer)) -> Reader Word16
forall a b. (a -> b) -> a -> b
$ Int -> Buffer -> IO (Word16, Buffer)
forall a. Binary a => Int -> Buffer -> IO (a, Buffer)
readBinaryHelper Int
2
instance Readable Word32 where
readIn :: Reader Word32
readIn = (Buffer -> IO (Word32, Buffer)) -> Reader Word32
forall s (m :: * -> *) a. (s -> m (a, s)) -> StateT s m a
StateT ((Buffer -> IO (Word32, Buffer)) -> Reader Word32)
-> (Buffer -> IO (Word32, Buffer)) -> Reader Word32
forall a b. (a -> b) -> a -> b
$ Int -> Buffer -> IO (Word32, Buffer)
forall a. Binary a => Int -> Buffer -> IO (a, Buffer)
readBinaryHelper Int
4
instance Readable Word64 where
readIn :: Reader Word64
readIn = (Buffer -> IO (Word64, Buffer)) -> Reader Word64
forall s (m :: * -> *) a. (s -> m (a, s)) -> StateT s m a
StateT ((Buffer -> IO (Word64, Buffer)) -> Reader Word64)
-> (Buffer -> IO (Word64, Buffer)) -> Reader Word64
forall a b. (a -> b) -> a -> b
$ Int -> Buffer -> IO (Word64, Buffer)
forall a. Binary a => Int -> Buffer -> IO (a, Buffer)
readBinaryHelper Int
8
readVarInt :: Reader Word
readVarInt :: Reader Word
readVarInt = Reader Word
forall a. Readable a => Reader a
readIn
readBinaryStrWithLength :: Int->Reader ByteString
readBinaryStrWithLength :: Int -> Reader ByteString
readBinaryStrWithLength Int
n = (Buffer -> IO (ByteString, Buffer)) -> Reader ByteString
forall s (m :: * -> *) a. (s -> m (a, s)) -> StateT s m a
StateT (Int -> Buffer -> IO (ByteString, Buffer)
readBinaryStrWithLength' (Int -> Buffer -> IO (ByteString, Buffer))
-> Int -> Buffer -> IO (ByteString, Buffer)
forall a b. (a -> b) -> a -> b
$ Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
readBinaryStr :: Reader ByteString
readBinaryStr :: Reader ByteString
readBinaryStr = Reader ByteString
forall a. Readable a => Reader a
readIn
readBinaryInt8 :: Reader Int8
readBinaryInt8 :: Reader Int8
readBinaryInt8 = Reader Int8
forall a. Readable a => Reader a
readIn
readBinaryInt16 :: Reader Int16
readBinaryInt16 :: Reader Int16
readBinaryInt16 = Reader Int16
forall a. Readable a => Reader a
readIn
readBinaryInt32 :: Reader Int32
readBinaryInt32 :: Reader Int32
readBinaryInt32 = Reader Int32
forall a. Readable a => Reader a
readIn
readBinaryInt64 :: Reader Int64
readBinaryInt64 :: Reader Int64
readBinaryInt64 = Reader Int64
forall a. Readable a => Reader a
readIn
readBinaryUInt32 :: Reader Word32
readBinaryUInt32 :: Reader Word32
readBinaryUInt32 = Reader Word32
forall a. Readable a => Reader a
readIn
readBinaryUInt8 :: Reader Word8
readBinaryUInt8 :: Reader Word8
readBinaryUInt8 = Reader Word8
forall a. Readable a => Reader a
readIn
readBinaryUInt16 :: Reader Word16
readBinaryUInt16 :: Reader Word16
readBinaryUInt16 = Reader Word16
forall a. Readable a => Reader a
readIn
readBinaryUInt64 :: Reader Word64
readBinaryUInt64 :: Reader Word64
readBinaryUInt64 = Reader Word64
forall a. Readable a => Reader a
readIn
readBinaryUInt128 :: Reader Word128
readBinaryUInt128 :: Reader Word128
readBinaryUInt128 = do
Word64
hi <- Reader Word64
readBinaryUInt64
Word64
lo <- Reader Word64
readBinaryUInt64
Word128 -> Reader Word128
forall (m :: * -> *) a. Monad m => a -> m a
return (Word128 -> Reader Word128) -> Word128 -> Reader Word128
forall a b. (a -> b) -> a -> b
$ Word64 -> Word64 -> Word128
Word128 Word64
hi Word64
lo
foreign import ccall unsafe "varuint.h read_varint" c_read_varint :: Word->CString -> Word -> IO Word
foreign import ccall unsafe "varuint.h count_read" c_count :: CString -> Word -> IO Word