{-# LANGUAGE GADTs #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Database.EventStore.Internal.Subscription.Catchup where
import Control.Monad.Fix
import Safe (fromJustNote)
import Database.EventStore.Internal.Callback
import Database.EventStore.Internal.Communication
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.Exec
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Operation.Catchup
import Database.EventStore.Internal.Operation.Volatile
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Api
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types
data Phase
= CatchingUp
| Running SubDetails
| Closed (Either SomeException SubDropReason)
receivedAlready :: StreamId t -> t -> ResolvedEvent -> Bool
receivedAlready StreamName{} old e =
EventNumber (resolvedEventOriginalEventNumber e) < old
receivedAlready All old e =
let pos =
fromJustNote
"Position is always defined when reading events from $all stream"
$ resolvedEventPosition e in
pos < old
nextTarget :: StreamId t -> ResolvedEvent -> t
nextTarget StreamName{} e =
EventNumber (resolvedEventOriginalEventNumber e)
nextTarget All e =
fromJustNote
"Position is always defined when reading events from $all stream"
$ resolvedEventPosition e
data CatchupSubscription t =
CatchupSubscription { _catchupExec :: Exec
, _catchupStream :: StreamId t
, _catchupPhase :: TVar Phase
, _catchupTrack :: TVar t
, _catchupNext :: STM (Maybe ResolvedEvent)
}
instance Subscription (CatchupSubscription t) where
nextEventMaybeSTM = _catchupNext
getSubscriptionDetailsSTM s = do
p <- readTVar (_catchupPhase s)
case p of
Running details -> return details
Closed r -> throwClosed r
_ -> retrySTM
unsubscribe s = subUnsubscribe (_catchupExec s) s
instance SubscriptionStream (CatchupSubscription t) t where
subscriptionStream = _catchupStream
newCatchupSubscription :: Exec
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> StreamId t
-> t
-> IO (CatchupSubscription t)
newCatchupSubscription exec tos batch cred streamId seed = do
phaseVar <- newTVarIO CatchingUp
queue <- newTQueueIO
track <- newTVarIO seed
let stream = streamIdRaw streamId
sub = CatchupSubscription exec streamId phaseVar track $ do
p <- readTVar phaseVar
isEmpty <- isEmptyTQueue queue
if isEmpty
then
case p of
Closed r -> throwClosed r
_ -> return Nothing
else Just <$> readTQueue queue
callback cb (Left e) =
case fromException e of
Just opE ->
case opE of
StreamNotFound{} -> do
let op = volatile streamId tos cred
publishWith exec (SubmitOperation cb op)
_ -> atomically $ writeTVar phaseVar (Closed $ Left e)
_ -> atomically $ writeTVar phaseVar (Closed $ Left e)
callback _ (Right action) =
case action of
Confirmed details -> atomically $ writeTVar phaseVar (Running details)
Dropped r ->
atomically $ writeTVar phaseVar (Closed $ Right r)
Submit e -> atomically $ do
tracker <- readTVar track
unless (receivedAlready streamId tracker e) $ do
writeTVar track (nextTarget streamId e)
writeTQueue queue e
ConnectionReset -> do
chk <- readTVarIO track
let newOp = catchup (execSettings exec) streamId chk tos batch cred
newCb <- mfix $ \self -> newCallback (callback self)
publishWith exec (SubmitOperation newCb newOp)
cb <- mfix $ \self -> newCallback (callback self)
let op = catchup (execSettings exec) streamId seed tos batch cred
publishWith exec (SubmitOperation cb op)
return sub
throwClosed :: Either SomeException SubDropReason -> STM a
throwClosed (Left e) = throwSTM e
throwClosed (Right r) = throwSTM (SubscriptionClosed $ Just r)
hasCaughtUp :: CatchupSubscription t -> IO Bool
hasCaughtUp sub = atomically $ hasCaughtUpSTM sub
waitTillCatchup :: CatchupSubscription t -> IO ()
waitTillCatchup sub = atomically $ unlessM (hasCaughtUpSTM sub) retrySTM
hasCaughtUpSTM :: CatchupSubscription t -> STM Bool
hasCaughtUpSTM CatchupSubscription{..} = do
p <- readTVar _catchupPhase
case p of
CatchingUp -> return False
Running{} -> return True
Closed tpe ->
case tpe of
Left e -> throwSTM e
Right r -> throwSTM (SubscriptionClosed $ Just r)