{-# LANGUAGE DataKinds           #-}
{-# LANGUAGE GADTs               #-}
{-# LANGUAGE OverloadedStrings   #-}
{-# LANGUAGE ScopedTypeVariables #-}
--------------------------------------------------------------------------------
-- |
-- Module : Database.EventStore.Internal.Operation.Catchup
-- Copyright : (C) 2015 Yorick Laupa
-- License : (see the file LICENSE)
--
-- Maintainer : Yorick Laupa <yo.eight@gmail.com>
-- Stability : provisional
-- Portability : non-portable
--
--------------------------------------------------------------------------------
module Database.EventStore.Internal.Operation.Catchup
    ( catchup ) where

--------------------------------------------------------------------------------
import Data.Int
import Data.Maybe
import Data.ProtocolBuffers

--------------------------------------------------------------------------------
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Communication (Transmit(..))
import Database.EventStore.Internal.Control (publishWith)
import Database.EventStore.Internal.Exec (Exec)
import Database.EventStore.Internal.Operation
import qualified Database.EventStore.Internal.Operation.ReadAllEvents.Message as ReadAll
import qualified Database.EventStore.Internal.Operation.ReadStreamEvents.Message as ReadStream
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Settings
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Message
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types

--------------------------------------------------------------------------------
defaultBatchSize :: Int32
defaultBatchSize :: Int32
defaultBatchSize = Int32
500

--------------------------------------------------------------------------------
data State s
  = Init s
  | Catchup UUID UUID s
  | Live UUID s

--------------------------------------------------------------------------------
createReadPkg
  :: Settings
  -> StreamId t
  -> t
  -> Int32 -- Batch size
  -> Bool -- Resolve links
  -> Maybe Credentials
  -> IO Package
createReadPkg :: forall t.
Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
createReadPkg Settings
setts (StreamName Text
stream) t
evtNum Int32
batch Bool
tos Maybe Credentials
cred
  = let
      req :: Request
req =
        Text -> Int64 -> Int32 -> Bool -> Bool -> Request
ReadStream.newRequest
          Text
stream
          (EventNumber -> Int64
eventNumberToInt64 t
evtNum)
          Int32
batch
          Bool
tos
          (Settings -> Bool
s_requireMaster Settings
setts) in
    forall msg (m :: * -> *).
(Encode msg, MonadIO m) =>
Command -> Maybe Credentials -> msg -> m Package
createPkg Command
readStreamEventsForwardCmd Maybe Credentials
cred Request
req
createReadPkg Settings
setts StreamId t
All t
pos Int32
batch Bool
tos Maybe Credentials
cred
  = let
      req :: Request
req =
        Int64 -> Int64 -> Int32 -> Bool -> Bool -> Request
ReadAll.newRequest
          (Position -> Int64
positionCommit t
pos)
          (Position -> Int64
positionPrepare t
pos)
          Int32
batch
          Bool
tos
          (Settings -> Bool
s_requireMaster Settings
setts) in
    forall msg (m :: * -> *).
(Encode msg, MonadIO m) =>
Command -> Maybe Credentials -> msg -> m Package
createPkg Command
readAllEventsForwardCmd Maybe Credentials
cred Request
req

--------------------------------------------------------------------------------
catchup
  :: Settings
  -> Exec
  -> StreamId t
  -> t
  -> Bool        -- Resolve link tos.
  -> Maybe Int32 -- Batch size.
  -> Maybe Credentials
  -> IO (TVar (Maybe UUID), Chan SubAction)
catchup :: forall t.
Settings
-> Exec
-> StreamId t
-> t
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> IO (TVar (Maybe UUID), Chan SubAction)
catchup Settings
setts Exec
exec StreamId t
streamId t
from Bool
tos Maybe Int32
batchSiz Maybe Credentials
cred
  = do Mailbox
m <- forall (m :: * -> *). MonadBase IO m => m Mailbox
mailboxNew
       Chan SubAction
subM <- forall (m :: * -> *) a. MonadBase IO m => m (Chan a)
newChan
       TVar (Maybe UUID)
var <- forall a. a -> IO (TVar a)
newTVarIO forall a. Maybe a
Nothing
       Async ()
_ <- forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s a.
Monad m =>
s -> (s -> m (LoopS s a)) -> m a
keepLoopingS (forall s. s -> State s
Init t
from) forall a b. (a -> b) -> a -> b
$ \case
         Init t
pos
           -> do let subReq :: SubscribeToStream
subReq = Text -> Bool -> SubscribeToStream
subscribeToStream Text
stream Bool
tos
                 Package
subPkg <- forall msg (m :: * -> *).
(Encode msg, MonadIO m) =>
Command -> Maybe Credentials -> msg -> m Package
createPkg Command
subscribeToStreamCmd Maybe Credentials
cred SubscribeToStream
subReq
                 Package
readPkg <- forall t.
Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
createReadPkg Settings
setts StreamId t
streamId t
pos Int32
batch Bool
tos Maybe Credentials
cred

                 forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m (Command -> Lifetime
KeepAlive Command
subscriptionDroppedCmd) Package
subPkg)
                 forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m Lifetime
OneTime Package
readPkg)
                 let theSubId :: UUID
theSubId = Package -> UUID
packageCorrelation Package
subPkg

                 forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe UUID)
var (forall a. a -> Maybe a
Just UUID
theSubId)

                 forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. s -> LoopS s a
LoopS (forall s. UUID -> UUID -> s -> State s
Catchup UUID
theSubId (Package -> UUID
packageCorrelation Package
readPkg) t
pos)

         unchanged :: State t
unchanged@(Catchup UUID
theSubId UUID
readId t
pos)
           -> do Either OperationError Package
outcome <- forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> m (Either OperationError Package)
mailboxRead Mailbox
m
                 case Either OperationError Package
outcome of
                   Left OperationError
e
                     -> case OperationError
e of
                          OperationError
ConnectionHasDropped
                            -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. s -> LoopS s a
LoopS (forall s. s -> State s
Init t
pos)
                          OperationError
_ -> forall s a. a -> LoopS s a
BreakS () forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
SubAborted)

                   Right Package
