-- Copyright (c) 2013-2015 PivotCloud, Inc. -- -- Aws.Kinesis.Types -- -- Please feel free to contact us at licensing@pivotmail.com with any -- contributions, additions, or other feedback; we would love to hear from -- you. -- -- Licensed under the Apache License, Version 2.0 (the "License"); you may -- not use this file except in compliance with the License. You may obtain a -- copy of the License at http://www.apache.org/licenses/LICENSE-2.0 -- -- Unless required by applicable law or agreed to in writing, software -- distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -- WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -- License for the specific language governing permissions and limitations -- under the License. -- | -- Module: Aws.Kinesis.Types -- Copyright: Copyright (c) 2013-2015 PivotCloud, Inc. -- license: Apache License, Version 2.0 -- Maintainer: Lars Kuhtz -- Stability: experimental -- -- /API Version: 2013-12-02/ -- -- The Amazon Kinesis Service API Reference API contains several data types -- that various actions use. -- -- {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE DeriveDataTypeable #-} module Aws.Kinesis.Types ( StreamName , streamName , streamNameText , ShardId , SequenceNumber , PartitionHash , partitionHash , partitionHashInteger , PartitionKey , partitionKey , partitionKeyText , ShardIterator , ShardIteratorType(..) , Record(..) , StreamDescription(..) , StreamStatus(..) , Shard(..) ) where import Aws.General import Control.Applicative import Data.Aeson import qualified Data.ByteString as B import qualified Data.ByteString.Base64 as B64 import Data.Monoid import Data.String import qualified Data.Text as T import qualified Data.Text.Encoding as T import qualified Data.Text.Read as T import Data.Typeable import Test.QuickCheck import Test.QuickCheck.Instances () -- -------------------------------------------------------------------------- -- -- Internal Utils sshow :: (Show a, IsString b) => a -> b sshow = fromString . show tryM :: Monad m => Either T.Text a -> m a tryM = either (fail . T.unpack) return -- -------------------------------------------------------------------------- -- -- | Stream Name -- -- The stream name is scoped to the AWS account used by the application that -- creates the stream. It is also scoped by region. That is, two streams in two -- different AWS accounts can have the same name, and two streams in the same -- AWS account, but in two different regions, can have the same name. -- -- Length constraints: Minimum length of 1. Maximum length of 128. -- -- Error responses from AWS Kinesis indicate that stream names must -- match the following regular expressions: -- -- > [a-zA-Z0-9_.-]+ -- newtype StreamName = StreamName { streamNameText :: T.Text } deriving (Show, Read, Eq, Ord, Typeable) -- | Smart Constructor for 'StreamName' that enforces size constraints. -- -- For static construction you may also use the 'IsString' instance -- that throws an 'error' on invalid stream names. -- streamName :: T.Text -> Either T.Text StreamName streamName t | T.length t < 1 = Left $ "Illegal StreamName " <> sshow t <> "; StreamName must be of length at least 1" | T.length t > 128 = Left $ "Illegal StreamName " <> sshow t <> "; StreamName must be of length at most 128" | otherwise = Right $ StreamName t -- | The 'fromString' function of this instance raises and asynchronous 'error' -- if the argument 'String' is not a legal 'StreamName'. -- instance IsString StreamName where fromString = either (error . T.unpack) id . streamName . T.pack instance ToJSON StreamName where toJSON = toJSON . streamNameText instance FromJSON StreamName where parseJSON = withText "StreamName" $ tryM . streamName instance Arbitrary StreamName where arbitrary = StreamName . T.pack <$> (resize 128 . listOf1 $ elements chars) where chars = ['a' .. 'z'] <> ['A' .. 'Z'] <> ['0' .. '9'] <> "_.-" -- -------------------------------------------------------------------------- -- -- | Identifier for a shard as returned by PutRecord. -- -- Length constraints: Minimum length of 1. Maximum length of 128. -- newtype ShardId = ShardId { shardIdText :: T.Text } deriving (Show, Read, Eq, Ord, Typeable) instance ToJSON ShardId where toJSON = toJSON . shardIdText instance FromJSON ShardId where parseJSON = withText "ShardId" $ return . ShardId instance Arbitrary ShardId where arbitrary = ShardId <$> (resize 128 arbitrary `suchThat` (not . T.null)) -- -------------------------------------------------------------------------- -- -- | Opaque sequence number as returned by PutRecord. -- newtype SequenceNumber = SequenceNumber { sequenceNumberText :: T.Text } deriving (Show, Read, Eq, Ord, Typeable) instance ToJSON SequenceNumber where toJSON = toJSON . sequenceNumberText instance FromJSON SequenceNumber where parseJSON = withText "SequenceNumber" $ return . SequenceNumber instance Arbitrary SequenceNumber where arbitrary = SequenceNumber . T.pack . show . getPositive <$> (arbitrary :: Gen (Positive Integer)) -- -------------------------------------------------------------------------- -- -- | An 128 bit interger that identifies the partition. The default -- is the MD5 hash of the partition key. -- -- FIXME the specification is rather vague about the precise encoding -- of this value. The default is to compute it as an MD5 hash of -- the partition key. The API reference describes it as an Int128. -- However, it is not clear how the result of the hash function is -- encoded (big-endian or small endian, word size?) and how it is -- serialized to text, which is the type in the JSON serialization. -- -- -- seems to indicate that the value is actually unsigned. -- newtype PartitionHash = PartitionHash { partitionHashInteger :: Integer {- FIXME -} } deriving (Show, Read, Eq, Ord, Typeable) -- | Smart Constructor for 'PartitionHash' that enforces size constraints. -- partitionHash :: Integer -> Either T.Text PartitionHash partitionHash i | i >= 2^(128 :: Int) = Left $ "partition hash " <> sshow i <> "is out of range; a partition hash must be a 128 bit unsigned integer" | i < 0 = Left $ "partition hash " <> sshow i <> "is out of range; a partition hash must be a 128 bit unsigned integer" | otherwise = Right $ PartitionHash i instance ToJSON PartitionHash where toJSON = toJSON . show . partitionHashInteger instance FromJSON PartitionHash where parseJSON = withText "PartitionHash" $ \t -> case T.decimal t of Right (a, "") -> tryM $ partitionHash a Right (_, x) -> fail $ "trailing text: " <> T.unpack x Left e -> fail e instance Arbitrary PartitionHash where arbitrary = PartitionHash . getPositive <$> resize (2^(128 :: Int) - 1) arbitrary -- -------------------------------------------------------------------------- -- -- | PartitionKey -- -- Identifies which shard in the stream the data record is assigned to. -- -- Partition keys are Unicode strings with a maximum length limit of 256 -- bytes. Amazon Kinesis uses the partition key as input to a hash function -- that maps the partition key and associated data to a specific shard. -- Specifically, an MD5 hash function is used to map partition keys to -- 128-bit integer values and to map associated data records to shards. As -- a result of this hashing mechanism, all data records with the same -- partition key will map to the same shard within the stream. -- -- Length constraints: Minimum length of 1. Maximum length of 256. -- newtype PartitionKey = PartitionKey { partitionKeyText :: T.Text } deriving (Show, Read, Eq, Ord, IsString, Typeable) partitionKey :: T.Text -> Either T.Text PartitionKey partitionKey t | T.length t < 1 = Left $ "Illegal PartitionKey " <> sshow t <> "; PartitionKey must be of length at least 1" | T.length t > 256 = Left $ "Illegal PartitionKey " <> sshow t <> "; PartitionKey must be of length at most 256" | otherwise = Right $ PartitionKey t instance ToJSON PartitionKey where toJSON = toJSON . partitionKeyText instance FromJSON PartitionKey where parseJSON = withText "PartitionKey" $ tryM . partitionKey instance Arbitrary PartitionKey where arbitrary = PartitionKey <$> (resize 256 arbitrary `suchThat` (not . T.null)) -- -------------------------------------------------------------------------- -- -- | Iterator for records within a shard. -- -- Length constraints: Minimum length of 1. Maximum length of 512. -- newtype ShardIterator = ShardIterator { shardIteratorText :: T.Text } deriving (Show, Read, Eq, Ord, Typeable) shardIterator :: T.Text -> Either T.Text ShardIterator shardIterator t | T.length t < 1 = Left $ "Illegal ShardIterator " <> sshow t <> "; ShardIterator must be of length at least 1" | T.length t > 512 = Left $ "Illegal ShardIterator " <> sshow t <> "; ShardIterator must be of length at most 512" | otherwise = Right $ ShardIterator t instance ToJSON ShardIterator where toJSON = toJSON . shardIteratorText instance FromJSON ShardIterator where parseJSON = withText "ShardIterator" $ tryM . shardIterator instance Arbitrary ShardIterator where arbitrary = ShardIterator <$> (resize 512 arbitrary `suchThat` (not . T.null)) -- -------------------------------------------------------------------------- -- -- | ShardIteratorType -- -- Determines how the shard iterator is used to start reading data records from -- the shard. -- data ShardIteratorType = AtSequenceNumber -- ^ Start reading exactly from the position denoted by a specific sequence -- number. | AfterSequenceNumber -- ^ Start reading right after the position denoted by a specific sequence -- number. | TrimHorizon -- ^ Start reading at the last untrimmed record in the shard in the system, -- which is the oldest data record in the shard. | Latest -- ^ Start reading just after the most recent record in the shard, so that -- you always read the most recent data in the shard. deriving (Show, Read, Eq, Ord, Enum, Bounded, Typeable) instance ToJSON ShardIteratorType where toJSON AtSequenceNumber = "AT_SEQUENCE_NUMBER" toJSON AfterSequenceNumber = "AFTER_SEQUENCE_NUMBER" toJSON TrimHorizon = "TRIM_HORIZON" toJSON Latest = "LATEST" instance FromJSON ShardIteratorType where parseJSON = withText "SharedIteratorType" $ \t -> case t of "AT_SEQUENCE_NUMBER" -> return AtSequenceNumber "AFTER_SEQUENCE_NUMBER" -> return AfterSequenceNumber "TRIM_HORIZON" -> return TrimHorizon "LATEST" -> return Latest e -> fail $ "unexpected value for SharedIteratorType: " <> T.unpack e instance Arbitrary ShardIteratorType where arbitrary = elements [minBound .. maxBound] -- -------------------------------------------------------------------------- -- -- | Record -- -- The unit of data of the Amazon Kinesis stream, which is composed of a -- sequence number, a partition key, and a data blob. -- -- -- data Record = Record { recordData :: !B.ByteString -- ^ The data blob. The data in the blob is both opaque and immutable to -- the Amazon Kinesis service, which does not inspect, interpret, or change -- the data in the blob in any way. The maximum size of the data blob (the -- payload after Base64-decoding) is 50 kilobytes (KB) -- -- Length constraints: Minimum length of 0. Maximum length of 51200. -- -- Note that the size constraint is not currently enforced in the code. , recordPartitionKey :: !PartitionKey -- ^ Identifies which shard in the stream the data record is assigned to. , recordSequenceNumber :: !SequenceNumber -- ^ The unique identifier for the record in the Amazon Kinesis stream. } deriving (Show, Read, Eq, Ord, Typeable) instance ToJSON Record where toJSON Record{..} = object [ "Data" .= T.decodeUtf8 (B64.encode recordData) , "PartitionKey" .= recordPartitionKey , "SequenceNumber" .= recordSequenceNumber ] instance FromJSON Record where parseJSON = withObject "Record" $ \o -> Record <$> (from64 =<< o .: "Data") <*> o .: "PartitionKey" <*> o .: "SequenceNumber" where from64 x = case B64.decode (T.encodeUtf8 x) of Left e -> fail $ "failed to decode base64 encoded data: " <> e Right a -> return a instance Arbitrary Record where arbitrary = Record <$> (arbitrary `suchThat` ((<= 51200) . B.length)) <*> arbitrary <*> arbitrary -- -------------------------------------------------------------------------- -- -- Stream Description -- -- Represents the output of a DescribeStream operation. -- -- -- data StreamDescription = StreamDescription { streamDescriptionHasMoreShards :: !Bool -- ^ If set to true there are more shards in the stream available to -- describe. , streamDescriptionShards :: ![Shard] -- ^ The shards that comprise the stream. , streamDescriptionStreamARN :: !Arn -- ^The Amazon Resource Name (ARN) for the stream being described. , streamDescriptionStreamName :: !StreamName -- ^ The name of the stream being described. , streamDescriptionStreamStatus :: !StreamStatus -- ^ The current status of the stream being described. } deriving (Show, Read, Eq, Ord, Typeable) instance ToJSON StreamDescription where toJSON StreamDescription{..} = object [ "HasMoreShards" .= streamDescriptionHasMoreShards , "Shards" .= streamDescriptionShards , "StreamARN" .= streamDescriptionStreamARN , "StreamName" .= streamDescriptionStreamName , "StreamStatus" .= streamDescriptionStreamStatus ] instance FromJSON StreamDescription where parseJSON = withObject "StreamDescription" $ \o -> StreamDescription <$> o .: "HasMoreShards" <*> o .: "Shards" <*> o .: "StreamARN" <*> o .: "StreamName" <*> o .: "StreamStatus" instance Arbitrary StreamDescription where arbitrary = StreamDescription <$> arbitrary <*> arbitrary <*> arbitrary <*> arbitrary <*> arbitrary -- | Stream Status -- -- -- data StreamStatus = StreamStatusCreating -- ^ The stream is being created. Upon receiving a CreateStream request, -- Amazon Kinesis immediately returns and sets StreamStatus to CREATING. | StreamStatusDeleting -- ^ The stream is being deleted. After a DeleteStream request, the -- specified stream is in the DELETING state until Amazon Kinesis completes -- the deletion. | StreamStatusActive -- ^ The stream exists and is ready for read and write operations or -- deletion. You should perform read and write operations only on an ACTIVE -- stream. | StreamStatusUpdating -- ^ Shards in the stream are being merged or split. Read and write -- operations continue to work while the stream is in the UPDATING state. deriving (Show, Read, Eq, Ord, Enum, Bounded, Typeable) instance ToJSON StreamStatus where toJSON StreamStatusCreating = "CREATING" toJSON StreamStatusDeleting = "DELETING" toJSON StreamStatusActive = "ACTIVE" toJSON StreamStatusUpdating = "UPDATING" instance FromJSON StreamStatus where parseJSON = withText "StreamStatus" $ \t -> case t of "CREATING" -> return StreamStatusCreating "DELETING" -> return StreamStatusDeleting "ACTIVE" -> return StreamStatusActive "UPDATING" -> return StreamStatusUpdating e -> fail $ "unexpected value for StreamStatus: " <> T.unpack e instance Arbitrary StreamStatus where arbitrary = elements [minBound .. maxBound] -- | Shard -- -- -- data Shard = Shard { shardAdjacentParentShardId :: !(Maybe ShardId) -- ^ The shard Id of the shard adjacent to the shard's parent. , shardHashKeyRange :: !(PartitionHash, PartitionHash) -- ^ The (inclusive) range of possible hash key values for the shard, which is a set of -- ordered contiguous positive integers. -- -- , shardParentShardId :: !(Maybe ShardId) -- ^ The shard Id of the shard's parent. , shardSequenceNumberRange :: !(SequenceNumber, Maybe SequenceNumber) -- ^ The (inclusive) range of possible sequence numbers for the shard. -- -- Shards that are in the OPEN state have an ending sequence number of 'Nothing'. -- -- , shardShardId :: ShardId -- ^ The unique identifier of the shard within the Amazon Kinesis stream. } deriving (Show, Read, Eq, Ord, Typeable) instance ToJSON Shard where toJSON Shard{..} = object [ "AdjacentParentShardId" .= shardAdjacentParentShardId , "HashKeyRange" .= object [ "StartingHashKey" .= fst shardHashKeyRange , "EndingHashKey" .= snd shardHashKeyRange ] , "ParentShardId" .= shardParentShardId , "SequenceNumberRange" .= object [ "StartingSequenceNumber" .= fst shardSequenceNumberRange , "EndingSequenceNumber" .= snd shardSequenceNumberRange ] , "ShardId" .= shardShardId ] instance FromJSON Shard where parseJSON = withObject "Shard" $ \o -> Shard <$> o .:? "AdjacentParentShardId" .!= Nothing <*> (hashKeyRange =<< o .: "HashKeyRange") <*> o .:? "ParentShardId" .!= Nothing <*> (sequenceNumberRange =<< o .: "SequenceNumberRange") <*> o .: "ShardId" where hashKeyRange = withObject "HashKeyRange" $ \o -> (,) <$> o .: "StartingHashKey" <*> o .: "EndingHashKey" sequenceNumberRange = withObject "SequenceNumberRange" $ \o -> (,) <$> o .: "StartingSequenceNumber" <*> o .:? "EndingSequenceNumber" .!= Nothing instance Arbitrary Shard where arbitrary = Shard <$> arbitrary <*> do u <- arbitrary l <- resize (fromIntegral $ partitionHashInteger u) arbitrary return (l,u) <*> arbitrary <*> do u <- arbitrary l <- resize (maybe maxBound (read . T.unpack . sequenceNumberText) u) arbitrary return (l,u) <*> arbitrary