{-# LANGUAGE OverloadedStrings #-}

{- An encoder that understands the Pulsar protocol, as specified at: http://pulsar.apache.org/docs/en/develop-binary-protocol -}
module Pulsar.Protocol.Encoder
  ( encodeBaseCommand
  )
where

import qualified Data.Binary.Put               as B
import qualified Data.ByteString.Lazy.Char8    as CL
import           Data.Digest.CRC32C             ( crc32c )
import           Data.Int                       ( Int32 )
import           Data.Maybe                     ( fromMaybe )
import qualified Data.ProtoLens.Encoding       as PL
import           Proto.PulsarApi                ( BaseCommand
                                                , MessageMetadata
                                                )
import           Pulsar.Protocol.Frame

mkSimpleCommand :: Int32 -> BaseCommand -> SimpleCmd
mkSimpleCommand :: Int32 -> BaseCommand -> SimpleCmd
mkSimpleCommand extraBytes :: Int32
extraBytes cmd :: BaseCommand
cmd = SimpleCommand :: Int32 -> Int32 -> ByteString -> SimpleCmd
SimpleCommand
  { frameTotalSize :: Int32
frameTotalSize   = Int32
cmdSize Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
extraBytes
  , frameCommandSize :: Int32
frameCommandSize = Int32
cmdSize
  , frameMessage :: ByteString
frameMessage     = ByteString
msg
  }
 where
  msg :: ByteString
msg     = ByteString -> ByteString
CL.fromStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ BaseCommand -> ByteString
forall msg. Message msg => msg -> ByteString
PL.encodeMessage BaseCommand
cmd
  cmdSize :: Int32
cmdSize = Int64 -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int32) -> Int64 -> Int32
forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
CL.length ByteString
msg

mkPayloadCommand
  :: BaseCommand -> MessageMetadata -> Payload -> (SimpleCmd, PayloadCmd)
mkPayloadCommand :: BaseCommand
-> MessageMetadata -> Payload -> (SimpleCmd, PayloadCmd)
mkPayloadCommand cmd :: BaseCommand
cmd meta :: MessageMetadata
meta (Payload pl :: ByteString
pl) = (SimpleCmd
simpleCmd, PayloadCmd
payloadCmd)
 where
  -- payload fields
  metadata :: ByteString
metadata    = MessageMetadata -> ByteString
forall msg. Message msg => msg -> ByteString
PL.encodeMessage MessageMetadata
meta
  metaSize :: Int32
metaSize    = Int64 -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int32) -> (ByteString -> Int64) -> ByteString -> Int32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int64
CL.length (ByteString -> Int64)
-> (ByteString -> ByteString) -> ByteString -> Int64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
CL.fromStrict (ByteString -> Int32) -> ByteString -> Int32
forall a b. (a -> b) -> a -> b
$ ByteString
metadata
  metaSizeBS :: ByteString
metaSizeBS  = Put -> ByteString
B.runPut (Put -> ByteString) -> (Int32 -> Put) -> Int32 -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int32 -> Put
B.putInt32be (Int32 -> ByteString) -> Int32 -> ByteString
forall a b. (a -> b) -> a -> b
$ Int32
metaSize
  checksum :: Word32
checksum    = ByteString -> Word32
crc32c (ByteString -> Word32) -> ByteString -> Word32
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
CL.toStrict ByteString
metaSizeBS ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
metadata ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString -> ByteString
CL.toStrict ByteString
pl
  payloadSize :: Int32
payloadSize = Int64 -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int32) -> (ByteString -> Int64) -> ByteString -> Int32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int64
CL.length (ByteString -> Int32) -> ByteString -> Int32
forall a b. (a -> b) -> a -> b
$ ByteString
pl
  -- frame: extra 14 bytes = 2 (magic number) + 4 (checksum) + 4 (metadata size) + 4 (command size)
  extraBytes :: Int32
extraBytes  = Int32 -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (14 Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
metaSize) Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
payloadSize
  simpleCmd :: SimpleCmd
simpleCmd   = Int32 -> BaseCommand -> SimpleCmd
mkSimpleCommand Int32
extraBytes BaseCommand
cmd
  payloadCmd :: PayloadCmd
payloadCmd  = PayloadCommand :: Word32 -> Int32 -> ByteString -> ByteString -> PayloadCmd
PayloadCommand { frameCheckSum :: Word32
frameCheckSum     = Word32
checksum
                               , frameMetadataSize :: Int32
frameMetadataSize = Int32
metaSize
                               , frameMetadata :: ByteString
frameMetadata     = ByteString -> ByteString
CL.fromStrict ByteString
metadata
                               , framePayload :: ByteString
framePayload      = ByteString
pl
                               }

