--------------------------------------------------------------------------------
-- |
-- Module : Database.EventStore.Internal.Operation.Volatile
-- Copyright : (C) 2017 Yorick Laupa
-- License : (see the file LICENSE)
--
-- Maintainer : Yorick Laupa <yo.eight@gmail.com>
-- Stability : provisional
-- Portability : non-portable
--
--------------------------------------------------------------------------------
module Database.EventStore.Internal.Operation.Volatile (volatile) where

--------------------------------------------------------------------------------
import Data.ProtocolBuffers
import Data.UUID

--------------------------------------------------------------------------------
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Communication (Transmit(..))
import Database.EventStore.Internal.Control (publishWith)
import Database.EventStore.Internal.Exec (Exec)
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Settings
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Message
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types

--------------------------------------------------------------------------------
volatile
  :: Exec
  -> StreamId t
  -> Bool
  -> Maybe Credentials
  -> IO (UUID, Chan SubAction)
volatile :: forall t.
Exec
-> StreamId t
-> Bool
-> Maybe Credentials
-> IO (UUID, Chan SubAction)
volatile Exec
exec StreamId t
streamId Bool
tos Maybe Credentials
cred
  = do Mailbox
m <- forall (m :: * -> *). MonadBase IO m => m Mailbox
mailboxNew
       Chan SubAction
subM <- forall (m :: * -> *) a. MonadBase IO m => m (Chan a)
newChan
       let req :: SubscribeToStream
req = Text -> Bool -> SubscribeToStream
subscribeToStream Text
stream Bool
tos
       Package
pkg <- forall msg (m :: * -> *).
(Encode msg, MonadIO m) =>
Command -> Maybe Credentials -> msg -> m Package
createPkg Command
subscribeToStreamCmd Maybe Credentials
cred SubscribeToStream
req
       let theSubId :: UUID
theSubId = Package -> UUID
packageCorrelation Package
pkg
       forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m (Command -> Lifetime
KeepAlive Command
subscriptionDroppedCmd) Package
pkg)
       Async ()
_ <- forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => m (Loop a) -> m a
keepLooping forall a b. (a -> b) -> a -> b
$
         do Either OperationError Package
outcome <- forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> m (Either OperationError Package)
mailboxRead Mailbox
m
            case Either OperationError Package
outcome of
              Left OperationError
_
                -> forall a. a -> Loop a
Break () forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
SubAborted)
              Right Package
respPkg
                | Package -> Command
packageCmd Package
respPkg forall a. Eq a => a -> a -> Bool
== Command
subscriptionDroppedCmd
                -> let Right SubscriptionDropped
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
                       reason :: DropReason
reason = forall a. a -> Maybe a -> a
fromMaybe DropReason
D_Unsubscribed (forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ SubscriptionDropped -> Optional 1 (Enumeration DropReason)
dropReason SubscriptionDropped
resp)
                       subReason :: SubDropReason
subReason = DropReason -> SubDropReason
toSubDropReason DropReason
reason in
                   forall a. a -> Loop a
Break () forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
subReason)
                | Package -> Command
packageCmd Package
respPkg forall a. Eq a => a -> a -> Bool
== Command
subscriptionConfirmationCmd
                -> let Right SubscriptionConfirmation
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
                       lcp :: FieldType (Required 1 (Value Int64))
lcp = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ SubscriptionConfirmation -> Required 1 (Value Int64)
subscribeLastCommitPos SubscriptionConfirmation
resp
                       len :: FieldType (Optional 2 (Value Int64))
len = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ SubscriptionConfirmation -> Optional 2 (Value Int64)
subscribeLastEventNumber SubscriptionConfirmation
resp
                       details :: SubDetails
details =
                         SubDetails
                         { subId :: UUID
subId = UUID
theSubId
                         , subCommitPos :: Int64
subCommitPos = FieldType (Required 1 (Value Int64))
lcp
                         , subLastEventNum :: Maybe Int64
subLastEventNum = FieldType (Optional 2 (Value Int64))
len
                         , subSubId :: Maybe Text
subSubId = forall a. Maybe a
Nothing
                         } in
                   forall a. Loop a
Loop forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDetails -> SubAction
Confirmed SubDetails
details)
                | Package -> Command
packageCmd Package
respPkg forall a. Eq a => a -> a -> Bool
== Command
streamEventAppearedCmd
                -> let Right StreamEventAppeared
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
                       evt :: ResolvedEvent
evt = ResolvedEventBuf -> ResolvedEvent
newResolvedEventFromBuf forall a b. (a -> b) -> a -> b
$ forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ StreamEventAppeared -> Required 1 (Message ResolvedEventBuf)
streamResolvedEvent StreamEventAppeared
resp in
                   forall a. Loop a
Loop forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (ResolvedEvent -> SubAction
Submit ResolvedEvent
evt)
                | Bool
otherwise
                -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Loop a
Loop

       forall (f :: * -> *) a. Applicative f => a -> f a
pure (UUID
theSubId, Chan SubAction
subM)
  where
    stream :: Text
stream = forall t. StreamId t -> Text
streamIdRaw StreamId t
streamId