-- Copyright (c) 2013-2015 PivotCloud, Inc. -- -- Aws.Kinesis.Client.Consumer.Internal -- -- Please feel free to contact us at licensing@pivotmail.com with any -- contributions, additions, or other feedback; we would love to hear from -- you. -- -- Licensed under the Apache License, Version 2.0 (the "License"); you may -- not use this file except in compliance with the License. You may obtain a -- copy of the License at http://www.apache.org/licenses/LICENSE-2.0 -- -- Unless required by applicable law or agreed to in writing, software -- distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -- WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -- License for the specific language governing permissions and limitations -- under the License. {-# LANGUAGE CPP #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TupleSections #-} {-# LANGUAGE UnicodeSyntax #-} -- | -- Module: Aws.Kinesis.Client.Consumer.Internal -- Copyright: Copyright © 2013-2015 PivotCloud, Inc. -- License: Apache-2.0 -- Maintainer: Jon Sterling -- Stability: experimental -- module Aws.Kinesis.Client.Consumer.Internal ( -- * Types MessageQueueItem , MessageQueue , StreamState -- * Operations , updateStreamState , replenishMessages -- * Re-exports , module Aws.Kinesis.Client.Consumer.Internal.Kit , module Aws.Kinesis.Client.Consumer.Internal.ShardState , module Aws.Kinesis.Client.Consumer.Internal.SavedStreamState ) where import Aws.Kinesis import Aws.Kinesis.Client.Common import Aws.Kinesis.Client.Consumer.Internal.Kit import Aws.Kinesis.Client.Consumer.Internal.ShardState import Aws.Kinesis.Client.Consumer.Internal.SavedStreamState import Control.Lens import Control.Lens.Action import Control.Concurrent.STM import Control.Concurrent.STM.Queue import Control.Monad import Control.Monad.Trans import qualified Data.Carousel as CR import Data.Conduit import qualified Data.Conduit.List as CondL import Prelude.Unicode #ifdef DEBUG import Data.Monoid.Unicode import System.IO #else #endif type MessageQueueItem = (ShardState, Record) type MessageQueue = TBQueue MessageQueueItem type StreamState = CR.Carousel ShardState -- | This requests new information from Kinesis and reconciles that with an -- existing carousel of shard states. -- updateStreamState ∷ ConsumerKit → StreamState → IO StreamState updateStreamState ConsumerKit{..} state = do let existingShardIds = state ^. CR.clList <&> view ssShardId shardSource = flip mapOutputMaybe (streamOpenShardSource _ckKinesisKit _ckStreamName) $ \sh@Shard{..} → if shardShardId ∈ existingShardIds then Nothing else Just sh newShards ← shardSource $$ CondL.consume shardStates ← forM newShards $ \Shard{..} → do let startingSequenceNumber = _ckSavedStreamState ^? _Just ∘ _SavedStreamState ∘ ix shardShardId iteratorType = maybe _ckIteratorType (const AfterSequenceNumber) startingSequenceNumber #ifdef DEBUG debugPrint stdout $ "Getting " ⊕ show iteratorType ⊕ " iterator for shard " ⊕ show shardShardId #else return () #endif GetShardIteratorResponse it ← runKinesis _ckKinesisKit GetShardIterator { getShardIteratorShardId = shardShardId , getShardIteratorShardIteratorType = iteratorType , getShardIteratorStartingSequenceNumber = startingSequenceNumber , getShardIteratorStreamName = _ckStreamName } liftIO ∘ atomically $ do iteratorVar ← newTVar $ Just it sequenceNumberVar ← newTVar startingSequenceNumber return $ makeShardState shardShardId iteratorVar sequenceNumberVar return ∘ CR.nub $ CR.append shardStates state -- | Waits for a message queue to be emptied and fills it up again. -- replenishMessages ∷ ConsumerKit → MessageQueue → TVar StreamState → IO Int replenishMessages ConsumerKit{..} messageQueue shardsVar = do liftIO ∘ atomically ∘ awaitQueueEmpty $ messageQueue (shard, iterator) ← liftIO ∘ atomically $ do mshard ← shardsVar ^!? act readTVar ∘ CR.cursor shard ← maybe retry return mshard miterator ← shard ^! ssIterator ∘ act readTVar iterator ← maybe retry return miterator return (shard, iterator) GetRecordsResponse{..} ← runKinesis _ckKinesisKit GetRecords { getRecordsLimit = Just $ fromIntegral _ckBatchSize , getRecordsShardIterator = iterator } #ifdef DEBUG debugPrint stdout $ "Replenished shard " ⊕ show (shard ^. ssShardId) ⊕ " with " ⊕ show (length getRecordsResRecords) ⊕ " records" #else return () #endif liftIO ∘ atomically $ do writeTVar (shard ^. ssIterator) getRecordsResNextShardIterator forM_ getRecordsResRecords $ writeTBQueue messageQueue ∘ (shard ,) modifyTVar shardsVar CR.moveRight return $ length getRecordsResRecords #ifdef DEBUG debugPrint ∷ MonadIO m ⇒ Handle → String → m () debugPrint h = liftIO ∘ hPutStrLn h ∘ ("[Kinesis Consumer] " ⊕) #else #endif