module Aws.Kinesis.Client.Producer
(
KinesisProducer
, withKinesisProducer
, managedKinesisProducer
, writeProducer
, Message
, MonadProducer
, ProducerKit(..)
, pkKinesisKit
, pkStreamName
, pkBatchPolicy
, pkMessageQueueBounds
, pkMaxConcurrency
, ProducerError(..)
, _KinesisError
, _MessageNotEnqueued
, _InvalidConcurrentConsumerCount
, BatchPolicy
, defaultBatchPolicy
, bpBatchSize
, bpEndpoint
, RecordEndpoint(..)
) where
import qualified Aws.Kinesis as Kin
import qualified Aws.Kinesis.Commands.PutRecords as Kin
import Aws.Kinesis.Client.Common
import Control.Applicative
import Control.Concurrent.Async.Lifted
import Control.Concurrent.Lifted hiding (yield)
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
import Control.Exception.Lifted
import Control.Lens
import Control.Monad
import Control.Monad.Codensity
import Control.Monad.Error.Class
import Control.Monad.Reader
import Control.Monad.Trans.Control
import Control.Monad.Trans.Either
import Data.Conduit
import Data.Conduit.TQueue
import qualified Data.Conduit.List as CL
import Data.Maybe
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.Traversable
import Data.Typeable
import Prelude.Unicode
import qualified System.Random as R
data RecordEndpoint
= PutRecordEndpoint
| PutRecordsEndpoint
deriving (Eq, Show)
data BatchPolicy
= BatchPolicy
{ _bpBatchSize ∷ !Int
, _bpEndpoint ∷ !RecordEndpoint
} deriving (Eq, Show)
bpBatchSize ∷ Lens' BatchPolicy Int
bpBatchSize = lens _bpBatchSize $ \bp bs → bp { _bpBatchSize = bs }
bpEndpoint ∷ Lens' BatchPolicy RecordEndpoint
bpEndpoint = lens _bpEndpoint $ \bp ep → bp { _bpEndpoint = ep }
defaultBatchPolicy ∷ BatchPolicy
defaultBatchPolicy = BatchPolicy
{ _bpBatchSize = 200
, _bpEndpoint = PutRecordsEndpoint
}
type Message = T.Text
data MessageQueueItem
= MessageQueueItem
{ _mqiMessage ∷ !Message
, _mqiPartitionKey ∷ !Kin.PartitionKey
} deriving (Eq, Show)
mqiMessage ∷ Lens' MessageQueueItem Message
mqiMessage = lens _mqiMessage $ \i m → i { _mqiMessage = m }
mqiPartitionKey ∷ Lens' MessageQueueItem Kin.PartitionKey
mqiPartitionKey = lens _mqiPartitionKey $ \i s → i { _mqiPartitionKey = s }
data ProducerKit
= ProducerKit
{ _pkKinesisKit ∷ !KinesisKit
, _pkStreamName ∷ !Kin.StreamName
, _pkBatchPolicy ∷ !BatchPolicy
, _pkMessageQueueBounds ∷ !Int
, _pkMaxConcurrency ∷ !Int
}
pkKinesisKit ∷ Lens' ProducerKit KinesisKit
pkKinesisKit = lens _pkKinesisKit $ \pk kk → pk { _pkKinesisKit = kk }
pkStreamName ∷ Lens' ProducerKit Kin.StreamName
pkStreamName = lens _pkStreamName $ \pk sn → pk { _pkStreamName = sn }
pkBatchPolicy ∷ Lens' ProducerKit BatchPolicy
pkBatchPolicy = lens _pkBatchPolicy $ \pk bp → pk { _pkBatchPolicy = bp }
pkMessageQueueBounds ∷ Lens' ProducerKit Int
pkMessageQueueBounds = lens _pkMessageQueueBounds $ \pk qb → pk { _pkMessageQueueBounds = qb }
pkMaxConcurrency ∷ Lens' ProducerKit Int
pkMaxConcurrency = lens _pkMaxConcurrency $ \pk n → pk { _pkMaxConcurrency = n }
newtype KinesisProducer
= KinesisProducer
{ _kpMessageQueue ∷ TBMQueue MessageQueueItem
}
kpMessageQueue ∷ Getter KinesisProducer (TBMQueue MessageQueueItem)
kpMessageQueue = to _kpMessageQueue
data ProducerError
= KinesisError !SomeException
| MessageNotEnqueued Message
| InvalidConcurrentConsumerCount
deriving (Typeable, Show)
instance Exception ProducerError
_KinesisError ∷ Prism' ProducerError SomeException
_KinesisError =
prism KinesisError $ \case
KinesisError e → Right e
e → Left e
_MessageNotEnqueued ∷ Prism' ProducerError Message
_MessageNotEnqueued =
prism MessageNotEnqueued $ \case
MessageNotEnqueued m → Right m
e → Left e
_InvalidConcurrentConsumerCount ∷ Prism' ProducerError ()
_InvalidConcurrentConsumerCount =
prism (const InvalidConcurrentConsumerCount) $ \case
InvalidConcurrentConsumerCount → Right ()
e → Left e
type MonadProducer m
= ( MonadIO m
, MonadBaseControl IO m
, MonadError ProducerError m
)
type MonadProducerInternal m
= ( MonadProducer m
, MonadReader ProducerKit m
)
liftKinesis
∷ MonadProducerInternal m
⇒ EitherT SomeException (ReaderT KinesisKit m) α
→ m α
liftKinesis =
mapEnvironment pkKinesisKit
∘ mapError KinesisError
generatePartitionKey
∷ R.RandomGen g
⇒ g
→ Kin.PartitionKey
generatePartitionKey gen =
let name = take 25 $ R.randomRs ('a','z') gen in
Kin.partitionKey (T.pack name)
& either (error ∘ T.unpack) id
takeTBMQueue
∷ Int
→ TBMQueue α
→ STM [α]
takeTBMQueue n q
| n <= 0 = return []
| otherwise = do
res ← tryReadTBMQueue q
case res of
Just (Just x) → (x:) <$> takeTBMQueue (n 1) q
_ → return []
data ChunkingPolicy
= ChunkingPolicy
{ _cpMaxChunkSize ∷ Int
, _cpThrottlingDelay ∷ Int
}
chunkedSourceTBMQueue
∷ ( MonadIO m
, MonadBaseControl IO m
)
⇒ ChunkingPolicy
→ TBMQueue α
→ Source m [α]
chunkedSourceTBMQueue bp@ChunkingPolicy{..} q = do
terminateNow ← liftIO ∘ atomically $ isClosedTBMQueue q
unless terminateNow $ do
items ← liftIO ∘ atomically $ takeTBMQueue _cpMaxChunkSize q
unless (null items) $ do
yield items
when (length items < _cpMaxChunkSize) $
threadDelay _cpThrottlingDelay
chunkedSourceTBMQueue bp q
chunkSource
∷ ( MonadIO m
, MonadBaseControl IO m
)
⇒ ChunkingPolicy
→ Source m α
→ Source m [α]
chunkSource cp src = do
queue ← liftIO $ newTBMQueueIO $ _cpMaxChunkSize cp
worker ← lift ∘ async $ src $$+ sinkTBMQueue queue True
addCleanup (\_ → cancel worker) $ do
lift $ link worker
chunkedSourceTBMQueue cp queue
return ()
concurrentPutRecordSink
∷ MonadProducerInternal m
⇒ Sink [MessageQueueItem] m ()
concurrentPutRecordSink = do
maxWorkerCount ← view pkMaxConcurrency
awaitForever $ \messages → do
lift ∘ flip (mapConcurrentlyN maxWorkerCount 100) messages $ \m → do
CL.sourceList [m] $$ putRecordSink
putRecordSink
∷ MonadProducerInternal m
⇒ Sink MessageQueueItem m ()
putRecordSink = do
streamName ← view pkStreamName
awaitForever $ \item → do
let handler e = do
liftIO $ do
putStrLn $ "Error: " ++ show e
putStrLn "Will wait 5s"
threadDelay 5000000
leftover item
handleError handler $ do
let partitionKey = item ^. mqiPartitionKey
void ∘ lift ∘ liftKinesis $ runKinesis Kin.PutRecord
{ Kin.putRecordData = item ^. mqiMessage ∘ to T.encodeUtf8
, Kin.putRecordExplicitHashKey = Nothing
, Kin.putRecordPartitionKey = partitionKey
, Kin.putRecordSequenceNumberForOrdering = Nothing
, Kin.putRecordStreamName = streamName
}
splitEvery
∷ Int
→ [α]
→ [[α]]
splitEvery _ [] = []
splitEvery n list = first : splitEvery n rest
where
(first,rest) = splitAt n list
putRecordsSink
∷ MonadProducerInternal m
⇒ Sink [MessageQueueItem] m ()
putRecordsSink = do
streamName ← view pkStreamName
batchSize ← view $ pkBatchPolicy ∘ bpBatchSize
maxWorkerCount ← view pkMaxConcurrency
awaitForever $ \messages → do
let batches = splitEvery batchSize messages
leftovers ← lift ∘ flip (mapConcurrentlyN maxWorkerCount 100) batches $ \ms → do
let handler e = do
liftIO $ print e
return ms
handleError handler $ do
items ← for ms $ \m → do
let partitionKey = m ^. mqiPartitionKey
return Kin.PutRecordsRequestEntry
{ Kin.putRecordsRequestEntryData = m ^. mqiMessage ∘ to T.encodeUtf8
, Kin.putRecordsRequestEntryExplicitHashKey = Nothing
, Kin.putRecordsRequestEntryPartitionKey = partitionKey
}
Kin.PutRecordsResponse{..} ← liftKinesis $ runKinesis Kin.PutRecords
{ Kin.putRecordsRecords = items
, Kin.putRecordsStreamName = streamName
}
let processResult m m'
| isJust (Kin.putRecordsResponseRecordErrorCode m') = Just m
| otherwise = Nothing
return ∘ catMaybes $ zipWith processResult ms putRecordsResponseRecords
forM_ leftovers $ \mss →
unless (null mss) $ leftover mss
sendMessagesSink
∷ MonadProducerInternal m
⇒ Sink [MessageQueueItem] m ()
sendMessagesSink = do
batchPolicy ← view pkBatchPolicy
case batchPolicy ^. bpEndpoint of
PutRecordsEndpoint → putRecordsSink
PutRecordEndpoint → concurrentPutRecordSink
writeProducer
∷ MonadProducer m
⇒ KinesisProducer
→ Message
→ m ()
writeProducer producer !msg = do
gen ← liftIO R.newStdGen
result ← liftIO ∘ atomically $ do
tryWriteTBMQueue (producer ^. kpMessageQueue) MessageQueueItem
{ _mqiMessage = msg
, _mqiPartitionKey = generatePartitionKey gen
}
case result of
Just True → return ()
_ → throwError $ MessageNotEnqueued msg
exhaustTBMQueue
∷ TBMQueue α
→ Source STM α
exhaustTBMQueue q = do
mx ← lift $ tryReadTBMQueue q
case mx of
Just (Just x) → do
yield x
exhaustTBMQueue q
_ → return ()
managedKinesisProducer
∷ ∀ m
. ( MonadIO m
, MonadBaseControl IO m
, MonadError ProducerError m
)
⇒ ProducerKit
→ Codensity m KinesisProducer
managedKinesisProducer kit = do
when (kit ^. pkMaxConcurrency < 1) ∘ lift $
throwError InvalidConcurrentConsumerCount
messageQueue ← liftIO ∘ newTBMQueueIO $ kit ^. pkMessageQueueBounds
let chunkingPolicy = ChunkingPolicy ((kit ^. pkBatchPolicy ∘ bpBatchSize) * (kit ^. pkMaxConcurrency)) 5000000
consumerLoop ∷ m () = flip runReaderT kit ∘ forever $
chunkSource chunkingPolicy (sourceTBMQueue messageQueue)
$$ sendMessagesSink
let cleanupConsumer consumerHandle = do
liftIO ∘ atomically $ closeTBMQueue messageQueue
flip runReaderT kit $ do
leftovers ← liftIO ∘ atomically $
exhaustTBMQueue messageQueue
$$ CL.consume
chunkSource chunkingPolicy (CL.sourceList leftovers)
$$ sendMessagesSink
cancel consumerHandle
consumerHandle ← managedBracket (async consumerLoop) cleanupConsumer
Codensity $ \inner → do
link consumerHandle
res ← inner $ KinesisProducer messageQueue
() ← wait consumerHandle
return res
withKinesisProducer
∷ ( MonadIO m
, MonadBaseControl IO m
, MonadError ProducerError m
)
⇒ ProducerKit
→ (KinesisProducer → m α)
→ m α
withKinesisProducer =
runCodensity ∘ managedKinesisProducer
managedBracket
∷ MonadBaseControl IO m
⇒ m α
→ (α → m β)
→ Codensity m α
managedBracket action cleanup =
Codensity $ bracket action cleanup
mapConcurrentlyN
∷ ( MonadIO m
, MonadBaseControl IO m
, Traversable t
)
⇒ Int
→ Int
→ (a → m b)
→ t a
→ m (t b)
mapConcurrentlyN n delay f t = do
sem ← liftIO $ newQSem n
mapConcurrently (run sem) t_
where
(_, t_) = mapAccumL (\i v → (succ i, (i,v))) 0 t
run sem (i,a) =
liftBaseOp_ (bracket_ (waitQSem sem) (signalQSem sem)) $ do
liftIO ∘ threadDelay $ 1000 * delay * i
f a