{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Database.EventStore.Internal.Operation.Catchup
( catchup ) where
import Data.Int
import Data.Maybe
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Operation.Read.Common
import Database.EventStore.Internal.Operation.ReadAllEvents
import Database.EventStore.Internal.Operation.ReadStreamEvents
import Database.EventStore.Internal.Operation.Volatile
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Settings
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Types
defaultBatchSize :: Int32
defaultBatchSize = 500
streamNotFound :: Text -> OperationError
streamNotFound stream = StreamNotFound $ StreamName stream
fetchStream :: Settings
-> Text
-> Int32
-> Bool
-> Maybe Credentials
-> EventNumber
-> Code o (Slice EventNumber)
fetchStream setts stream batch tos cred (EventNumber n) = do
outcome <-
deconstruct $ fmap Left $
readStreamEvents setts Forward stream n batch tos cred
fromReadResult stream outcome pure
fetchAll :: Settings
-> Int32
-> Bool
-> Maybe Credentials
-> Position
-> Code o (Slice Position)
fetchAll setts batch tos cred (Position com pre) =
deconstruct $ fmap Left $
readAllEvents setts com pre batch tos Forward cred
sourceStream :: t
-> (forall o. t -> Code o (Slice t))
-> Operation SubAction
sourceStream seed iteratee = unfoldPlan seed go
where
go state = do
s <- iteratee state
traverse_ (yield . Submit) (sliceEvents s)
case sliceNext s of
Just newState -> pure newState
Nothing -> stop
catchup :: forall t. Settings
-> StreamId t
-> t
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> Operation SubAction
catchup setts streamId from tos batchSiz cred =
sourceStream from iteratee <> volatile streamId tos cred
where
batch = fromMaybe defaultBatchSize batchSiz
iteratee :: t -> Code o (Slice t)
iteratee =
case streamId of
StreamName n -> fetchStream setts n batch tos cred
All -> fetchAll setts batch tos cred
fromReadResult :: Text
-> ReadResult EventNumber a
-> (a -> Code o x)
-> Code o x
fromReadResult stream res k =
case res of
ReadNoStream -> failure $ streamNotFound stream
ReadStreamDeleted s -> failure $ StreamDeleted s
ReadNotModified -> failure $ ServerError Nothing
ReadError e -> failure $ ServerError e
ReadAccessDenied s -> failure $ AccessDenied s
ReadSuccess ss -> k ss