module Database.EventStore
(
Event
, EventData
, createEvent
, withJson
, withJsonAndMetadata
, Connection
, Credentials
, Settings(..)
, Retry
, atMost
, keepRetrying
, credentials
, defaultSettings
, connect
, shutdown
, readEvent
, readAllEventsBackward
, readAllEventsForward
, readStreamEventsBackward
, readStreamEventsForward
, deleteStream
, sendEvent
, sendEvents
, Transaction
, transactionStart
, transactionCommit
, transactionRollback
, transactionSendEvents
, DropReason(..)
, Subscription
, subscribe
, subscribeToAll
, subAwait
, subId
, subStream
, subResolveLinkTos
, subLastCommitPos
, subLastEventNumber
, subUnsubscribe
, Catchup
, CatchupError(..)
, subscribeFrom
, subscribeToAllFrom
, catchupAwait
, catchupStream
, catchupUnsubscribe
, AllEventsSlice(..)
, DeleteResult(..)
, WriteResult(..)
, ReadResult(..)
, RecordedEvent(..)
, StreamEventsSlice(..)
, Position(..)
, ReadDirection(..)
, ReadAllResult(..)
, ReadEventResult(..)
, ResolvedEvent(..)
, ReadStreamResult(..)
, OperationException(..)
, eventResolved
, resolvedEventOriginal
, resolvedEventOriginalStreamId
, positionStart
, positionEnd
, ExpectedVersion(..)
, module Control.Concurrent.Async
) where
import Control.Concurrent
import Control.Exception
import Data.Int
import Control.Concurrent.Async
import Data.Text
import Database.EventStore.Catchup
import Database.EventStore.Internal.Processor
import Database.EventStore.Internal.Types
import Database.EventStore.Internal.Operation.DeleteStreamOperation
import Database.EventStore.Internal.Operation.ReadAllEventsOperation
import Database.EventStore.Internal.Operation.ReadEventOperation
import Database.EventStore.Internal.Operation.ReadStreamEventsOperation
import Database.EventStore.Internal.Operation.TransactionStartOperation
import Database.EventStore.Internal.Operation.WriteEventsOperation
data Connection
= Connection
{ conProcessor :: Processor
, conSettings :: Settings
}
connect :: Settings
-> String
-> Int
-> IO Connection
connect settings host port = do
processor <- newProcessor settings
processorConnect processor host port
return $ Connection processor settings
shutdown :: Connection -> IO ()
shutdown Connection{..} = processorShutdown conProcessor
sendEvent :: Connection
-> Text
-> ExpectedVersion
-> Event
-> IO (Async WriteResult)
sendEvent mgr evt_stream exp_ver evt =
sendEvents mgr evt_stream exp_ver [evt]
sendEvents :: Connection
-> Text
-> ExpectedVersion
-> [Event]
-> IO (Async WriteResult)
sendEvents Connection{..} evt_stream exp_ver evts = do
(as, mvar) <- createAsync
let op = writeEventsOperation conSettings mvar evt_stream exp_ver evts
processorNewOperation conProcessor op
return as
deleteStream :: Connection
-> Text
-> ExpectedVersion
-> Maybe Bool
-> IO (Async DeleteResult)
deleteStream Connection{..} evt_stream exp_ver hard_del = do
(as, mvar) <- createAsync
let op = deleteStreamOperation conSettings mvar evt_stream exp_ver hard_del
processorNewOperation conProcessor op
return as
transactionStart :: Connection
-> Text
-> ExpectedVersion
-> IO (Async Transaction)
transactionStart Connection{..} evt_stream exp_ver = do
(as, mvar) <- createAsync
let op = transactionStartOperation conSettings
conProcessor
mvar
evt_stream
exp_ver
processorNewOperation conProcessor op
return as
readEvent :: Connection
-> Text
-> Int32
-> Bool
-> IO (Async ReadResult)
readEvent Connection{..} stream_id evt_num res_link_tos = do
(as, mvar) <- createAsync
let op = readEventOperation conSettings mvar stream_id evt_num res_link_tos
processorNewOperation conProcessor op
return as
readStreamEventsForward :: Connection
-> Text
-> Int32
-> Int32
-> Bool
-> IO (Async StreamEventsSlice)
readStreamEventsForward mgr =
readStreamEventsCommon mgr Forward
readStreamEventsBackward :: Connection
-> Text
-> Int32
-> Int32
-> Bool
-> IO (Async StreamEventsSlice)
readStreamEventsBackward mgr =
readStreamEventsCommon mgr Backward
readStreamEventsCommon :: Connection
-> ReadDirection
-> Text
-> Int32
-> Int32
-> Bool
-> IO (Async StreamEventsSlice)
readStreamEventsCommon Connection{..} dir stream_id start cnt res_link_tos = do
(as, mvar) <- createAsync
let op = readStreamEventsOperation conSettings
dir
mvar
stream_id
start
cnt
res_link_tos
processorNewOperation conProcessor op
return as
readAllEventsForward :: Connection
-> Position
-> Int32
-> Bool
-> IO (Async AllEventsSlice)
readAllEventsForward mgr =
readAllEventsCommon mgr Forward
readAllEventsBackward :: Connection
-> Position
-> Int32
-> Bool
-> IO (Async AllEventsSlice)
readAllEventsBackward mgr =
readAllEventsCommon mgr Backward
readAllEventsCommon :: Connection
-> ReadDirection
-> Position
-> Int32
-> Bool
-> IO (Async AllEventsSlice)
readAllEventsCommon Connection{..} dir pos max_c res_link_tos = do
(as, mvar) <- createAsync
let op = readAllEventsOperation conSettings
dir
mvar
c_pos
p_pos
max_c
res_link_tos
processorNewOperation conProcessor op
return as
where
Position c_pos p_pos = pos
subscribe :: Connection
-> Text
-> Bool
-> IO (Async Subscription)
subscribe Connection{..} stream_id res_lnk_tos = do
tmp <- newEmptyMVar
processorNewSubcription conProcessor
(putMVar tmp)
stream_id
res_lnk_tos
async $ readMVar tmp
subscribeToAll :: Connection
-> Bool
-> IO (Async Subscription)
subscribeToAll Connection{..} res_lnk_tos = do
tmp <- newEmptyMVar
processorNewSubcription conProcessor
(putMVar tmp)
""
res_lnk_tos
async $ readMVar tmp
subscribeFrom :: Connection
-> Text
-> Bool
-> Maybe Int32
-> Maybe Int32
-> IO Catchup
subscribeFrom conn stream_id res_lnk_tos last_chk_pt batch_m = do
catchupStart evts_fwd get_sub stream_id batch_m last_chk_pt
where
evts_fwd cur_num batch_size =
readStreamEventsForward conn stream_id cur_num batch_size res_lnk_tos
get_sub = subscribe conn stream_id res_lnk_tos
subscribeToAllFrom :: Connection
-> Bool
-> Maybe Position
-> Maybe Int32
-> IO Catchup
subscribeToAllFrom conn res_lnk_tos last_chk_pt batch_m = do
catchupAllStart evts_fwd get_sub last_chk_pt batch_m
where
evts_fwd pos batch_size =
readAllEventsForward conn pos batch_size res_lnk_tos
get_sub = subscribeToAll conn res_lnk_tos
createAsync :: IO (Async a, MVar (OperationExceptional a))
createAsync = do
mvar <- newEmptyMVar
as <- async $ do
res <- readMVar mvar
either throwIO return res
return (as, mvar)