module Aws.Kinesis.Core
(
KinesisVersion(..)
, KinesisConfiguration(..)
, defaultKinesisConfiguration
, KinesisMetadata(..)
, KinesisErrorResponse(..)
, KinesisAction(..)
, kinesisActionToText
, parseKinesisAction
, kinesisServiceEndpoint
, KinesisQuery(..)
, kinesisSignQuery
, kinesisResponseConsumer
, jsonResponseConsumer
, KinesisError(..)
, KinesisCommonParameters(..)
, KinesisCommonError(..)
) where
import Aws.Core
import Aws.General
import Aws.SignatureV4
import qualified Blaze.ByteString.Builder as BB
import Control.Applicative
import Control.DeepSeq
import Control.Exception
import Control.Monad.IO.Class
import Control.Monad.Trans.Resource (throwM)
import Data.Aeson
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as LB
import qualified Data.ByteString.Char8 as B8
import Data.Conduit (($$+-))
import Data.Conduit.Binary (sinkLbs)
import Data.IORef
import Data.Maybe
import Data.Monoid
import Data.String
import Data.Time.Clock
import Data.Typeable
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import GHC.Generics
import qualified Network.HTTP.Types as HTTP
import qualified Network.HTTP.Conduit as HTTP
import qualified Test.QuickCheck as Q
import qualified Text.Parser.Char as P
import Text.Parser.Combinators ((<?>))
deriving instance Generic HTTP.Status
instance NFData HTTP.Status
data KinesisVersion
= KinesisVersion_2013_12_02
deriving (Show, Read, Eq, Ord, Typeable, Generic)
instance NFData KinesisVersion
kinesisTargetVersion :: IsString a => a
kinesisTargetVersion = "Kinesis_20131202"
data KinesisAction
= KinesisCreateStream
| KinesisDeleteStream
| KinesisDescribeStream
| KinesisGetRecords
| KinesisGetShardIterator
| KinesisListStreams
| KinesisMergeShards
| KinesisPutRecord
| KinesisPutRecords
| KinesisSplitShard
deriving (Show, Read, Eq, Ord, Enum, Bounded, Typeable, Generic)
instance NFData KinesisAction
kinesisActionToText :: IsString a => KinesisAction -> a
kinesisActionToText KinesisCreateStream = "CreateStream"
kinesisActionToText KinesisDeleteStream = "DeleteStream"
kinesisActionToText KinesisDescribeStream = "DescribeStream"
kinesisActionToText KinesisGetRecords = "GetRecords"
kinesisActionToText KinesisGetShardIterator = "GetShardIterator"
kinesisActionToText KinesisListStreams = "ListStreams"
kinesisActionToText KinesisMergeShards = "MergeShards"
kinesisActionToText KinesisPutRecord = "PutRecord"
kinesisActionToText KinesisPutRecords = "PutRecords"
kinesisActionToText KinesisSplitShard = "SplitShard"
parseKinesisAction :: P.CharParsing m => m KinesisAction
parseKinesisAction =
KinesisCreateStream <$ P.text "CreateStream"
<|> KinesisDeleteStream <$ P.text "DeleteStream"
<|> KinesisDescribeStream <$ P.text "DescribeStream"
<|> KinesisGetRecords <$ P.text "GetRecords"
<|> KinesisGetShardIterator <$ P.text "GetShardIterator"
<|> KinesisListStreams <$ P.text "ListStreams"
<|> KinesisMergeShards <$ P.text "MergeShards"
<|> KinesisPutRecord <$ P.text "PutRecord"
<|> KinesisPutRecords <$ P.text "PutRecords"
<|> KinesisSplitShard <$ P.text "SplitShard"
<?> "KinesisAction"
instance AwsType KinesisAction where
toText = kinesisActionToText
parse = parseKinesisAction
instance Q.Arbitrary KinesisAction where
arbitrary = Q.elements [minBound..maxBound]
kinesisTargetHeader :: KinesisAction -> HTTP.Header
kinesisTargetHeader a = ("X-Amz-Target", kinesisTargetVersion <> "." <> toText a)
kinesisServiceEndpoint :: Region -> B8.ByteString
kinesisServiceEndpoint ApNortheast1 = "kinesis.ap-northeast-1.amazonaws.com"
kinesisServiceEndpoint ApSoutheast1 = "kinesis.ap-southeast-1.amazonaws.com"
kinesisServiceEndpoint ApSoutheast2 = "kinesis.ap-southeast-2.amazonaws.com"
kinesisServiceEndpoint EuWest1 = "kinesis.eu-west-1.amazonaws.com"
kinesisServiceEndpoint UsEast1 = "kinesis.us-east-1.amazonaws.com"
kinesisServiceEndpoint UsWest2 = "kinesis.us-west-2.amazonaws.com"
kinesisServiceEndpoint (CustomEndpoint e _) = T.encodeUtf8 e
kinesisServiceEndpoint r = error $ "Aws.Kinesis.Core.kinesisServiceEndpoint: unsupported region " <> show r
data KinesisMetadata = KinesisMetadata
{ kinesisMAmzId2 :: Maybe T.Text
, kinesisMRequestId :: Maybe T.Text
}
deriving (Show, Generic)
instance NFData KinesisMetadata
instance Loggable KinesisMetadata where
toLogText (KinesisMetadata rid id2) =
"Kinesis: request ID=" <> fromMaybe "<none>" rid
<> ", x-amz-id-2=" <> fromMaybe "<none>" id2
instance Monoid KinesisMetadata where
mempty = KinesisMetadata Nothing Nothing
KinesisMetadata id1 r1 `mappend` KinesisMetadata id2 r2 = KinesisMetadata (id1 <|> id2) (r1 <|> r2)
data KinesisConfiguration qt = KinesisConfiguration
{ kinesisConfRegion :: Region
, kinesisConfProtocol :: Protocol
}
deriving (Show, Typeable)
defaultKinesisConfiguration
:: Region
-> KinesisConfiguration qt
defaultKinesisConfiguration r = KinesisConfiguration
{ kinesisConfRegion = r
, kinesisConfProtocol = HTTPS
}
data KinesisQuery = KinesisQuery
{ kinesisQueryAction :: !KinesisAction
, kinesisQueryBody :: !(Maybe B.ByteString)
}
deriving (Show, Eq, Typeable, Generic)
instance NFData KinesisQuery
kinesisSignQuery :: KinesisQuery -> KinesisConfiguration qt -> SignatureData -> SignedQuery
kinesisSignQuery query conf sigData = SignedQuery
{ sqMethod = Post
, sqProtocol = kinesisConfProtocol conf
, sqHost = host
, sqPort = port
, sqPath = BB.toByteString $ HTTP.encodePathSegments path
, sqQuery = reqQuery
, sqDate = Nothing
, sqAuthorization = authorization
, sqContentType = contentType
, sqContentMd5 = Nothing
, sqAmzHeaders = amzHeaders
, sqOtherHeaders = []
, sqBody = HTTP.RequestBodyBS <$> body
, sqStringToSign = mempty
}
where
path = []
reqQuery = []
host = kinesisServiceEndpoint $ kinesisConfRegion conf
headers = [("host", host), kinesisTargetHeader (kinesisQueryAction query)]
port = case kinesisConfRegion conf of
CustomEndpoint _ p -> p
_ -> 443
contentType = Just "application/x-amz-json-1.1"
body = kinesisQueryBody query
amzHeaders = filter ((/= "Authorization") . fst) sig
authorization = return <$> lookup "authorization" sig
sig = either error id $ signPostRequest
(cred2cred $ signatureCredentials sigData)
(kinesisConfRegion conf)
ServiceNamespaceKinesis
(signatureTime sigData)
"POST"
path
reqQuery
headers
(fromMaybe "" body)
#if MIN_VERSION_aws(0,9,2)
cred2cred (Credentials a b c d) = SignatureV4Credentials a b c d
#else
cred2cred (Credentials a b c) = SignatureV4Credentials a b c Nothing
#endif
jsonResponseConsumer
:: FromJSON a
=> HTTPResponseConsumer a
jsonResponseConsumer res = do
doc <- HTTP.responseBody res $$+- sinkLbs
case eitherDecode (if doc == mempty then "{}" else doc) of
Left err -> throwM . KinesisResponseJsonError $ T.pack err
Right v -> return v
kinesisResponseConsumer
:: FromJSON a
=> IORef KinesisMetadata
-> HTTPResponseConsumer a
kinesisResponseConsumer metadata resp = do
let headerString = fmap T.decodeUtf8 . flip lookup (HTTP.responseHeaders resp)
amzId2 = headerString "x-amz-id-2"
requestId = headerString "x-amz-request-id"
m = KinesisMetadata
{ kinesisMAmzId2 = amzId2
, kinesisMRequestId = requestId
}
liftIO $ tellMetadataRef metadata m
if HTTP.responseStatus resp >= HTTP.status400
then errorResponseConsumer resp
else jsonResponseConsumer resp
errorResponseConsumer :: HTTPResponseConsumer a
errorResponseConsumer resp = do
doc <- HTTP.responseBody resp $$+- sinkLbs
if HTTP.responseStatus resp == HTTP.status400
then kinesisError doc
else throwM KinesisOtherError
{ kinesisOtherErrorStatus = HTTP.responseStatus resp
, kinesisOtherErrorMessage = T.decodeUtf8 $ LB.toStrict doc
}
where
kinesisError doc = case eitherDecode doc of
Left e -> throwM . KinesisResponseJsonError $ T.pack e
Right a -> throwM (a :: KinesisErrorResponse)
data KinesisError a
= KinesisErrorCommon KinesisCommonError
| KinesisErrorCommand a
deriving (Show, Read, Eq, Ord, Typeable, Generic)
instance NFData a => NFData (KinesisError a)
data KinesisErrorResponse
= KinesisErrorResponse
{ kinesisErrorCode :: !T.Text
, kinesisErrorMessage :: !T.Text
}
| KinesisResponseJsonError T.Text
| KinesisOtherError
{ kinesisOtherErrorStatus :: !HTTP.Status
, kinesisOtherErrorMessage :: !T.Text
}
deriving (Show, Eq, Ord, Typeable, Generic)
instance Exception KinesisErrorResponse
instance NFData KinesisErrorResponse
instance FromJSON KinesisErrorResponse where
parseJSON = withObject "KinesisErrorResponse" $ \o -> KinesisErrorResponse
<$> o .: "__type"
<*> (o .: "message" <|> o .: "Message" <|> pure "")
data KinesisCommonError
= ErrorIncompleteSignature
| ErrorInternalFailure
| ErrorInvalidAction
| ErrorInvalidClientTokenId
| ErrorInvalidParameterCombination
| ErrorInvalidParameterValue
| ErrorInvalidQueryParamter
| ErrorMalformedQueryString
| ErrorMissingAction
| ErrorMissingAuthenticationToken
| ErrorMissingParameter
| ErrorOptInRequired
| ErrorRequestExpired
| ErrorServiceUnavailable
| ErrorThrottling
| ErrorValidationError
deriving (Show, Read, Eq, Ord, Enum, Bounded, Typeable, Generic)
instance NFData KinesisCommonError
data KinesisCommonParameters = KinesisCommonParameters
{ kinesisAction :: !KinesisAction
, kinesisAuthParams :: !(Maybe ())
, kinesisAWSAccessKeyId :: !B8.ByteString
, kinesisExpires :: !UTCTime
, kinesisTimestamp :: !UTCTime
, kinesisSecurityToken :: ()
, kinesisSignature :: !Signature
, kinesisSignatureMethod :: !SignatureMethod
, kinesisSignatureVersion :: !SignatureVersion
, kinesisVersion :: KinesisVersion
}
deriving (Show, Eq, Read, Ord, Typeable, Generic)
instance NFData KinesisCommonParameters