{-# LANGUAGE FlexibleInstances #-} -------------------------------------------------------------------------------- -- | -- Module : Database.EventStore.Internal.Subscription.Regular -- Copyright : (C) 2017 Yorick Laupa -- License : (see the file LICENSE) -- -- Maintainer : Yorick Laupa -- Stability : provisional -- Portability : non-portable -- -------------------------------------------------------------------------------- module Database.EventStore.Internal.Subscription.Regular where -------------------------------------------------------------------------------- import Database.EventStore.Internal.Callback import Database.EventStore.Internal.Communication import Database.EventStore.Internal.Control import Database.EventStore.Internal.Exec import Database.EventStore.Internal.Operation.Volatile import Database.EventStore.Internal.Prelude import Database.EventStore.Internal.Stream import Database.EventStore.Internal.Subscription.Api import Database.EventStore.Internal.Subscription.Types import Database.EventStore.Internal.Types -------------------------------------------------------------------------------- data Phase = Pending | Running SubDetails | Closed (Either SomeException SubDropReason) -------------------------------------------------------------------------------- -- | Also referred as volatile subscription. For example, if a stream has 100 -- events in it when a subscriber connects, the subscriber can expect to see -- event number 101 onwards until the time the subscription is closed or -- dropped. data RegularSubscription t = RegularSubscription { _regExec :: Exec , _regStream :: StreamId t , _regPhase :: TVar Phase , _regNext :: STM (Maybe ResolvedEvent) } -------------------------------------------------------------------------------- instance Subscription (RegularSubscription t) where nextEventMaybeSTM = _regNext getSubscriptionDetailsSTM s = do p <- readTVar (_regPhase s) case p of Pending -> retrySTM Running details -> return details Closed outcome -> case outcome of Right r -> throwSTM (SubscriptionClosed $ Just r) Left e -> throwSTM e unsubscribe s = subUnsubscribe (_regExec s) s -------------------------------------------------------------------------------- instance SubscriptionStream (RegularSubscription t) t where subscriptionStream = _regStream -------------------------------------------------------------------------------- newRegularSubscription :: Exec -> StreamId t -> Bool -> Maybe Credentials -> IO (RegularSubscription t) newRegularSubscription exec streamId tos cred = do phaseVar <- newTVarIO Pending queue <- newTQueueIO let sub = RegularSubscription exec streamId phaseVar $ do p <- readTVar phaseVar isEmpty <- isEmptyTQueue queue if isEmpty then case p of Closed outcome -> case outcome of Right r -> throwSTM (SubscriptionClosed $ Just r) Left e -> throwSTM e _ -> return Nothing else Just <$> readTQueue queue callback (Left e) = atomically $ writeTVar phaseVar (Closed $ Left e) callback (Right action) = case action of Confirmed details -> atomically $ writeTVar phaseVar (Running details) Dropped r -> atomically $ writeTVar phaseVar (Closed $ Right r) Submit e -> atomically $ do readTVar phaseVar >>= \case Running{} -> writeTQueue queue e _ -> return () ConnectionReset -> atomically $ writeTVar phaseVar (Closed $ Right SubAborted) cb <- newCallback callback publishWith exec (SubmitOperation cb (volatile streamId tos cred)) return sub