{- An encoder that understands the Pulsar protocol, as specified at: http://pulsar.apache.org/docs/en/develop-binary-protocol -}
module Pulsar.Protocol.Encoder
  ( encodeBaseCommand
  )
where

import qualified Data.Binary.Put               as B
import qualified Data.ByteString.Lazy.Char8    as CL
import           Data.Int                       ( Int32 )
import           Data.Maybe                     ( fromMaybe )
import qualified Data.ProtoLens.Encoding       as PL
import           Proto.PulsarApi                ( BaseCommand
                                                , MessageMetadata
                                                )
import           Pulsar.Protocol.CheckSum
import           Pulsar.Protocol.Frame

mkSimpleCommand :: Int32 -> BaseCommand -> SimpleCmd
mkSimpleCommand :: Int32 -> BaseCommand -> SimpleCmd
mkSimpleCommand extraBytes :: Int32
extraBytes cmd :: BaseCommand
cmd = SimpleCommand :: Int32 -> Int32 -> ByteString -> SimpleCmd
SimpleCommand
  { frameTotalSize :: Int32
frameTotalSize   = Int32
cmdSize Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
extraBytes
  , frameCommandSize :: Int32
frameCommandSize = Int32
cmdSize
  , frameMessage :: ByteString
frameMessage     = ByteString
msg
  }
 where
  msg :: ByteString
msg     = ByteString -> ByteString
CL.fromStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ BaseCommand -> ByteString
forall msg. Message msg => msg -> ByteString
PL.encodeMessage BaseCommand
cmd
  cmdSize :: Int32
cmdSize = Int64 -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int32) -> Int64 -> Int32
forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
CL.length ByteString
msg

mkPayloadCommand
  :: BaseCommand -> MessageMetadata -> Payload -> (SimpleCmd, PayloadCmd)
mkPayloadCommand :: BaseCommand
-> MessageMetadata -> Payload -> (SimpleCmd, PayloadCmd)
mkPayloadCommand cmd :: BaseCommand
cmd meta :: MessageMetadata
meta (Payload pl :: ByteString
pl) = (SimpleCmd
simpleCmd, PayloadCmd
payloadCmd)
 where
  -- payload fields
  metadata :: ByteString
metadata    = MessageMetadata -> ByteString
forall msg. Message msg => msg -> ByteString
PL.encodeMessage MessageMetadata
meta
  metaSize :: Int32
metaSize    = Int64 -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int32) -> (ByteString -> Int64) -> ByteString -> Int32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int64
CL.length (ByteString -> Int64)
-> (ByteString -> ByteString) -> ByteString -> Int64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
CL.fromStrict (ByteString -> Int32) -> ByteString -> Int32
forall a b. (a -> b) -> a -> b
$ ByteString
metadata
  metaSizeBS :: ByteString
metaSizeBS  = Put -> ByteString
B.runPut (Put -> ByteString) -> Put -> ByteString
forall a b. (a -> b) -> a -> b
$ Int32 -> Put
B.putInt32be Int32
metaSize
  checkSum :: CheckSum
checkSum    = ByteString -> CheckSum
computeCheckSum (ByteString -> CheckSum) -> ByteString -> CheckSum
forall a b. (a -> b) -> a -> b
$ ByteString
metaSizeBS ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString -> ByteString
CL.fromStrict ByteString
metadata ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
pl
  payloadSize :: Int32
payloadSize = Int64 -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int32) -> Int64 -> Int32
forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
CL.length ByteString
pl
  -- frame: extra 14 bytes = 2 (magic number) + 4 (checksum) + 4 (metadata size) + 4 (command size)
  extraBytes :: Int32
extraBytes  = Int32 -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (14 Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
metaSize) Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
payloadSize
  simpleCmd :: SimpleCmd
simpleCmd   = Int32 -> BaseCommand -> SimpleCmd
mkSimpleCommand Int32
extraBytes BaseCommand
cmd
  payloadCmd :: PayloadCmd
payloadCmd  = PayloadCommand :: Maybe CheckSum -> Int32 -> ByteString -> ByteString -> PayloadCmd
PayloadCommand { frameCheckSum :: Maybe CheckSum
frameCheckSum     = CheckSum -> Maybe CheckSum
forall a. a -> Maybe a
Just CheckSum
checkSum
                               , frameMetadataSize :: Int32
frameMetadataSize = Int32
metaSize
                               , frameMetadata :: ByteString
frameMetadata     = ByteString -> ByteString
CL.fromStrict ByteString
metadata
                               , framePayload :: ByteString
framePayload      = ByteString
pl
                               }

encodeSimpleCmd :: SimpleCmd -> CL.ByteString
encodeSimpleCmd :: SimpleCmd -> ByteString
encodeSimpleCmd (SimpleCommand ts :: Int32
ts cs :: Int32
cs msg :: ByteString
msg) =
  let totalSize :: ByteString
