Safe Haskell | None |
---|---|
Language | Haskell2010 |
The CQL native protocol is a binary frame-based protocol where
each frame has a Header
, a Length
and a body. The protocol
distinguishes Request
s and Response
s.
Some usage examples:
Constructing and Serialising a Request
let q = QueryString "select peer from system.peers where data_center = ? and rack = ?" p = QueryParams One False ("uk-1", "r-0") Nothing Nothing Nothing r = RqQuery (Query q p :: Query R (Text, Text) (Identity IP)) i = mkStreamId 0 in pack V3 noCompression False i r
Deserialising a Response
-- assumingbh
contains the raw header byte string andbb
the raw -- body byte string. case header V3 bh of Left e -> ... Right h -> unpack noCompression h bb
A generic query processing function
query :: (Tuple a, Tuple b) => Version -> Socket -> QueryString k a b -> QueryParams a -> IO (Response k a b) query v s q p = do let i = mkStreamId 0 sendToServer s (pack v noCompression False i (RqQuery (Query q p))) b <- recv (if v == V3 then 9 else 8) s h <- either (throwIO . MyException) return (header v b) when (headerType h == RqHeader) $ throwIO UnexpectedRequestHeader let len = lengthRepr (bodyLength h) x <- recv (fromIntegral len) s case unpack noCompression h x of Left e -> throwIO $ AnotherException e Right (RsError _ e) -> throwIO e Right response -> return response
- class Cql a where
- newtype Keyspace = Keyspace {
- unKeyspace :: Text
- newtype Table = Table {}
- newtype PagingState = PagingState {}
- newtype QueryId k a b = QueryId {}
- newtype QueryString k a b = QueryString {}
- data Version
- data CqlVersion
- = Cqlv300
- | CqlVersion !Text
- data CompressionAlgorithm
- data Compression = Compression {
- algorithm :: !CompressionAlgorithm
- shrink :: ByteString -> Maybe ByteString
- expand :: ByteString -> Maybe ByteString
- noCompression :: Compression
- data Consistency
- = Any
- | One
- | Two
- | Three
- | Quorum
- | All
- | LocalQuorum
- | EachQuorum
- | Serial
- | LocalOne
- | LocalSerial
- data OpCode
- data ColumnType
- = CustomColumn !Text
- | AsciiColumn
- | BigIntColumn
- | BlobColumn
- | BooleanColumn
- | CounterColumn
- | DecimalColumn
- | DoubleColumn
- | FloatColumn
- | IntColumn
- | TextColumn
- | TimestampColumn
- | UuidColumn
- | VarCharColumn
- | VarIntColumn
- | TimeUuidColumn
- | InetColumn
- | MaybeColumn !ColumnType
- | ListColumn !ColumnType
- | SetColumn !ColumnType
- | MapColumn !ColumnType !ColumnType
- | TupleColumn [ColumnType]
- | UdtColumn !Keyspace !Text [(Text, ColumnType)]
- newtype Ascii = Ascii {}
- newtype Blob = Blob {}
- newtype Counter = Counter {
- fromCounter :: Int64
- newtype TimeUuid = TimeUuid {
- fromTimeUuid :: UUID
- newtype Set a = Set {
- fromSet :: [a]
- newtype Map a b = Map {
- fromMap :: [(a, b)]
- data Value
- = CqlCustom !ByteString
- | CqlBoolean !Bool
- | CqlInt !Int32
- | CqlBigInt !Int64
- | CqlVarInt !Integer
- | CqlFloat !Float
- | CqlDecimal !Decimal
- | CqlDouble !Double
- | CqlText !Text
- | CqlInet !IP
- | CqlUuid !UUID
- | CqlTimestamp !Int64
- | CqlAscii !Text
- | CqlBlob !ByteString
- | CqlCounter !Int64
- | CqlTimeUuid !UUID
- | CqlMaybe (Maybe Value)
- | CqlList [Value]
- | CqlSet [Value]
- | CqlMap [(Value, Value)]
- | CqlTuple [Value]
- | CqlUdt [(Text, Value)]
- newtype Tagged a b = Tagged {
- untag :: b
- retag :: Tagged a c -> Tagged b c
- data R
- data W
- data S
- data Header = Header {
- headerType :: !HeaderType
- version :: !Version
- flags :: !Flags
- streamId :: !StreamId
- opCode :: !OpCode
- bodyLength :: !Length
- data HeaderType
- header :: Version -> ByteString -> Either String Header
- newtype Length = Length {
- lengthRepr :: Int32
- data StreamId
- mkStreamId :: Integral i => i -> StreamId
- fromStreamId :: StreamId -> Int
- data Flags
- compress :: Flags
- tracing :: Flags
- isSet :: Flags -> Flags -> Bool
- data Request k a b
- getOpCode :: Request k a b -> OpCode
- pack :: Tuple a => Version -> Compression -> Bool -> StreamId -> Request k a b -> Either String ByteString
- data Options = Options
- data Startup = Startup !CqlVersion !CompressionAlgorithm
- newtype AuthResponse = AuthResponse ByteString
- newtype Register = Register [EventType]
- data EventType
- data Query k a b = Query !(QueryString k a b) !(QueryParams a)
- data QueryParams a = QueryParams {}
- data SerialConsistency
- data Batch = Batch {}
- data BatchQuery where
- BatchQuery :: (Show a, Tuple a, Tuple b) => !(QueryString W a b) -> !a -> BatchQuery
- BatchPrepared :: (Show a, Tuple a, Tuple b) => !(QueryId W a b) -> !a -> BatchQuery
- data BatchType
- newtype Prepare k a b = Prepare (QueryString k a b)
- data Execute k a b = Execute !(QueryId k a b) !(QueryParams a)
- data Response k a b
- = RsError (Maybe UUID) !Error
- | RsReady (Maybe UUID) !Ready
- | RsAuthenticate (Maybe UUID) !Authenticate
- | RsAuthChallenge (Maybe UUID) !AuthChallenge
- | RsAuthSuccess (Maybe UUID) !AuthSuccess
- | RsSupported (Maybe UUID) !Supported
- | RsResult (Maybe UUID) !(Result k a b)
- | RsEvent (Maybe UUID) !Event
- unpack :: (Tuple a, Tuple b) => Compression -> Header -> ByteString -> Either String (Response k a b)
- data Ready = Ready
- newtype Authenticate = Authenticate Text
- newtype AuthChallenge = AuthChallenge (Maybe ByteString)
- newtype AuthSuccess = AuthSuccess (Maybe ByteString)
- data Result k a b
- = VoidResult
- | RowsResult !MetaData [b]
- | SetKeyspaceResult !Keyspace
- | PreparedResult !(QueryId k a b) !MetaData !MetaData
- | SchemaChangeResult !SchemaChange
- data MetaData = MetaData {
- columnCount :: !Int32
- pagingState :: Maybe PagingState
- columnSpecs :: [ColumnSpec]
- data ColumnSpec = ColumnSpec {
- keyspace :: !Keyspace
- table :: !Table
- columnName :: !Text
- columnType :: !ColumnType
- data Supported = Supported [CompressionAlgorithm] [CqlVersion]
- data Event
- data TopologyChange
- data SchemaChange
- data StatusChange
- data Change
- data Error
- = AlreadyExists !Text !Keyspace !Table
- | BadCredentials !Text
- | ConfigError !Text
- | Invalid !Text
- | IsBootstrapping !Text
- | Overloaded !Text
- | ProtocolError !Text
- | ServerError !Text
- | SyntaxError !Text
- | TruncateError !Text
- | Unauthorized !Text
- | Unprepared !Text !ByteString
- | Unavailable { }
- | ReadTimeout { }
- | WriteTimeout { }
- data WriteType
- class PrivateTuple a => Tuple a
- count :: PrivateTuple a => Tagged a Int
- check :: PrivateTuple a => Tagged a ([ColumnType] -> [ColumnType])
- tuple :: PrivateTuple a => Version -> [ColumnType] -> Get a
- store :: PrivateTuple a => Version -> Putter a
- data Row
- mkRow :: [(Value, ColumnType)] -> Row
- fromRow :: Cql a => Int -> Row -> Either String a
- columnTypes :: Row -> [ColumnType]
- rowLength :: Row -> Int
- class Record a where
- type family TupleType a
- recordInstance :: Name -> Q [Dec]
Cql type-class
A type that can be converted from and to some CQL Value
.
This type-class is used to map custom types to Cassandra data-types. For example:
newtype MyType = MyType Int32 instance Cql MyType where ctype = Tagged IntColumn toCql (MyType i) = CqlInt i fromCql (CqlInt i) = Right (MyType i) fromCql _ = Left "Expected CqlInt"
ctype :: Tagged a ColumnType Source
the column-type of a
map a
to some CQL data-type
fromCql :: Value -> Either String a Source
map a CQL value back to a
Cql Bool Source | |
Cql Double Source | |
Cql Float Source | |
Cql Int32 Source | |
Cql Int64 Source | |
Cql Integer Source | |
Cql Decimal Source | |
Cql IP Source | |
Cql Text Source | |
Cql UTCTime Source | |
Cql UUID Source | |
Cql TimeUuid Source | |
Cql Counter Source | |
Cql Blob Source | |
Cql Ascii Source | |
Cql a => Cql [a] Source | |
Cql a => Cql (Maybe a) Source | Please note that due to the fact that Cassandra internally represents
empty collection type values (i.e. lists, maps and sets) as |
Cql a => Cql (Set a) Source | |
(Cql a, Cql b) => Cql (Map a b) Source |
Basic type definitions
newtype PagingState Source
Opaque token passed to the server to continue result paging.
ID representing a prepared query.
newtype QueryString k a b Source
Eq (QueryString k a b) Source | |
Show (QueryString k a b) Source | |
IsString (QueryString k a b) Source |
CQL binary protocol version.
data CqlVersion Source
The CQL version (not the binary protocol version).
data Compression Source
Compression | |
|
data Consistency Source
Consistency level.
An opcode is a tag to distinguish protocol frame bodies.
data ColumnType Source
The type of a single CQL column.
A CQL value. The various constructors correspond to CQL data-types for individual columns in Cassandra.
CqlCustom !ByteString | |
CqlBoolean !Bool | |
CqlInt !Int32 | |
CqlBigInt !Int64 | |
CqlVarInt !Integer | |
CqlFloat !Float | |
CqlDecimal !Decimal | |
CqlDouble !Double | |
CqlText !Text | |
CqlInet !IP | |
CqlUuid !UUID | |
CqlTimestamp !Int64 | |
CqlAscii !Text | |
CqlBlob !ByteString | |
CqlCounter !Int64 | |
CqlTimeUuid !UUID | |
CqlMaybe (Maybe Value) | |
CqlList [Value] | |
CqlSet [Value] | |
CqlMap [(Value, Value)] | |
CqlTuple [Value] | binary protocol version >= 3 |
CqlUdt [(Text, Value)] | binary protocol version >= 3 |
Header
Protocol frame header.
Header | |
|
data HeaderType Source
header :: Version -> ByteString -> Either String Header Source
Deserialise a frame header using the version specific decoding format.
Length
The type denoting a protocol frame length.
StreamId
Streams allow multiplexing of requests over a single communication
channel. The StreamId
correlates Request
s with Response
s.
mkStreamId :: Integral i => i -> StreamId Source
fromStreamId :: StreamId -> Int Source
Convert the stream ID to an integer.
Flags
Type representing header flags. Flags form a monoid and can be used
as in compress <> tracing <> mempty
.
Tracing flag. If a request support tracing and the tracing flag was set, the response to this request will have the tracing flag set and contain tracing information.
Request
The type corresponding to the protocol request frame.
The type parameter k
denotes the kind of request. It is present to allow
distinguishing read operations from write operations. Use R
for read,
W
for write and S
for schema related operations.
a
represents the argument type and b
the return type of this request.
:: Tuple a | |
=> Version | protocol version, which determines the encoding |
-> Compression | compression to use |
-> Bool | enable/disable tracing |
-> StreamId | the stream Id to use |
-> Request k a b | the actual request to serialise |
-> Either String ByteString |
Options
An options request, send prior to Startup
to request the server's
startup options.
Startup
A startup request which is used when initialising a connection to the server. It specifies the CQL version to use and optionally the compression algorithm.
Auth Response
newtype AuthResponse Source
A request send in response to a previous authentication challenge.
Register
Register's the connection this request is made through, to receive server events.
Event types to register.
TopologyChangeEvent | events related to change in the cluster topology |
StatusChangeEvent | events related to change of node status |
SchemaChangeEvent | events related to schema change |
Query
A CQL query (select, insert, etc.).
Query !(QueryString k a b) !(QueryParams a) |
data QueryParams a Source
Query parameters.
QueryParams | |
|
Show a => Show (QueryParams a) Source |
data SerialConsistency Source
Consistency level for the serial phase of conditional updates.
Batch
Allows executing a list of queries (prepared or not) as a batch.
data BatchQuery where Source
A GADT to unify queries and prepared queries both of which can be used in batch requests.
BatchQuery :: (Show a, Tuple a, Tuple b) => !(QueryString W a b) -> !a -> BatchQuery | |
BatchPrepared :: (Show a, Tuple a, Tuple b) => !(QueryId W a b) -> !a -> BatchQuery |
BatchLogged | default, uses a batch log for atomic application |
BatchUnLogged | skip the batch log |
BatchCounter | used for batched counter updates |
Prepare
Prepare a query for later execution (cf. Execute
).
Prepare (QueryString k a b) |
Execute
Executes a prepared query.
Execute !(QueryId k a b) !(QueryParams a) |
Response
The type corresponding to the protocol response frame.
The type parameter k
denotes the kind of response. It is present to allow
distinguishing read operations from write operations. Use R
for read,
W
for write and S
for schema related operations.
a
represents the argument type and b
the return type of this
response.
RsError (Maybe UUID) !Error | |
RsReady (Maybe UUID) !Ready | |
RsAuthenticate (Maybe UUID) !Authenticate | |
RsAuthChallenge (Maybe UUID) !AuthChallenge | |
RsAuthSuccess (Maybe UUID) !AuthSuccess | |
RsSupported (Maybe UUID) !Supported | |
RsResult (Maybe UUID) !(Result k a b) | |
RsEvent (Maybe UUID) !Event |
unpack :: (Tuple a, Tuple b) => Compression -> Header -> ByteString -> Either String (Response k a b) Source
Deserialise a Response
from the given ByteString
.
Ready
The server is ready to process queries. Response of a Startup
request.
Authenticate
newtype Authenticate Source
The server requires authentication.
newtype AuthChallenge Source
A server-send authentication challenge.
newtype AuthSuccess Source
Indicates the success of an authentication phase.
Result
Query response.
Part of a RowsResult
. Describes the result set.
MetaData | |
|
data ColumnSpec Source
The column specification. Part of MetaData
unless skipMetaData
in
QueryParams
was True.
ColumnSpec | |
|
Supported
The startup options supported by the server. Response of an Options
request.
Event
Messages send by the server without request, if the connection has
been Register
ed to receive such events.
data TopologyChange Source
data SchemaChange Source
data StatusChange Source
Error
Error response.
Row, Tuple and Record
check :: PrivateTuple a => Tagged a ([ColumnType] -> [ColumnType]) Source
tuple :: PrivateTuple a => Version -> [ColumnType] -> Get a Source
mkRow :: [(Value, ColumnType)] -> Row Source
columnTypes :: Row -> [ColumnType] Source
Record/Tuple conversion. For example:
data Peer = Peer { peerAddr :: IP , peerRPC :: IP , peerDC :: Text , peerRack :: Text } deriving Show recordInstance ''Peer map asRecord <$> performQuery "SELECT peer, rpc_address, data_center, rack FROM system.peers"
The generated type-class instance maps between record and tuple constructors:
type instance TupleType Peer = (IP, IP, Text, Text) instance Record Peer where asTuple (Peer a b c d) = (a, b, c, d) asRecord (a, b, c, d) = Peer a b c d
recordInstance :: Name -> Q [Dec] Source