module Aws.Kinesis.Client.Common
( KinesisKit(..)
, kkConfiguration
, kkKinesisConfiguration
, kkManager
, MonadKinesis
, runKinesis
, streamShardSource
, streamOpenShardSource
, shardIsOpen
, mapError
, handleError
, mapEnvironment
) where
import qualified Aws
import qualified Aws.Core as Aws
import qualified Aws.Kinesis as Kin
import Control.Exception
import Control.Lens
import Control.Error
import Control.Monad
import Control.Monad.Reader.Class
import Control.Monad.Error.Class
import Control.Monad.Trans
import Control.Monad.Trans.Reader
import Control.Monad.Trans.Resource
import Control.Monad.Unicode
import Data.Conduit
import qualified Data.Conduit.List as CL
import qualified Network.HTTP.Conduit as HC
import Prelude.Unicode
data KinesisKit
= KinesisKit
{ _kkConfiguration ∷ !Aws.Configuration
, _kkKinesisConfiguration ∷ !(Kin.KinesisConfiguration Aws.NormalQuery)
, _kkManager ∷ !HC.Manager
}
kkConfiguration ∷ Lens' KinesisKit Aws.Configuration
kkConfiguration = lens _kkConfiguration $ \kk cfg → kk { _kkConfiguration = cfg }
kkKinesisConfiguration ∷ Lens' KinesisKit (Kin.KinesisConfiguration Aws.NormalQuery)
kkKinesisConfiguration = lens _kkKinesisConfiguration $ \kk cfg → kk { _kkKinesisConfiguration = cfg }
kkManager ∷ Lens' KinesisKit HC.Manager
kkManager = lens _kkManager $ \kk mgr → kk { _kkManager = mgr }
type MonadKinesis m
= ( MonadIO m
, MonadReader KinesisKit m
, MonadError SomeException m
)
runKinesis
∷ ( MonadKinesis m
, Aws.ServiceConfiguration req ~ Kin.KinesisConfiguration
, Aws.Transaction req resp
)
⇒ req
→ m resp
runKinesis req = do
KinesisKit{..} ← view id
eitherT throwError return ∘ syncIO ∘ runResourceT $
Aws.pureAws
_kkConfiguration
_kkKinesisConfiguration
_kkManager
req
shardIsOpen
∷ Kin.Shard
→ Bool
shardIsOpen Kin.Shard{..} =
isNothing $ shardSequenceNumberRange ^. _2
fetchShardsConduit
∷ MonadKinesis m
⇒ Kin.StreamName
→ Conduit (Maybe Kin.ShardId) m Kin.Shard
fetchShardsConduit streamName =
awaitForever $ \mshardId → do
let req = Kin.DescribeStream
{ Kin.describeStreamExclusiveStartShardId = mshardId
, Kin.describeStreamLimit = Nothing
, Kin.describeStreamStreamName = streamName
}
resp@(Kin.DescribeStreamResponse Kin.StreamDescription{..}) ←
lift $ runKinesis req
yield `mapM_` streamDescriptionShards
void ∘ traverse (leftover ∘ Just) $
Kin.describeStreamExclusiveStartShardId =<<
Aws.nextIteratedRequest req resp
return ()
streamShardSource
∷ MonadKinesis m
⇒ Kin.StreamName
→ Source m Kin.Shard
streamShardSource streamName =
CL.sourceList [Nothing] $= fetchShardsConduit streamName
streamOpenShardSource
∷ MonadKinesis m
⇒ Kin.StreamName
→ Source m Kin.Shard
streamOpenShardSource streamName =
flip mapOutputMaybe (streamShardSource streamName) $ \shard →
if shardIsOpen shard
then Just shard
else Nothing
mapError
∷ MonadError e' m
⇒ (e → e')
→ EitherT e m a
→ m a
mapError e = eitherT (throwError ∘ e) return
handleError
∷ MonadError e m
⇒ (e → m α)
→ m α
→ m α
handleError = flip catchError
mapEnvironment
∷ MonadReader r' m
⇒ Getter r' r
→ ReaderT r m a
→ m a
mapEnvironment l m = view l ≫= runReaderT m