{-# LANGUAGE CPP               #-}
{-# LANGUAGE DeriveTraversable #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE NamedFieldPuns    #-}

{-|
Module      : AWS.Lambda.Events.Kafka
Description : Data types for consuming Kafka events.
License     : BSD3
Stability   : stable

It is possible to subscribe Lambda functions to Kafka topics. You can
subscribe to topics from Amazon Managed Streaming for Kafka (MSK) as
well as self-managed Kafka clusters.

Lambda considers Amazon Managed Streaming for Kafka (MSK) to be a
different event source type from a self-managed Apache Kafka cluster,
but their payloads are very similar. The types in this module are
derived from inspecting invocation payloads, and from reading the
following links:

  * https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
  * https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
  * https://aws.amazon.com/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/
  * https://aws.amazon.com/blogs/compute/using-self-hosted-apache-kafka-as-an-event-source-for-aws-lambda/

-}

module AWS.Lambda.Events.Kafka (
  KafkaEvent(..),
  EventSource(..),
  Record,
  Record'(..),
  Header (..),
  Timestamp(..),
  -- * Internal
  parseTimestamp,
  unparseTimestamp,
  int64ToUTCTime,
  utcTimeToInt64
) where

import           Data.Aeson
import           Data.Aeson.Encoding    (text)
import           Data.Aeson.Types
import           Data.ByteString        (ByteString)
import qualified Data.ByteString        as B
import qualified Data.ByteString.Base64 as B64
import           Data.List.NonEmpty     (NonEmpty)
import qualified Data.List.NonEmpty     as NE
import           Data.Function          ((&))
import           Data.Int               (Int32, Int64)
import           Data.Map               (Map)
import           Data.Maybe             (catMaybes, maybeToList)
import           Data.Scientific        (toBoundedInteger)
#if !MIN_VERSION_base(4,11,0)
import           Data.Semigroup         ((<>))
#endif
import           Data.Text              (Text)
import qualified Data.Text              as T
import qualified Data.Text.Encoding     as TE
import           Data.Time              (UTCTime)
import           Data.Time.Clock.POSIX  (posixSecondsToUTCTime,
                                         utcTimeToPOSIXSeconds)
import           GHC.Generics           (Generic)

#if MIN_VERSION_aeson(2,0,0)
import qualified Data.Aeson.Key         as Key
import qualified Data.Aeson.KeyMap      as KM

keyToText :: Key -> Text
keyToText :: Key -> Text
keyToText = Key -> Text
Key.toText

keyFromText :: Text -> Key
keyFromText :: Text -> Key
keyFromText = Text -> Key
Key.fromText
#else
import qualified Data.HashMap.Strict    as KM
type Key = Text

keyToText :: Key -> Text
keyToText = id

keyFromText :: Text -> Key
keyFromText = id
#endif

-- | Represents an event from either Amazon MSK or a self-managed
-- Apache Kafka cluster, as the payloads are very similar.
--
-- The 'ToJSON' and 'FromJSON' instances on 'Record' perform base64
-- conversion for you.
--
-- See the <https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html AWS documentation>
-- for a sample payload.
data KafkaEvent = KafkaEvent {
  KafkaEvent -> EventSource
eventSource      :: !EventSource,
  -- | Only present when using the "Amazon MSK" event source mapping.
  KafkaEvent -> Maybe Text
eventSourceArn   :: !(Maybe Text),
  KafkaEvent -> NonEmpty Text
bootstrapServers :: !(NonEmpty Text),
  -- | The map's keys look like @"${topicName}-${partitionNumber}"@
  KafkaEvent -> Map Text [Record' ByteString]
records          :: !(Map Text [Record])
} deriving (KafkaEvent -> KafkaEvent -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: KafkaEvent -> KafkaEvent -> Bool
$c/= :: KafkaEvent -> KafkaEvent -> Bool
== :: KafkaEvent -> KafkaEvent -> Bool
$c== :: KafkaEvent -> KafkaEvent -> Bool
Eq, Int -> KafkaEvent -> ShowS
[KafkaEvent] -> ShowS
KafkaEvent -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [KafkaEvent] -> ShowS
$cshowList :: [KafkaEvent] -> ShowS
show :: KafkaEvent -> String
$cshow :: KafkaEvent -> String
showsPrec :: Int -> KafkaEvent -> ShowS
$cshowsPrec :: Int -> KafkaEvent -> ShowS
Show, forall x. Rep KafkaEvent x -> KafkaEvent
forall x. KafkaEvent -> Rep KafkaEvent x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep KafkaEvent x -> KafkaEvent
$cfrom :: forall x. KafkaEvent -> Rep KafkaEvent x
Generic)

instance FromJSON KafkaEvent where
  parseJSON :: Value -> Parser KafkaEvent
parseJSON = forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"KafkaEvent" forall a b. (a -> b) -> a -> b
$ \Object
o ->
    let parseBootstrapServers :: Text -> Parser (NonEmpty Text)
parseBootstrapServers
          = forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"bootstrapServers: empty string") forall (f :: * -> *) a. Applicative f => a -> f a
pure
          forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty
          forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text -> [Text]
T.splitOn Text
","
    in EventSource
-> Maybe Text
-> NonEmpty Text
-> Map Text [Record' ByteString]
-> KafkaEvent
KafkaEvent
         forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
o forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"eventSource"
         forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
o forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:! Key
"eventSourceArn"
         forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (Object
o forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"bootstrapServers" forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Text -> Parser (NonEmpty Text)
parseBootstrapServers)
         forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
o forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"records"

instance ToJSON KafkaEvent where
  toJSON :: KafkaEvent -> Value
toJSON KafkaEvent
e = [Pair] -> Value
object forall a b. (a -> b) -> a -> b
$
    [ Key
"eventSource" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= KafkaEvent -> EventSource
eventSource KafkaEvent
e
    , Key
"bootstrapServers" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Text -> [Text] -> Text
T.intercalate Text
"," (forall a. NonEmpty a -> [a]
NE.toList (KafkaEvent -> NonEmpty Text
bootstrapServers KafkaEvent
e))
    , Key
"records" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= KafkaEvent -> Map Text [Record' ByteString]
records KafkaEvent
e
    ] forall a. Semigroup a => a -> a -> a
<> forall a. Maybe a -> [a]
maybeToList ((Key
"eventSourceArn" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.=) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> KafkaEvent -> Maybe Text
eventSourceArn KafkaEvent
e)

  toEncoding :: KafkaEvent -> Encoding
toEncoding KafkaEvent
e = Series -> Encoding
pairs forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Monoid a => [a] -> a
mconcat forall a b. (a -> b) -> a -> b
$
    [ Key
"eventSource" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= KafkaEvent -> EventSource
eventSource KafkaEvent
e
    , Key
"bootstrapServers" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Text -> [Text] -> Text
T.intercalate Text
"," (forall a. NonEmpty a -> [a]
NE.toList (KafkaEvent -> NonEmpty Text
bootstrapServers KafkaEvent
e))
    , Key
"records" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= KafkaEvent -> Map Text [Record' ByteString]
records KafkaEvent
e
    ] forall a. Semigroup a => a -> a -> a
<> forall a. Maybe a -> [a]
maybeToList ((Key
"eventSourceArn" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.=) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> KafkaEvent -> Maybe Text
eventSourceArn KafkaEvent
e)

data EventSource
  = AwsKafka -- ^ @"aws:kafka"@
  | SelfManagedKafka -- ^ @"SelfManagedKafka"@
  deriving (EventSource -> EventSource -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: EventSource -> EventSource -> Bool
$c/= :: EventSource -> EventSource -> Bool
== :: EventSource -> EventSource -> Bool
$c== :: EventSource -> EventSource -> Bool
Eq, Int -> EventSource -> ShowS
[EventSource] -> ShowS
EventSource -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [EventSource] -> ShowS
$cshowList :: [EventSource] -> ShowS
show :: EventSource -> String
$cshow :: EventSource -> String
showsPrec :: Int -> EventSource -> ShowS
$cshowsPrec :: Int -> EventSource -> ShowS
Show, EventSource
forall a. a -> a -> Bounded a
maxBound :: EventSource
$cmaxBound :: EventSource
minBound :: EventSource
$cminBound :: EventSource
Bounded, Int -> EventSource
EventSource -> Int
EventSource -> [EventSource]
EventSource -> EventSource
EventSource -> EventSource -> [EventSource]
EventSource -> EventSource -> EventSource -> [EventSource]
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
enumFromThenTo :: EventSource -> EventSource -> EventSource -> [EventSource]
$cenumFromThenTo :: EventSource -> EventSource -> EventSource -> [EventSource]
enumFromTo :: EventSource -> EventSource -> [EventSource]
$cenumFromTo :: EventSource -> EventSource -> [EventSource]
enumFromThen :: EventSource -> EventSource -> [EventSource]
$cenumFromThen :: EventSource -> EventSource -> [EventSource]
enumFrom :: EventSource -> [EventSource]
$cenumFrom :: EventSource -> [EventSource]
fromEnum :: EventSource -> Int
$cfromEnum :: EventSource -> Int
toEnum :: Int -> EventSource
$ctoEnum :: Int -> EventSource
pred :: EventSource -> EventSource
$cpred :: EventSource -> EventSource
succ :: EventSource -> EventSource
$csucc :: EventSource -> EventSource
Enum, Eq EventSource
EventSource -> EventSource -> Bool
EventSource -> EventSource -> Ordering
EventSource -> EventSource -> EventSource
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: EventSource -> EventSource -> EventSource
$cmin :: EventSource -> EventSource -> EventSource
max :: EventSource -> EventSource -> EventSource
$cmax :: EventSource -> EventSource -> EventSource
>= :: EventSource -> EventSource -> Bool
$c>= :: EventSource -> EventSource -> Bool
> :: EventSource -> EventSource -> Bool
$c> :: EventSource -> EventSource -> Bool
<= :: EventSource -> EventSource -> Bool
$c<= :: EventSource -> EventSource -> Bool
< :: EventSource -> EventSource -> Bool
$c< :: EventSource -> EventSource -> Bool
compare :: EventSource -> EventSource -> Ordering
$ccompare :: EventSource -> EventSource -> Ordering
Ord, forall x. Rep EventSource x -> EventSource
forall x. EventSource -> Rep EventSource x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep EventSource x -> EventSource
$cfrom :: forall x. EventSource -> Rep EventSource x
Generic)

instance FromJSON EventSource where
  parseJSON :: Value -> Parser EventSource
parseJSON = forall a. String -> (Text -> Parser a) -> Value -> Parser a
withText String
"EventSource" forall a b. (a -> b) -> a -> b
$ \case
    Text
"aws:kafka" -> forall (f :: * -> *) a. Applicative f => a -> f a
pure EventSource
AwsKafka
    Text
"SelfManagedKafka" -> forall (f :: * -> *) a. Applicative f => a -> f a
pure EventSource
SelfManagedKafka
    Text
t -> forall (m :: * -> *) a. MonadFail m => String -> m a
fail forall a b. (a -> b) -> a -> b
$ String
"unrecognised EventSource: \"" forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Text
t forall a. Semigroup a => a -> a -> a
<> String
"\""

instance ToJSON EventSource where
  toJSON :: EventSource -> Value
toJSON = \case
    EventSource
AwsKafka -> Text -> Value
String Text
"aws:kafka"
    EventSource
SelfManagedKafka -> Text -> Value
String Text
"SelfManagedKafka"

  toEncoding :: EventSource -> Encoding
toEncoding = forall a. Text -> Encoding' a
text forall b c a. (b -> c) -> (a -> b) -> a -> c
. \case
    EventSource
AwsKafka -> Text
"aws:kafka"
    EventSource
SelfManagedKafka -> Text
"SelfManagedKafka"

-- | Convenience alias: most of the time you will parse the records
-- straight into some app-specific structure.
type Record = Record' ByteString

-- | Records from a Kafka event. This is 'Traversable', which means
-- you can do things like parse a JSON-encoded payload:
--
-- @
-- 'traverse' 'decodeStrict' :: 'FromJSON' a => Record -> Maybe (Record' a)
-- @
data Record' a = Record {
  forall a. Record' a -> Text
topic :: !Text,
  forall a. Record' a -> Int32
partition :: !Int32,
  forall a. Record' a -> Int64
offset :: !Int64,
  forall a. Record' a -> Timestamp
timestamp :: !Timestamp,
  -- | NOTE: there can be multiple headers for a given key.
  forall a. Record' a -> [Header]
headers :: [Header],
  forall a. Record' a -> Maybe ByteString
key :: !(Maybe ByteString),
  forall a. Record' a -> Maybe a
value :: !(Maybe a)
} deriving (Record' a -> Record' a -> Bool
forall a. Eq a => Record' a -> Record' a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Record' a -> Record' a -> Bool
$c/= :: forall a. Eq a => Record' a -> Record' a -> Bool
== :: Record' a -> Record' a -> Bool
$c== :: forall a. Eq a => Record' a -> Record' a -> Bool
Eq, Int -> Record' a -> ShowS
forall a. Show a => Int -> Record' a -> ShowS
forall a. Show a => [Record' a] -> ShowS
forall a. Show a => Record' a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Record' a] -> ShowS
$cshowList :: forall a. Show a => [Record' a] -> ShowS
show :: Record' a -> String
$cshow :: forall a. Show a => Record' a -> String
showsPrec :: Int -> Record' a -> ShowS
$cshowsPrec :: forall a. Show a => Int -> Record' a -> ShowS
Show, forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall a x. Rep (Record' a) x -> Record' a
forall a x. Record' a -> Rep (Record' a) x
$cto :: forall a x. Rep (Record' a) x -> Record' a
$cfrom :: forall a x. Record' a -> Rep (Record' a) x
Generic, forall a b. a -> Record' b -> Record' a
forall a b. (a -> b) -> Record' a -> Record' b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> Record' b -> Record' a
$c<$ :: forall a b. a -> Record' b -> Record' a
fmap :: forall a b. (a -> b) -> Record' a -> Record' b
$cfmap :: forall a b. (a -> b) -> Record' a -> Record' b
Functor, forall a. Eq a => a -> Record' a -> Bool
forall a. Num a => Record' a -> a
forall a. Ord a => Record' a -> a
forall m. Monoid m => Record' m -> m
forall a. Record' a -> Bool
forall a. Record' a -> Int
forall a. Record' a -> [a]
forall a. (a -> a -> a) -> Record' a -> a
forall m a. Monoid m => (a -> m) -> Record' a -> m
forall b a. (b -> a -> b) -> b -> Record' a -> b
forall a b. (a -> b -> b) -> b -> Record' a -> b
forall (t :: * -> *).
(forall m. Monoid m => t m -> m)
-> (forall m a. Monoid m => (a -> m) -> t a -> m)
-> (forall m a. Monoid m => (a -> m) -> t a -> m)
-> (forall a b. (a -> b -> b) -> b -> t a -> b)
-> (forall a b. (a -> b -> b) -> b -> t a -> b)
-> (forall b a. (b -> a -> b) -> b -> t a -> b)
-> (forall b a. (b -> a -> b) -> b -> t a -> b)
-> (forall a. (a -> a -> a) -> t a -> a)
-> (forall a. (a -> a -> a) -> t a -> a)
-> (forall a. t a -> [a])
-> (forall a. t a -> Bool)
-> (forall a. t a -> Int)
-> (forall a. Eq a => a -> t a -> Bool)
-> (forall a. Ord a => t a -> a)
-> (forall a. Ord a => t a -> a)
-> (forall a. Num a => t a -> a)
-> (forall a. Num a => t a -> a)
-> Foldable t
product :: forall a. Num a => Record' a -> a
$cproduct :: forall a. Num a => Record' a -> a
sum :: forall a. Num a => Record' a -> a
$csum :: forall a. Num a => Record' a -> a
minimum :: forall a. Ord a => Record' a -> a
$cminimum :: forall a. Ord a => Record' a -> a
maximum :: forall a. Ord a => Record' a -> a
$cmaximum :: forall a. Ord a => Record' a -> a
elem :: forall a. Eq a => a -> Record' a -> Bool
$celem :: forall a. Eq a => a -> Record' a -> Bool
length :: forall a. Record' a -> Int
$clength :: forall a. Record' a -> Int
null :: forall a. Record' a -> Bool
$cnull :: forall a. Record' a -> Bool
toList :: forall a. Record' a -> [a]
$ctoList :: forall a. Record' a -> [a]
foldl1 :: forall a. (a -> a -> a) -> Record' a -> a
$cfoldl1 :: forall a. (a -> a -> a) -> Record' a -> a
foldr1 :: forall a. (a -> a -> a) -> Record' a -> a
$cfoldr1 :: forall a. (a -> a -> a) -> Record' a -> a
foldl' :: forall b a. (b -> a -> b) -> b -> Record' a -> b
$cfoldl' :: forall b a. (b -> a -> b) -> b -> Record' a -> b
foldl :: forall b a. (b -> a -> b) -> b -> Record' a -> b
$cfoldl :: forall b a. (b -> a -> b) -> b -> Record' a -> b
foldr' :: forall a b. (a -> b -> b) -> b -> Record' a -> b
$cfoldr' :: forall a b. (a -> b -> b) -> b -> Record' a -> b
foldr :: forall a b. (a -> b -> b) -> b -> Record' a -> b
$cfoldr :: forall a b. (a -> b -> b) -> b -> Record' a -> b
foldMap' :: forall m a. Monoid m => (a -> m) -> Record' a -> m
$cfoldMap' :: forall m a. Monoid m => (a -> m) -> Record' a -> m
foldMap :: forall m a. Monoid m => (a -> m) -> Record' a -> m
$cfoldMap :: forall m a. Monoid m => (a -> m) -> Record' a -> m
fold :: forall m. Monoid m => Record' m -> m
$cfold :: forall m. Monoid m => Record' m -> m
Foldable, Functor Record'
Foldable Record'
forall (t :: * -> *).
Functor t
-> Foldable t
-> (forall (f :: * -> *) a b.
    Applicative f =>
    (a -> f b) -> t a -> f (t b))
-> (forall (f :: * -> *) a. Applicative f => t (f a) -> f (t a))
-> (forall (m :: * -> *) a b.
    Monad m =>
    (a -> m b) -> t a -> m (t b))
-> (forall (m :: * -> *) a. Monad m => t (m a) -> m (t a))
-> Traversable t
forall (m :: * -> *) a. Monad m => Record' (m a) -> m (Record' a)
forall (f :: * -> *) a.
Applicative f =>
Record' (f a) -> f (Record' a)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Record' a -> m (Record' b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Record' a -> f (Record' b)
sequence :: forall (m :: * -> *) a. Monad m => Record' (m a) -> m (Record' a)
$csequence :: forall (m :: * -> *) a. Monad m => Record' (m a) -> m (Record' a)
mapM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Record' a -> m (Record' b)
$cmapM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Record' a -> m (Record' b)
sequenceA :: forall (f :: * -> *) a.
Applicative f =>
Record' (f a) -> f (Record' a)
$csequenceA :: forall (f :: * -> *) a.
Applicative f =>
Record' (f a) -> f (Record' a)
traverse :: forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Record' a -> f (Record' b)
$ctraverse :: forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Record' a -> f (Record' b)
Traversable)

