{-# LANGUAGE FlexibleInstances, LambdaCase, OverloadedStrings #-}

{- A decoder that understands the Pulsar protocol, as specified at: http://pulsar.apache.org/docs/en/develop-binary-protocol -}
module Pulsar.Protocol.Decoder
  ( decodeBaseCommand
  , dropPayloadGarbage
  )
where

import           Control.Monad                  ( unless )
import qualified Data.Binary.Get               as B
import qualified Data.Binary.Put               as B
import qualified Data.ByteString.Lazy.Char8    as CL
import           Data.Digest.CRC32C             ( crc32c )
import           Data.Bifunctor                 ( bimap )
import           Data.Int                       ( Int32 )
import qualified Data.ProtoLens.Encoding       as PL
import           Pulsar.Protocol.Frame

{-
 - These 5 bytes are part of a total of 8 bytes sent as the payload's prefix from the Java client.
 - Apparently that's how Google's FlatBuffers serialize data: https://google.github.io/flatbuffers/
 -
 - Source: https://github.com/apache/pulsar/blob/master/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java#L22
 -
 - More info on the Ascii spec: https://www.december.com/html/spec/ascii.html. Maybe this could be helpful: https://hackage.haskell.org/package/flatbuffers
 -}
dropPayloadGarbage :: CL.ByteString -> CL.ByteString
dropPayloadGarbage :: ByteString -> ByteString
dropPayloadGarbage bs :: ByteString
bs =
  ByteString
-> (ByteString -> ByteString) -> Maybe ByteString -> ByteString
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ByteString
bs (Int64 -> ByteString -> ByteString
CL.drop 3) (ByteString -> ByteString -> Maybe ByteString
CL.stripPrefix "\NUL\NUL\NUL\EOT\CAN" ByteString
bs)

parseFrame :: B.Get Frame
parseFrame :: Get Frame
parseFrame = do
  Int32
ts <- Get Int32
B.getInt32be
  Int32
cs <- Get Int32
B.getInt32be
  ByteString
ms <- Int64 -> Get ByteString
B.getLazyByteString (Int32 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
cs)
  let simpleCmd :: SimpleCmd
simpleCmd = Int32 -> Int32 -> ByteString -> SimpleCmd
SimpleCommand Int32
ts Int32
cs ByteString
ms
  Get Bool
B.isEmpty Get Bool -> (Bool -> Get Frame) -> Get Frame
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    True  -> Frame -> Get Frame
forall (m :: * -> *) a. Monad m => a -> m a
return (Frame -> Get Frame) -> Frame -> Get Frame
forall a b. (a -> b) -> a -> b
$ SimpleCmd -> Frame
SimpleFrame SimpleCmd
simpleCmd
    False -> Int32 -> Int32 -> SimpleCmd -> Get Frame
parsePayload Int32
ts Int32
cs SimpleCmd
simpleCmd

validateCheckSum :: Frame -> B.Get Frame
validateCheckSum :: Frame -> Get Frame
validateCheckSum (PayloadFrame sc :: SimpleCmd
sc (PayloadCommand cs :: Word32
cs ms :: Int32
ms md :: ByteString
md pl :: ByteString
pl)) =
  let
    metaSize :: ByteString
metaSize = ByteString -> ByteString
CL.toStrict (Put -> ByteString
B.runPut (Put -> ByteString) -> Put -> ByteString
forall a b. (a -> b) -> a -> b
$ Int32 -> Put
B.putInt32be Int32
ms)
    metadata :: ByteString
metadata = ByteString -> ByteString
CL.toStrict ByteString
md
    payload :: ByteString
payload  = ByteString -> ByteString
CL.toStrict ByteString
pl
    checksum :: Word32
checksum = ByteString -> Word32
crc32c (ByteString -> Word32) -> ByteString -> Word32
forall a b. (a -> b) -> a -> b
$ ByteString
metaSize ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
metadata ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
payload
    frame :: Frame
frame    = SimpleCmd -> PayloadCmd -> Frame
PayloadFrame SimpleCmd
sc (Word32 -> Int32 -> ByteString -> ByteString -> PayloadCmd
PayloadCommand Word32
cs Int32
ms ByteString
md (ByteString -> ByteString
dropPayloadGarbage ByteString
pl))
  in
    if Word32
checksum Word32 -> Word32 -> Bool
forall a. Eq a => a -> a -> Bool
== Word32
cs then Frame -> Get Frame
forall (m :: * -> *) a. Monad m => a -> m a
return (Frame -> Get Frame) -> Frame -> Get Frame
forall a b. (a -> b) -> a -> b
$! Frame
frame else String -> Get Frame
forall (m :: * -> *) a. MonadFail m => String -> m a
fail "Invalid checksum"
validateCheckSum x :: Frame
x = Frame -> Get Frame
forall (m :: * -> *) a. Monad m => a -> m a
return (Frame -> Get Frame) -> Frame -> Get Frame
forall a b. (a -> b) -> a -> b
$! Frame
x

parsePayload :: Int32 -> Int32 -> SimpleCmd -> B.Get Frame
parsePayload :: Int32 -> Int32 -> SimpleCmd -> Get Frame
parsePayload ts :: Int32
ts cs :: Int32
cs simpleCmd :: SimpleCmd
simpleCmd = do
  Word16
mn <- Get Word16
B.getWord16be
  Bool -> Get () -> Get ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Word16
mn Word16 -> Word16 -> Bool
forall a. Eq a => a -> a -> Bool
== Word16
frameMagicNumber) (Get () -> Get ()) -> Get () -> Get ()
forall a b. (a -> b) -> a -> b
$ String -> Get ()
forall (m :: * -> *) a. MonadFail m => String -> m a
fail ("Invalid magic number: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Word16 -> String
forall a. Show a => a -> String
show Word16
mn)
  Word32
cm <- Get Word32
B.getWord32be
  Int32
ms <- Get Int32
B.getInt32be
  ByteString
md <- Int64 -> Get ByteString
B.getLazyByteString (Int64 -> Get ByteString)
-> (Int32 -> Int64) -> Int32 -> Get ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int32 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int32 -> Get ByteString) -> Int32 -> Get ByteString
forall a b. (a -> b) -> a -> b
$ Int32
ms
  -- 14 remaining bytes = 4 (command size field) + 2 (magic number) + 4 (checksum) + 4 (metadata size field)
  ByteString
