module Database.EventStore.Internal.Types where
import Control.Applicative ((<|>))
import Control.Exception
import Data.ByteString (ByteString)
import Data.ByteString.Lazy (fromStrict, toStrict)
import Data.Int
import Data.Maybe
import Data.Typeable
import Data.Word
import GHC.Generics (Generic)
import Control.Concurrent.Async hiding (link)
import qualified Data.Aeson as A
import Data.ProtocolBuffers
import Data.Text (Text)
import Data.Time
import Data.Time.Clock.POSIX
import Data.UUID (UUID, fromByteString, toByteString)
import System.Random
data InternalException
= ConnectionClosedByServer
| Stopped
deriving (Show, Typeable)
instance Exception InternalException
data OperationException
= WrongExpectedVersion Text ExpectedVersion
| StreamDeleted Text
| InvalidTransaction
| AccessDenied Text
| InvalidServerResponse Word8 Word8
| ProtobufDecodingError String
| ServerError (Maybe Text)
deriving (Show, Typeable)
instance Exception OperationException
type OperationExceptional a = Either OperationException a
data Event
= Event
{ eventType :: !Text
, eventData :: !EventData
}
createEvent :: Text
-> EventData
-> Event
createEvent = Event
data EventData
= Json A.Value (Maybe A.Value)
eventDataType :: EventData -> Int32
eventDataType (Json _ _) = 1
eventMetadataType :: EventData -> Int32
eventMetadataType _ = 0
withJson :: A.Value -> EventData
withJson value = Json value Nothing
withJsonAndMetadata :: A.Value -> A.Value -> EventData
withJsonAndMetadata value metadata = Json value (Just metadata)
eventDataBytes :: EventData -> ByteString
eventDataBytes (Json value _) = toStrict $ A.encode value
eventMetadataBytes :: EventData -> Maybe ByteString
eventMetadataBytes (Json _ meta_m) = fmap (toStrict . A.encode) meta_m
data ExpectedVersion
= Any
| NoStream
| EmptyStream
deriving Show
expVersionInt32 :: ExpectedVersion -> Int32
expVersionInt32 Any = 2
expVersionInt32 NoStream = 1
expVersionInt32 EmptyStream = 0
data OpResult
= OP_SUCCESS
| OP_PREPARE_TIMEOUT
| OP_COMMIT_TIMEOUT
| OP_FORWARD_TIMEOUT
| OP_WRONG_EXPECTED_VERSION
| OP_STREAM_DELETED
| OP_INVALID_TRANSACTION
| OP_ACCESS_DENIED
deriving (Eq, Enum, Show)
data NewEvent
= NewEvent
{ newEventId :: Required 1 (Value ByteString)
, newEventType :: Required 2 (Value Text)
, newEventDataType :: Required 3 (Value Int32)
, newEventMetadataType :: Required 4 (Value Int32)
, newEventData :: Required 5 (Value ByteString)
, newEventMetadata :: Optional 6 (Value ByteString)
}
deriving (Generic, Show)
instance Encode NewEvent
newEvent :: Text
-> Int32
-> Int32
-> ByteString
-> Maybe ByteString
-> IO NewEvent
newEvent evt_type data_type meta_type evt_data evt_meta = do
new_uuid <- randomIO
let uuid_bytes = toStrict $ toByteString new_uuid
new_evt = NewEvent
{ newEventId = putField uuid_bytes
, newEventType = putField evt_type
, newEventDataType = putField data_type
, newEventMetadataType = putField meta_type
, newEventData = putField evt_data
, newEventMetadata = putField evt_meta
}
return new_evt
data TransactionStart
= TransactionStart
{ transactionStartStreamId :: Required 1 (Value Text)
, transactionStartExpectedVersion :: Required 2 (Value Int32)
, transactionStartRequireMaster :: Required 3 (Value Bool)
}
deriving (Generic, Show)
newTransactionStart :: Text
-> Int32
-> Bool
-> TransactionStart
newTransactionStart stream_id exp_ver req_master =
TransactionStart
{ transactionStartStreamId = putField stream_id
, transactionStartExpectedVersion = putField exp_ver
, transactionStartRequireMaster = putField req_master
}
instance Encode TransactionStart
data TransactionStartCompleted
= TransactionStartCompleted
{ transactionSCId :: Required 1 (Value Int64)
, transactionSCResult :: Required 2 (Enumeration OpResult)
, transactionSCMessage :: Optional 3 (Value Text)
}
deriving (Generic, Show)
instance Decode TransactionStartCompleted
data TransactionWrite
= TransactionWrite
{ transactionWriteId :: Required 1 (Value Int64)
, transactionWriteEvents :: Repeated 2 (Message NewEvent)
, transactionWriteRequireMaster :: Required 3 (Value Bool)
}
deriving (Generic, Show)
instance Encode TransactionWrite
newTransactionWrite :: Int64 -> [NewEvent] -> Bool -> TransactionWrite
newTransactionWrite trans_id evts req_master =
TransactionWrite
{ transactionWriteId = putField trans_id
, transactionWriteEvents = putField evts
, transactionWriteRequireMaster = putField req_master
}
data TransactionWriteCompleted
= TransactionWriteCompleted
{ transactionWCId :: Required 1 (Value Int64)
, transactionWCResult :: Required 2 (Enumeration OpResult)
, transactionWCMessage :: Optional 3 (Value Text)
}
deriving (Generic, Show)
instance Decode TransactionWriteCompleted
data TransactionCommit
= TransactionCommit
{ transactionCommitId :: Required 1 (Value Int64)
, transactionCommitRequireMaster :: Required 2 (Value Bool)
}
deriving (Generic, Show)
instance Encode TransactionCommit
newTransactionCommit :: Int64 -> Bool -> TransactionCommit
newTransactionCommit trans_id req_master =
TransactionCommit
{ transactionCommitId = putField trans_id
, transactionCommitRequireMaster = putField req_master
}
data TransactionCommitCompleted
= TransactionCommitCompleted
{ transactionCCId :: Required 1 (Value Int64)
, transactionCCResult :: Required 2 (Enumeration OpResult)
, transactionCCMessage :: Optional 3 (Value Text)
, transactionCCFirstNumber :: Required 4 (Value Int32)
, transactionCCLastNumber :: Required 5 (Value Int32)
, transactionCCPreparePosition :: Optional 6 (Value Int64)
, transactionCCCommitPosition :: Optional 7 (Value Int64)
}
deriving (Generic, Show)
instance Decode TransactionCommitCompleted
data EventRecord
= EventRecord
{ eventRecordStreamId :: Required 1 (Value Text)
, eventRecordNumber :: Required 2 (Value Int32)
, eventRecordId :: Required 3 (Value ByteString)
, eventRecordType :: Required 4 (Value Text)
, eventRecordDataType :: Required 5 (Value Int32)
, eventRecordMetadataType :: Required 6 (Value Int32)
, eventRecordData :: Required 7 (Value ByteString)
, eventRecordMetadata :: Optional 8 (Value ByteString)
, eventRecordCreated :: Optional 9 (Value Int64)
, eventRecordCreatedEpoch :: Optional 10 (Value Int64)
}
deriving (Generic, Show)
instance Decode EventRecord
data ResolvedIndexedEvent
= ResolvedIndexedEvent
{ resolvedIndexedRecord :: Optional 1 (Message EventRecord)
, resolvedIndexedLink :: Optional 2 (Message EventRecord)
}
deriving (Generic, Show)
instance Decode ResolvedIndexedEvent
data ResolvedEventBuf
= ResolvedEventBuf
{ resolvedEventBufEvent :: Required 1 (Message EventRecord)
, resolvedEventBufLink :: Optional 2 (Message EventRecord)
, resolvedEventBufCommitPosition :: Required 3 (Value Int64)
, resolvedEventBufPreparePosition :: Required 4 (Value Int64)
}
deriving (Generic, Show)
instance Decode ResolvedEventBuf
data Position
= Position
{ positionCommit :: !Int64
, positionPrepare :: !Int64
}
deriving Show
positionStart :: Position
positionStart = Position 0 0
positionEnd :: Position
positionEnd = Position (1) (1)
data WriteResult
= WriteResult
{ writeNextExpectedVersion :: !Int32
, writePosition :: !Position
}
deriving Show
newtype DeleteResult
= DeleteResult { deleteStreamPosition :: Position }
deriving Show
data RecordedEvent
= RecordedEvent
{ recordedEventStreamId :: !Text
, recordedEventId :: !UUID
, recordedEventNumber :: !Int32
, recordedEventType :: !Text
, recordedEventData :: !ByteString
, recordedEventMetadata :: !(Maybe ByteString)
, recordedEventIsJson :: !Bool
, recordedEventCreated :: !(Maybe UTCTime)
, recordedEventCreatedEpoch :: !(Maybe Integer)
}
deriving Show
newRecordedEvent :: EventRecord -> RecordedEvent
newRecordedEvent er = re
where
evt_id = getField $ eventRecordId er
evt_uuid = fromJust $ fromByteString $ fromStrict evt_id
data_type = getField $ eventRecordDataType er
created = getField $ eventRecordCreated er
epoch = getField $ eventRecordCreatedEpoch er
utc_created = fmap (posixSecondsToUTCTime . fromInteger . toInteger) created
re = RecordedEvent
{ recordedEventStreamId = getField $ eventRecordStreamId er
, recordedEventNumber = getField $ eventRecordNumber er
, recordedEventId = evt_uuid
, recordedEventType = getField $ eventRecordType er
, recordedEventData = getField $ eventRecordData er
, recordedEventMetadata = getField $ eventRecordMetadata er
, recordedEventIsJson = data_type == 1
, recordedEventCreated = utc_created
, recordedEventCreatedEpoch = fmap toInteger epoch
}
data ResolvedEvent
= ResolvedEvent
{ resolvedEventRecord :: !(Maybe RecordedEvent)
, resolvedEventLink :: !(Maybe RecordedEvent)
}
deriving Show
newResolvedEvent :: ResolvedIndexedEvent -> ResolvedEvent
newResolvedEvent rie = re
where
record = getField $ resolvedIndexedRecord rie
link = getField $ resolvedIndexedLink rie
re = ResolvedEvent
{ resolvedEventRecord = fmap newRecordedEvent record
, resolvedEventLink = fmap newRecordedEvent link
}
newResolvedEventFromBuf :: ResolvedEventBuf -> ResolvedEvent
newResolvedEventFromBuf reb = re
where
record = Just $ newRecordedEvent $ getField $ resolvedEventBufEvent reb
link = getField $ resolvedEventBufLink reb
re = ResolvedEvent
{ resolvedEventRecord = record
, resolvedEventLink = fmap newRecordedEvent link
}
resolvedEventOriginal :: ResolvedEvent -> Maybe RecordedEvent
resolvedEventOriginal (ResolvedEvent record link) =
link <|> record
eventResolved :: ResolvedEvent -> Bool
eventResolved = isJust . resolvedEventOriginal
resolvedEventOriginalStreamId :: ResolvedEvent -> Maybe Text
resolvedEventOriginalStreamId =
fmap recordedEventStreamId . resolvedEventOriginal
data ReadDirection
= Forward
| Backward
deriving Show
data Transaction
= Transaction
{ transactionId :: Int64
, transactionStreamId :: Text
, transactionExpectedVersion :: ExpectedVersion
, transactionCommit :: IO (Async WriteResult)
, transactionSendEvents :: [Event] -> IO (Async ())
, transactionRollback :: IO ()
}
data Flag
= None
| Authenticated
deriving Show
flagWord8 :: Flag -> Word8
flagWord8 None = 0x00
flagWord8 Authenticated = 0x01
data Credentials
= Credentials
{ credLogin :: !ByteString
, credPassword :: !ByteString
}
deriving Show
credentials :: ByteString
-> ByteString
-> Credentials
credentials = Credentials
data Package
= Package
{ packageCmd :: !Word8
, packageCorrelation :: !UUID
, packageData :: !ByteString
, packageCred :: !(Maybe Credentials)
}
deriving Show
data Retry
= AtMost Int
| KeepRetrying
atMost :: Int -> Retry
atMost = AtMost
keepRetrying :: Retry
keepRetrying = KeepRetrying
data Settings
= Settings
{ s_heartbeatInterval :: NominalDiffTime
, s_heartbeatTimeout :: NominalDiffTime
, s_requireMaster :: Bool
, s_credentials :: Maybe Credentials
, s_retry :: Retry
, s_reconnect_delay_secs :: Int
}
defaultSettings :: Settings
defaultSettings = Settings
{ s_heartbeatInterval = msDiffTime 750
, s_heartbeatTimeout = msDiffTime 1500
, s_requireMaster = True
, s_credentials = Nothing
, s_retry = atMost 3
, s_reconnect_delay_secs = 3
}
msDiffTime :: Float -> NominalDiffTime
msDiffTime i = fromRational $ toRational (i / 1000)