{-# LANGUAGE LambdaCase #-}
module Database.EventStore.Streaming
( ReadError(..)
, readThroughForward
, readThroughBackward
, throwOnError
) where
import Control.Exception (Exception, throwIO)
import Data.Int (Int32)
import Data.Maybe (fromMaybe)
import Data.Typeable (Typeable)
import Prelude
import Control.Concurrent.Async.Lifted (wait)
import Control.Monad.Except (ExceptT, throwError, runExceptT)
import Data.Text (Text)
import Streaming
import qualified Streaming.Prelude as Streaming
import qualified Database.EventStore as ES
import Database.EventStore.Internal.Operation.Read.Common (emptySlice)
import Database.EventStore.Internal.Types (EventNumber(..))
data ReadError t
= StreamDeleted !ES.StreamName
| ReadError !(Maybe Text)
| AccessDenied !(ES.StreamId t)
deriving Show
instance (Show t, Typeable t) => Exception (ReadError t)
data Fetch t = FetchError !(ReadError t) | Fetch !(ES.Slice t)
toFetch :: ES.ReadResult t (ES.Slice t) -> Fetch t
toFetch ES.ReadNoStream = Fetch ES.emptySlice
toFetch ES.ReadNotModified = Fetch ES.emptySlice
toFetch (ES.ReadStreamDeleted n) = FetchError (StreamDeleted n)
toFetch (ES.ReadError e) = FetchError (ReadError e)
toFetch (ES.ReadAccessDenied n) = FetchError (AccessDenied n)
toFetch (ES.ReadSuccess s) = Fetch s
handleBatchResult :: ES.StreamId t -> ES.BatchResult t -> Fetch t
handleBatchResult ES.StreamName{} = toFetch
handleBatchResult ES.All = Fetch
data State t = Need t | Fetched ![ES.ResolvedEvent] !(Maybe t)
streaming :: (t -> IO (Fetch t))
-> t
-> Stream (Of ES.ResolvedEvent) (ExceptT (ReadError t) IO) ()
streaming iteratee = Streaming.unfoldr go . Need
where
go (Fetched buffer next) =
case buffer of
e:rest -> pure (Right (e, Fetched rest next))
[] -> maybe stop (go . Need) next
go (Need pos) = do
liftIO (iteratee pos) >>= \case
FetchError e -> throwError e
Fetch s ->
case s of
ES.SliceEndOfStream -> stop
ES.Slice xs next -> go (Fetched xs next)
stop = pure (Left ())
readThroughForward :: ES.Connection
-> ES.StreamId t
-> ES.ResolveLink
-> t
-> Maybe Int32
-> Maybe ES.Credentials
-> Stream (Of ES.ResolvedEvent) (ExceptT (ReadError t) IO) ()
readThroughForward conn = readThrough conn ES.Forward
readThroughBackward :: ES.Connection
-> ES.StreamId t
-> ES.ResolveLink
-> t
-> Maybe Int32
-> Maybe ES.Credentials
-> Stream (Of ES.ResolvedEvent) (ExceptT (ReadError t) IO) ()
readThroughBackward conn = readThrough conn ES.Backward
throwOnError :: (Show t, Typeable t)
=> Stream (Of a) (ExceptT (ReadError t) IO) ()
-> Stream (Of a) IO ()
throwOnError = hoist go
where
go action =
runExceptT action >>= \case
Left e -> throwIO e
Right a -> pure a
readThrough :: ES.Connection
-> ES.ReadDirection
-> ES.StreamId t
-> ES.ResolveLink
-> t
-> Maybe Int32
-> Maybe ES.Credentials
-> Stream (Of ES.ResolvedEvent) (ExceptT (ReadError t) IO) ()
readThrough conn dir streamId lnk from sizMay cred = streaming iteratee from
where
batchSize = fromMaybe 500 sizMay
iteratee =
case dir of
ES.Forward -> readForward conn streamId batchSize lnk cred
ES.Backward -> readBackward conn streamId batchSize lnk cred
readForward :: ES.Connection
-> ES.StreamId t
-> Int32
-> ES.ResolveLink
-> Maybe ES.Credentials
-> t
-> IO (Fetch t)
readForward conn streamId siz lnk creds start =
fmap (handleBatchResult streamId) . wait =<<
ES.readEventsForward conn streamId start siz lnk creds
readBackward :: ES.Connection
-> ES.StreamId t
-> Int32
-> ES.ResolveLink
-> Maybe ES.Credentials
-> t
-> IO (Fetch t)
readBackward conn streamId siz lnk creds start =
fmap (handleBatchResult streamId) . wait =<<
ES.readEventsBackward conn streamId start siz lnk creds