respPkg
                     | UUID
theSubId forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
                     Bool -> Bool -> Bool
&& Package -> Command
packageCmd Package
respPkg forall a. Eq a => a -> a -> Bool
== Command
subscriptionDroppedCmd
                     -> let Right SubscriptionDropped
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
                            reason :: DropReason
reason = forall a. a -> Maybe a -> a
fromMaybe DropReason
D_Unsubscribed (forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ SubscriptionDropped -> Optional 1 (Enumeration DropReason)
dropReason SubscriptionDropped
resp)
                            subReason :: SubDropReason
subReason = DropReason -> SubDropReason
toSubDropReason DropReason
reason in
                        forall s a. a -> LoopS s a
BreakS () forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
subReason)

                     | UUID
theSubId forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
                     Bool -> Bool -> Bool
&& Package -> Command
packageCmd Package
respPkg forall a. Eq a => a -> a -> Bool
== Command
subscriptionConfirmationCmd
                     -> let Right SubscriptionConfirmation
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
                            lcp :: FieldType (Required 1 (Value Int64))
lcp = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ SubscriptionConfirmation -> Required 1 (Value Int64)
subscribeLastCommitPos SubscriptionConfirmation
resp
                            len :: FieldType (Optional 2 (Value Int64))
len = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ SubscriptionConfirmation -> Optional 2 (Value Int64)
subscribeLastEventNumber SubscriptionConfirmation
resp
                            details :: SubDetails
details =
                              SubDetails
                              { subId :: UUID
subId = UUID
theSubId
                              , subCommitPos :: Int64
subCommitPos = FieldType (Required 1 (Value Int64))
lcp
                              , subLastEventNum :: Maybe Int64
subLastEventNum = FieldType (Optional 2 (Value Int64))
len
                              , subSubId :: Maybe Text
subSubId = forall a. Maybe a
Nothing
                              } in
                        forall s a. s -> LoopS s a
LoopS State t
unchanged forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDetails -> SubAction
Confirmed SubDetails
details)

                     | UUID
theSubId forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
                     -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. s -> LoopS s a
LoopS State t
unchanged

                     | UUID
readId forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
                     -> case StreamId t
streamId of
                          StreamName Text
_
                            -> do let
                                    Right Response
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
                                    r :: FieldType (Required 2 (Enumeration Result))
r = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ Response -> Required 2 (Enumeration Result)
ReadStream._result Response
resp
                                    es :: FieldType (Repeated 1 (Message ResolvedIndexedEvent))
es = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ Response -> Repeated 1 (Message ResolvedIndexedEvent)
ReadStream._events Response
resp
                                    evts :: [ResolvedEvent]
evts = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ResolvedIndexedEvent -> ResolvedEvent
newResolvedEvent FieldType (Repeated 1 (Message ResolvedIndexedEvent))
es
                                    eos :: FieldType (Required 5 (Value Bool))
eos = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ Response -> Required 5 (Value Bool)
ReadStream._endOfStream Response
resp
                                    nxt :: FieldType (Required 3 (Value Int64))
