-------------------------------------------------------------------------------- -- | -- Module : Database.EventStore.Internal.Operation.Persist -- Copyright : (C) 2017 Yorick Laupa -- License : (see the file LICENSE) -- -- Maintainer : Yorick Laupa -- Stability : provisional -- Portability : non-portable -- -------------------------------------------------------------------------------- module Database.EventStore.Internal.Operation.Persist (persist) where -------------------------------------------------------------------------------- import Data.ProtocolBuffers import Data.UUID -------------------------------------------------------------------------------- import Database.EventStore.Internal.Command import Database.EventStore.Internal.Communication (Transmit(..)) import Database.EventStore.Internal.Control (publishWith) import Database.EventStore.Internal.Exec (Exec) import Database.EventStore.Internal.Operation import Database.EventStore.Internal.Prelude import Database.EventStore.Internal.Settings import Database.EventStore.Internal.Subscription.Message import Database.EventStore.Internal.Subscription.Types import Database.EventStore.Internal.Types -------------------------------------------------------------------------------- persist :: Exec -> Text -> Text -> Int32 -> Maybe Credentials -> IO (UUID, TVar (Maybe Text), Chan SubAction) persist exec grp stream bufSize cred = do m <- mailboxNew subM <- newChan var <- newTVarIO Nothing let req = _connectToPersistentSubscription grp stream bufSize pkg <- createPkg connectToPersistentSubscriptionCmd cred req let theSubId = packageCorrelation pkg publishWith exec (Transmit m (KeepAlive subscriptionDroppedCmd) pkg) _ <- async $ keepLooping $ do outcome <- mailboxRead m case outcome of Left _ -> Break () <$ writeChan subM (Dropped SubAborted) Right respPkg | packageCmd respPkg == subscriptionDroppedCmd -> let Right resp = decodePkg respPkg reason = fromMaybe D_Unsubscribed (getField $ dropReason resp) subReason = toSubDropReason reason in Break () <$ writeChan subM (Dropped subReason) | packageCmd respPkg == persistentSubscriptionConfirmationCmd -> do let Right resp = decodePkg respPkg lcp = getField $ pscLastCommitPos resp subSubId = getField $ pscId resp len = getField $ pscLastEvtNumber resp details = SubDetails { subId = theSubId , subCommitPos = lcp , subLastEventNum = len , subSubId = Just subSubId } atomically $ writeTVar var (Just subSubId) Loop <$ writeChan subM (Confirmed details) | packageCmd respPkg == persistentSubscriptionStreamEventAppearedCmd -> let Right resp = decodePkg respPkg evt = newResolvedEvent $ getField $ psseaEvt resp in Loop <$ writeChan subM (Submit evt) | otherwise -> pure Loop pure (theSubId, var, subM)