{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DeriveGeneric #-}
module Kafka.Consumer.Types
( KafkaConsumer(..)
, ConsumerGroupId(..)
, Offset(..)
, OffsetReset(..)
, RebalanceEvent(..)
, PartitionOffset(..)
, SubscribedPartitions(..)
, Timestamp(..)
, OffsetCommit(..)
, OffsetStoreSync(..)
, OffsetStoreMethod(..)
, TopicPartition(..)
, ConsumerRecord(..)
, crMapKey
, crMapValue
, crMapKV
, sequenceFirst
, traverseFirst
, traverseFirstM
, traverseM
, bitraverseM
)
where
import Data.Bifoldable (Bifoldable (..))
import Data.Bifunctor (Bifunctor (..))
import Data.Bitraversable (Bitraversable (..), bimapM, bisequenceA)
import Data.Int (Int64)
import Data.Text (Text)
import Data.Typeable (Typeable)
import GHC.Generics (Generic)
import Kafka.Internal.Setup (HasKafka (..), HasKafkaConf (..), Kafka (..), KafkaConf (..))
import Kafka.Types (Millis (..), PartitionId (..), TopicName (..))
data KafkaConsumer = KafkaConsumer
{ kcKafkaPtr :: !Kafka
, kcKafkaConf :: !KafkaConf
}
instance HasKafka KafkaConsumer where
getKafka = kcKafkaPtr
{-# INLINE getKafka #-}
instance HasKafkaConf KafkaConsumer where
getKafkaConf = kcKafkaConf
{-# INLINE getKafkaConf #-}
newtype ConsumerGroupId = ConsumerGroupId { unConsumerGroupId :: Text } deriving (Show, Ord, Eq, Generic)
newtype Offset = Offset { unOffset :: Int64 } deriving (Show, Eq, Ord, Read, Generic)
data OffsetReset = Earliest | Latest deriving (Show, Eq, Generic)
data RebalanceEvent =
RebalanceBeforeAssign [(TopicName, PartitionId)]
| RebalanceAssign [(TopicName, PartitionId)]
| RebalanceBeforeRevoke [(TopicName, PartitionId)]
| RebalanceRevoke [(TopicName, PartitionId)]
deriving (Eq, Show, Generic)
data PartitionOffset =
PartitionOffsetBeginning
| PartitionOffsetEnd
| PartitionOffset Int64
| PartitionOffsetStored
| PartitionOffsetInvalid
deriving (Eq, Show, Generic)
data SubscribedPartitions
= SubscribedPartitions [PartitionId]
| SubscribedPartitionsAll
deriving (Show, Eq, Generic)
data Timestamp =
CreateTime !Millis
| LogAppendTime !Millis
| NoTimestamp
deriving (Show, Eq, Read, Generic)
data OffsetCommit =
OffsetCommit
| OffsetCommitAsync
deriving (Show, Eq, Generic)
data OffsetStoreSync =
OffsetSyncDisable
| OffsetSyncImmediate
| OffsetSyncInterval Int
deriving (Show, Eq, Generic)
data OffsetStoreMethod =
OffsetStoreBroker
| OffsetStoreFile FilePath OffsetStoreSync
deriving (Show, Eq, Generic)
data TopicPartition = TopicPartition
{ tpTopicName :: TopicName
, tpPartition :: PartitionId
, tpOffset :: PartitionOffset
} deriving (Show, Eq, Generic)
data ConsumerRecord k v = ConsumerRecord
{ crTopic :: !TopicName
, crPartition :: !PartitionId
, crOffset :: !Offset
, crTimestamp :: !Timestamp
, crKey :: !k
, crValue :: !v
}
deriving (Eq, Show, Read, Typeable, Generic)
instance Bifunctor ConsumerRecord where
bimap f g (ConsumerRecord t p o ts k v) = ConsumerRecord t p o ts (f k) (g v)
{-# INLINE bimap #-}
instance Functor (ConsumerRecord k) where
fmap = second
{-# INLINE fmap #-}
instance Foldable (ConsumerRecord k) where
foldMap f r = f (crValue r)
{-# INLINE foldMap #-}
instance Traversable (ConsumerRecord k) where
traverse f r = (\v -> crMapValue (const v) r) <$> f (crValue r)
{-# INLINE traverse #-}
instance Bifoldable ConsumerRecord where
bifoldMap f g r = f (crKey r) `mappend` g (crValue r)
{-# INLINE bifoldMap #-}
instance Bitraversable ConsumerRecord where
bitraverse f g r = (\k v -> bimap (const k) (const v) r) <$> f (crKey r) <*> g (crValue r)
{-# INLINE bitraverse #-}
crMapKey :: (k -> k') -> ConsumerRecord k v -> ConsumerRecord k' v
crMapKey = first
{-# INLINE crMapKey #-}
crMapValue :: (v -> v') -> ConsumerRecord k v -> ConsumerRecord k v'
crMapValue = second
{-# INLINE crMapValue #-}
crMapKV :: (k -> k') -> (v -> v') -> ConsumerRecord k v -> ConsumerRecord k' v'
crMapKV = bimap
{-# INLINE crMapKV #-}
{-# DEPRECATED sequenceFirst "Isn't concern of this library. Use 'bitraverse id pure'" #-}
sequenceFirst :: (Bitraversable t, Applicative f) => t (f k) v -> f (t k v)
sequenceFirst = bitraverse id pure
{-# INLINE sequenceFirst #-}
{-# DEPRECATED traverseFirst "Isn't concern of this library. Use 'bitraverse f pure'" #-}
traverseFirst :: (Bitraversable t, Applicative f)
=> (k -> f k')
-> t k v
-> f (t k' v)
traverseFirst f = bitraverse f pure
{-# INLINE traverseFirst #-}
{-# DEPRECATED traverseFirstM "Isn't concern of this library. Use 'bitraverse id pure <$> bitraverse f pure r'" #-}
traverseFirstM :: (Bitraversable t, Applicative f, Monad m)
=> (k -> m (f k'))
-> t k v
-> m (f (t k' v))
traverseFirstM f r = bitraverse id pure <$> bitraverse f pure r
{-# INLINE traverseFirstM #-}
{-# DEPRECATED traverseM "Isn't concern of this library. Use 'sequenceA <$> traverse f r'" #-}
traverseM :: (Traversable t, Applicative f, Monad m)
=> (v -> m (f v'))
-> t v
-> m (f (t v'))
traverseM f r = sequenceA <$> traverse f r
{-# INLINE traverseM #-}
{-# DEPRECATED bitraverseM "Isn't concern of this library. Use 'bisequenceA <$> bimapM f g r'" #-}
bitraverseM :: (Bitraversable t, Applicative f, Monad m)
=> (k -> m (f k'))
-> (v -> m (f v'))
-> t k v
-> m (f (t k' v'))
bitraverseM f g r = bisequenceA <$> bimapM f g r
{-# INLINE bitraverseM #-}