nxt = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ Response -> Required 3 (Value Int64)
ReadStream._nextNumber Response
resp
                                  case FieldType (Required 2 (Enumeration Result))
r of
                                    FieldType (Required 2 (Enumeration Result))
Result
ReadStream.NO_STREAM
                                      -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. s -> LoopS s a
LoopS (forall s. UUID -> s -> State s
Live UUID
theSubId t
pos)

                                    FieldType (Required 2 (Enumeration Result))
Result
ReadStream.SUCCESS
                                      -> do forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResolvedEvent -> SubAction
Submit) [ResolvedEvent]
evts
                                            if FieldType (Required 5 (Value Bool))
eos
                                              then
                                                forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. s -> LoopS s a
LoopS (forall s. UUID -> s -> State s
Live UUID
theSubId (Int64 -> EventNumber
rawEventNumber FieldType (Required 3 (Value Int64))
nxt))
                                              else
                                                do Package
newReadPkg <- forall t.
Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
createReadPkg Settings
setts StreamId t
streamId (Int64 -> EventNumber
rawEventNumber FieldType (Required 3 (Value Int64))
nxt) Int32
batch Bool
tos Maybe Credentials
cred
                                                   let newReadId :: UUID
newReadId = Package -> UUID
packageCorrelation Package
newReadPkg

                                                   forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m Lifetime
OneTime Package
newReadPkg)
                                                   forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. s -> LoopS s a
LoopS (forall s. UUID -> UUID -> s -> State s
Catchup UUID
theSubId UUID
newReadId (Int64 -> EventNumber
rawEventNumber FieldType (Required 3 (Value Int64))
nxt))

                                         -- TODO - Do we have to close the subscription?
                                         -- Pretty sure the subcription has failed already at
                                         -- this point.
                                    FieldType (Required 2 (Enumeration Result))
_ -> forall s a. a -> LoopS s a
BreakS () forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
SubAborted)

                          StreamId t
All
                            -> do let
                                    Right Response
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
                                    r :: FieldType (Optional 6 (Enumeration Result))
r = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ Response -> Optional 6 (Enumeration Result)
ReadAll._Result Response
resp
                                    nc_pos :: FieldType (Required 4 (Value Int64))
nc_pos = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ Response -> Required 4 (Value Int64)
ReadAll._NextCommitPosition Response
resp
                                    np_pos :: FieldType (Required 5 (Value Int64))
np_pos = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ Response -> Required 5 (Value Int64)
ReadAll._NextPreparePosition Response
resp
                                    es :: FieldType (Repeated 3 (Message ResolvedEventBuf))
es = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ Response -> Repeated 3 (Message ResolvedEventBuf)
ReadAll._Events Response
resp
                                    evts :: [ResolvedEvent]
evts = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ResolvedEventBuf -> ResolvedEvent
newResolvedEventFromBuf FieldType (Repeated 3 (Message ResolvedEventBuf))
es
                                    eos :: Bool
eos = forall mono. MonoFoldable mono => mono -> Bool
null [ResolvedEvent]
evts
                                    n_pos :: Position
n_pos = Int64 -> Int64 -> Position
Position FieldType (Required 4 (Value Int64))
nc_pos FieldType (Required 5 (Value Int64))
np_pos

                                  case forall a. a -> Maybe a -> a
fromMaybe Result
ReadAll.SUCCESS FieldType (Optional 6 (Enumeration Result))
r of
                                    Result
ReadAll.SUCCESS
                                      -> do forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResolvedEvent -> SubAction
Submit) [ResolvedEvent]
evts
                                            if Bool
eos
                                              then
                                                forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. s -> LoopS s a
LoopS (forall s. UUID -> s -> State s
Live UUID
theSubId Position
n_pos)
                                              else
                                                do Package
newReadPkg <- forall t.
Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
createReadPkg Settings
setts StreamId t
streamId Position
n_pos Int32
batch Bool
tos Maybe Credentials
cred
                                                   let newReadId :: UUID
newReadId = Package -> UUID
packageCorrelation Package
newReadPkg

                                                   forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m Lifetime
OneTime Package
newReadPkg)
                                                   forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. s -> LoopS s a
LoopS (forall s. UUID -> UUID -> s -> State s
Catchup UUID
theSubId UUID
newReadId Position
n_pos)

                                         -- TODO - Do we have to close the subscription?
                                         -- Pretty sure the subcription has failed already at
                                         -- this point.
                                    Result
_ -> forall s a. a -> LoopS s a
BreakS () forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
SubAborted)
                     | Bool
otherwise
                     -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. s -> LoopS s a
