module Network.Kafka.Producer where
import Control.Applicative
import Control.Lens
import Control.Monad.Trans (liftIO, lift)
import Control.Monad.Trans.Either
import Data.ByteString.Char8 (ByteString)
import Data.Monoid ((<>))
import System.IO
import qualified Data.Map as M
import System.Random (getStdRandom, randomR)
import Network.Kafka
import Network.Kafka.Protocol
produce :: Handle -> ProduceRequest -> Kafka ProduceResponse
produce handle request =
makeRequest (ProduceRequest request) >>= doRequest' handle >>= expectResponse ExpectedProduce _ProduceResponse
produceRequest :: RequiredAcks -> Timeout -> [(TopicAndPartition, MessageSet)] -> ProduceRequest
produceRequest ra ti ts =
ProduceReq (ra, ti, M.toList . M.unionsWith (<>) $ fmap f ts)
where f (TopicAndPartition t p, i) = M.singleton t [(p, i)]
produceMessages :: [TopicAndMessage] -> Kafka [ProduceResponse]
produceMessages tams = do
m <- fmap (fmap groupMessagesToSet) <$> partitionAndCollate tams
mapM (uncurry send) $ fmap M.toList <$> M.toList m
groupMessagesToSet :: [TopicAndMessage] -> MessageSet
groupMessagesToSet xs = MessageSet $ msm <$> xs
where msm = MessageSetMember (Offset (1)) . _tamMessage
partitionAndCollate :: [TopicAndMessage] -> Kafka (M.Map Leader (M.Map TopicAndPartition [TopicAndMessage]))
partitionAndCollate ks = recurse ks M.empty
where recurse [] accum = return accum
recurse (x:xs) accum = do
topicPartitionsList <- brokerPartitionInfo $ _tamTopic x
pal <- getPartition topicPartitionsList
let leader = maybe (Leader Nothing) _palLeader pal
tp = TopicAndPartition <$> pal ^? folded . palTopic <*> pal ^? folded . palPartition
b = M.singleton leader $ maybe M.empty (`M.singleton` [x]) tp
accum' = M.unionWith (M.unionWith (<>)) accum b
recurse xs accum'
send :: Leader -> [(TopicAndPartition, MessageSet)] -> Kafka ProduceResponse
send l ts = do
let s = kafkaClientState . stateBrokers . at l
topicNames = map (_tapTopic . fst) ts
broker <- findMetadataOrElse topicNames s (KafkaInvalidBroker l)
requiredAcks <- use (kafkaClientState . stateRequiredAcks)
requestTimeout <- use (kafkaClientState . stateRequestTimeout)
withBrokerHandle broker $ \handle -> produce handle $ produceRequest requiredAcks requestTimeout ts
brokerPartitionInfo :: TopicName -> Kafka [PartitionAndLeader]
brokerPartitionInfo t = do
let s = kafkaClientState . stateTopicMetadata . at t
tmd <- findMetadataOrElse [t] s KafkaFailedToFetchMetadata
return $ pal <$> tmd ^. partitionsMetadata
where pal d = PartitionAndLeader t (d ^. partitionId) (d ^. partitionMetadataLeader)
findMetadataOrElse :: [TopicName] -> Getting (Maybe a) KafkaClient (Maybe a) -> KafkaClientError -> Kafka a
findMetadataOrElse ts s err = do
maybeFound <- use s
case maybeFound of
Just x -> return x
Nothing -> do
updateMetadatas ts
maybeFound' <- use s
case maybeFound' of
Just x -> return x
Nothing -> lift $ left $ err
getPartition :: [PartitionAndLeader] -> Kafka (Maybe PartitionAndLeader)
getPartition ps =
liftIO $ (ps' ^?) . element <$> getStdRandom (randomR (0, length ps' 1))
where ps' = ps ^.. folded . filtered (has $ palLeader . leaderId . _Just)
defaultMessageCrc :: Crc
defaultMessageCrc = 1
defaultMessageMagicByte :: MagicByte
defaultMessageMagicByte = 0
defaultMessageKey :: Key
defaultMessageKey = Key Nothing
defaultMessageAttributes :: Attributes
defaultMessageAttributes = 0
makeMessage :: ByteString -> Message
makeMessage m = Message (defaultMessageCrc, defaultMessageMagicByte, defaultMessageAttributes, defaultMessageKey, Value (Just (KBytes m)))