module Database.EventStore
(
Connection
, ConnectionType(..)
, Credentials
, Settings(..)
, Retry
, atMost
, keepRetrying
, credentials
, defaultSettings
, defaultSSLSettings
, connect
, shutdown
, waitTillClosed
, connectionSettings
, ClusterSettings(..)
, DnsServer(..)
, GossipSeed
, gossipSeed
, gossipSeedWithHeader
, gossipSeedHost
, gossipSeedHeader
, gossipSeedPort
, gossipSeedClusterSettings
, dnsClusterSettings
, Event
, EventData
, EventType(..)
, createEvent
, withJson
, withJsonAndMetadata
, withBinary
, withBinaryAndMetadata
, OperationMaxAttemptReached(..)
, StreamMetadataResult(..)
, readEvent
, readAllEventsBackward
, readAllEventsForward
, readStreamEventsBackward
, readStreamEventsForward
, getStreamMetadata
, StreamACL(..)
, StreamMetadata(..)
, getCustomPropertyValue
, getCustomProperty
, emptyStreamACL
, emptyStreamMetadata
, deleteStream
, sendEvent
, sendEvents
, setStreamMetadata
, Builder
, StreamACLBuilder
, buildStreamACL
, modifyStreamACL
, setReadRoles
, setReadRole
, setWriteRoles
, setWriteRole
, setDeleteRoles
, setDeleteRole
, setMetaReadRoles
, setMetaReadRole
, setMetaWriteRoles
, setMetaWriteRole
, StreamMetadataBuilder
, buildStreamMetadata
, modifyStreamMetadata
, setMaxCount
, setMaxAge
, setTruncateBefore
, setCacheControl
, setACL
, modifyACL
, setCustomProperty
, Transaction
, TransactionId
, startTransaction
, transactionId
, transactionCommit
, transactionRollback
, transactionWrite
, SubscriptionClosed(..)
, SubscriptionId
, Subscription
, SubDropReason(..)
, SubDetails
, waitConfirmation
, unsubscribeConfirmed
, unsubscribeConfirmedSTM
, waitUnsubscribeConfirmed
, nextEventMaybeSTM
, getSubscriptionDetailsSTM
, unsubscribe
, subscriptionStream
, RegularSubscription
, subscribe
, subscribeToAll
, getSubscriptionId
, isSubscribedToAll
, nextEvent
, nextEventMaybe
, CatchupSubscription
, subscribeFrom
, subscribeToAllFrom
, waitTillCatchup
, hasCaughtUp
, hasCaughtUpSTM
, PersistentSubscription
, PersistentSubscriptionSettings(..)
, SystemConsumerStrategy(..)
, NakAction(..)
, PersistActionException(..)
, acknowledge
, acknowledgeEvents
, failed
, eventsFailed
, notifyEventsProcessed
, notifyEventsFailed
, defaultPersistentSubscriptionSettings
, createPersistentSubscription
, updatePersistentSubscription
, deletePersistentSubscription
, connectToPersistentSubscription
, Slice(..)
, AllSlice
, Op.DeleteResult(..)
, WriteResult(..)
, ReadResult(..)
, RecordedEvent(..)
, Op.ReadEvent(..)
, StreamType(..)
, StreamSlice
, Position(..)
, ReadDirection(..)
, ResolvedEvent(..)
, OperationError(..)
, StreamName(..)
, isEventResolvedLink
, resolvedEventOriginal
, resolvedEventDataAsJson
, resolvedEventOriginalStreamId
, resolvedEventOriginalId
, resolvedEventOriginalEventNumber
, recordedEventDataAsJson
, positionStart
, positionEnd
, LogLevel(..)
, LogType(..)
, LoggerFilter(..)
, Command
, DropReason(..)
, ExpectedVersion
, anyVersion
, noStreamVersion
, emptyStreamVersion
, exactEventVersion
, streamExists
, msDiffTime
, (<>)
, NonEmpty(..)
, nonEmpty
, TLSSettings
, NominalDiffTime
) where
import Prelude (String)
import Data.Int
import Data.Maybe
import Data.Time (NominalDiffTime)
import Data.List.NonEmpty(NonEmpty(..), nonEmpty)
import Network.Connection (TLSSettings)
import Database.EventStore.Internal.Callback
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Communication
import Database.EventStore.Internal.Connection (connectionBuilder)
import Database.EventStore.Internal.Control hiding (subscribe)
import Database.EventStore.Internal.Discovery
import Database.EventStore.Internal.Exec
import Database.EventStore.Internal.Subscription.Api
import Database.EventStore.Internal.Subscription.Catchup
import Database.EventStore.Internal.Subscription.Message
import Database.EventStore.Internal.Subscription.Persistent
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Subscription.Regular
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Manager.Operation.Registry
import Database.EventStore.Internal.Operation (OperationError(..))
import qualified Database.EventStore.Internal.Operations as Op
import Database.EventStore.Internal.Operation.Read.Common
import Database.EventStore.Internal.Operation.Write.Common
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Types
data ConnectionType
= Static String Int
| Cluster ClusterSettings
| Dns ByteString (Maybe DnsServer) Int
data Connection
= Connection
{ _exec :: Exec
, _settings :: Settings
, _type :: ConnectionType
}
connect :: Settings -> ConnectionType -> IO Connection
connect settings@Settings{..} tpe = do
disc <- case tpe of
Static host port -> return $ staticEndPointDiscovery host port
Cluster setts -> clusterDnsEndPointDiscovery setts
Dns dom srv port -> return $ simpleDnsEndPointDiscovery dom srv port
logRef <- newLoggerRef s_loggerType s_loggerFilter s_loggerDetailed
mainBus <- newBus logRef settings
builder <- connectionBuilder settings
exec <- newExec settings mainBus builder disc
return $ Connection exec settings tpe
waitTillClosed :: Connection -> IO ()
waitTillClosed Connection{..} = execWaitTillClosed _exec
connectionSettings :: Connection -> Settings
connectionSettings = _settings
shutdown :: Connection -> IO ()
shutdown Connection{..} = publishWith _exec SystemShutdown
sendEvent :: Connection
-> StreamName
-> ExpectedVersion
-> Event
-> IO (Async WriteResult)
sendEvent mgr evt_stream exp_ver evt =
sendEvents mgr evt_stream exp_ver [evt]
sendEvents :: Connection
-> StreamName
-> ExpectedVersion
-> [Event]
-> IO (Async WriteResult)
sendEvents Connection{..} evt_stream exp_ver evts = do
p <- newPromise
let op = Op.writeEvents _settings (streamNameRaw evt_stream) exp_ver evts
publishWith _exec (SubmitOperation p op)
async (retrieve p)
deleteStream :: Connection
-> StreamName
-> ExpectedVersion
-> Maybe Bool
-> IO (Async Op.DeleteResult)
deleteStream Connection{..} evt_stream exp_ver hard_del = do
p <- newPromise
let op = Op.deleteStream _settings (streamNameRaw evt_stream) exp_ver hard_del
publishWith _exec (SubmitOperation p op)
async (retrieve p)
data Transaction =
Transaction
{ _tStream :: Text
, _tTransId :: TransactionId
, _tExpVer :: ExpectedVersion
, _tConn :: Connection
}
newtype TransactionId =
TransactionId { _unTransId :: Int64 }
deriving (Eq, Ord, Show)
transactionId :: Transaction -> TransactionId
transactionId = _tTransId
startTransaction :: Connection
-> StreamName
-> ExpectedVersion
-> IO (Async Transaction)
startTransaction conn@Connection{..} evt_stream exp_ver = do
p <- newPromise
let op = Op.transactionStart _settings (streamNameRaw evt_stream) exp_ver
publishWith _exec (SubmitOperation p op)
async $ do
tid <- retrieve p
return Transaction
{ _tStream = streamNameRaw evt_stream
, _tTransId = TransactionId tid
, _tExpVer = exp_ver
, _tConn = conn
}
transactionWrite :: Transaction -> [Event] -> IO (Async ())
transactionWrite Transaction{..} evts = do
p <- newPromise
let Connection{..} = _tConn
raw_id = _unTransId _tTransId
op = Op.transactionWrite _settings _tStream _tExpVer raw_id evts
publishWith _exec (SubmitOperation p op)
async (retrieve p)
transactionCommit :: Transaction -> IO (Async WriteResult)
transactionCommit Transaction{..} = do
p <- newPromise
let Connection{..} = _tConn
raw_id = _unTransId _tTransId
op = Op.transactionCommit _settings _tStream _tExpVer raw_id
publishWith _exec (SubmitOperation p op)
async (retrieve p)
transactionRollback :: Transaction -> IO ()
transactionRollback _ = return ()
readEvent :: Connection
-> StreamName
-> Int32
-> Bool
-> IO (Async (ReadResult 'RegularStream Op.ReadEvent))
readEvent Connection{..} stream_id evt_num res_link_tos = do
p <- newPromise
let op = Op.readEvent _settings (streamNameRaw stream_id) evt_num res_link_tos
publishWith _exec (SubmitOperation p op)
async (retrieve p)
readStreamEventsForward :: Connection
-> StreamName
-> Int32
-> Int32
-> Bool
-> IO (Async (ReadResult 'RegularStream StreamSlice))
readStreamEventsForward mgr =
readStreamEventsCommon mgr Forward
readStreamEventsBackward :: Connection
-> StreamName
-> Int32
-> Int32
-> Bool
-> IO (Async (ReadResult 'RegularStream StreamSlice))
readStreamEventsBackward mgr =
readStreamEventsCommon mgr Backward
readStreamEventsCommon :: Connection
-> ReadDirection
-> StreamName
-> Int32
-> Int32
-> Bool
-> IO (Async (ReadResult 'RegularStream StreamSlice))
readStreamEventsCommon Connection{..} dir stream_id start cnt res_link_tos = do
p <- newPromise
let name = streamNameRaw stream_id
op = Op.readStreamEvents _settings dir name start cnt res_link_tos
publishWith _exec (SubmitOperation p op)
async (retrieve p)
readAllEventsForward :: Connection
-> Position
-> Int32
-> Bool
-> IO (Async AllSlice)
readAllEventsForward mgr =
readAllEventsCommon mgr Forward
readAllEventsBackward :: Connection
-> Position
-> Int32
-> Bool
-> IO (Async AllSlice)
readAllEventsBackward mgr =
readAllEventsCommon mgr Backward
readAllEventsCommon :: Connection
-> ReadDirection
-> Position
-> Int32
-> Bool
-> IO (Async AllSlice)
readAllEventsCommon Connection{..} dir pos max_c res_link_tos = do
p <- newPromise
let op = Op.readAllEvents _settings c_pos p_pos max_c res_link_tos dir
publishWith _exec (SubmitOperation p op)
async (retrieve p)
where
Position c_pos p_pos = pos
subscribe :: Connection
-> StreamName
-> Bool
-> IO RegularSubscription
subscribe Connection{..} stream resLnkTos =
newRegularSubscription _exec stream resLnkTos
subscribeToAll :: Connection
-> Bool
-> IO RegularSubscription
subscribeToAll conn = subscribe conn AllStream
subscribeFrom :: Connection
-> StreamName
-> Bool
-> Maybe Int32
-> Maybe Int32
-> IO CatchupSubscription
subscribeFrom conn streamId resLnkTos lastChkPt batch =
subscribeFromCommon conn resLnkTos batch tpe
where
tpe = Op.RegularCatchup (streamNameRaw streamId) (fromMaybe 0 lastChkPt)
subscribeToAllFrom :: Connection
-> Bool
-> Maybe Position
-> Maybe Int32
-> IO CatchupSubscription
subscribeToAllFrom conn resLnkTos lastChkPt batch =
subscribeFromCommon conn resLnkTos batch tpe
where
Position cPos pPos = fromMaybe positionStart lastChkPt
tpe = Op.AllCatchup (Position cPos pPos)
subscribeFromCommon :: Connection
-> Bool
-> Maybe Int32
-> Op.CatchupState
-> IO CatchupSubscription
subscribeFromCommon Connection{..} resLnkTos batch tpe =
newCatchupSubscription _exec resLnkTos batch tpe
setStreamMetadata :: Connection
-> StreamName
-> ExpectedVersion
-> StreamMetadata
-> IO (Async WriteResult)
setStreamMetadata Connection{..} evt_stream exp_ver metadata = do
p <- newPromise
let name = streamNameRaw evt_stream
op = Op.setMetaStream _settings name exp_ver metadata
publishWith _exec (SubmitOperation p op)
async (retrieve p)
getStreamMetadata :: Connection -> StreamName -> IO (Async StreamMetadataResult)
getStreamMetadata Connection{..} evt_stream = do
p <- newPromise
let op = Op.readMetaStream _settings (streamNameRaw evt_stream)
publishWith _exec (SubmitOperation p op)
async (retrieve p)
createPersistentSubscription :: Connection
-> Text
-> StreamName
-> PersistentSubscriptionSettings
-> IO (Async (Maybe PersistActionException))
createPersistentSubscription Connection{..} grp stream sett = do
p <- newPromise
let op = Op.createPersist grp (streamNameRaw stream) sett
publishWith _exec (SubmitOperation p op)
async (persistAsync p)
updatePersistentSubscription :: Connection
-> Text
-> StreamName
-> PersistentSubscriptionSettings
-> IO (Async (Maybe PersistActionException))
updatePersistentSubscription Connection{..} grp stream sett = do
p <- newPromise
let op = Op.updatePersist grp (streamNameRaw stream) sett
publishWith _exec (SubmitOperation p op)
async (persistAsync p)
deletePersistentSubscription :: Connection
-> Text
-> StreamName
-> IO (Async (Maybe PersistActionException))
deletePersistentSubscription Connection{..} grp stream = do
p <- newPromise
let op = Op.deletePersist grp (streamNameRaw stream)
publishWith _exec (SubmitOperation p op)
async (persistAsync p)
connectToPersistentSubscription :: Connection
-> Text
-> StreamName
-> Int32
-> IO PersistentSubscription
connectToPersistentSubscription Connection{..} group stream bufSize =
newPersistentSubscription _exec group stream bufSize
persistAsync :: Callback (Maybe PersistActionException)
-> IO (Maybe PersistActionException)
persistAsync = either throw return <=< tryRetrieve