{-# LANGUAGE GADTs #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# OPTIONS -Wno-orphans #-} -------------------------------------------------------------------------------- -- | -- Module : Database.EventStore.Internal.Subscription.Catchup -- Copyright : (C) 2017 Yorick Laupa -- License : (see the file LICENSE) -- -- Maintainer : Yorick Laupa -- Stability : provisional -- Portability : non-portable -- -------------------------------------------------------------------------------- module Database.EventStore.Internal.Subscription.Catchup where -------------------------------------------------------------------------------- import Safe (fromJustNote) -------------------------------------------------------------------------------- import Database.EventStore.Internal.Communication import Database.EventStore.Internal.Control import Database.EventStore.Internal.Exec import Database.EventStore.Internal.Operation.Catchup import Database.EventStore.Internal.Prelude import Database.EventStore.Internal.Stream import Database.EventStore.Internal.Subscription.Api import Database.EventStore.Internal.Subscription.Types import Database.EventStore.Internal.Subscription.Packages import Database.EventStore.Internal.Types -------------------------------------------------------------------------------- receivedAlready :: StreamId t -> t -> ResolvedEvent -> Bool receivedAlready StreamName{} old e = EventNumber (resolvedEventOriginalEventNumber e) < old receivedAlready All old e = let pos = fromJustNote "Position is always defined when reading events from $all stream" $ resolvedEventPosition e in pos < old -------------------------------------------------------------------------------- nextTarget :: StreamId t -> ResolvedEvent -> t nextTarget StreamName{} e = EventNumber (resolvedEventOriginalEventNumber e) nextTarget All e = fromJustNote "Position is always defined when reading events from $all stream" $ resolvedEventPosition e -------------------------------------------------------------------------------- -- | This kind of subscription specifies a starting point, in the form of an -- event number or transaction file position. The given function will be -- called for events from the starting point until the end of the stream, and -- then for subsequently written events. -- -- For example, if a starting point of 50 is specified when a stream has 100 -- events in it, the subscriber can expect to see events 51 through 100, and -- then any events subsequently written until such time as the subscription is -- dropped or closed. data CatchupSubscription t = CatchupSubscription { _catchupExec :: Exec , _catchupStream :: StreamId t , _catchupSub :: TVar (Maybe UUID) , _catchupChan :: Chan SubAction } -------------------------------------------------------------------------------- instance Subscription (CatchupSubscription s) where nextSubEvent s = readChan (_catchupChan s) unsubscribe s = do subId <- atomically $ do idMay <- readTVar (_catchupSub s) case idMay of Nothing -> retrySTM Just sid -> pure sid let pkg = createUnsubscribePackage subId publishWith (_catchupExec s) (SendPackage pkg) -------------------------------------------------------------------------------- instance SubscriptionStream (CatchupSubscription t) t where subscriptionStream = _catchupStream -------------------------------------------------------------------------------- newCatchupSubscription :: Exec -> Bool -> Maybe Int32 -> Maybe Credentials -> StreamId t -> t -> IO (CatchupSubscription t) newCatchupSubscription exec tos batch cred streamId seed = do (var, chan) <- catchup (execSettings exec) exec streamId seed tos batch cred let sub = CatchupSubscription { _catchupExec = exec , _catchupStream = streamId , _catchupSub = var , _catchupChan = chan } pure sub