module Database.EventStore
(
Connection
, ConnectionException(..)
, ServerConnectionError(..)
, Credentials
, Settings(..)
, Retry
, atMost
, keepRetrying
, credentials
, defaultSettings
, connect
, shutdown
, waitTillClosed
, Event
, EventData
, createEvent
, withJson
, withJsonAndMetadata
, StreamMetadataResult(..)
, readEvent
, readAllEventsBackward
, readAllEventsForward
, readStreamEventsBackward
, readStreamEventsForward
, getStreamMetadata
, StreamACL(..)
, StreamMetadata(..)
, getCustomPropertyValue
, getCustomProperty
, emptyStreamACL
, emptyStreamMetadata
, deleteStream
, sendEvent
, sendEvents
, setStreamMetadata
, Builder
, StreamACLBuilder
, buildStreamACL
, modifyStreamACL
, setReadRoles
, setReadRole
, setWriteRoles
, setWriteRole
, setDeleteRoles
, setDeleteRole
, setMetaReadRoles
, setMetaReadRole
, setMetaWriteRoles
, setMetaWriteRole
, StreamMetadataBuilder
, buildStreamMetadata
, modifyStreamMetadata
, setMaxCount
, setMaxAge
, setTruncateBefore
, setCacheControl
, setACL
, modifyACL
, setCustomProperty
, TimeSpan
, timeSpanTicks
, timeSpanHoursMinsSecs
, timeSpanDaysHoursMinsSecs
, timeSpanDaysHoursMinsSecsMillis
, timeSpanGetTicks
, timeSpanGetDays
, timeSpanGetHours
, timeSpanGetMinutes
, timeSpanGetSeconds
, timeSpanGetMillis
, timeSpanFromSeconds
, timeSpanFromMinutes
, timeSpanFromHours
, timeSpanFromDays
, timeSpanTotalMillis
, Transaction
, TransactionId
, startTransaction
, transactionId
, transactionCommit
, transactionRollback
, transactionWrite
, SubscriptionClosed(..)
, SubscriptionId
, Subscription
, S.Running(..)
, S.SubDropReason(..)
, S.Regular
, subscribe
, subscribeToAll
, getSubId
, getSubStream
, isSubscribedToAll
, unsubscribe
, nextEvent
, nextEventMaybe
, getSubResolveLinkTos
, getSubLastCommitPos
, getSubLastEventNumber
, S.Catchup
, subscribeFrom
, subscribeToAllFrom
, waitTillCatchup
, hasCaughtUp
, S.Persistent
, PersistentSubscriptionSettings(..)
, SystemConsumerStrategy(..)
, NakAction(..)
, S.PersistActionException(..)
, acknowledge
, acknowledgeEvents
, failed
, eventsFailed
, notifyEventsProcessed
, notifyEventsFailed
, defaultPersistentSubscriptionSettings
, createPersistentSubscription
, updatePersistentSubscription
, deletePersistentSubscription
, connectToPersistentSubscription
, Slice(..)
, AllSlice
, Op.DeleteResult(..)
, WriteResult(..)
, ReadResult(..)
, RecordedEvent(..)
, Op.ReadEvent(..)
, StreamType(..)
, StreamSlice
, Position(..)
, ReadDirection(..)
, ResolvedEvent(..)
, OperationError(..)
, StreamName(..)
, isEventResolvedLink
, resolvedEventOriginal
, resolvedEventDataAsJson
, resolvedEventOriginalStreamId
, resolvedEventOriginalId
, recordedEventDataAsJson
, positionStart
, positionEnd
, DropReason(..)
, ExpectedVersion
, anyVersion
, noStreamVersion
, emptyStreamVersion
, exactEventVersion
, module Control.Concurrent.Async
, (<>)
) where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad (when)
import Data.Int
import Data.Maybe
import Data.Monoid ((<>))
import Data.Typeable
import Control.Concurrent.Async
import Data.Text hiding (group)
import Data.UUID
import Database.EventStore.Internal.Connection hiding (Connection)
import qualified Database.EventStore.Internal.Manager.Subscription as S
import Database.EventStore.Internal.Manager.Subscription.Message
import Database.EventStore.Internal.Operation (OperationError(..))
import qualified Database.EventStore.Internal.Operations as Op
import Database.EventStore.Internal.Operation.Read.Common
import Database.EventStore.Internal.Operation.Write.Common
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.TimeSpan
import Database.EventStore.Internal.Types
import Database.EventStore.Internal.Execution.Production
data Connection
= Connection
{ _prod :: Production
, _settings :: Settings
}
connect :: Settings
-> String
-> Int
-> IO Connection
connect settings host port = do
prod <- newExecutionModel settings host port
return $ Connection prod settings
waitTillClosed :: Connection -> IO ()
waitTillClosed Connection{..} = prodWaitTillClosed _prod
shutdown :: Connection -> IO ()
shutdown Connection{..} = shutdownExecutionModel _prod
sendEvent :: Connection
-> Text
-> ExpectedVersion
-> Event
-> IO (Async WriteResult)
sendEvent mgr evt_stream exp_ver evt =
sendEvents mgr evt_stream exp_ver [evt]
newtype SubscriptionId = SubId UUID deriving (Eq, Ord, Show)
getSubResolveLinkTos :: Subscription S.Regular -> Bool
getSubResolveLinkTos = S._subTos . _subInner
hasCaughtUp :: Subscription S.Catchup -> IO Bool
hasCaughtUp sub = atomically $ _hasCaughtUp sub
waitTillCatchup :: Subscription S.Catchup -> IO ()
waitTillCatchup sub = atomically $ do
caughtUp <- _hasCaughtUp sub
when (not caughtUp) retry
_hasCaughtUp :: Subscription S.Catchup -> STM Bool
_hasCaughtUp Subscription{..} = do
SubState sm _ <- readTVar _subVar
return $ S.hasCaughtUp sm
data SubState a = SubState (S.Subscription a) (Maybe S.SubDropReason)
data Subscription a =
Subscription
{ _subVar :: TVar (SubState a)
, _subRun :: TMVar S.Running
, _subStream :: Text
, _subProd :: Production
, _subInner :: a
}
getSubId :: Subscription a -> IO SubscriptionId
getSubId Subscription{..} = atomically $ do
run <- readTMVar _subRun
return $ SubId $ S.runningUUID run
getSubStream :: Subscription a -> Text
getSubStream = _subStream
unsubscribe :: Subscription a -> IO ()
unsubscribe Subscription{..} = do
run <- atomically $ readTMVar _subRun
pushUnsubscribe _subProd run
isSubscribedToAll :: Subscription a -> Bool
isSubscribedToAll = (== "") . getSubStream
getSubLastCommitPos :: Subscription a -> IO Int64
getSubLastCommitPos Subscription{..} = atomically $ do
run <- readTMVar _subRun
return $ S.runningLastCommitPosition run
getSubLastEventNumber :: Subscription a -> IO (Maybe Int32)
getSubLastEventNumber Subscription{..} = atomically $ do
run <- readTMVar _subRun
return $ S.runningLastEventNumber run
nextEvent :: Subscription a -> IO ResolvedEvent
nextEvent sub = atomically $ do
m <- _nextEventMaybe sub
case m of
Nothing -> retry
Just e -> return e
nextEventMaybe :: Subscription a -> IO (Maybe ResolvedEvent)
nextEventMaybe = atomically . _nextEventMaybe
_nextEventMaybe :: Subscription a -> STM (Maybe ResolvedEvent)
_nextEventMaybe Subscription{..} = do
SubState sub close <- readTVar _subVar
run <- readTMVar _subRun
let (res, nxt) = S.readNext sub
case res of
Nothing -> do
case close of
Nothing -> return Nothing
Just err -> throwSTM $ SubscriptionClosed run err
Just e -> do
writeTVar _subVar $ SubState nxt close
return $ Just e
notifyEventsProcessed :: Subscription S.Persistent -> [UUID] -> IO ()
notifyEventsProcessed Subscription{..} evts = do
run <- atomically $ readTMVar _subRun
pushAckPersist _subProd run evts
acknowledge :: Subscription S.Persistent -> ResolvedEvent -> IO ()
acknowledge sub e = notifyEventsProcessed sub [resolvedEventOriginalId e]
acknowledgeEvents :: Subscription S.Persistent -> [ResolvedEvent] -> IO ()
acknowledgeEvents sub = notifyEventsProcessed sub . fmap resolvedEventOriginalId
failed :: Subscription S.Persistent
-> ResolvedEvent
-> NakAction
-> Maybe Text
-> IO ()
failed sub e a r = notifyEventsFailed sub a r [resolvedEventOriginalId e]
eventsFailed :: Subscription S.Persistent
-> [ResolvedEvent]
-> NakAction
-> Maybe Text
-> IO ()
eventsFailed sub evts a r =
notifyEventsFailed sub a r $ fmap resolvedEventOriginalId evts
notifyEventsFailed :: Subscription S.Persistent
-> NakAction
-> Maybe Text
-> [UUID]
-> IO ()
notifyEventsFailed Subscription{..} act res evts = do
run <- atomically $ readTMVar _subRun
pushNakPersist _subProd run act res evts
modifySubSM :: (S.Subscription a -> S.Subscription a)
-> SubState a
-> SubState a
modifySubSM k (SubState sm r) = SubState (k sm) r
data SubscriptionClosed =
SubscriptionClosed S.Running S.SubDropReason
deriving (Show, Typeable)
instance Exception SubscriptionClosed
sendEvents :: Connection
-> Text
-> ExpectedVersion
-> [Event]
-> IO (Async WriteResult)
sendEvents Connection{..} evt_stream exp_ver evts = do
(k, as) <- createOpAsync
let op = Op.writeEvents _settings evt_stream exp_ver evts
pushOperation _prod k op
return as
deleteStream :: Connection
-> Text
-> ExpectedVersion
-> Maybe Bool
-> IO (Async Op.DeleteResult)
deleteStream Connection{..} evt_stream exp_ver hard_del = do
(k, as) <- createOpAsync
let op = Op.deleteStream _settings evt_stream exp_ver hard_del
pushOperation _prod k op
return as
data Transaction =
Transaction
{ _tStream :: Text
, _tTransId :: TransactionId
, _tExpVer :: ExpectedVersion
, _tConn :: Connection
}
newtype TransactionId =
TransactionId { _unTransId :: Int64 }
deriving (Eq, Ord, Show)
transactionId :: Transaction -> TransactionId
transactionId = _tTransId
startTransaction :: Connection
-> Text
-> ExpectedVersion
-> IO (Async Transaction)
startTransaction conn@Connection{..} evt_stream exp_ver = do
(k, as) <- createOpAsync
let op = Op.transactionStart _settings evt_stream exp_ver
pushOperation _prod k op
let _F trans_id =
Transaction
{ _tStream = evt_stream
, _tTransId = TransactionId trans_id
, _tExpVer = exp_ver
, _tConn = conn
}
return $ fmap _F as
transactionWrite :: Transaction -> [Event] -> IO (Async ())
transactionWrite Transaction{..} evts = do
(k, as) <- createOpAsync
let Connection{..} = _tConn
raw_id = _unTransId _tTransId
op = Op.transactionWrite _settings _tStream _tExpVer raw_id evts
pushOperation _prod k op
return as
transactionCommit :: Transaction -> IO (Async WriteResult)
transactionCommit Transaction{..} = do
(k, as) <- createOpAsync
let Connection{..} = _tConn
raw_id = _unTransId _tTransId
op = Op.transactionCommit _settings _tStream _tExpVer raw_id
pushOperation _prod k op
return as
transactionRollback :: Transaction -> IO ()
transactionRollback _ = return ()
readEvent :: Connection
-> Text
-> Int32
-> Bool
-> IO (Async (ReadResult 'RegularStream Op.ReadEvent))
readEvent Connection{..} stream_id evt_num res_link_tos = do
(k, as) <- createOpAsync
let op = Op.readEvent _settings stream_id evt_num res_link_tos
pushOperation _prod k op
return as
readStreamEventsForward :: Connection
-> Text
-> Int32
-> Int32
-> Bool
-> IO (Async (ReadResult 'RegularStream StreamSlice))
readStreamEventsForward mgr =
readStreamEventsCommon mgr Forward
readStreamEventsBackward :: Connection
-> Text
-> Int32
-> Int32
-> Bool
-> IO (Async (ReadResult 'RegularStream StreamSlice))
readStreamEventsBackward mgr =
readStreamEventsCommon mgr Backward
readStreamEventsCommon :: Connection
-> ReadDirection
-> Text
-> Int32
-> Int32
-> Bool
-> IO (Async (ReadResult 'RegularStream StreamSlice))
readStreamEventsCommon Connection{..} dir stream_id start cnt res_link_tos = do
(k, as) <- createOpAsync
let op = Op.readStreamEvents _settings dir stream_id start cnt res_link_tos
pushOperation _prod k op
return as
readAllEventsForward :: Connection
-> Position
-> Int32
-> Bool
-> IO (Async AllSlice)
readAllEventsForward mgr =
readAllEventsCommon mgr Forward
readAllEventsBackward :: Connection
-> Position
-> Int32
-> Bool
-> IO (Async AllSlice)
readAllEventsBackward mgr =
readAllEventsCommon mgr Backward
readAllEventsCommon :: Connection
-> ReadDirection
-> Position
-> Int32
-> Bool
-> IO (Async AllSlice)
readAllEventsCommon Connection{..} dir pos max_c res_link_tos = do
(k, as) <- createOpAsync
let op = Op.readAllEvents _settings c_pos p_pos max_c res_link_tos dir
pushOperation _prod k op
return as
where
Position c_pos p_pos = pos
subscribe :: Connection
-> Text
-> Bool
-> IO (Subscription S.Regular)
subscribe Connection{..} stream_id res_lnk_tos = do
mvar <- newEmptyTMVarIO
var <- newTVarIO $ SubState S.regularSubscription Nothing
let mk r = putTMVar mvar r
recv = readTVar var
send = writeTVar var
dropped r = do
SubState sm _ <- readTVar var
writeTVar var $ SubState sm (Just r)
cb = createSubAsync mk recv send dropped
pushConnectStream _prod cb stream_id res_lnk_tos
return $ Subscription var mvar stream_id _prod (S.Regular res_lnk_tos)
subscribeToAll :: Connection
-> Bool
-> IO (Subscription S.Regular)
subscribeToAll conn res_lnk_tos = subscribe conn "" res_lnk_tos
subscribeFrom :: Connection
-> Text
-> Bool
-> Maybe Int32
-> Maybe Int32
-> IO (Subscription S.Catchup)
subscribeFrom conn stream_id res_lnk_tos last_chk_pt batch_m =
subscribeFromCommon conn stream_id res_lnk_tos batch_m tpe
where
tpe = Op.RegularCatchup stream_id (fromMaybe 0 last_chk_pt)
subscribeToAllFrom :: Connection
-> Bool
-> Maybe Position
-> Maybe Int32
-> IO (Subscription S.Catchup)
subscribeToAllFrom conn res_lnk_tos last_chk_pt batch_m =
subscribeFromCommon conn "" res_lnk_tos batch_m tpe
where
Position c_pos p_pos = fromMaybe positionStart last_chk_pt
tpe = Op.AllCatchup c_pos p_pos
subscribeFromCommon :: Connection
-> Text
-> Bool
-> Maybe Int32
-> Op.CatchupState
-> IO (Subscription S.Catchup)
subscribeFromCommon Connection{..} stream_id res_lnk_tos batch_m tpe = do
mvar <- newEmptyTMVarIO
var <- newTVarIO $ SubState S.catchupSubscription Nothing
let readFrom res =
case res of
Left _ -> return ()
Right (xs, eos, chk) -> atomically $ do
s <- readTVar var
let nxt_s = modifySubSM (S.batchRead xs eos chk) s
writeTVar var nxt_s
mk = putTMVar mvar
rcv = readTVar var
send = writeTVar var
dropped r = do
SubState sm _ <- readTVar var
writeTVar var $ SubState sm (Just r)
op = Op.catchup _settings tpe res_lnk_tos batch_m
cb = createSubAsync mk rcv send dropped
pushOperation _prod readFrom op
pushConnectStream _prod cb stream_id res_lnk_tos
return $ Subscription var mvar stream_id _prod S.Catchup
setStreamMetadata :: Connection
-> Text
-> ExpectedVersion
-> StreamMetadata
-> IO (Async WriteResult)
setStreamMetadata Connection{..} evt_stream exp_ver metadata = do
(k, as) <- createOpAsync
let op = Op.setMetaStream _settings evt_stream exp_ver metadata
pushOperation _prod k op
return as
getStreamMetadata :: Connection -> Text -> IO (Async StreamMetadataResult)
getStreamMetadata Connection{..} evt_stream = do
(k, as) <- createOpAsync
let op = Op.readMetaStream _settings evt_stream
pushOperation _prod k op
return as
createPersistentSubscription :: Connection
-> Text
-> Text
-> PersistentSubscriptionSettings
-> IO (Async (Maybe S.PersistActionException))
createPersistentSubscription Connection{..} group stream sett = do
mvar <- newEmptyTMVarIO
let _F res = atomically $
case res of
Left e -> putTMVar mvar (Just e)
_ -> putTMVar mvar Nothing
pushCreatePersist _prod _F group stream sett
async $ atomically $ readTMVar mvar
updatePersistentSubscription :: Connection
-> Text
-> Text
-> PersistentSubscriptionSettings
-> IO (Async (Maybe S.PersistActionException))
updatePersistentSubscription Connection{..} group stream sett = do
mvar <- newEmptyTMVarIO
let _F res = atomically $
case res of
Left e -> putTMVar mvar (Just e)
_ -> putTMVar mvar Nothing
pushUpdatePersist _prod _F group stream sett
async $ atomically $ readTMVar mvar
deletePersistentSubscription :: Connection
-> Text
-> Text
-> IO (Async (Maybe S.PersistActionException))
deletePersistentSubscription Connection{..} group stream = do
mvar <- newEmptyTMVarIO
let _F res = atomically $
case res of
Left e -> putTMVar mvar (Just e)
_ -> putTMVar mvar Nothing
pushDeletePersist _prod _F group stream
async $ atomically $ readTMVar mvar
connectToPersistentSubscription :: Connection
-> Text
-> Text
-> Int32
-> IO (Subscription S.Persistent)
connectToPersistentSubscription Connection{..} group stream bufSize = do
mvar <- newEmptyTMVarIO
var <- newTVarIO $ SubState S.persistentSubscription Nothing
let mk r = putTMVar mvar r
recv = readTVar var
send = writeTVar var
dropped r = do
SubState sm _ <- readTVar var
writeTVar var $ SubState sm (Just r)
cb = createSubAsync mk recv send dropped
pushConnectPersist _prod cb group stream bufSize
return $ Subscription var mvar stream _prod (S.Persistent group)
createOpAsync :: IO (Either OperationError a -> IO (), Async a)
createOpAsync = do
mvar <- newEmptyMVar
as <- async $ do
res <- readMVar mvar
either throwIO return res
return (putMVar mvar, as)
createSubAsync :: (S.Running -> STM ())
-> STM (SubState a)
-> (SubState a -> STM ())
-> (S.SubDropReason -> STM ())
-> (S.SubConnectEvent -> IO ())
createSubAsync mk rcv send quit = go
where
go (S.SubConfirmed run) = atomically $ mk run
go (S.EventAppeared e) = atomically $ do
SubState sm close <- rcv
let nxt = S.eventArrived e sm
send $ SubState nxt close
go (S.Dropped r) = atomically $ quit r