{-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE RecordWildCards #-} -------------------------------------------------------------------------------- -- | -- Module : Database.EventStore.Internal.Subscription.Persistent -- Copyright : (C) 2017 Yorick Laupa -- License : (see the file LICENSE) -- -- Maintainer : Yorick Laupa -- Stability : provisional -- Portability : non-portable -- -------------------------------------------------------------------------------- module Database.EventStore.Internal.Subscription.Persistent where -------------------------------------------------------------------------------- import Data.UUID -------------------------------------------------------------------------------- import Database.EventStore.Internal.Callback import Database.EventStore.Internal.Communication import Database.EventStore.Internal.Control import Database.EventStore.Internal.Exec import Database.EventStore.Internal.Operation.Persist import Database.EventStore.Internal.Prelude import Database.EventStore.Internal.Stream import Database.EventStore.Internal.Subscription.Api import Database.EventStore.Internal.Subscription.Message import Database.EventStore.Internal.Subscription.Packages import Database.EventStore.Internal.Subscription.Types import Database.EventStore.Internal.Types -------------------------------------------------------------------------------- data Phase = Pending | Running SubDetails | Closed (Either SomeException SubDropReason) -------------------------------------------------------------------------------- -- | The server remembers the state of the subscription. This allows for many -- different modes of operations compared to a regular or catchup subscription -- where the client holds the subscription state. -- (Need EventStore >= v3.1.0). data PersistentSubscription = PersistentSubscription { _perExec :: Exec , _perStream :: StreamName , _perCred :: Maybe Credentials , _perPhase :: TVar Phase , _perNext :: STM (Maybe ResolvedEvent) } -------------------------------------------------------------------------------- instance Subscription PersistentSubscription where nextEventMaybeSTM = _perNext getSubscriptionDetailsSTM s = do p <- readTVar (_perPhase s) case p of Pending -> retrySTM Running details -> return details Closed outcome -> case outcome of Right r -> throwSTM (SubscriptionClosed $ Just r) Left e -> throwSTM e unsubscribe s = subUnsubscribe (_perExec s) s -------------------------------------------------------------------------------- instance SubscriptionStream PersistentSubscription EventNumber where subscriptionStream = _perStream -------------------------------------------------------------------------------- newPersistentSubscription :: Exec -> Text -> StreamName -> Int32 -> Maybe Credentials -> IO PersistentSubscription newPersistentSubscription exec grp stream bufSize cred = do phaseVar <- newTVarIO Pending queue <- newTQueueIO let name = streamIdRaw stream sub = PersistentSubscription exec stream cred phaseVar $ do p <- readTVar phaseVar isEmpty <- isEmptyTQueue queue if isEmpty then case p of Closed outcome -> case outcome of Right r -> throwSTM (SubscriptionClosed $ Just r) Left e -> throwSTM e _ -> return Nothing else Just <$> readTQueue queue callback (Left e) = atomically $ writeTVar phaseVar (Closed $ Left e) callback (Right action) = case action of Confirmed details -> atomically $ writeTVar phaseVar (Running details) Dropped r -> atomically $ writeTVar phaseVar (Closed $ Right r) Submit e -> atomically $ do readTVar phaseVar >>= \case Running{} -> writeTQueue queue e _ -> return () ConnectionReset -> atomically $ writeTVar phaseVar (Closed $ Right SubAborted) cb <- newCallback callback publishWith exec (SubmitOperation cb (persist grp name bufSize cred)) return sub -------------------------------------------------------------------------------- -- | Acknowledges those event ids have been successfully processed. notifyEventsProcessed :: PersistentSubscription -> [UUID] -> IO () notifyEventsProcessed PersistentSubscription{..} evts = do details <- atomically $ do p <- readTVar _perPhase case p of Closed outcome -> case outcome of Right r -> throwSTM (SubscriptionClosed $ Just r) Left e -> throwSTM e Pending -> retrySTM Running d -> return d let uuid = subId details Just sid = subSubId details pkg = createAckPackage _perCred uuid sid evts publishWith _perExec (SendPackage pkg) -------------------------------------------------------------------------------- -- | Acknowledges that 'ResolvedEvent' has been successfully processed. acknowledge :: PersistentSubscription -> ResolvedEvent -> IO () acknowledge sub e = notifyEventsProcessed sub [resolvedEventOriginalId e] -------------------------------------------------------------------------------- -- | Acknowledges those 'ResolvedEvent's have been successfully processed. acknowledgeEvents :: PersistentSubscription -> [ResolvedEvent] -> IO () acknowledgeEvents sub = notifyEventsProcessed sub . fmap resolvedEventOriginalId -------------------------------------------------------------------------------- -- | Mark a message that has failed processing. The server will take action -- based upon the action parameter. failed :: PersistentSubscription -> ResolvedEvent -> NakAction -> Maybe Text -> IO () failed sub e a r = notifyEventsFailed sub a r [resolvedEventOriginalId e] -------------------------------------------------------------------------------- -- | Mark messages that have failed processing. The server will take action -- based upon the action parameter. eventsFailed :: PersistentSubscription -> [ResolvedEvent] -> NakAction -> Maybe Text -> IO () eventsFailed sub evts a r = notifyEventsFailed sub a r $ fmap resolvedEventOriginalId evts -------------------------------------------------------------------------------- -- | Acknowledges those event ids have failed to be processed successfully. notifyEventsFailed :: PersistentSubscription -> NakAction -> Maybe Text -> [UUID] -> IO () notifyEventsFailed PersistentSubscription{..} act res evts = do details <- atomically $ do p <- readTVar _perPhase case p of Closed outcome -> case outcome of Right r -> throwSTM (SubscriptionClosed $ Just r) Left e -> throwSTM e Pending -> retrySTM Running d -> return d let uuid = subId details Just sid = subSubId details pkg = createNakPackage _perCred uuid sid act res evts publishWith _perExec (SendPackage pkg)