module Network.Kafka.Producer where
import Prelude hiding ((!!))
import Control.Applicative
import Control.Lens
import Control.Monad.Trans (liftIO, lift)
import Control.Monad.Trans.Either
import Data.ByteString.Char8 (ByteString)
import qualified Data.Digest.Murmur32 as Murmur32
import Data.List.Safe ((!!))
import Data.Monoid ((<>))
import System.IO
import qualified Data.Map as M
import System.Random (getStdRandom, randomR)
import Network.Kafka
import Network.Kafka.Protocol
produce :: Handle -> ProduceRequest -> Kafka ProduceResponse
produce handle request =
makeRequest (ProduceRequest request) >>= doRequest' handle >>= expectResponse ExpectedProduce _ProduceResponse
produceRequest :: RequiredAcks -> Timeout -> [(TopicAndPartition, MessageSet)] -> ProduceRequest
produceRequest ra ti ts =
ProduceReq (ra, ti, M.toList . M.unionsWith (<>) $ fmap f ts)
where f (TopicAndPartition t p, i) = M.singleton t [(p, i)]
produceMessages :: [TopicAndMessage] -> Kafka [ProduceResponse]
produceMessages tams = do
m <- fmap (fmap groupMessagesToSet) <$> partitionAndCollate tams
mapM (uncurry send) $ fmap M.toList <$> M.toList m
groupMessagesToSet :: [TopicAndMessage] -> MessageSet
groupMessagesToSet xs = MessageSet $ msm <$> xs
where msm = MessageSetMember (Offset (1)) . _tamMessage
partitionAndCollate :: [TopicAndMessage] -> Kafka (M.Map Leader (M.Map TopicAndPartition [TopicAndMessage]))
partitionAndCollate ks = recurse ks M.empty
where recurse [] accum = return accum
recurse (x:xs) accum = do
topicPartitionsList <- brokerPartitionInfo $ _tamTopic x
let maybeKey = x ^. tamMessage . messageKey . keyBytes
pal <- case maybeKey of
Nothing -> getRandPartition topicPartitionsList
Just key -> return $ getPartitionByKey (_kafkaByteString key) topicPartitionsList
let leader = maybe (Leader Nothing) _palLeader pal
tp = TopicAndPartition <$> pal ^? folded . palTopic <*> pal ^? folded . palPartition
b = M.singleton leader $ maybe M.empty (`M.singleton` [x]) tp
accum' = M.unionWith (M.unionWith (<>)) accum b
recurse xs accum'
getPartitionByKey :: ByteString -> [PartitionAndLeader] -> Maybe PartitionAndLeader
getPartitionByKey key ps = let i = Murmur32.asWord32 $ Murmur32.hash32WithSeed 0x9747b28c key
in ps !! i
send :: Leader -> [(TopicAndPartition, MessageSet)] -> Kafka ProduceResponse
send l ts = do
let s = kafkaClientState . stateBrokers . at l
topicNames = map (_tapTopic . fst) ts
broker <- findMetadataOrElse topicNames s (KafkaInvalidBroker l)
requiredAcks <- use (kafkaClientState . stateRequiredAcks)
requestTimeout <- use (kafkaClientState . stateRequestTimeout)
withBrokerHandle broker $ \handle -> produce handle $ produceRequest requiredAcks requestTimeout ts
brokerPartitionInfo :: TopicName -> Kafka [PartitionAndLeader]
brokerPartitionInfo t = do
let s = kafkaClientState . stateTopicMetadata . at t
tmd <- findMetadataOrElse [t] s KafkaFailedToFetchMetadata
return $ pal <$> tmd ^. partitionsMetadata
where pal d = PartitionAndLeader t (d ^. partitionId) (d ^. partitionMetadataLeader)
findMetadataOrElse :: [TopicName] -> Getting (Maybe a) KafkaClient (Maybe a) -> KafkaClientError -> Kafka a
findMetadataOrElse ts s err = do
maybeFound <- use s
case maybeFound of
Just x -> return x
Nothing -> do
updateMetadatas ts
maybeFound' <- use s
case maybeFound' of
Just x -> return x
Nothing -> lift $ left $ err
getRandPartition :: [PartitionAndLeader] -> Kafka (Maybe PartitionAndLeader)
getRandPartition ps =
liftIO $ (ps' ^?) . element <$> getStdRandom (randomR (0, length ps' 1))
where ps' = ps ^.. folded . filtered (has $ palLeader . leaderId . _Just)
defaultMessageCrc :: Crc
defaultMessageCrc = 1
defaultMessageMagicByte :: MagicByte
defaultMessageMagicByte = 0
defaultMessageKey :: Key
defaultMessageKey = Key Nothing
defaultMessageAttributes :: Attributes
defaultMessageAttributes = 0
makeMessage :: ByteString -> Message
makeMessage m = Message (defaultMessageCrc, defaultMessageMagicByte, defaultMessageAttributes, defaultMessageKey, Value (Just (KBytes m)))