LoopS State t
unchanged

         unchanged :: State t
unchanged@(Live UUID
theSubId t
pos)
           -> do Either OperationError Package
outcome <- forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> m (Either OperationError Package)
mailboxRead Mailbox
m
                 case Either OperationError Package
outcome of
                   Left OperationError
e
                     -> case OperationError
e of
                          OperationError
ConnectionHasDropped
                            -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. s -> LoopS s a
LoopS (forall s. s -> State s
Init t
pos)
                          OperationError
_ -> forall s a. a -> LoopS s a
BreakS () forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
SubAborted)

                   Right Package
respPkg
                     | UUID
theSubId forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
                     Bool -> Bool -> Bool
&& Package -> Command
packageCmd Package
respPkg forall a. Eq a => a -> a -> Bool
== Command
subscriptionDroppedCmd
                     -> let Right SubscriptionDropped
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
                            reason :: DropReason
reason = forall a. a -> Maybe a -> a
fromMaybe DropReason
D_Unsubscribed (forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ SubscriptionDropped -> Optional 1 (Enumeration DropReason)
dropReason SubscriptionDropped
resp)
                            subReason :: SubDropReason
subReason = DropReason -> SubDropReason
toSubDropReason DropReason
reason in
                        forall s a. a -> LoopS s a
BreakS () forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
subReason)
                     | UUID
theSubId forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
                     Bool -> Bool -> Bool
&& Package -> Command
packageCmd Package
respPkg forall a. Eq a => a -> a -> Bool
== Command
subscriptionConfirmationCmd
                     -> let Right SubscriptionConfirmation
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
                            lcp :: FieldType (Required 1 (Value Int64))
lcp = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ SubscriptionConfirmation -> Required 1 (Value Int64)
subscribeLastCommitPos SubscriptionConfirmation
resp
                            len :: FieldType (Optional 2 (Value Int64))
len = forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ SubscriptionConfirmation -> Optional 2 (Value Int64)
subscribeLastEventNumber SubscriptionConfirmation
resp
                            details :: SubDetails
details =
                              SubDetails
                              { subId :: UUID
subId = UUID
theSubId
                              , subCommitPos :: Int64
subCommitPos = FieldType (Required 1 (Value Int64))
lcp
                              , subLastEventNum :: Maybe Int64
subLastEventNum = FieldType (Optional 2 (Value Int64))
len
                              , subSubId :: Maybe Text
subSubId = forall a. Maybe a
Nothing
                              } in
                        forall s a. s -> LoopS s a
LoopS State t
unchanged forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDetails -> SubAction
Confirmed SubDetails
details)
                     | UUID
theSubId forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
                     Bool -> Bool -> Bool
&& Package -> Command
packageCmd Package
respPkg forall a. Eq a => a -> a -> Bool
== Command
streamEventAppearedCmd
                     -> let
                          Right StreamEventAppeared
resp = forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
                          evt :: ResolvedEvent
evt = ResolvedEventBuf -> ResolvedEvent
newResolvedEventFromBuf forall a b. (a -> b) -> a -> b
$ forall a. HasField a => a -> FieldType a
getField forall a b. (a -> b) -> a -> b
$ StreamEventAppeared -> Required 1 (Message ResolvedEventBuf)
streamResolvedEvent StreamEventAppeared
resp
                          nextState :: State t
nextState =
                            case StreamId t
streamId of
                              StreamName Text
_
                                -> let nxt :: Int64
nxt = ResolvedEvent -> Int64
resolvedEventOriginalEventNumber ResolvedEvent
evt
                                   in forall s. UUID -> s -> State s
Live UUID
theSubId (Int64 -> EventNumber
rawEventNumber Int64
nxt)
                              StreamId t
All
                                -> let Just Position
nxtPos = ResolvedEvent -> Maybe Position
resolvedEventPosition ResolvedEvent
evt
                                   in forall s. UUID -> s -> State s
Live UUID
theSubId Position
nxtPos in
                        forall s a. s -> LoopS s a
LoopS State t
nextState forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (ResolvedEvent -> SubAction
Submit ResolvedEvent
evt)
                     | Bool
otherwise
                     -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall s a. s -> LoopS s a
LoopS State t
unchanged

       forall (f :: * -> *) a. Applicative f => a -> f a
pure (TVar (Maybe UUID)
var, Chan SubAction
subM)
  where
    batch :: Int32
batch = forall a. a -> Maybe a -> a
fromMaybe Int32
defaultBatchSize Maybe Int32
batchSiz
    stream :: Text
stream = forall t. StreamId t -> Text
streamIdRaw StreamId t
streamId