pl <- Int32 -> Get ByteString
forall a. Integral a => a -> Get ByteString
payload (Int32 -> Get ByteString) -> Int32 -> Get ByteString
forall a b. (a -> b) -> a -> b
$ Int32
ts Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
- (14 Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
cs Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
ms)
  let payloadCmd :: PayloadCmd
payloadCmd = Word32 -> Int32 -> ByteString -> ByteString -> PayloadCmd
PayloadCommand Word32
cm Int32
ms ByteString
md ByteString
pl
  Frame -> Get Frame
validateCheckSum (SimpleCmd -> PayloadCmd -> Frame
PayloadFrame SimpleCmd
simpleCmd PayloadCmd
payloadCmd)
 where
  payload :: a -> Get ByteString
payload rms :: a
rms | a
rms a -> a -> Bool
forall a. Ord a => a -> a -> Bool
> 0   = Int64 -> Get ByteString
B.getLazyByteString (Int64 -> Get ByteString) -> Int64 -> Get ByteString
forall a b. (a -> b) -> a -> b
$ a -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral a
rms
              | Bool
otherwise = ByteString -> Get ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
CL.empty

decodeFrame :: CL.ByteString -> Either String Frame
decodeFrame :: ByteString -> Either String Frame
decodeFrame =
  ((ByteString, Int64, String) -> String)
-> ((ByteString, Int64, Frame) -> Frame)
-> Either (ByteString, Int64, String) (ByteString, Int64, Frame)
-> Either String Frame
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap (\(_, _, e :: String
e) -> String
e) (\(_, _, f :: Frame
f) -> Frame
f) (Either (ByteString, Int64, String) (ByteString, Int64, Frame)
 -> Either String Frame)
-> (ByteString
    -> Either (ByteString, Int64, String) (ByteString, Int64, Frame))
-> ByteString
-> Either String Frame
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Get Frame
-> ByteString
-> Either (ByteString, Int64, String) (ByteString, Int64, Frame)
forall a.
Get a
-> ByteString
-> Either (ByteString, Int64, String) (ByteString, Int64, a)
B.runGetOrFail Get Frame
parseFrame

decodeBaseCommand :: CL.ByteString -> Either String Response
decodeBaseCommand :: ByteString -> Either String Response
decodeBaseCommand bytes :: ByteString
bytes = ByteString -> Either String Frame
decodeFrame ByteString
bytes Either String Frame
-> (Frame -> Either String Response) -> Either String Response
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
  SimpleFrame s :: SimpleCmd
s -> do
    BaseCommand
cmd <- ByteString -> Either String BaseCommand
forall msg. Message msg => ByteString -> Either String msg
PL.decodeMessage (ByteString -> ByteString
CL.toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ SimpleCmd -> ByteString
frameMessage SimpleCmd
s)
    Response -> Either String Response
forall (m :: * -> *) a. Monad m => a -> m a
return (Response -> Either String Response)
-> Response -> Either String Response
forall a b. (a -> b) -> a -> b
$ BaseCommand -> Response
SimpleResponse BaseCommand
cmd
  PayloadFrame s :: SimpleCmd
s (PayloadCommand _ _ md :: ByteString
md pl :: ByteString
pl) -> do
    BaseCommand
cmd  <- ByteString -> Either String BaseCommand
forall msg. Message msg => ByteString -> Either String msg
PL.decodeMessage (ByteString -> Either String BaseCommand)
-> (ByteString -> ByteString)
-> ByteString
-> Either String BaseCommand
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
CL.toStrict (ByteString -> Either String BaseCommand)
-> ByteString -> Either String BaseCommand
forall a b. (a -> b) -> a -> b
$ SimpleCmd -> ByteString
frameMessage SimpleCmd
s
    MessageMetadata
meta <- ByteString -> Either String MessageMetadata
forall msg. Message msg => ByteString -> Either String msg
PL.decodeMessage (ByteString -> Either String MessageMetadata)
-> (ByteString -> ByteString)
-> ByteString
-> Either String MessageMetadata
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
CL.toStrict (ByteString -> Either String MessageMetadata)
-> ByteString -> Either String MessageMetadata
forall a b. (a -> b) -> a -> b
$ ByteString
md
    Response -> Either String Response
forall (m :: * -> *) a. Monad m => a -> m a
return (Response -> Either String Response)
-> Response -> Either String Response
forall a b. (a -> b) -> a -> b
$ BaseCommand -> MessageMetadata -> Maybe Payload -> Response
PayloadResponse BaseCommand
cmd MessageMetadata
meta (ByteString -> Maybe Payload
payload ByteString
pl)
   where
    payload :: ByteString -> Maybe Payload
payload p :: ByteString
p | ByteString -> Bool
CL.null ByteString
p = Maybe Payload
forall a. Maybe a
Nothing
              | Bool
otherwise = Payload -> Maybe Payload
forall a. a -> Maybe a
Just (Payload -> Maybe Payload) -> Payload -> Maybe Payload
forall a b. (a -> b) -> a -> b
$ ByteString -> Payload
Payload ByteString
p