{-# LANGUAGE CPP               #-}
{-# LANGUAGE DeriveTraversable #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE NamedFieldPuns    #-}

{-|
Module      : AWS.Lambda.Events.Kafka
Description : Data types for consuming Kafka events.
License     : BSD3
Stability   : stable

It is possible to subscribe Lambda functions to Kafka topics. You can
subscribe to topics from Amazon Managed Streaming for Kafka (MSK) as
well as self-managed Kafka clusters.

Lambda considers Amazon Managed Streaming for Kafka (MSK) to be a
different event source type from a self-managed Apache Kafka cluster,
but their payloads are very similar. The types in this module are
derived from inspecting invocation payloads, and from reading the
following links:

  * https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
  * https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
  * https://aws.amazon.com/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/
  * https://aws.amazon.com/blogs/compute/using-self-hosted-apache-kafka-as-an-event-source-for-aws-lambda/

-}

module AWS.Lambda.Events.Kafka (
  KafkaEvent(..),
  EventSource(..),
  Record,
  Record'(..),
  Header (..),
  Timestamp(..),
  -- * Internal
  parseTimestamp,
  unparseTimestamp,
  int64ToUTCTime,
  utcTimeToInt64
) where

import           Data.Aeson
import           Data.Aeson.Encoding    (text)
import           Data.Aeson.Types
import           Data.ByteString        (ByteString)
import qualified Data.ByteString        as B
import qualified Data.ByteString.Base64 as B64
import           Data.List.NonEmpty     (NonEmpty)
import qualified Data.List.NonEmpty     as NE
import           Data.Function          ((&))
import           Data.Int               (Int32, Int64)
import           Data.Map               (Map)
import           Data.Maybe             (catMaybes, maybeToList)
import           Data.Scientific        (toBoundedInteger)
#if !MIN_VERSION_base(4,11,0)
import           Data.Semigroup         ((<>))
#endif
import           Data.Text              (Text)
import qualified Data.Text              as T
import qualified Data.Text.Encoding     as TE
import           Data.Time              (UTCTime)
import           Data.Time.Clock.POSIX  (posixSecondsToUTCTime,
                                         utcTimeToPOSIXSeconds)
import           GHC.Generics           (Generic)

#if MIN_VERSION_aeson(2,0,0)
import qualified Data.Aeson.Key         as Key
import qualified Data.Aeson.KeyMap      as KM

keyToText :: Key -> Text
keyToText :: Key -> Text
keyToText = Key -> Text
Key.toText

keyFromText :: Text -> Key
keyFromText :: Text -> Key
keyFromText = Text -> Key
Key.fromText
#else
import qualified Data.HashMap.Strict    as KM
type Key = Text

keyToText :: Key -> Text
keyToText = id

keyFromText :: Text -> Key
keyFromText = id
#endif

-- | Represents an event from either Amazon MSK or a self-managed
-- Apache Kafka cluster, as the payloads are very similar.
--
-- The 'ToJSON' and 'FromJSON' instances on 'Record' perform base64
-- conversion for you.
--
-- See the <https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html AWS documentation>
-- for a sample payload.
data KafkaEvent = KafkaEvent {
  KafkaEvent -> EventSource
eventSource      :: !EventSource,
  -- | Only present when using the "Amazon MSK" event source mapping.
  KafkaEvent -> Maybe Text
eventSourceArn   :: !(Maybe Text),
  KafkaEvent -> NonEmpty Text
bootstrapServers :: !(NonEmpty Text),
  -- | The map's keys look like @"${topicName}-${partitionNumber}"@
  KafkaEvent -> Map Text [Record]
records          :: !(Map Text [Record])
} deriving (KafkaEvent -> KafkaEvent -> Bool
(KafkaEvent -> KafkaEvent -> Bool)
-> (KafkaEvent -> KafkaEvent -> Bool) -> Eq KafkaEvent
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: KafkaEvent -> KafkaEvent -> Bool
$c/= :: KafkaEvent -> KafkaEvent -> Bool
== :: KafkaEvent -> KafkaEvent -> Bool
$c== :: KafkaEvent -> KafkaEvent -> Bool
Eq, Int -> KafkaEvent -> ShowS
[KafkaEvent] -> ShowS
KafkaEvent -> String
(Int -> KafkaEvent -> ShowS)
-> (KafkaEvent -> String)
-> ([KafkaEvent] -> ShowS)
-> Show KafkaEvent
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [KafkaEvent] -> ShowS
$cshowList :: [KafkaEvent] -> ShowS
show :: KafkaEvent -> String
$cshow :: KafkaEvent -> String
showsPrec :: Int -> KafkaEvent -> ShowS
$cshowsPrec :: Int -> KafkaEvent -> ShowS
Show, (forall x. KafkaEvent -> Rep KafkaEvent x)
-> (forall x. Rep KafkaEvent x -> KafkaEvent) -> Generic KafkaEvent
forall x. Rep KafkaEvent x -> KafkaEvent
forall x. KafkaEvent -> Rep KafkaEvent x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep KafkaEvent x -> KafkaEvent
$cfrom :: forall x. KafkaEvent -> Rep KafkaEvent x
Generic)

instance FromJSON KafkaEvent where
  parseJSON :: Value -> Parser KafkaEvent
parseJSON = String
-> (Object -> Parser KafkaEvent) -> Value -> Parser KafkaEvent
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"KafkaEvent" ((Object -> Parser KafkaEvent) -> Value -> Parser KafkaEvent)
-> (Object -> Parser KafkaEvent) -> Value -> Parser KafkaEvent
forall a b. (a -> b) -> a -> b
$ \Object
o ->
    let parseBootstrapServers :: Text -> Parser (NonEmpty Text)
parseBootstrapServers
          = Parser (NonEmpty Text)
-> (NonEmpty Text -> Parser (NonEmpty Text))
-> Maybe (NonEmpty Text)
-> Parser (NonEmpty Text)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (String -> Parser (NonEmpty Text)
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"bootstrapServers: empty string") NonEmpty Text -> Parser (NonEmpty Text)
forall (f :: * -> *) a. Applicative f => a -> f a
pure
          (Maybe (NonEmpty Text) -> Parser (NonEmpty Text))
-> (Text -> Maybe (NonEmpty Text))
-> Text
-> Parser (NonEmpty Text)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Text] -> Maybe (NonEmpty Text)
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty
          ([Text] -> Maybe (NonEmpty Text))
-> (Text -> [Text]) -> Text -> Maybe (NonEmpty Text)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> Text -> [Text]
T.splitOn Text
","
    in EventSource
-> Maybe Text -> NonEmpty Text -> Map Text [Record] -> KafkaEvent
KafkaEvent
         (EventSource
 -> Maybe Text -> NonEmpty Text -> Map Text [Record] -> KafkaEvent)
-> Parser EventSource
-> Parser
     (Maybe Text -> NonEmpty Text -> Map Text [Record] -> KafkaEvent)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
o Object -> Key -> Parser EventSource
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"eventSource"
         Parser
  (Maybe Text -> NonEmpty Text -> Map Text [Record] -> KafkaEvent)
-> Parser (Maybe Text)
-> Parser (NonEmpty Text -> Map Text [Record] -> KafkaEvent)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
o Object -> Key -> Parser (Maybe Text)
forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:! Key
"eventSourceArn"
         Parser (NonEmpty Text -> Map Text [Record] -> KafkaEvent)
-> Parser (NonEmpty Text)
-> Parser (Map Text [Record] -> KafkaEvent)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (Object
o Object -> Key -> Parser Text
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"bootstrapServers" Parser Text
-> (Text -> Parser (NonEmpty Text)) -> Parser (NonEmpty Text)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Text -> Parser (NonEmpty Text)
parseBootstrapServers)
         Parser (Map Text [Record] -> KafkaEvent)
-> Parser (Map Text [Record]) -> Parser KafkaEvent
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
o Object -> Key -> Parser (Map Text [Record])
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"records"

instance ToJSON KafkaEvent where
  toJSON :: KafkaEvent -> Value
toJSON KafkaEvent
e = [Pair] -> Value
object ([Pair] -> Value) -> [Pair] -> Value
forall a b. (a -> b) -> a -> b
$
    [ Key
"eventSource" Key -> EventSource -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= KafkaEvent -> EventSource
eventSource KafkaEvent
e
    , Key
"bootstrapServers" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Text -> [Text] -> Text
T.intercalate Text
"," (NonEmpty Text -> [Text]
forall a. NonEmpty a -> [a]
NE.toList (KafkaEvent -> NonEmpty Text
bootstrapServers KafkaEvent
e))
    , Key
"records" Key -> Map Text [Record] -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= KafkaEvent -> Map Text [Record]
records KafkaEvent
e
    ] [Pair] -> [Pair] -> [Pair]
forall a. Semigroup a => a -> a -> a
<> Maybe Pair -> [Pair]
forall a. Maybe a -> [a]
maybeToList ((Key
"eventSourceArn" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.=) (Text -> Pair) -> Maybe Text -> Maybe Pair
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> KafkaEvent -> Maybe Text
eventSourceArn KafkaEvent
e)

  toEncoding :: KafkaEvent -> Encoding
toEncoding KafkaEvent
e = Series -> Encoding
pairs (Series -> Encoding)
-> ([Series] -> Series) -> [Series] -> Encoding
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Series] -> Series
forall a. Monoid a => [a] -> a
mconcat ([Series] -> Encoding) -> [Series] -> Encoding
forall a b. (a -> b) -> a -> b
$
    [ Key
"eventSource" Key -> EventSource -> Series
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= KafkaEvent -> EventSource
eventSource KafkaEvent
e
    , Key
"bootstrapServers" Key -> Text -> Series
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Text -> [Text] -> Text
T.intercalate Text
"," (NonEmpty Text -> [Text]
forall a. NonEmpty a -> [a]
NE.toList (KafkaEvent -> NonEmpty Text
bootstrapServers KafkaEvent
e))
    , Key
"records" Key -> Map Text [Record] -> Series
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= KafkaEvent -> Map Text [Record]
records KafkaEvent
e
    ] [Series] -> [Series] -> [Series]
