{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Database.EventStore.Internal.Operation.Catchup
( catchup ) where
import Data.Int
import Data.Maybe
import Data.ProtocolBuffers
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Communication (Transmit(..))
import Database.EventStore.Internal.Control (publishWith)
import Database.EventStore.Internal.Exec (Exec)
import Database.EventStore.Internal.Operation
import qualified Database.EventStore.Internal.Operation.ReadAllEvents.Message as ReadAll
import qualified Database.EventStore.Internal.Operation.ReadStreamEvents.Message as ReadStream
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Settings
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Message
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types
defaultBatchSize :: Int32
defaultBatchSize :: Int32
defaultBatchSize = Int32
500
data State s
= Init s
| Catchup UUID UUID s
| Live UUID s
createReadPkg
:: Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
createReadPkg :: Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
createReadPkg Settings
setts (StreamName Text
stream) t
evtNum Int32
batch Bool
tos Maybe Credentials
cred
= let
req :: Request
req =
Text -> Int64 -> Int32 -> Bool -> Bool -> Request
ReadStream.newRequest
Text
stream
(EventNumber -> Int64
eventNumberToInt64 t
EventNumber
evtNum)
Int32
batch
Bool
tos
(Settings -> Bool
s_requireMaster Settings
setts) in
Command -> Maybe Credentials -> Request -> IO Package
forall msg (m :: * -> *).
(Encode msg, MonadIO m) =>
Command -> Maybe Credentials -> msg -> m Package
createPkg Command
readStreamEventsForwardCmd Maybe Credentials
cred Request
req
createReadPkg Settings
setts StreamId t
All t
pos Int32
batch Bool
tos Maybe Credentials
cred
= let
req :: Request
req =
Int64 -> Int64 -> Int32 -> Bool -> Bool -> Request
ReadAll.newRequest
(Position -> Int64
positionCommit t
Position
pos)
(Position -> Int64
positionPrepare t
Position
pos)
Int32
batch
Bool
tos
(Settings -> Bool
s_requireMaster Settings
setts) in
Command -> Maybe Credentials -> Request -> IO Package
forall msg (m :: * -> *).
(Encode msg, MonadIO m) =>
Command -> Maybe Credentials -> msg -> m Package
createPkg Command
readAllEventsForwardCmd Maybe Credentials
cred Request
req
catchup
:: Settings
-> Exec
-> StreamId t
-> t
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> IO (TVar (Maybe UUID), Chan SubAction)
catchup :: Settings
-> Exec
-> StreamId t
-> t
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> IO (TVar (Maybe UUID), Chan SubAction)
catchup Settings
setts Exec
exec StreamId t
streamId t
from Bool
tos Maybe Int32
batchSiz Maybe Credentials
cred
= do Mailbox
m <- IO Mailbox
forall (m :: * -> *). MonadBase IO m => m Mailbox
mailboxNew
Chan SubAction
subM <- IO (Chan SubAction)
forall (m :: * -> *) a. MonadBase IO m => m (Chan a)
newChan
TVar (Maybe UUID)
var <- Maybe UUID -> IO (TVar (Maybe UUID))
forall a. a -> IO (TVar a)
newTVarIO Maybe UUID
forall a. Maybe a
Nothing
Async ()
_ <- IO () -> IO (Async (StM IO ()))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async (IO () -> IO (Async (StM IO ())))
-> IO () -> IO (Async (StM IO ()))
forall a b. (a -> b) -> a -> b
$ State t -> (State t -> IO (LoopS (State t) ())) -> IO ()
forall (m :: * -> *) s a.
Monad m =>
s -> (s -> m (LoopS s a)) -> m a
keepLoopingS (t -> State t
forall s. s -> State s
Init t
from) ((State t -> IO (LoopS (State t) ())) -> IO ())
-> (State t -> IO (LoopS (State t) ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \case
Init t
pos
-> do let subReq :: SubscribeToStream
subReq = Text -> Bool -> SubscribeToStream
subscribeToStream Text
stream Bool
tos
Package
subPkg <- Command -> Maybe Credentials -> SubscribeToStream -> IO Package
forall msg (m :: * -> *).
(Encode msg, MonadIO m) =>
Command -> Maybe Credentials -> msg -> m Package
createPkg Command
subscribeToStreamCmd Maybe Credentials
cred SubscribeToStream
subReq
Package
readPkg <- Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
forall t.
Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
createReadPkg Settings
setts StreamId t
streamId t
pos Int32
batch Bool
tos Maybe Credentials
cred
Exec -> Transmit -> IO ()
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m (Command -> Lifetime
KeepAlive Command
subscriptionDroppedCmd) Package
subPkg)
Exec -> Transmit -> IO ()
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m Lifetime
OneTime Package
readPkg)
let theSubId :: UUID
theSubId = Package -> UUID
packageCorrelation Package
subPkg
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Maybe UUID) -> Maybe UUID -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe UUID)
var (UUID -> Maybe UUID
forall a. a -> Maybe a
Just UUID
theSubId)
LoopS (State t) () -> IO (LoopS (State t) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoopS (State t) () -> IO (LoopS (State t) ()))
-> LoopS (State t) () -> IO (LoopS (State t) ())
forall a b. (a -> b) -> a -> b
$ State t -> LoopS (State t) ()
forall s a. s -> LoopS s a
LoopS (UUID -> UUID -> t -> State t
forall s. UUID -> UUID -> s -> State s
Catchup UUID
theSubId (Package -> UUID
packageCorrelation Package
readPkg) t
pos)
unchanged :: State t
unchanged@(Catchup UUID
theSubId UUID
readId t
pos)
-> do Either OperationError Package
outcome <- Mailbox -> IO (Either OperationError Package)
forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> m (Either OperationError Package)
mailboxRead Mailbox
m
case Either OperationError Package
outcome of
Left OperationError
e
-> case OperationError
e of
OperationError
ConnectionHasDropped
-> LoopS (State t) () -> IO (LoopS (State t) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoopS (State t) () -> IO (LoopS (State t) ()))
-> LoopS (State t) () -> IO (LoopS (State t) ())
forall a b. (a -> b) -> a -> b
$ State t -> LoopS (State t) ()
forall s a. s -> LoopS s a
LoopS (t -> State t
forall s. s -> State s
Init t
pos)
OperationError
_ -> () -> LoopS (State t) ()
forall s a. a -> LoopS s a
BreakS () LoopS (State t) () -> IO () -> IO (LoopS (State t) ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
SubAborted)
Right Package
respPkg
| UUID
theSubId UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
Bool -> Bool -> Bool
&& Package -> Command
packageCmd Package
respPkg Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
subscriptionDroppedCmd
-> let Right SubscriptionDropped
resp = Package -> Either OperationError SubscriptionDropped
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
reason :: DropReason
reason = DropReason -> Maybe DropReason -> DropReason
forall a. a -> Maybe a -> a
fromMaybe DropReason
D_Unsubscribed (Field 1 (OptionalField (Last (Enumeration DropReason)))
-> FieldType
(Field 1 (OptionalField (Last (Enumeration DropReason))))
forall a. HasField a => a -> FieldType a
getField (Field 1 (OptionalField (Last (Enumeration DropReason)))
-> FieldType
(Field 1 (OptionalField (Last (Enumeration DropReason)))))
-> Field 1 (OptionalField (Last (Enumeration DropReason)))
-> FieldType
(Field 1 (OptionalField (Last (Enumeration DropReason))))
forall a b. (a -> b) -> a -> b
$ SubscriptionDropped -> Optional 1 (Enumeration DropReason)
dropReason SubscriptionDropped
resp)
subReason :: SubDropReason
subReason = DropReason -> SubDropReason
toSubDropReason DropReason
reason in
() -> LoopS (State t) ()
forall s a. a -> LoopS s a
BreakS () LoopS (State t) () -> IO () -> IO (LoopS (State t) ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
subReason)
| UUID
theSubId UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
Bool -> Bool -> Bool
&& Package -> Command
packageCmd Package
respPkg Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
subscriptionConfirmationCmd
-> let Right SubscriptionConfirmation
resp = Package -> Either OperationError SubscriptionConfirmation
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
lcp :: FieldType (Field 1 (RequiredField (Always (Value Int64))))
lcp = Field 1 (RequiredField (Always (Value Int64)))
-> FieldType (Field 1 (RequiredField (Always (Value Int64))))
forall a. HasField a => a -> FieldType a
getField (Field 1 (RequiredField (Always (Value Int64)))
-> FieldType (Field 1 (RequiredField (Always (Value Int64)))))
-> Field 1 (RequiredField (Always (Value Int64)))
-> FieldType (Field 1 (RequiredField (Always (Value Int64))))
forall a b. (a -> b) -> a -> b
$ SubscriptionConfirmation -> Required 1 (Value Int64)
subscribeLastCommitPos SubscriptionConfirmation
resp
len :: FieldType (Field 2 (OptionalField (Last (Value Int64))))
len = Field 2 (OptionalField (Last (Value Int64)))
-> FieldType (Field 2 (OptionalField (Last (Value Int64))))
forall a. HasField a => a -> FieldType a
getField (Field 2 (OptionalField (Last (Value Int64)))
-> FieldType (Field 2 (OptionalField (Last (Value Int64)))))
-> Field 2 (OptionalField (Last (Value Int64)))
-> FieldType (Field 2 (OptionalField (Last (Value Int64))))
forall a b. (a -> b) -> a -> b
$ SubscriptionConfirmation -> Optional 2 (Value Int64)
subscribeLastEventNumber SubscriptionConfirmation
resp
details :: SubDetails
details =
SubDetails :: UUID -> Int64 -> Maybe Int64 -> Maybe Text -> SubDetails
SubDetails
{ subId :: UUID
subId = UUID
theSubId
, subCommitPos :: Int64
subCommitPos = Int64
FieldType (Field 1 (RequiredField (Always (Value Int64))))
lcp
, subLastEventNum :: Maybe Int64
subLastEventNum = Maybe Int64
FieldType (Field 2 (OptionalField (Last (Value Int64))))
len
, subSubId :: Maybe Text
subSubId = Maybe Text
forall a. Maybe a
Nothing
} in
State t -> LoopS (State t) ()
forall s a. s -> LoopS s a
LoopS State t
unchanged LoopS (State t) () -> IO () -> IO (LoopS (State t) ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDetails -> SubAction
Confirmed SubDetails
details)
| UUID
theSubId UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
-> LoopS (State t) () -> IO (LoopS (State t) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoopS (State t) () -> IO (LoopS (State t) ()))
-> LoopS (State t) () -> IO (LoopS (State t) ())
forall a b. (a -> b) -> a -> b
$ State t -> LoopS (State t) ()
forall s a. s -> LoopS s a
LoopS State t
unchanged
| UUID
readId UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
-> case StreamId t
streamId of
StreamName Text
_
-> do let
Right Response
resp = Package -> Either OperationError Response
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
r :: FieldType (Field 2 (RequiredField (Always (Enumeration Result))))
r = Field 2 (RequiredField (Always (Enumeration Result)))
-> FieldType
(Field 2 (RequiredField (Always (Enumeration Result))))
forall a. HasField a => a -> FieldType a
getField (Field 2 (RequiredField (Always (Enumeration Result)))
-> FieldType
(Field 2 (RequiredField (Always (Enumeration Result)))))
-> Field 2 (RequiredField (Always (Enumeration Result)))
-> FieldType
(Field 2 (RequiredField (Always (Enumeration Result))))
forall a b. (a -> b) -> a -> b
$ Response -> Required 2 (Enumeration Result)
ReadStream._result Response
resp
es :: FieldType (Repeated 1 (Message ResolvedIndexedEvent))
es = Repeated 1 (Message ResolvedIndexedEvent)
-> FieldType (Repeated 1 (Message ResolvedIndexedEvent))
forall a. HasField a => a -> FieldType a
getField (Repeated 1 (Message ResolvedIndexedEvent)
-> FieldType (Repeated 1 (Message ResolvedIndexedEvent)))
-> Repeated 1 (Message ResolvedIndexedEvent)
-> FieldType (Repeated 1 (Message ResolvedIndexedEvent))
forall a b. (a -> b) -> a -> b
$ Response -> Repeated 1 (Message ResolvedIndexedEvent)
ReadStream._events Response
resp
evts :: [ResolvedEvent]
evts = (ResolvedIndexedEvent -> ResolvedEvent)
-> [ResolvedIndexedEvent] -> [ResolvedEvent]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ResolvedIndexedEvent -> ResolvedEvent
newResolvedEvent [ResolvedIndexedEvent]
FieldType (Repeated 1 (Message ResolvedIndexedEvent))
es
eos :: FieldType (Field 5 (RequiredField (Always (Value Bool))))
eos = Field 5 (RequiredField (Always (Value Bool)))
-> FieldType (Field 5 (RequiredField (Always (Value Bool))))
forall a. HasField a => a -> FieldType a
getField (Field 5 (RequiredField (Always (Value Bool)))
-> FieldType (Field 5 (RequiredField (Always (Value Bool)))))
-> Field 5 (RequiredField (Always (Value Bool)))
-> FieldType (Field 5 (RequiredField (Always (Value Bool))))
forall a b. (a -> b) -> a -> b
$ Response -> Required 5 (Value Bool)
ReadStream._endOfStream Response
resp
nxt :: FieldType (Field 3 (RequiredField (Always (Value Int64))))
nxt = Field 3 (RequiredField (Always (Value Int64)))
-> FieldType (Field 3 (RequiredField (Always (Value Int64))))
forall a. HasField a => a -> FieldType a
getField (Field 3 (RequiredField (Always (Value Int64)))
-> FieldType (Field 3 (RequiredField (Always (Value Int64)))))
-> Field 3 (RequiredField (Always (Value Int64)))
-> FieldType (Field 3 (RequiredField (Always (Value Int64))))
forall a b. (a -> b) -> a -> b
$ Response -> Required 3 (Value Int64)
ReadStream._nextNumber Response
resp
case FieldType (Field 2 (RequiredField (Always (Enumeration Result))))
r of
FieldType (Field 2 (RequiredField (Always (Enumeration Result))))
ReadStream.NO_STREAM
-> LoopS (State t) () -> IO (LoopS (State t) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoopS (State t) () -> IO (LoopS (State t) ()))
-> LoopS (State t) () -> IO (LoopS (State t) ())
forall a b. (a -> b) -> a -> b
$ State t -> LoopS (State t) ()
forall s a. s -> LoopS s a
LoopS (UUID -> t -> State t
forall s. UUID -> s -> State s
Live UUID
theSubId t
pos)
FieldType (Field 2 (RequiredField (Always (Enumeration Result))))
ReadStream.SUCCESS
-> do (ResolvedEvent -> IO ()) -> [ResolvedEvent] -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubAction -> IO ())
-> (ResolvedEvent -> SubAction) -> ResolvedEvent -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResolvedEvent -> SubAction
Submit) [ResolvedEvent]
evts
if Bool
FieldType (Field 5 (RequiredField (Always (Value Bool))))
eos
then
LoopS (State EventNumber) () -> IO (LoopS (State EventNumber) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoopS (State EventNumber) () -> IO (LoopS (State EventNumber) ()))
-> LoopS (State EventNumber) ()
-> IO (LoopS (State EventNumber) ())
forall a b. (a -> b) -> a -> b
$ State EventNumber -> LoopS (State EventNumber) ()
forall s a. s -> LoopS s a
LoopS (UUID -> EventNumber -> State EventNumber
forall s. UUID -> s -> State s
Live UUID
theSubId (Int64 -> EventNumber
rawEventNumber Int64
FieldType (Field 3 (RequiredField (Always (Value Int64))))
nxt))
else
do Package
newReadPkg <- Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
forall t.
Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
createReadPkg Settings
setts StreamId t
streamId (Int64 -> EventNumber
rawEventNumber Int64
FieldType (Field 3 (RequiredField (Always (Value Int64))))
nxt) Int32
batch Bool
tos Maybe Credentials
cred
let newReadId :: UUID
newReadId = Package -> UUID
packageCorrelation Package
newReadPkg
Exec -> Transmit -> IO ()
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m Lifetime
OneTime Package
newReadPkg)
LoopS (State EventNumber) () -> IO (LoopS (State EventNumber) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoopS (State EventNumber) () -> IO (LoopS (State EventNumber) ()))
-> LoopS (State EventNumber) ()
-> IO (LoopS (State EventNumber) ())
forall a b. (a -> b) -> a -> b
$ State EventNumber -> LoopS (State EventNumber) ()
forall s a. s -> LoopS s a
LoopS (UUID -> UUID -> EventNumber -> State EventNumber
forall s. UUID -> UUID -> s -> State s
Catchup UUID
theSubId UUID
newReadId (Int64 -> EventNumber
rawEventNumber Int64
FieldType (Field 3 (RequiredField (Always (Value Int64))))
nxt))
FieldType (Field 2 (RequiredField (Always (Enumeration Result))))
_ -> () -> LoopS (State t) ()
forall s a. a -> LoopS s a
BreakS () LoopS (State t) () -> IO () -> IO (LoopS (State t) ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
SubAborted)
StreamId t
All
-> do let
Right Response
resp = Package -> Either OperationError Response
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
r :: FieldType (Field 6 (OptionalField (Last (Enumeration Result))))
r = Field 6 (OptionalField (Last (Enumeration Result)))
-> FieldType (Field 6 (OptionalField (Last (Enumeration Result))))
forall a. HasField a => a -> FieldType a
getField (Field 6 (OptionalField (Last (Enumeration Result)))
-> FieldType (Field 6 (OptionalField (Last (Enumeration Result)))))
-> Field 6 (OptionalField (Last (Enumeration Result)))
-> FieldType (Field 6 (OptionalField (Last (Enumeration Result))))
forall a b. (a -> b) -> a -> b
$ Response -> Optional 6 (Enumeration Result)
ReadAll._Result Response
resp
nc_pos :: FieldType (Field 4 (RequiredField (Always (Value Int64))))
nc_pos = Field 4 (RequiredField (Always (Value Int64)))
-> FieldType (Field 4 (RequiredField (Always (Value Int64))))
forall a. HasField a => a -> FieldType a
getField (Field 4 (RequiredField (Always (Value Int64)))
-> FieldType (Field 4 (RequiredField (Always (Value Int64)))))
-> Field 4 (RequiredField (Always (Value Int64)))
-> FieldType (Field 4 (RequiredField (Always (Value Int64))))
forall a b. (a -> b) -> a -> b
$ Response -> Required 4 (Value Int64)
ReadAll._NextCommitPosition Response
resp
np_pos :: FieldType (Field 5 (RequiredField (Always (Value Int64))))
np_pos = Field 5 (RequiredField (Always (Value Int64)))
-> FieldType (Field 5 (RequiredField (Always (Value Int64))))
forall a. HasField a => a -> FieldType a
getField (Field 5 (RequiredField (Always (Value Int64)))
-> FieldType (Field 5 (RequiredField (Always (Value Int64)))))
-> Field 5 (RequiredField (Always (Value Int64)))
-> FieldType (Field 5 (RequiredField (Always (Value Int64))))
forall a b. (a -> b) -> a -> b
$ Response -> Required 5 (Value Int64)
ReadAll._NextPreparePosition Response
resp
es :: FieldType (Repeated 3 (Message ResolvedEventBuf))
es = Repeated 3 (Message ResolvedEventBuf)
-> FieldType (Repeated 3 (Message ResolvedEventBuf))
forall a. HasField a => a -> FieldType a
getField (Repeated 3 (Message ResolvedEventBuf)
-> FieldType (Repeated 3 (Message ResolvedEventBuf)))
-> Repeated 3 (Message ResolvedEventBuf)
-> FieldType (Repeated 3 (Message ResolvedEventBuf))
forall a b. (a -> b) -> a -> b
$ Response -> Repeated 3 (Message ResolvedEventBuf)
ReadAll._Events Response
resp
evts :: [ResolvedEvent]
evts = (ResolvedEventBuf -> ResolvedEvent)
-> [ResolvedEventBuf] -> [ResolvedEvent]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ResolvedEventBuf -> ResolvedEvent
newResolvedEventFromBuf [ResolvedEventBuf]
FieldType (Repeated 3 (Message ResolvedEventBuf))
es
eos :: Bool
eos = [ResolvedEvent] -> Bool
forall mono. MonoFoldable mono => mono -> Bool
null [ResolvedEvent]
evts
n_pos :: Position
n_pos = Int64 -> Int64 -> Position
Position Int64
FieldType (Field 4 (RequiredField (Always (Value Int64))))
nc_pos Int64
FieldType (Field 5 (RequiredField (Always (Value Int64))))
np_pos
case Result -> Maybe Result -> Result
forall a. a -> Maybe a -> a
fromMaybe Result
ReadAll.SUCCESS Maybe Result
FieldType (Field 6 (OptionalField (Last (Enumeration Result))))
r of
Result
ReadAll.SUCCESS
-> do (ResolvedEvent -> IO ()) -> [ResolvedEvent] -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubAction -> IO ())
-> (ResolvedEvent -> SubAction) -> ResolvedEvent -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResolvedEvent -> SubAction
Submit) [ResolvedEvent]
evts
if Bool
eos
then
LoopS (State Position) () -> IO (LoopS (State Position) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoopS (State Position) () -> IO (LoopS (State Position) ()))
-> LoopS (State Position) () -> IO (LoopS (State Position) ())
forall a b. (a -> b) -> a -> b
$ State Position -> LoopS (State Position) ()
forall s a. s -> LoopS s a
LoopS (UUID -> Position -> State Position
forall s. UUID -> s -> State s
Live UUID
theSubId Position
n_pos)
else
do Package
newReadPkg <- Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
forall t.
Settings
-> StreamId t
-> t
-> Int32
-> Bool
-> Maybe Credentials
-> IO Package
createReadPkg Settings
setts StreamId t
streamId t
Position
n_pos Int32
batch Bool
tos Maybe Credentials
cred
let newReadId :: UUID
newReadId = Package -> UUID
packageCorrelation Package
newReadPkg
Exec -> Transmit -> IO ()
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Exec
exec (Mailbox -> Lifetime -> Package -> Transmit
Transmit Mailbox
m Lifetime
OneTime Package
newReadPkg)
LoopS (State Position) () -> IO (LoopS (State Position) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoopS (State Position) () -> IO (LoopS (State Position) ()))
-> LoopS (State Position) () -> IO (LoopS (State Position) ())
forall a b. (a -> b) -> a -> b
$ State Position -> LoopS (State Position) ()
forall s a. s -> LoopS s a
LoopS (UUID -> UUID -> Position -> State Position
forall s. UUID -> UUID -> s -> State s
Catchup UUID
theSubId UUID
newReadId Position
n_pos)
Result
_ -> () -> LoopS (State t) ()
forall s a. a -> LoopS s a
BreakS () LoopS (State t) () -> IO () -> IO (LoopS (State t) ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
SubAborted)
| Bool
otherwise
-> LoopS (State t) () -> IO (LoopS (State t) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoopS (State t) () -> IO (LoopS (State t) ()))
-> LoopS (State t) () -> IO (LoopS (State t) ())
forall a b. (a -> b) -> a -> b
$ State t -> LoopS (State t) ()
forall s a. s -> LoopS s a
LoopS State t
unchanged
unchanged :: State t
unchanged@(Live UUID
theSubId t
pos)
-> do Either OperationError Package
outcome <- Mailbox -> IO (Either OperationError Package)
forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> m (Either OperationError Package)
mailboxRead Mailbox
m
case Either OperationError Package
outcome of
Left OperationError
e
-> case OperationError
e of
OperationError
ConnectionHasDropped
-> LoopS (State t) () -> IO (LoopS (State t) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoopS (State t) () -> IO (LoopS (State t) ()))
-> LoopS (State t) () -> IO (LoopS (State t) ())
forall a b. (a -> b) -> a -> b
$ State t -> LoopS (State t) ()
forall s a. s -> LoopS s a
LoopS (t -> State t
forall s. s -> State s
Init t
pos)
OperationError
_ -> () -> LoopS (State t) ()
forall s a. a -> LoopS s a
BreakS () LoopS (State t) () -> IO () -> IO (LoopS (State t) ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
SubAborted)
Right Package
respPkg
| UUID
theSubId UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
Bool -> Bool -> Bool
&& Package -> Command
packageCmd Package
respPkg Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
subscriptionDroppedCmd
-> let Right SubscriptionDropped
resp = Package -> Either OperationError SubscriptionDropped
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
reason :: DropReason
reason = DropReason -> Maybe DropReason -> DropReason
forall a. a -> Maybe a -> a
fromMaybe DropReason
D_Unsubscribed (Field 1 (OptionalField (Last (Enumeration DropReason)))
-> FieldType
(Field 1 (OptionalField (Last (Enumeration DropReason))))
forall a. HasField a => a -> FieldType a
getField (Field 1 (OptionalField (Last (Enumeration DropReason)))
-> FieldType
(Field 1 (OptionalField (Last (Enumeration DropReason)))))
-> Field 1 (OptionalField (Last (Enumeration DropReason)))
-> FieldType
(Field 1 (OptionalField (Last (Enumeration DropReason))))
forall a b. (a -> b) -> a -> b
$ SubscriptionDropped -> Optional 1 (Enumeration DropReason)
dropReason SubscriptionDropped
resp)
subReason :: SubDropReason
subReason = DropReason -> SubDropReason
toSubDropReason DropReason
reason in
() -> LoopS (State t) ()
forall s a. a -> LoopS s a
BreakS () LoopS (State t) () -> IO () -> IO (LoopS (State t) ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDropReason -> SubAction
Dropped SubDropReason
subReason)
| UUID
theSubId UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
Bool -> Bool -> Bool
&& Package -> Command
packageCmd Package
respPkg Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
subscriptionConfirmationCmd
-> let Right SubscriptionConfirmation
resp = Package -> Either OperationError SubscriptionConfirmation
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
lcp :: FieldType (Field 1 (RequiredField (Always (Value Int64))))
lcp = Field 1 (RequiredField (Always (Value Int64)))
-> FieldType (Field 1 (RequiredField (Always (Value Int64))))
forall a. HasField a => a -> FieldType a
getField (Field 1 (RequiredField (Always (Value Int64)))
-> FieldType (Field 1 (RequiredField (Always (Value Int64)))))
-> Field 1 (RequiredField (Always (Value Int64)))
-> FieldType (Field 1 (RequiredField (Always (Value Int64))))
forall a b. (a -> b) -> a -> b
$ SubscriptionConfirmation -> Required 1 (Value Int64)
subscribeLastCommitPos SubscriptionConfirmation
resp
len :: FieldType (Field 2 (OptionalField (Last (Value Int64))))
len = Field 2 (OptionalField (Last (Value Int64)))
-> FieldType (Field 2 (OptionalField (Last (Value Int64))))
forall a. HasField a => a -> FieldType a
getField (Field 2 (OptionalField (Last (Value Int64)))
-> FieldType (Field 2 (OptionalField (Last (Value Int64)))))
-> Field 2 (OptionalField (Last (Value Int64)))
-> FieldType (Field 2 (OptionalField (Last (Value Int64))))
forall a b. (a -> b) -> a -> b
$ SubscriptionConfirmation -> Optional 2 (Value Int64)
subscribeLastEventNumber SubscriptionConfirmation
resp
details :: SubDetails
details =
SubDetails :: UUID -> Int64 -> Maybe Int64 -> Maybe Text -> SubDetails
SubDetails
{ subId :: UUID
subId = UUID
theSubId
, subCommitPos :: Int64
subCommitPos = Int64
FieldType (Field 1 (RequiredField (Always (Value Int64))))
lcp
, subLastEventNum :: Maybe Int64
subLastEventNum = Maybe Int64
FieldType (Field 2 (OptionalField (Last (Value Int64))))
len
, subSubId :: Maybe Text
subSubId = Maybe Text
forall a. Maybe a
Nothing
} in
State t -> LoopS (State t) ()
forall s a. s -> LoopS s a
LoopS State t
unchanged LoopS (State t) () -> IO () -> IO (LoopS (State t) ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (SubDetails -> SubAction
Confirmed SubDetails
details)
| UUID
theSubId UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
== Package -> UUID
packageCorrelation Package
respPkg
Bool -> Bool -> Bool
&& Package -> Command
packageCmd Package
respPkg Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
streamEventAppearedCmd
-> let
Right StreamEventAppeared
resp = Package -> Either OperationError StreamEventAppeared
forall msg. Decode msg => Package -> Either OperationError msg
decodePkg Package
respPkg
evt :: ResolvedEvent
evt = ResolvedEventBuf -> ResolvedEvent
newResolvedEventFromBuf (ResolvedEventBuf -> ResolvedEvent)
-> ResolvedEventBuf -> ResolvedEvent
forall a b. (a -> b) -> a -> b
$ Field 1 (RequiredField (Always (Message ResolvedEventBuf)))
-> FieldType
(Field 1 (RequiredField (Always (Message ResolvedEventBuf))))
forall a. HasField a => a -> FieldType a
getField (Field 1 (RequiredField (Always (Message ResolvedEventBuf)))
-> FieldType
(Field 1 (RequiredField (Always (Message ResolvedEventBuf)))))
-> Field 1 (RequiredField (Always (Message ResolvedEventBuf)))
-> FieldType
(Field 1 (RequiredField (Always (Message ResolvedEventBuf))))
forall a b. (a -> b) -> a -> b
$ StreamEventAppeared -> Required 1 (Message ResolvedEventBuf)
streamResolvedEvent StreamEventAppeared
resp
nextState :: State t
nextState =
case StreamId t
streamId of
StreamName Text
_
-> let nxt :: Int64
nxt = ResolvedEvent -> Int64
resolvedEventOriginalEventNumber ResolvedEvent
evt
in UUID -> EventNumber -> State EventNumber
forall s. UUID -> s -> State s
Live UUID
theSubId (Int64 -> EventNumber
rawEventNumber Int64
nxt)
StreamId t
All
-> let Just Position
nxtPos = ResolvedEvent -> Maybe Position
resolvedEventPosition ResolvedEvent
evt
in UUID -> Position -> State Position
forall s. UUID -> s -> State s
Live UUID
theSubId Position
nxtPos in
State t -> LoopS (State t) ()
forall s a. s -> LoopS s a
LoopS State t
nextState LoopS (State t) () -> IO () -> IO (LoopS (State t) ())
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Chan SubAction -> SubAction -> IO ()
forall (m :: * -> *) a. MonadBase IO m => Chan a -> a -> m ()
writeChan Chan SubAction
subM (ResolvedEvent -> SubAction
Submit ResolvedEvent
evt)
| Bool
otherwise
-> LoopS (State t) () -> IO (LoopS (State t) ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LoopS (State t) () -> IO (LoopS (State t) ()))
-> LoopS (State t) () -> IO (LoopS (State t) ())
forall a b. (a -> b) -> a -> b
$ State t -> LoopS (State t) ()
forall s a. s -> LoopS s a
LoopS State t
unchanged
(TVar (Maybe UUID), Chan SubAction)
-> IO (TVar (Maybe UUID), Chan SubAction)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TVar (Maybe UUID)
var, Chan SubAction
subM)
where
batch :: Int32
batch = Int32 -> Maybe Int32 -> Int32
forall a. a -> Maybe a -> a
fromMaybe Int32
defaultBatchSize Maybe Int32
batchSiz
stream :: Text
stream = StreamId t -> Text
forall t. StreamId t -> Text
streamIdRaw StreamId t
streamId