totalSize   = Put -> ByteString
B.runPut (Put -> ByteString) -> Put -> ByteString
forall a b. (a -> b) -> a -> b
$ Int32 -> Put
B.putInt32be Int32
ts
      commandSize :: ByteString
commandSize = Put -> ByteString
B.runPut (Put -> ByteString) -> Put -> ByteString
forall a b. (a -> b) -> a -> b
$ Int32 -> Put
B.putInt32be Int32
cs
  in  ByteString
totalSize ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
commandSize ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
msg

encodeFrame :: Frame -> CL.ByteString
encodeFrame :: Frame -> ByteString
encodeFrame (SimpleFrame scmd :: SimpleCmd
scmd) = SimpleCmd -> ByteString
encodeSimpleCmd SimpleCmd
scmd
encodeFrame (PayloadFrame scmd :: SimpleCmd
scmd (PayloadCommand cs :: Maybe CheckSum
cs mds :: Int32
mds md :: ByteString
md p :: ByteString
p)) =
  let simpleCmd :: ByteString
simpleCmd  = SimpleCmd -> ByteString
encodeSimpleCmd SimpleCmd
scmd
      metaSizeBS :: ByteString
metaSizeBS = Put -> ByteString
B.runPut (Put -> ByteString) -> Put -> ByteString
forall a b. (a -> b) -> a -> b
$ Int32 -> Put
B.putInt32be Int32
mds
      payloadCmd :: ByteString
payloadCmd = Maybe CheckSum -> ByteString
encodeOptionalFields Maybe CheckSum
cs ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
metaSizeBS ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
md ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
p
  in  ByteString
simpleCmd ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
payloadCmd

-- If a magic number is present, a CRC32-C checksum of everything that comes after it (4 bytes) should follow
encodeOptionalFields :: Maybe CheckSum -> CL.ByteString
encodeOptionalFields :: Maybe CheckSum -> ByteString
encodeOptionalFields (Just (CheckSum cs :: Word32
cs)) =
  let magicNumber :: ByteString
magicNumber = Put -> ByteString
B.runPut (Put -> ByteString) -> Put -> ByteString
forall a b. (a -> b) -> a -> b
$ Word16 -> Put
B.putWord16be Word16
frameMagicNumber
      crc32cSum :: ByteString
crc32cSum   = Put -> ByteString
B.runPut (Put -> ByteString) -> Put -> ByteString
forall a b. (a -> b) -> a -> b
$ Word32 -> Put
B.putWord32be Word32
cs
  in  ByteString
magicNumber ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
crc32cSum
encodeOptionalFields Nothing = ByteString
CL.empty

encodeBaseCommand
  :: Maybe MessageMetadata -> Maybe Payload -> BaseCommand -> CL.ByteString
encodeBaseCommand :: Maybe MessageMetadata -> Maybe Payload -> BaseCommand -> ByteString
encodeBaseCommand (Just meta :: MessageMetadata
meta) p :: Maybe Payload
p cmd :: BaseCommand
cmd =
  let pl :: Payload
pl = Payload -> Maybe Payload -> Payload
forall a. a -> Maybe a -> a
fromMaybe (ByteString -> Payload
Payload ByteString
CL.empty) Maybe Payload
p
  in  Frame -> ByteString
encodeFrame (Frame -> ByteString)
-> ((SimpleCmd, PayloadCmd) -> Frame)
-> (SimpleCmd, PayloadCmd)
-> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (SimpleCmd -> PayloadCmd -> Frame)
-> (SimpleCmd, PayloadCmd) -> Frame
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry SimpleCmd -> PayloadCmd -> Frame
PayloadFrame ((SimpleCmd, PayloadCmd) -> ByteString)
-> (SimpleCmd, PayloadCmd) -> ByteString
forall a b. (a -> b) -> a -> b
$ BaseCommand
-> MessageMetadata -> Payload -> (SimpleCmd, PayloadCmd)
mkPayloadCommand BaseCommand
cmd MessageMetadata
meta Payload
pl
encodeBaseCommand Nothing _ cmd :: BaseCommand
cmd =
  Frame -> ByteString
encodeFrame (Frame -> ByteString)
-> (BaseCommand -> Frame) -> BaseCommand -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SimpleCmd -> Frame
SimpleFrame (SimpleCmd -> Frame)
-> (BaseCommand -> SimpleCmd) -> BaseCommand -> Frame
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int32 -> BaseCommand -> SimpleCmd
mkSimpleCommand 4 (BaseCommand -> ByteString) -> BaseCommand -> ByteString
forall a b. (a -> b) -> a -> b
$ BaseCommand
cmd