encodeSimpleCmd :: SimpleCmd -> CL.ByteString
encodeSimpleCmd :: SimpleCmd -> ByteString
encodeSimpleCmd (SimpleCommand ts :: Int32
ts cs :: Int32
cs msg :: ByteString
msg) =
  let totalSize :: ByteString
totalSize   = Put -> ByteString
B.runPut (Put -> ByteString) -> Put -> ByteString
forall a b. (a -> b) -> a -> b
$ Int32 -> Put
B.putInt32be Int32
ts
      commandSize :: ByteString
commandSize = Put -> ByteString
B.runPut (Put -> ByteString) -> Put -> ByteString
forall a b. (a -> b) -> a -> b
$ Int32 -> Put
B.putInt32be Int32
cs
  in  ByteString
totalSize ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
commandSize ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
msg

encodeFrame :: Frame -> CL.ByteString
encodeFrame :: Frame -> ByteString
encodeFrame (SimpleFrame scmd :: SimpleCmd
scmd) = SimpleCmd -> ByteString
encodeSimpleCmd SimpleCmd
scmd
encodeFrame (PayloadFrame scmd :: SimpleCmd
scmd (PayloadCommand cs :: Word32
cs mds :: Int32
mds md :: ByteString
md p :: ByteString
p)) =
  let simpleCmd :: ByteString
simpleCmd   = SimpleCmd -> ByteString
encodeSimpleCmd SimpleCmd
scmd
      metaSizeBS :: ByteString
metaSizeBS  = Put -> ByteString
B.runPut (Put -> ByteString) -> (Int32 -> Put) -> Int32 -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int32 -> Put
B.putInt32be (Int32 -> ByteString) -> Int32 -> ByteString
forall a b. (a -> b) -> a -> b
$ Int32
mds
      magicNumber :: ByteString
magicNumber = Put -> ByteString
B.runPut (Put -> ByteString) -> (Word16 -> Put) -> Word16 -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word16 -> Put
B.putWord16be (Word16 -> ByteString) -> Word16 -> ByteString
forall a b. (a -> b) -> a -> b
$ Word16
frameMagicNumber
      crc32cSum :: ByteString
crc32cSum   = Put -> ByteString
B.runPut (Put -> ByteString) -> (Word32 -> Put) -> Word32 -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word32 -> Put
B.putWord32be (Word32 -> ByteString) -> Word32 -> ByteString
forall a b. (a -> b) -> a -> b
$ Word32
cs
      payloadCmd :: ByteString
payloadCmd  = ByteString
magicNumber ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
crc32cSum ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
metaSizeBS ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
md ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
p
  in  ByteString
simpleCmd ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
payloadCmd

encodeBaseCommand
  :: Maybe MessageMetadata -> Maybe Payload -> BaseCommand -> CL.ByteString
encodeBaseCommand :: Maybe MessageMetadata -> Maybe Payload -> BaseCommand -> ByteString
encodeBaseCommand (Just meta :: MessageMetadata
meta) p :: Maybe Payload
p cmd :: BaseCommand
cmd =
  let pl :: Payload
pl = Payload -> Maybe Payload -> Payload
forall a. a -> Maybe a -> a
fromMaybe (ByteString -> Payload
Payload "") Maybe Payload
p
  in  Frame -> ByteString
encodeFrame (Frame -> ByteString)
-> ((SimpleCmd, PayloadCmd) -> Frame)
-> (SimpleCmd, PayloadCmd)
-> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (SimpleCmd -> PayloadCmd -> Frame)
-> (SimpleCmd, PayloadCmd) -> Frame
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry SimpleCmd -> PayloadCmd -> Frame
PayloadFrame ((SimpleCmd, PayloadCmd) -> ByteString)
-> (SimpleCmd, PayloadCmd) -> ByteString
forall a b. (a -> b) -> a -> b
$ BaseCommand
-> MessageMetadata -> Payload -> (SimpleCmd, PayloadCmd)
mkPayloadCommand BaseCommand
cmd MessageMetadata
meta Payload
pl
encodeBaseCommand Nothing _ cmd :: BaseCommand
cmd =
  Frame -> ByteString
encodeFrame (Frame -> ByteString)
-> (BaseCommand -> Frame) -> BaseCommand -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SimpleCmd -> Frame
SimpleFrame (SimpleCmd -> Frame)
-> (BaseCommand -> SimpleCmd) -> BaseCommand -> Frame
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int32 -> BaseCommand -> SimpleCmd
mkSimpleCommand 4 (BaseCommand -> ByteString) -> BaseCommand -> ByteString
forall a b. (a -> b) -> a -> b
$ BaseCommand
cmd