forall a. Semigroup a => a -> a -> a
<> Maybe Series -> [Series]
forall a. Maybe a -> [a]
maybeToList ((Key
"eventSourceArn" Key -> Text -> Series
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.=) (Text -> Series) -> Maybe Text -> Maybe Series
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> KafkaEvent -> Maybe Text
eventSourceArn KafkaEvent
e)

data EventSource
  = AwsKafka -- ^ @"aws:kafka"@
  | SelfManagedKafka -- ^ @"SelfManagedKafka"@
  deriving (EventSource -> EventSource -> Bool
(EventSource -> EventSource -> Bool)
-> (EventSource -> EventSource -> Bool) -> Eq EventSource
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: EventSource -> EventSource -> Bool
$c/= :: EventSource -> EventSource -> Bool
== :: EventSource -> EventSource -> Bool
$c== :: EventSource -> EventSource -> Bool
Eq, Int -> EventSource -> ShowS
[EventSource] -> ShowS
EventSource -> String
(Int -> EventSource -> ShowS)
-> (EventSource -> String)
-> ([EventSource] -> ShowS)
-> Show EventSource
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [EventSource] -> ShowS
$cshowList :: [EventSource] -> ShowS
show :: EventSource -> String
$cshow :: EventSource -> String
showsPrec :: Int -> EventSource -> ShowS
$cshowsPrec :: Int -> EventSource -> ShowS
Show, EventSource
EventSource -> EventSource -> Bounded EventSource
forall a. a -> a -> Bounded a
maxBound :: EventSource
$cmaxBound :: EventSource
minBound :: EventSource
$cminBound :: EventSource
Bounded, Int -> EventSource
EventSource -> Int
EventSource -> [EventSource]
EventSource -> EventSource
EventSource -> EventSource -> [EventSource]
EventSource -> EventSource -> EventSource -> [EventSource]
(EventSource -> EventSource)
-> (EventSource -> EventSource)
-> (Int -> EventSource)
-> (EventSource -> Int)
-> (EventSource -> [EventSource])
-> (EventSource -> EventSource -> [EventSource])
-> (EventSource -> EventSource -> [EventSource])
-> (EventSource -> EventSource -> EventSource -> [EventSource])
-> Enum EventSource
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
enumFromThenTo :: EventSource -> EventSource -> EventSource -> [EventSource]
$cenumFromThenTo :: EventSource -> EventSource -> EventSource -> [EventSource]
enumFromTo :: EventSource -> EventSource -> [EventSource]
$cenumFromTo :: EventSource -> EventSource -> [EventSource]
enumFromThen :: EventSource -> EventSource -> [EventSource]
$cenumFromThen :: EventSource -> EventSource -> [EventSource]
enumFrom :: EventSource -> [EventSource]
$cenumFrom :: EventSource -> [EventSource]
fromEnum :: EventSource -> Int
$cfromEnum :: EventSource -> Int
toEnum :: Int -> EventSource
$ctoEnum :: Int -> EventSource
pred :: EventSource -> EventSource
$cpred :: EventSource -> EventSource
succ :: EventSource -> EventSource
$csucc :: EventSource -> EventSource
Enum, Eq EventSource
Eq EventSource
-> (EventSource -> EventSource -> Ordering)
-> (EventSource -> EventSource -> Bool)
-> (EventSource -> EventSource -> Bool)
-> (EventSource -> EventSource -> Bool)
-> (EventSource -> EventSource -> Bool)
-> (EventSource -> EventSource -> EventSource)
-> (EventSource -> EventSource -> EventSource)
-> Ord EventSource
EventSource -> EventSource -> Bool
EventSource -> EventSource -> Ordering
EventSource -> EventSource -> EventSource
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: EventSource -> EventSource -> EventSource
$cmin :: EventSource -> EventSource -> EventSource
max :: EventSource -> EventSource -> EventSource
$cmax :: EventSource -> EventSource -> EventSource
>= :: EventSource -> EventSource -> Bool
$c>= :: EventSource -> EventSource -> Bool
> :: EventSource -> EventSource -> Bool
$c> :: EventSource -> EventSource -> Bool
<= :: EventSource -> EventSource -> Bool
$c<= :: EventSource -> EventSource -> Bool
< :: EventSource -> EventSource -> Bool
$c< :: EventSource -> EventSource -> Bool
compare :: EventSource -> EventSource -> Ordering
$ccompare :: EventSource -> EventSource -> Ordering
$cp1Ord :: Eq EventSource
Ord, (forall x. EventSource -> Rep EventSource x)
-> (forall x. Rep EventSource x -> EventSource)
-> Generic EventSource
forall x. Rep EventSource x -> EventSource
forall x. EventSource -> Rep EventSource x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep EventSource x -> EventSource
$cfrom :: forall x. EventSource -> Rep EventSource x
Generic)

