{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} -------------------------------------------------------------------------------- -- | -- Module : Database.EventStore.Internal.Subscription.Catchup -- Copyright : (C) 2017 Yorick Laupa -- License : (see the file LICENSE) -- -- Maintainer : Yorick Laupa -- Stability : provisional -- Portability : non-portable -- -------------------------------------------------------------------------------- module Database.EventStore.Internal.Subscription.Catchup where -------------------------------------------------------------------------------- import Control.Monad.Fix -------------------------------------------------------------------------------- import Database.EventStore.Internal.Callback import Database.EventStore.Internal.Communication import Database.EventStore.Internal.Control import Database.EventStore.Internal.Exec import Database.EventStore.Internal.Operation import Database.EventStore.Internal.Operation.Catchup import Database.EventStore.Internal.Operation.Volatile import Database.EventStore.Internal.Prelude import Database.EventStore.Internal.Stream import Database.EventStore.Internal.Subscription.Api import Database.EventStore.Internal.Subscription.Types import Database.EventStore.Internal.Types -------------------------------------------------------------------------------- data Phase = CatchingUp | Running SubDetails | Closed (Either SomeException SubDropReason) -------------------------------------------------------------------------------- data CatchupTrack = CatchupRegular !(Maybe Int64) | CatchupAll !(Maybe Position) -------------------------------------------------------------------------------- catchupTrack :: CatchupState -> CatchupTrack catchupTrack RegularCatchup{} = CatchupRegular Nothing catchupTrack AllCatchup{} = CatchupAll Nothing -------------------------------------------------------------------------------- receivedAlready :: CatchupTrack -> ResolvedEvent -> Bool receivedAlready (CatchupRegular old) e = maybe False (resolvedEventOriginalEventNumber e <=) old receivedAlready (CatchupAll old) e = fromMaybe False ((<=) <$> resolvedEventPosition e <*> old) -------------------------------------------------------------------------------- updateTrack :: ResolvedEvent -> CatchupTrack -> CatchupTrack updateTrack e (CatchupRegular _) = CatchupRegular (Just $ resolvedEventOriginalEventNumber e) updateTrack e (CatchupAll _) = CatchupAll (resolvedEventPosition e) -------------------------------------------------------------------------------- -- | This kind of subscription specifies a starting point, in the form of an -- event number or transaction file position. The given function will be -- called for events from the starting point until the end of the stream, and -- then for subsequently written events. -- -- For example, if a starting point of 50 is specified when a stream has 100 -- events in it, the subscriber can expect to see events 51 through 100, and -- then any events subsequently written until such time as the subscription is -- dropped or closed. data CatchupSubscription = CatchupSubscription { _catchupExec :: Exec , _catchupStream :: StreamName , _catchupPhase :: TVar Phase , _catchupTrack :: TVar CatchupTrack , _catchupNext :: STM (Maybe ResolvedEvent) } -------------------------------------------------------------------------------- instance Subscription CatchupSubscription where nextEventMaybeSTM = _catchupNext getSubscriptionDetailsSTM s = do p <- readTVar (_catchupPhase s) case p of Running details -> return details Closed r -> throwClosed r _ -> retrySTM subscriptionStream = _catchupStream unsubscribe s = subUnsubscribe (_catchupExec s) s -------------------------------------------------------------------------------- streamName :: CatchupState -> StreamName streamName (RegularCatchup stream _) = StreamName stream streamName _ = "$all" -------------------------------------------------------------------------------- streamText :: StreamName -> Text streamText (StreamName s) = s streamText _ = "" -------------------------------------------------------------------------------- newCatchupSubscription :: Exec -> Bool -> Maybe Int32 -> Maybe Credentials -> CatchupState -> IO CatchupSubscription newCatchupSubscription exec tos batch cred state = do phaseVar <- newTVarIO CatchingUp queue <- newTQueueIO track <- newTVarIO $ catchupTrack state let stream = streamName state sub = CatchupSubscription exec stream phaseVar track $ do p <- readTVar phaseVar isEmpty <- isEmptyTQueue queue if isEmpty then case p of Closed r -> throwClosed r _ -> return Nothing else Just <$> readTQueue queue callback cb (Left e) = case fromException e of Just opE -> case opE of StreamNotFound{} -> do let op = volatile (streamText stream) tos cred publishWith exec (SubmitOperation cb op) _ -> atomically $ writeTVar phaseVar (Closed $ Left e) _ -> atomically $ writeTVar phaseVar (Closed $ Left e) callback _ (Right action) = case action of Confirmed details -> atomically $ writeTVar phaseVar (Running details) Dropped r -> atomically $ writeTVar phaseVar (Closed $ Right r) Submit e -> atomically $ do tracker <- readTVar track unless (receivedAlready tracker e) $ do writeTVar track (updateTrack e tracker) writeTQueue queue e ConnectionReset -> do tpe <- readTVarIO track let newState = case tpe of CatchupRegular old -> case old of Just n -> RegularCatchup (streamText stream) n _ -> state CatchupAll old -> case old of Just p -> AllCatchup p _ -> state newOp = catchup (execSettings exec) newState tos batch cred newCb <- mfix $ \self -> newCallback (callback self) publishWith exec (SubmitOperation newCb newOp) cb <- mfix $ \self -> newCallback (callback self) let op = catchup (execSettings exec) state tos batch cred publishWith exec (SubmitOperation cb op) return sub -------------------------------------------------------------------------------- throwClosed :: Either SomeException SubDropReason -> STM a throwClosed (Left e) = throwSTM e throwClosed (Right r) = throwSTM (SubscriptionClosed $ Just r) -------------------------------------------------------------------------------- -- | Non blocking version of `waitTillCatchup`. hasCaughtUp :: CatchupSubscription -> IO Bool hasCaughtUp sub = atomically $ hasCaughtUpSTM sub -------------------------------------------------------------------------------- -- | Waits until 'CatchupSubscription' subscription catch-up its stream. waitTillCatchup :: CatchupSubscription -> IO () waitTillCatchup sub = atomically $ unlessM (hasCaughtUpSTM sub) retrySTM -------------------------------------------------------------------------------- -- | Like 'hasCaughtUp' but lives in 'STM' monad. hasCaughtUpSTM :: CatchupSubscription -> STM Bool hasCaughtUpSTM CatchupSubscription{..} = do p <- readTVar _catchupPhase case p of CatchingUp -> return False Running{} -> return True Closed tpe -> case tpe of Left e -> throwSTM e Right r -> throwSTM (SubscriptionClosed $ Just r)