module Aws.Kinesis.Client.Consumer
(
KinesisConsumer
, managedKinesisConsumer
, withKinesisConsumer
, consumerSource
, readConsumer
, tryReadConsumer
, ConsumerKit(..)
, ckKinesisKit
, ckStreamName
, ckBatchSize
, ConsumerError(..)
, MonadConsumer
) where
import qualified Aws.Kinesis as Kin
import Aws.Kinesis.Client.Common
import Control.Concurrent.Async.Lifted
import Control.Concurrent.Lifted hiding (yield)
import Control.Concurrent.STM
import Control.Concurrent.STM.Queue
import Control.Exception
import Control.Lens
import Control.Monad.Codensity
import Control.Monad.Error.Class
import Control.Monad.Reader
import Control.Monad.Trans.Control
import Control.Monad.Unicode
import qualified Data.Carousel as CR
import Data.Conduit
import qualified Data.Conduit.List as CL
import Prelude.Unicode
data ShardState
= ShardState
{ _ssIterator ∷ !(TVar (Maybe Kin.ShardIterator))
, _ssShardId ∷ !Kin.ShardId
}
ssIterator ∷ Getter ShardState (TVar (Maybe Kin.ShardIterator))
ssIterator = to _ssIterator
ssShardId ∷ Lens' ShardState Kin.ShardId
ssShardId = lens _ssShardId $ \ss sid → ss { _ssShardId = sid }
instance Eq ShardState where
ss == ss' = ss ^. ssShardId ≡ ss' ^. ssShardId
data ConsumerError
= NoShards
| KinesisError !SomeException
deriving Show
data ConsumerKit
= ConsumerKit
{ _ckKinesisKit ∷ !KinesisKit
, _ckStreamName ∷ !Kin.StreamName
, _ckBatchSize ∷ !Int
, _ckIteratorType ∷ !Kin.ShardIteratorType
}
ckKinesisKit ∷ Lens' ConsumerKit KinesisKit
ckKinesisKit = lens _ckKinesisKit $ \ck kk → ck { _ckKinesisKit = kk }
ckStreamName ∷ Lens' ConsumerKit Kin.StreamName
ckStreamName = lens _ckStreamName $ \ck sn → ck { _ckStreamName = sn }
ckBatchSize ∷ Lens' ConsumerKit Int
ckBatchSize = lens _ckBatchSize $ \ck bs → ck { _ckBatchSize = bs }
ckIteratorType ∷ Lens' ConsumerKit Kin.ShardIteratorType
ckIteratorType = lens _ckIteratorType $ \ck it → ck { _ckIteratorType = it }
newtype KinesisConsumer = KinesisConsumer { _kcMessageQueue ∷ TBQueue Kin.Record }
kcMessageQueue ∷ Getter KinesisConsumer (TBQueue Kin.Record)
kcMessageQueue = to _kcMessageQueue
type MonadConsumer m
= ( MonadIO m
, MonadBaseControl IO m
, MonadError ConsumerError m
)
type MonadConsumerInternal m
= ( MonadConsumer m
, MonadReader ConsumerKit m
)
managedKinesisConsumer
∷ MonadConsumer m
⇒ ConsumerKit
→ Codensity m KinesisConsumer
managedKinesisConsumer kit =
Codensity $ withKinesisConsumer kit
withKinesisConsumer
∷ MonadConsumer m
⇒ ConsumerKit
→ (KinesisConsumer → m α)
→ m α
withKinesisConsumer kit inner =
flip runReaderT kit $ do
batchSize ← view ckBatchSize
messageQueue ← liftIO ∘ newTBQueueIO $ batchSize * 10
state ← updateStreamState CR.empty ≫= liftIO ∘ newTVarIO
let reshardingLoop = forever $
handleError (\_ → liftIO $ threadDelay 3000000) $ do
liftIO (readTVarIO state)
≫= updateStreamState
≫= liftIO ∘ atomically ∘ writeTVar state
liftIO $ threadDelay 10000000
producerLoop = forever $
handleError (\_ → liftIO $ threadDelay 2000000) $ do
recordsCount ← replenishMessages messageQueue state
when (recordsCount ≡ 0) $
liftIO $ threadDelay 5000000
withAsync reshardingLoop $ \reshardingHandle → do
link reshardingHandle
withAsync producerLoop $ \producerHandle → do
link producerHandle
res ← lift ∘ inner $ KinesisConsumer messageQueue
return res
updateStreamState
∷ MonadConsumerInternal m
⇒ CR.Carousel ShardState
→ m (CR.Carousel ShardState)
updateStreamState state = do
streamName ← view ckStreamName
iteratorType ← view ckIteratorType
mapError KinesisError ∘ mapEnvironment ckKinesisKit $ do
let existingShardIds = state ^. CR.clList <&> view ssShardId
shardSource = flip mapOutputMaybe (streamOpenShardSource streamName) $ \sh@Kin.Shard{..} →
if shardShardId `elem` existingShardIds
then Nothing
else Just sh
newShards ← shardSource $$ CL.consume
shardStates ← forM newShards $ \Kin.Shard{..} → do
Kin.GetShardIteratorResponse it ← runKinesis Kin.GetShardIterator
{ Kin.getShardIteratorShardId = shardShardId
, Kin.getShardIteratorShardIteratorType = iteratorType
, Kin.getShardIteratorStartingSequenceNumber = Nothing
, Kin.getShardIteratorStreamName = streamName
}
iteratorVar ← liftIO ∘ newTVarIO $ Just it
return ShardState
{ _ssIterator = iteratorVar
, _ssShardId = shardShardId
}
return ∘ CR.nub $ CR.append shardStates state
replenishMessages
∷ MonadConsumerInternal m
⇒ TBQueue Kin.Record
→ TVar (CR.Carousel ShardState)
→ m Int
replenishMessages messageQueue shardsVar = do
bufferSize ← view ckBatchSize
liftIO ∘ atomically ∘ awaitQueueEmpty $ messageQueue
(shard, iterator) ← liftIO ∘ atomically $ do
mshard ← shardsVar ^!? act readTVar ∘ CR.cursor
shard ← maybe retry return mshard
miterator ← shard ^! ssIterator ∘ act readTVar
iterator ← maybe retry return miterator
return (shard, iterator)
Kin.GetRecordsResponse{..} ← mapError KinesisError ∘ mapEnvironment ckKinesisKit $ runKinesis Kin.GetRecords
{ getRecordsLimit = Just bufferSize
, getRecordsShardIterator = iterator
}
liftIO ∘ atomically $ do
writeTVar (shard ^. ssIterator) getRecordsResNextShardIterator
forM_ getRecordsResRecords $ writeTBQueue messageQueue
modifyTVar shardsVar CR.moveRight
return $ length getRecordsResRecords
readConsumer
∷ MonadConsumer m
⇒ KinesisConsumer
→ m Kin.Record
readConsumer consumer =
liftIO ∘ atomically $
consumer ^! kcMessageQueue ∘ act readTBQueue
tryReadConsumer
∷ MonadConsumer m
⇒ KinesisConsumer
→ m (Maybe Kin.Record)
tryReadConsumer consumer =
liftIO ∘ atomically $
consumer ^! kcMessageQueue ∘ act tryReadTBQueue
consumerSource
∷ MonadConsumer m
⇒ KinesisConsumer
→ Source m Kin.Record
consumerSource consumer =
forever $
lift (readConsumer consumer)
≫= yield