instance FromJSON EventSource where
  parseJSON :: Value -> Parser EventSource
parseJSON = String
-> (Text -> Parser EventSource) -> Value -> Parser EventSource
forall a. String -> (Text -> Parser a) -> Value -> Parser a
withText String
"EventSource" ((Text -> Parser EventSource) -> Value -> Parser EventSource)
-> (Text -> Parser EventSource) -> Value -> Parser EventSource
forall a b. (a -> b) -> a -> b
$ \case
    Text
"aws:kafka" -> EventSource -> Parser EventSource
forall (f :: * -> *) a. Applicative f => a -> f a
pure EventSource
AwsKafka
    Text
"SelfManagedKafka" -> EventSource -> Parser EventSource
forall (f :: * -> *) a. Applicative f => a -> f a
pure EventSource
SelfManagedKafka
    Text
t -> String -> Parser EventSource
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Parser EventSource) -> String -> Parser EventSource
forall a b. (a -> b) -> a -> b
$ String
"unrecognised EventSource: \"" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Text -> String
forall a. Show a => a -> String
show Text
t String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"\""

instance ToJSON EventSource where
  toJSON :: EventSource -> Value
toJSON = \case
    EventSource
AwsKafka -> Text -> Value
String Text
"aws:kafka"
    EventSource
SelfManagedKafka -> Text -> Value
String Text
"SelfManagedKafka"

  toEncoding :: EventSource -> Encoding