-- | Decodes base64-encoded keys and values, where present.
instance FromJSON (Record' ByteString) where
  parseJSON :: Value -> Parser (Record' ByteString)
parseJSON = forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"Record" forall a b. (a -> b) -> a -> b
$ \Object
o -> do
    Text
topic <- Object
o forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"topic"
    Int32
partition <- Object
o forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"partition"
    Int64
offset <- Object
o forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"offset"
    Timestamp
timestamp <- Object -> Parser Timestamp
parseTimestamp Object
o
    [Header]
headers <- Object
o forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"headers"
    Maybe ByteString
key <- forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (ByteString -> ByteString
B64.decodeLenient forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
TE.encodeUtf8) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
o forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:? Key
"key"
    Maybe ByteString
value <- forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (ByteString -> ByteString
B64.decodeLenient forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
TE.encodeUtf8) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
o forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:? Key
"value"
    forall (f :: * -> *) a. Applicative f => a -> f a
pure Record { Text
topic :: Text
$sel:topic:Record :: Text
topic, Int32
partition :: Int32
$sel:partition:Record :: Int32
partition, Int64
offset :: Int64
$sel:offset:Record :: Int64
offset, Timestamp
timestamp :: Timestamp
$sel:timestamp:Record :: Timestamp
timestamp, [Header]
headers :: [Header]
$sel:headers:Record :: [Header]
headers, Maybe ByteString
key :: Maybe ByteString
$sel:key:Record :: Maybe ByteString
key, Maybe ByteString
value :: Maybe ByteString
$sel:value:Record :: Maybe ByteString
value }

-- | Encodes keys and values into base64.
instance ToJSON (Record' ByteString) where
  toJSON :: Record' ByteString -> Value
toJSON Record' ByteString
r = [Pair] -> Value
object forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat
    [
      [ Key
"offset" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Record' a -> Int64
offset Record' ByteString
r
      , Key
"partition" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Record' a -> Int32
partition Record' ByteString
r
      , Key
"topic" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Record' a -> Text
topic Record' ByteString
r
      , Key
"headers" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Record' a -> [Header]
headers Record' ByteString
r
      ]
    , forall kv. KeyValue kv => Timestamp -> [kv]
unparseTimestamp (forall a. Record' a -> Timestamp
timestamp Record' ByteString
r)
    , forall a. [Maybe a] -> [a]
catMaybes
      [ (Key
"key" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.=) forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
TE.decodeUtf8 forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
B64.encode forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Record' a -> Maybe ByteString
key Record' ByteString
r
      , (Key
"value" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.=) forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
TE.decodeUtf8 forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
B64.encode forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Record' a -> Maybe a
value Record' ByteString
r
      ]
    ]

  toEncoding :: Record' ByteString -> Encoding
toEncoding Record' ByteString
r = Series -> Encoding
pairs forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Monoid a => [a] -> a
mconcat forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat forall a b. (a -> b) -> a -> b
$
    [
      [ Key
"offset" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Record' a -> Int64
offset Record' ByteString
r
      , Key
"partition" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Record' a -> Int32
partition Record' ByteString
r
      , Key
"topic" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Record' a -> Text
topic Record' ByteString
r
      , Key
"headers" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= forall a. Record' a -> [Header]
headers Record' ByteString
r
      ]
    , forall kv. KeyValue kv => Timestamp -> [kv]
unparseTimestamp (forall a. Record' a -> Timestamp
timestamp Record' ByteString
r)
    , forall a. [Maybe a] -> [a]
catMaybes
      [ (Key
"key" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.=) forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
TE.decodeUtf8 forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
B64.encode forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Record' a -> Maybe ByteString
key Record' ByteString
r
      , (Key
"value" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.=) forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
TE.decodeUtf8 forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
B64.encode forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Record' a -> Maybe a
value Record' ByteString
r
      ]
    ]

-- | AWS serialises record headers to JSON as an array of
-- objects. From their docs:
--
-- @
-- "headers":[{"headerKey":[104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
-- @
--
-- Note:
--
-- >>> map chr [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
-- "headerValue"
data Header = Header !Text !ByteString deriving (Header -> Header -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Header -> Header -> Bool
$c/= :: Header -> Header -> Bool
== :: Header -> Header -> Bool
$c== :: Header -> Header -> Bool
Eq, Int -> Header -> ShowS
[Header] -> ShowS
Header -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Header] -> ShowS
$cshowList :: [Header] -> ShowS
show :: Header -> String
$cshow :: Header -> String
showsPrec :: Int -> Header -> ShowS
$cshowsPrec :: Int -> Header -> ShowS
Show, forall x. Rep Header x -> Header
forall x. Header -> Rep Header x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep Header x -> Header
$cfrom :: forall x. Header -> Rep Header x
Generic)

instance FromJSON Header where
  parseJSON :: Value -> Parser Header
parseJSON = forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"header" forall a b. (a -> b) -> a -> b
$ \Object
o ->
    case forall v. KeyMap v -> [(Key, v)]
KM.toList Object
o of
      [(Key
key, Value
value)] -> Text -> ByteString -> Header
Header (Key -> Text
keyToText Key
key) forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Word8] -> ByteString
B.pack forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromJSON a => Value -> Parser a
parseJSON Value
value
      [] -> forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Unexpected empty object"
      [Pair]
_ -> forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Unexpected additional keys in object"

instance ToJSON Header where
  toJSON :: Header -> Value
toJSON (Header Text
key ByteString
value) = [Pair] -> Value
object [Text -> Key
keyFromText Text
key forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= ByteString -> [Word8]
B.unpack ByteString
value]
  toEncoding :: Header -> Encoding
toEncoding (Header Text
key ByteString
value) = Series -> Encoding
pairs forall a b. (a -> b) -> a -> b
$ Text -> Key
keyFromText Text
key forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= ByteString -> [Word8]
B.unpack ByteString
value

-- | Kafka timestamp types, derived from the Java client's enum at:
-- https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
data Timestamp = NoTimestampType | CreateTime !UTCTime | LogAppendTime !UTCTime
  deriving (Timestamp -> Timestamp -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Timestamp -> Timestamp -> Bool
$c/= :: Timestamp -> Timestamp -> Bool
== :: Timestamp -> Timestamp -> Bool
$c== :: Timestamp -> Timestamp -> Bool
Eq, Int -> Timestamp -> ShowS
[Timestamp] -> ShowS
Timestamp -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Timestamp] -> ShowS
$cshowList :: [Timestamp] -> ShowS
show :: Timestamp -> String
$cshow :: Timestamp -> String
showsPrec :: Int -> Timestamp -> ShowS
$cshowsPrec :: Int -> Timestamp -> ShowS
Show, forall x. Rep Timestamp x -> Timestamp
forall x. Timestamp -> Rep Timestamp x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep Timestamp x -> Timestamp
$cfrom :: forall x. Timestamp -> Rep Timestamp x
Generic)

parseTimestamp :: Object -> Parser Timestamp
parseTimestamp :: Object -> Parser Timestamp
parseTimestamp Object
o = forall a. (Value -> Parser a) -> Object -> Key -> Parser a
explicitParseField Value -> Parser Timestamp
parseSubtype Object
o Key
"timestampType"
  where
    parseSubtype :: Value -> Parser Timestamp
parseSubtype = forall a. String -> (Text -> Parser a) -> Value -> Parser a
withText String
"timestampType" forall a b. (a -> b) -> a -> b
$ \case
      Text
"NO_TIMESTAMP_TYPE" -> forall (f :: * -> *) a. Applicative f => a -> f a
pure Timestamp
NoTimestampType
      Text
"CREATE_TIME" -> UTCTime -> Timestamp
CreateTime forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Parser UTCTime
parseUTCTimestamp
      Text
"LOG_APPEND_TIME" -> UTCTime -> Timestamp
LogAppendTime forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Parser UTCTime
parseUTCTimestamp
      Text
t -> forall (m :: * -> *) a. MonadFail m => String -> m a
fail forall a b. (a -> b) -> a -> b
$ String
"unknown timestampType: \"" forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Text
t forall a. Semigroup a => a -> a -> a
<> String
"\""

    parseUTCTimestamp :: Parser UTCTime
parseUTCTimestamp = forall a. (Value -> Parser a) -> Object -> Key -> Parser a
explicitParseField Value -> Parser UTCTime
convertToUTCTime Object
o Key
"timestamp"

    convertToUTCTime :: Value -> Parser UTCTime
convertToUTCTime = forall a. String -> (Scientific -> Parser a) -> Value -> Parser a
withScientific String
"timestamp" forall a b. (a -> b) -> a -> b
$ \Scientific
stamp ->
      Int64 -> UTCTime
int64ToUTCTime forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (forall i. (Integral i, Bounded i) => Scientific -> Maybe i
toBoundedInteger Scientific
stamp forall a b. a -> (a -> b) -> b
&
        forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"timestamp out of range") forall (f :: * -> *) a. Applicative f => a -> f a
pure :: Parser Int64)

#if MIN_VERSION_aeson(2,2,0)
unparseTimestamp :: KeyValue e kv => Timestamp -> [kv]
#else
unparseTimestamp :: KeyValue kv => Timestamp -> [kv]
#endif
unparseTimestamp :: forall kv. KeyValue kv => Timestamp -> [kv]
unparseTimestamp = \case
  Timestamp
NoTimestampType -> [Key
"timestampType" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Text -> Value
String Text
"NO_TIMESTAMP_TYPE"]
  CreateTime UTCTime
ts ->
    [ Key
"timestampType" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Text -> Value
String Text
"CREATE_TIME"
    , Key
"timestamp" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= UTCTime -> Int64
utcTimeToInt64 UTCTime
ts
    ]
  LogAppendTime UTCTime
ts ->
    [ Key
"timestampType" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Text -> Value
String Text
"LOG_APPEND_TIME"
    , Key
"timestamp" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= UTCTime -> Int64
utcTimeToInt64 UTCTime
ts
    ]

int64ToUTCTime :: Int64 -> UTCTime
int64ToUTCTime :: Int64 -> UTCTime
int64ToUTCTime Int64
int =
  let (Int64
seconds, Int64
millis) = Int64
int forall a. Integral a => a -> a -> (a, a)
`divMod` Int64
1000
      posixSeconds :: POSIXTime
posixSeconds = forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
seconds forall a. Num a => a -> a -> a
+ forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
millis forall a. Fractional a => a -> a -> a
/ POSIXTime
1000
  in POSIXTime -> UTCTime
posixSecondsToUTCTime POSIXTime
posixSeconds

utcTimeToInt64 :: UTCTime -> Int64
utcTimeToInt64 :: UTCTime -> Int64
utcTimeToInt64 UTCTime
utc = forall a b. (RealFrac a, Integral b) => a -> b
truncate forall a b. (a -> b) -> a -> b
$ UTCTime -> POSIXTime
utcTimeToPOSIXSeconds UTCTime
utc forall a. Num a => a -> a -> a
* POSIXTime
1000