toEncoding = Text -> Encoding
forall a. Text -> Encoding' a
text (Text -> Encoding)
-> (EventSource -> Text) -> EventSource -> Encoding
forall b c a. (b -> c) -> (a -> b) -> a -> c
. \case
    EventSource
AwsKafka -> Text
"aws:kafka"
    EventSource
SelfManagedKafka -> Text
"SelfManagedKafka"

-- | Convenience alias: most of the time you will parse the records
-- straight into some app-specific structure.
type Record = Record' ByteString

-- | Records from a Kafka event. This is 'Traversable', which means
-- you can do things like parse a JSON-encoded payload:
--
-- @
-- 'traverse' 'decodeStrict' :: 'FromJSON' a => Record -> Maybe (Record' a)
-- @
data Record' a = Record {
  Record' a -> Text
topic :: !Text,
  Record' a -> Int32
partition :: !Int32,
  Record' a -> Int64
offset :: !Int64,
  Record' a -> Timestamp
timestamp :: !Timestamp,
  -- | NOTE: there can be multiple headers for a given key.
  Record' a -> [Header]
headers :: [Header],
  Record' a -> Maybe ByteString
key :: !(Maybe ByteString),
  Record' a -> Maybe a
value :: !(Maybe a)
} deriving (Record' a -> Record' a -> Bool
(Record' a -> Record' a -> Bool)
-> (Record' a -> Record' a -> Bool) -> Eq (Record' a)
forall a. Eq a => Record' a -> Record' a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Record' a -> Record' a -> Bool
$c/= :: forall a. Eq a => Record' a -> Record' a -> Bool
== :: Record' a -> Record' a -> Bool
$c== :: forall a. Eq a => Record' a -> Record' a -> Bool
Eq, Int -> Record' a -> ShowS
[Record' a] -> ShowS
Record' a -> String
(Int -> Record' a -> ShowS)
-> (Record' a -> String)
-> ([Record' a] -> ShowS)
-> Show (Record' a)
forall a. Show a => Int -> Record' a -> ShowS
forall a. Show a => [Record' a] -> ShowS
forall a. Show a => Record' a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Record' a] -> ShowS
$cshowList :: forall a. Show a => [Record' a] -> ShowS
show :: Record' a -> String
$cshow :: forall a. Show a => Record' a -> String
showsPrec :: Int -> Record' a -> ShowS
$cshowsPrec :: forall a. Show a => Int -> Record' a -> ShowS
Show, (forall x. Record' a -> Rep (Record' a) x)
-> (forall x. Rep (Record' a) x -> Record' a)
-> Generic (Record' a)
forall x. Rep (Record' a) x -> Record' a
forall x. Record' a -> Rep (Record' a) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall a x. Rep (Record' a) x -> Record' a
forall a x. Record' a -> Rep (Record' a) x
$cto :: forall a x. Rep (Record' a) x -> Record' a
$cfrom :: forall a x. Record' a -> Rep (Record' a) x
Generic, a -> Record' b -> Record' a
(a -> b) -> Record' a -> Record' b
(forall a b. (a -> b) -> Record' a -> Record' b)
-> (forall a b. a -> Record' b -> Record' a) -> Functor Record'
forall a b. a -> Record' b -> Record' a
forall a b. (a -> b) -> Record' a -> Record' b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> Record' b -> Record' a
$c<$ :: forall a b. a -> Record' b -> Record' a
fmap :: (a -> b) -> Record' a -> Record' b
$cfmap :: forall a b. (a -> b) -> Record' a -> Record' b
Functor, Record' a -> Bool
(a -> m) -> Record' a -> m
(a -> b -> b) -> b -> Record' a -> b
(forall m. Monoid m => Record' m -> m)
-> (forall m a. Monoid m => (a -> m) -> Record' a -> m)
-> (forall m a. Monoid m => (a -> m) -> Record' a -> m)
-> (forall a b. (a -> b -> b) -> b -> Record' a -> b)
-> (forall a b. (a -> b -> b) -> b -> Record' a -> b)
-> (forall b a. (b -> a -> b) -> b -> Record' a -> b)
-> (forall b a. (b -> a -> b) -> b -> Record' a -> b)
-> (forall a. (a -> a -> a) -> Record' a -> a)
-> (forall a. (a -> a -> a) -> Record' a -> a)
-> (forall a. Record' a -> [a])
-> (forall a. Record' a -> Bool)
-> (forall a. Record' a -> Int)
-> (forall a. Eq a => a -> Record' a -> Bool)
-> (forall a. Ord a => Record' a -> a)
-> (forall a. Ord a => Record' a -> a)
-> (forall a. Num a => Record' a -> a)
-> (forall a. Num a => Record' a -> a)
-> Foldable Record'
forall a. Eq a => a -> Record' a -> Bool
forall a. Num a => Record' a -> a
forall a. Ord a => Record' a -> a
forall m. Monoid m => Record' m -> m
forall a. Record' a -> Bool
forall a. Record' a -> Int
forall a. Record' a -> [a]
forall a. (a -> a -> a) -> Record' a -> a
forall m a. Monoid m => (a -> m) -> Record' a -> m
forall b a. (b -> a -> b) -> b -> Record' a -> b
forall a b. (a -> b -> b) -> b -> Record' a -> b
forall (t :: * -> *).
(forall m. Monoid m => t m -> m)
-> (forall m a. Monoid m => (a -> m) -> t a -> m)
-> (forall m a. Monoid m => (a -> m) -> t a -> m)
-> (forall a b. (a -> b -> b) -> b -> t a -> b)
-> (forall a b. (a -> b -> b) -> b -> t a -> b)
-> (forall b a. (b -> a -> b) -> b -> t a -> b)
-> (forall b a. (b -> a -> b) -> b -> t a -> b)
-> (forall a. (a -> a -> a) -> t a -> a)
-> (forall a. (a -> a -> a) -> t a -> a)
-> (forall a. t a -> [a])
-> (forall a. t a -> Bool)
-> (forall a. t a -> Int)
-> (forall a. Eq a => a -> t a -> Bool)
-> (forall a. Ord a => t a -> a)
-> (forall a. Ord a => t a -> a)
-> (forall a. Num a => t a -> a)
-> (forall a. Num a => t a -> a)
-> Foldable t
product :: Record' a -> a
$cproduct :: forall a. Num a => Record' a -> a
sum :: Record' a -> a
$csum :: forall a. Num a => Record' a -> a
minimum :: Record' a -> a
$cminimum :: forall a. Ord a => Record' a -> a
maximum :: Record' a -> a
$cmaximum :: forall a. Ord a => Record' a -> a
elem :: a -> Record' a -> Bool
$celem :: forall a. Eq a => a -> Record' a -> Bool
length :: Record' a -> Int
$clength :: forall a. Record' a -> Int
null :: Record' a -> Bool
$cnull :: forall a. Record' a -> Bool
toList :: Record' a -> [a]
$ctoList :: forall a. Record' a -> [a]
foldl1 :: (a -> a -> a) -> Record' a -> a
$cfoldl1 :: forall a. (a -> a -> a) -> Record' a -> a
foldr1 :: (a -> a -> a) -> Record' a -> a
$cfoldr1 :: forall a. (a -> a -> a) -> Record' a -> a
foldl' :: (b -> a -> b) -> b -> Record' a -> b
$cfoldl' :: forall b a. (b -> a -> b) -> b -> Record' a -> b
foldl :: (b -> a -> b) -> b -> Record' a -> b
$cfoldl :: forall b a. (b -> a -> b) -> b -> Record' a -> b
foldr' :: (a -> b -> b) -> b -> Record' a -> b
$cfoldr' :: forall a b. (a -> b -> b) -> b -> Record' a -> b
foldr :: (a -> b -> b) -> b -> Record' a -> b
$cfoldr :: forall a b. (a -> b -> b) -> b -> Record' a -> b
foldMap' :: (a -> m) -> Record' a -> m
$cfoldMap' :: forall m a. Monoid m => (a -> m) -> Record' a -> m
foldMap :: (a -> m) -> Record' a -> m
$cfoldMap :: forall m a. Monoid m => (a -> m) -> Record' a -> m
fold :: Record' m -> m
$cfold :: forall m. Monoid m => Record' m -> m
Foldable, Functor Record'
Foldable Record'
Functor Record'
-> Foldable Record'
-> (forall (f :: * -> *) a b.
    Applicative f =>
    (a -> f b) -> Record' a -> f (Record' b))
-> (forall (f :: * -> *) a.
    Applicative f =>
    Record' (f a) -> f (Record' a))
-> (forall (m :: * -> *) a b.
    Monad m =>
    (a -> m b) -> Record' a -> m (Record' b))
-> (forall (m :: * -> *) a.
    Monad m =>
    Record' (m a) -> m (Record' a))
-> Traversable Record'
(a -> f b) -> Record' a -> f (Record' b)
forall (t :: * -> *).
Functor t
-> Foldable t
-> (forall (f :: * -> *) a b.
    Applicative f =>
    (a -> f b) -> t a -> f (t b))
-> (forall (f :: * -> *) a. Applicative f => t (f a) -> f (t a))
-> (forall (m :: * -> *) a b.
    Monad m =>
    (a -> m b) -> t a -> m (t b))
-> (forall (m :: * -> *) a. Monad m => t (m a) -> m (t a))
-> Traversable t
forall (m :: * -> *) a. Monad m => Record' (m a) -> m (Record' a)
forall (f :: * -> *) a.
Applicative f =>
Record' (f a) -> f (Record' a)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Record' a -> m (Record' b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Record' a -> f (Record' b)
sequence :: Record' (m a) -> m (Record' a)
$csequence :: forall (m :: * -> *) a. Monad m => Record' (m a) -> m (Record' a)
mapM :: (a -> m b) -> Record' a -> m (Record' b)
$cmapM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Record' a -> m (Record' b)
sequenceA :: Record' (f a) -> f (Record' a)
$csequenceA :: forall (f :: * -> *) a.
Applicative f =>
Record' (f a) -> f (Record' a)
traverse :: (a -> f b) -> Record' a -> f (Record' b)
$ctraverse :: forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Record' a -> f (Record' b)
$cp2Traversable :: Foldable Record'
$cp1Traversable :: Functor Record'
Traversable)

-- | Decodes base64-encoded keys and values, where present.
instance FromJSON (Record' ByteString) where
  parseJSON :: Value -> Parser Record
parseJSON = String -> (Object -> Parser Record) -> Value -> Parser Record
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"Record" ((Object -> Parser Record) -> Value -> Parser Record)
-> (Object -> Parser Record) -> Value -> Parser Record
forall a b. (a -> b) -> a -> b
$ \Object
o -> do
    Text
topic <- Object
o Object -> Key -> Parser Text
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"topic"
    Int32
partition <- Object
o Object -> Key -> Parser Int32
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"partition"
    Int64
offset <- Object
o Object -> Key -> Parser Int64
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"offset"
    Timestamp
timestamp <- Object -> Parser Timestamp
parseTimestamp Object
o
    [Header]
headers <- Object
o Object -> Key -> Parser [Header]
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"headers"
    Maybe ByteString
key <- (Text -> ByteString) -> Maybe Text -> Maybe ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (ByteString -> ByteString
B64.decodeLenient (ByteString -> ByteString)
-> (Text -> ByteString) -> Text -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
TE.encodeUtf8) (Maybe Text -> Maybe ByteString)
-> Parser (Maybe Text) -> Parser (Maybe ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
o Object -> Key -> Parser (Maybe Text)
forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:? Key
"key"
    Maybe ByteString
value <- (Text -> ByteString) -> Maybe Text -> Maybe ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (ByteString -> ByteString
B64.decodeLenient (ByteString -> ByteString)
-> (Text -> ByteString) -> Text -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> ByteString
TE.encodeUtf8) (Maybe Text -> Maybe ByteString)
-> Parser (Maybe Text) -> Parser (Maybe ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
o Object -> Key -> Parser (Maybe Text)
forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:? Key
"value"
    Record -> Parser Record
forall (f :: * -> *) a. Applicative f => a -> f a
pure Record :: forall a.
Text
-> Int32
-> Int64
-> Timestamp
-> [Header]
-> Maybe ByteString
-> Maybe a
-> Record' a
Record { Text
topic :: Text
$sel:topic:Record :: Text
topic, Int32
partition :: Int32
$sel:partition:Record :: Int32
partition, Int64
offset :: Int64
$sel:offset:Record :: Int64
offset, Timestamp
timestamp :: Timestamp
$sel:timestamp:Record :: Timestamp
timestamp, [Header]
headers :: [Header]
$sel:headers:Record :: [Header]
headers, Maybe ByteString
key :: Maybe ByteString
$sel:key:Record :: Maybe ByteString
key, Maybe ByteString
value :: Maybe ByteString
$sel:value:Record :: Maybe ByteString
value }

-- | Encodes keys and values into base64.
instance ToJSON (Record' ByteString) where
  toJSON :: Record -> Value
toJSON Record
r = [Pair] -> Value
object ([Pair] -> Value) -> [Pair] -> Value
forall a b. (a -> b) -> a -> b
$ [[Pair]] -> [Pair]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat
    [
      [ Key
"offset" Key -> Int64 -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Record -> Int64
forall a. Record' a -> Int64
offset Record
r
      , Key
"partition" Key -> Int32 -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Record -> Int32
forall a. Record' a -> Int32
partition Record
r
      , Key
"topic" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Record -> Text
forall a. Record' a -> Text
topic Record
r
      , Key
"headers" Key -> [Header] -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Record -> [Header]
forall a. Record' a -> [Header]
headers Record
r
      ]
    , Timestamp -> [Pair]
forall kv. KeyValue kv => Timestamp -> [kv]
unparseTimestamp (Record -> Timestamp
forall a. Record' a -> Timestamp
timestamp Record
r)
    , [Maybe Pair] -> [Pair]
forall a. [Maybe a] -> [a]
catMaybes
      [ (Key
"key" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.=) (Text -> Pair) -> (ByteString -> Text) -> ByteString -> Pair
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
TE.decodeUtf8 (ByteString -> Text)
-> (ByteString -> ByteString) -> ByteString -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
B64.encode (ByteString -> Pair) -> Maybe ByteString -> Maybe Pair
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Record -> Maybe ByteString
forall a. Record' a -> Maybe ByteString
key Record
r
      , (Key
"value" Key -> Text -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.=) (Text -> Pair) -> (ByteString -> Text) -> ByteString -> Pair
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
TE.decodeUtf8 (ByteString -> Text)
-> (ByteString -> ByteString) -> ByteString -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
B64.encode (ByteString -> Pair) -> Maybe ByteString -> Maybe Pair
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Record -> Maybe ByteString
forall a. Record' a -> Maybe a
value Record
r
      ]
    ]

  toEncoding :: Record -> Encoding
toEncoding Record
r = Series -> Encoding
pairs (Series -> Encoding)
-> ([[Series]] -> Series) -> [[Series]] -> Encoding
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Series] -> Series
forall a. Monoid a => [a] -> a
mconcat ([Series] -> Series)
-> ([[Series]] -> [Series]) -> [[Series]] -> Series
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [[Series]] -> [Series]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat ([[Series]] -> Encoding) -> [[Series]] -> Encoding
forall a b. (a -> b) -> a -> b
$
    [
      [ Key
"offset" Key -> Int64 -> Series
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Record -> Int64
forall a. Record' a -> Int64
offset Record
r
      , Key
"partition" Key -> Int32 -> Series
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Record -> Int32
forall a. Record' a -> Int32
partition Record
r
      , Key
"topic" Key -> Text -> Series
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Record -> Text
forall a. Record' a -> Text
topic Record
r
      , Key
"headers" Key -> [Header] -> Series
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Record -> [Header]
forall a. Record' a -> [Header]
headers Record
r
      ]
    , Timestamp -> [Series]
forall kv. KeyValue kv => Timestamp -> [kv]
unparseTimestamp (Record -> Timestamp
forall a. Record' a -> Timestamp
timestamp Record
r)
    , [Maybe Series] -> [Series]
forall a. [Maybe a] -> [a]
catMaybes
      [ (Key
"key" Key -> Text -> Series
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.=) (Text -> Series) -> (ByteString -> Text) -> ByteString -> Series
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
TE.decodeUtf8 (ByteString -> Text)
-> (ByteString -> ByteString) -> ByteString -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
B64.encode (ByteString -> Series) -> Maybe ByteString -> Maybe Series
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Record -> Maybe ByteString
forall a. Record' a -> Maybe ByteString
key Record
r
      , (Key
"value" Key -> Text -> Series
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.=) (Text -> Series) -> (ByteString -> Text) -> ByteString -> Series
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
TE.decodeUtf8 (ByteString -> Text)
-> (ByteString -> ByteString) -> ByteString -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
B64.encode (ByteString -> Series) -> Maybe ByteString -> Maybe Series
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Record -> Maybe ByteString
forall a. Record' a -> Maybe a
value Record
r
      ]
    ]

-- | AWS serialises record headers to JSON as an array of
-- objects. From their docs:
--
-- @
-- "headers":[{"headerKey":[104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]}]
-- @
--
-- Note:
--
-- >>> map chr [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
-- "headerValue"
data Header = Header !Text !ByteString deriving (Header -> Header -> Bool
(Header -> Header -> Bool)
-> (Header -> Header -> Bool) -> Eq Header
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Header -> Header -> Bool
$c/= :: Header -> Header -> Bool
== :: Header -> Header -> Bool
$c== :: Header -> Header -> Bool
Eq, Int -> Header -> ShowS
[Header] -> ShowS
Header -> String
(Int -> Header -> ShowS)
-> (Header -> String) -> ([Header] -> ShowS) -> Show Header
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Header] -> ShowS
$cshowList :: [Header] -> ShowS
show :: Header -> String
$cshow :: Header -> String
showsPrec :: Int -> Header -> ShowS
$cshowsPrec :: Int -> Header -> ShowS
Show, (forall x. Header -> Rep Header x)
-> (forall x. Rep Header x -> Header) -> Generic Header
forall x. Rep Header x -> Header
forall x. Header -> Rep Header x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep Header x -> Header
$cfrom :: forall x. Header -> Rep Header x
Generic)

instance FromJSON Header where
  parseJSON :: Value -> Parser Header
parseJSON = String -> (Object -> Parser Header) -> Value -> Parser Header
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"header" ((Object -> Parser Header) -> Value -> Parser Header)
-> (Object -> Parser Header) -> Value -> Parser Header
forall a b. (a -> b) -> a -> b
$ \Object
o ->
    case Object -> [Pair]
forall v. KeyMap v -> [(Key, v)]
KM.toList Object
o of
      [(Key
key, Value
value)] -> Text -> ByteString -> Header
Header (Key -> Text
keyToText Key
key) (ByteString -> Header)
-> ([Word8] -> ByteString) -> [Word8] -> Header
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Word8] -> ByteString
B.pack ([Word8] -> Header) -> Parser [Word8] -> Parser Header
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Value -> Parser [Word8]
forall a. FromJSON a => Value -> Parser a
parseJSON Value
value
      [] -> String -> Parser Header
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Unexpected empty object"
      [Pair]
_ -> String -> Parser Header
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"Unexpected additional keys in object"

instance ToJSON Header where
  toJSON :: Header -> Value
toJSON (Header Text
key ByteString
value) = [Pair] -> Value
object [Text -> Key
keyFromText Text
key Key -> [Word8] -> Pair
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= ByteString -> [Word8]
B.unpack ByteString
value]
  toEncoding :: Header -> Encoding
toEncoding (Header Text
key ByteString
value) = Series -> Encoding
pairs (Series -> Encoding) -> Series -> Encoding
forall a b. (a -> b) -> a -> b
$ Text -> Key
keyFromText Text
key Key -> [Word8] -> Series
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= ByteString -> [Word8]
B.unpack ByteString
value

-- | Kafka timestamp types, derived from the Java client's enum at:
-- https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
data Timestamp = NoTimestampType | CreateTime !UTCTime | LogAppendTime !UTCTime
  deriving (Timestamp -> Timestamp -> Bool
(Timestamp -> Timestamp -> Bool)
-> (Timestamp -> Timestamp -> Bool) -> Eq Timestamp
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Timestamp -> Timestamp -> Bool
$c/= :: Timestamp -> Timestamp -> Bool
== :: Timestamp -> Timestamp -> Bool
$c== :: Timestamp -> Timestamp -> Bool
Eq, Int -> Timestamp -> ShowS
[Timestamp] -> ShowS
Timestamp -> String
(Int -> Timestamp -> ShowS)
-> (Timestamp -> String)
-> ([Timestamp] -> ShowS)
-> Show Timestamp
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Timestamp] -> ShowS
$cshowList :: [Timestamp] -> ShowS
show :: Timestamp -> String
$cshow :: Timestamp -> String
showsPrec :: Int -> Timestamp -> ShowS
$cshowsPrec :: Int -> Timestamp -> ShowS
Show, (forall x. Timestamp -> Rep Timestamp x)
-> (forall x. Rep Timestamp x -> Timestamp) -> Generic Timestamp
forall x. Rep Timestamp x -> Timestamp
forall x. Timestamp -> Rep Timestamp x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep Timestamp x -> Timestamp
$cfrom :: forall x. Timestamp -> Rep Timestamp x
Generic)

parseTimestamp :: Object -> Parser Timestamp
parseTimestamp :: Object -> Parser Timestamp
parseTimestamp Object
o = (Value -> Parser Timestamp) -> Object -> Key -> Parser Timestamp
forall a. (Value -> Parser a) -> Object -> Key -> Parser a
explicitParseField Value -> Parser Timestamp
parseSubtype Object
o Key
"timestampType"
  where
    parseSubtype :: Value -> Parser Timestamp
parseSubtype = String -> (Text -> Parser Timestamp) -> Value -> Parser Timestamp
forall a. String -> (Text -> Parser a) -> Value -> Parser a
withText String
"timestampType" ((Text -> Parser Timestamp) -> Value -> Parser Timestamp)
-> (Text -> Parser Timestamp) -> Value -> Parser Timestamp
forall a b. (a -> b) -> a -> b
$ \case
      Text
"NO_TIMESTAMP_TYPE" -> Timestamp -> Parser Timestamp
forall (f :: * -> *) a. Applicative f => a -> f a
pure Timestamp
NoTimestampType
      Text
"CREATE_TIME" -> UTCTime -> Timestamp
CreateTime (UTCTime -> Timestamp) -> Parser UTCTime -> Parser Timestamp
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Parser UTCTime
parseUTCTimestamp
      Text
"LOG_APPEND_TIME" -> UTCTime -> Timestamp
LogAppendTime (UTCTime -> Timestamp) -> Parser UTCTime -> Parser Timestamp
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Parser UTCTime
parseUTCTimestamp
      Text
t -> String -> Parser Timestamp
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Parser Timestamp) -> String -> Parser Timestamp
forall a b. (a -> b) -> a -> b
$ String
"unknown timestampType: \"" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Text -> String
forall a. Show a => a -> String
show Text
t String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"\""

    parseUTCTimestamp :: Parser UTCTime
parseUTCTimestamp = (Value -> Parser UTCTime) -> Object -> Key -> Parser UTCTime
forall a. (Value -> Parser a) -> Object -> Key -> Parser a
explicitParseField Value -> Parser UTCTime
convertToUTCTime Object
o Key
"timestamp"

    convertToUTCTime :: Value -> Parser UTCTime
convertToUTCTime = String -> (Scientific -> Parser UTCTime) -> Value -> Parser UTCTime
forall a. String -> (Scientific -> Parser a) -> Value -> Parser a
withScientific String
"timestamp" ((Scientific -> Parser UTCTime) -> Value -> Parser UTCTime)
-> (Scientific -> Parser UTCTime) -> Value -> Parser UTCTime
forall a b. (a -> b) -> a -> b
$ \Scientific
stamp ->
      Int64 -> UTCTime
int64ToUTCTime (Int64 -> UTCTime) -> Parser Int64 -> Parser UTCTime
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Scientific -> Maybe Int64
forall i. (Integral i, Bounded i) => Scientific -> Maybe i
toBoundedInteger Scientific
stamp Maybe Int64 -> (Maybe Int64 -> Parser Int64) -> Parser Int64
forall a b. a -> (a -> b) -> b
&
        Parser Int64
-> (Int64 -> Parser Int64) -> Maybe Int64 -> Parser Int64
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (String -> Parser Int64
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"timestamp out of range") Int64 -> Parser Int64
forall (f :: * -> *) a. Applicative f => a -> f a
pure :: Parser Int64)

unparseTimestamp :: KeyValue kv => Timestamp -> [kv]
unparseTimestamp :: Timestamp -> [kv]
unparseTimestamp = \case
  Timestamp
NoTimestampType -> [Key
"timestampType" Key -> Value -> kv
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Text -> Value
String Text
"NO_TIMESTAMP_TYPE"]
  CreateTime UTCTime
ts ->
    [ Key
"timestampType" Key -> Value -> kv
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Text -> Value
String Text
"CREATE_TIME"
    , Key
"timestamp" Key -> Int64 -> kv
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= UTCTime -> Int64
utcTimeToInt64 UTCTime
ts
    ]
  LogAppendTime UTCTime
ts ->
    [ Key
"timestampType" Key -> Value -> kv
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Text -> Value
String Text
"LOG_APPEND_TIME"
    , Key
"timestamp" Key -> Int64 -> kv
forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= UTCTime -> Int64
utcTimeToInt64 UTCTime
ts
    ]

int64ToUTCTime :: Int64 -> UTCTime
int64ToUTCTime :: Int64 -> UTCTime
int64ToUTCTime Int64
int =
  let (Int64
seconds, Int64
millis) = Int64
int Int64 -> Int64 -> (Int64, Int64)
forall a. Integral a => a -> a -> (a, a)
`divMod` Int64
1000
      posixSeconds :: POSIXTime
posixSeconds = Int64 -> POSIXTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
seconds POSIXTime -> POSIXTime -> POSIXTime
forall a. Num a => a -> a -> a
+ Int64 -> POSIXTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
millis POSIXTime -> POSIXTime -> POSIXTime
forall a. Fractional a => a -> a -> a
/ POSIXTime
1000
  in POSIXTime -> UTCTime
posixSecondsToUTCTime POSIXTime
posixSeconds

utcTimeToInt64 :: UTCTime -> Int64
utcTimeToInt64 :: UTCTime -> Int64
utcTimeToInt64 UTCTime
utc = POSIXTime -> Int64
forall a b. (RealFrac a, Integral b) => a -> b
truncate (POSIXTime -> Int64) -> POSIXTime -> Int64
forall a b. (a -> b) -> a -> b
$ UTCTime -> POSIXTime
utcTimeToPOSIXSeconds UTCTime
utc POSIXTime -> POSIXTime -> POSIXTime
forall a. Num a => a -> a -> a
* POSIXTime
1000