{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# OPTIONS_GHC -Wno-missing-import-lists #-}
module Asapo.Consumer
(
SomeConsumerException,
NoData,
EndOfStream,
StreamFinished,
UnavailableService,
InterruptedTransaction,
LocalIOError,
WrongInput,
PartialData,
UnsupportedClient,
DataNotInCache,
UnknownError,
Consumer,
Dataset (..),
MessageMetaHandle,
DeleteFlag (..),
ErrorOnNotExistFlag (..),
ErrorType (ErrorNoData),
FilesystemFlag (..),
PipelineStep (..),
Beamtime (Beamtime),
DataSource (DataSource),
Beamline (Beamline),
StreamInfo (..),
GroupId,
IncludeIncompleteFlag (..),
MessageMeta (..),
ServerName (..),
SourcePath (..),
SourceType (..),
StreamName (..),
InstanceId (..),
Token (..),
StreamFilter (..),
SourceCredentials (..),
NetworkConnectionType (..),
withConsumer,
withGroupId,
getCurrentSize,
getCurrentDatasetCount,
getBeamtimeMeta,
getNextDataset,
getLastDataset,
getLastDatasetInGroup,
getMessageMetaAndDataById,
getMessageMetaById,
getMessageDataById,
getNextMessageMetaAndData,
getNextMessageMeta,
getNextMessageData,
getLastMessageMetaAndData,
getLastMessageMeta,
getLastMessageData,
getLastInGroupMessageMetaAndData,
getLastInGroupMessageMeta,
getLastInGroupMessageData,
getUnacknowledgedMessages,
getCurrentConnectionType,
getStreamList,
messageIdFromInt,
queryMessages,
retrieveDataForMessageMeta,
resetLastReadMarker,
setTimeout,
setLastReadMarker,
setStreamPersistent,
acknowledge,
negativeAcknowledge,
deleteStream,
resendNacs,
)
where
import Asapo.Either.Common
( Beamline (Beamline),
Beamtime (Beamtime),
DataSource (DataSource),
InstanceId (..),
MessageId,
PipelineStep (..),
SourceCredentials (..),
SourceType (..),
StreamInfo (..),
StreamName (..),
Token (..),
messageIdFromInt,
)
import Asapo.Either.Consumer
( Consumer,
Dataset (..),
DeleteFlag (..),
Error (Error),
ErrorOnNotExistFlag (..),
ErrorType (..),
FilesystemFlag (..),
GroupId,
IncludeIncompleteFlag (..),
MessageMeta (..),
MessageMetaHandle,
NetworkConnectionType (..),
ServerName (..),
SourcePath (..),
StreamFilter (..),
getCurrentConnectionType,
resendNacs,
setTimeout,
)
import qualified Asapo.Either.Consumer as PC
import Control.Applicative (pure)
import Control.Exception (Exception (fromException, toException), SomeException, throw)
import Control.Monad (Monad)
import qualified Data.ByteString as BS
import Data.Either (Either (Left, Right))
import Data.Function ((.))
import Data.Int (Int)
import Data.Maybe (Maybe)
import Data.Text (Text)
import Data.Time (NominalDiffTime)
import Data.Typeable (cast)
import Data.Word (Word64)
import System.IO (IO)
import Text.Show (Show, show)
import Prelude ()
data SomeConsumerException
= forall e. (Exception e) => SomeConsumerException e
instance Show SomeConsumerException where
show :: SomeConsumerException -> String
show (SomeConsumerException e
e) = e -> String
forall a. Show a => a -> String
show e
e
instance Exception SomeConsumerException
consumerExceptionToException :: (Exception e) => e -> SomeException
consumerExceptionToException :: forall e. Exception e => e -> SomeException
consumerExceptionToException = SomeConsumerException -> SomeException
forall e. Exception e => e -> SomeException
toException (SomeConsumerException -> SomeException)
-> (e -> SomeConsumerException) -> e -> SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> SomeConsumerException
forall e. Exception e => e -> SomeConsumerException
SomeConsumerException
consumerExceptionFromException :: (Exception e) => SomeException -> Maybe e
consumerExceptionFromException :: forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException SomeException
x = do
SomeConsumerException e
a <- SomeException -> Maybe SomeConsumerException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
x
e -> Maybe e
forall a b. (Typeable a, Typeable b) => a -> Maybe b
cast e
a
newtype NoData = NoData Text deriving (Int -> NoData -> ShowS
[NoData] -> ShowS
NoData -> String
(Int -> NoData -> ShowS)
-> (NoData -> String) -> ([NoData] -> ShowS) -> Show NoData
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> NoData -> ShowS
showsPrec :: Int -> NoData -> ShowS
$cshow :: NoData -> String
show :: NoData -> String
$cshowList :: [NoData] -> ShowS
showList :: [NoData] -> ShowS
Show)
instance Exception NoData where
toException :: NoData -> SomeException
toException = NoData -> SomeException
forall e. Exception e => e -> SomeException
consumerExceptionToException
fromException :: SomeException -> Maybe NoData
fromException = SomeException -> Maybe NoData
forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException
newtype EndOfStream = EndOfStream Text deriving (Int -> EndOfStream -> ShowS
[EndOfStream] -> ShowS
EndOfStream -> String
(Int -> EndOfStream -> ShowS)
-> (EndOfStream -> String)
-> ([EndOfStream] -> ShowS)
-> Show EndOfStream
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> EndOfStream -> ShowS
showsPrec :: Int -> EndOfStream -> ShowS
$cshow :: EndOfStream -> String
show :: EndOfStream -> String
$cshowList :: [EndOfStream] -> ShowS
showList :: [EndOfStream] -> ShowS
Show)
instance Exception EndOfStream where
toException :: EndOfStream -> SomeException
toException = EndOfStream -> SomeException
forall e. Exception e => e -> SomeException
consumerExceptionToException
fromException :: SomeException -> Maybe EndOfStream
fromException = SomeException -> Maybe EndOfStream
forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException
newtype StreamFinished = StreamFinished Text deriving (Int -> StreamFinished -> ShowS
[StreamFinished] -> ShowS
StreamFinished -> String
(Int -> StreamFinished -> ShowS)
-> (StreamFinished -> String)
-> ([StreamFinished] -> ShowS)
-> Show StreamFinished
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> StreamFinished -> ShowS
showsPrec :: Int -> StreamFinished -> ShowS
$cshow :: StreamFinished -> String
show :: StreamFinished -> String
$cshowList :: [StreamFinished] -> ShowS
showList :: [StreamFinished] -> ShowS
Show)
instance Exception StreamFinished where
toException :: StreamFinished -> SomeException
toException = StreamFinished -> SomeException
forall e. Exception e => e -> SomeException
consumerExceptionToException
fromException :: SomeException -> Maybe StreamFinished
fromException = SomeException -> Maybe StreamFinished
forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException
newtype UnavailableService = UnavailableService Text deriving (Int -> UnavailableService -> ShowS
[UnavailableService] -> ShowS
UnavailableService -> String
(Int -> UnavailableService -> ShowS)
-> (UnavailableService -> String)
-> ([UnavailableService] -> ShowS)
-> Show UnavailableService
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UnavailableService -> ShowS
showsPrec :: Int -> UnavailableService -> ShowS
$cshow :: UnavailableService -> String
show :: UnavailableService -> String
$cshowList :: [UnavailableService] -> ShowS
showList :: [UnavailableService] -> ShowS
Show)
instance Exception UnavailableService where
toException :: UnavailableService -> SomeException
toException = UnavailableService -> SomeException
forall e. Exception e => e -> SomeException
consumerExceptionToException
fromException :: SomeException -> Maybe UnavailableService
fromException = SomeException -> Maybe UnavailableService
forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException
newtype InterruptedTransaction = InterruptedTransaction Text deriving (Int -> InterruptedTransaction -> ShowS
[InterruptedTransaction] -> ShowS
InterruptedTransaction -> String
(Int -> InterruptedTransaction -> ShowS)
-> (InterruptedTransaction -> String)
-> ([InterruptedTransaction] -> ShowS)
-> Show InterruptedTransaction
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> InterruptedTransaction -> ShowS
showsPrec :: Int -> InterruptedTransaction -> ShowS
$cshow :: InterruptedTransaction -> String
show :: InterruptedTransaction -> String
$cshowList :: [InterruptedTransaction] -> ShowS
showList :: [InterruptedTransaction] -> ShowS
Show)
instance Exception InterruptedTransaction where
toException :: InterruptedTransaction -> SomeException
toException = InterruptedTransaction -> SomeException
forall e. Exception e => e -> SomeException
consumerExceptionToException
fromException :: SomeException -> Maybe InterruptedTransaction
fromException = SomeException -> Maybe InterruptedTransaction
forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException
newtype LocalIOError = LocalIOError Text deriving (Int -> LocalIOError -> ShowS
[LocalIOError] -> ShowS
LocalIOError -> String
(Int -> LocalIOError -> ShowS)
-> (LocalIOError -> String)
-> ([LocalIOError] -> ShowS)
-> Show LocalIOError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> LocalIOError -> ShowS
showsPrec :: Int -> LocalIOError -> ShowS
$cshow :: LocalIOError -> String
show :: LocalIOError -> String
$cshowList :: [LocalIOError] -> ShowS
showList :: [LocalIOError] -> ShowS
Show)
instance Exception LocalIOError where
toException :: LocalIOError -> SomeException
toException = LocalIOError -> SomeException
forall e. Exception e => e -> SomeException
consumerExceptionToException
fromException :: SomeException -> Maybe LocalIOError
fromException = SomeException -> Maybe LocalIOError
forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException
newtype WrongInput = WrongInput Text deriving (Int -> WrongInput -> ShowS
[WrongInput] -> ShowS
WrongInput -> String
(Int -> WrongInput -> ShowS)
-> (WrongInput -> String)
-> ([WrongInput] -> ShowS)
-> Show WrongInput
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> WrongInput -> ShowS
showsPrec :: Int -> WrongInput -> ShowS
$cshow :: WrongInput -> String
show :: WrongInput -> String
$cshowList :: [WrongInput] -> ShowS
showList :: [WrongInput] -> ShowS
Show)
instance Exception WrongInput where
toException :: WrongInput -> SomeException
toException = WrongInput -> SomeException
forall e. Exception e => e -> SomeException
consumerExceptionToException
fromException :: SomeException -> Maybe WrongInput
fromException = SomeException -> Maybe WrongInput
forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException
newtype PartialData = PartialData Text deriving (Int -> PartialData -> ShowS
[PartialData] -> ShowS
PartialData -> String
(Int -> PartialData -> ShowS)
-> (PartialData -> String)
-> ([PartialData] -> ShowS)
-> Show PartialData
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> PartialData -> ShowS
showsPrec :: Int -> PartialData -> ShowS
$cshow :: PartialData -> String
show :: PartialData -> String
$cshowList :: [PartialData] -> ShowS
showList :: [PartialData] -> ShowS
Show)
instance Exception PartialData where
toException :: PartialData -> SomeException
toException = PartialData -> SomeException
forall e. Exception e => e -> SomeException
consumerExceptionToException
fromException :: SomeException -> Maybe PartialData
fromException = SomeException -> Maybe PartialData
forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException
newtype UnsupportedClient = UnsupportedClient Text deriving (Int -> UnsupportedClient -> ShowS
[UnsupportedClient] -> ShowS
UnsupportedClient -> String
(Int -> UnsupportedClient -> ShowS)
-> (UnsupportedClient -> String)
-> ([UnsupportedClient] -> ShowS)
-> Show UnsupportedClient
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UnsupportedClient -> ShowS
showsPrec :: Int -> UnsupportedClient -> ShowS
$cshow :: UnsupportedClient -> String
show :: UnsupportedClient -> String
$cshowList :: [UnsupportedClient] -> ShowS
showList :: [UnsupportedClient] -> ShowS
Show)
instance Exception UnsupportedClient where
toException :: UnsupportedClient -> SomeException
toException = UnsupportedClient -> SomeException
forall e. Exception e => e -> SomeException
consumerExceptionToException
fromException :: SomeException -> Maybe UnsupportedClient
fromException = SomeException -> Maybe UnsupportedClient
forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException
newtype DataNotInCache = DataNotInCache Text deriving (Int -> DataNotInCache -> ShowS
[DataNotInCache] -> ShowS
DataNotInCache -> String
(Int -> DataNotInCache -> ShowS)
-> (DataNotInCache -> String)
-> ([DataNotInCache] -> ShowS)
-> Show DataNotInCache
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> DataNotInCache -> ShowS
showsPrec :: Int -> DataNotInCache -> ShowS
$cshow :: DataNotInCache -> String
show :: DataNotInCache -> String
$cshowList :: [DataNotInCache] -> ShowS
showList :: [DataNotInCache] -> ShowS
Show)
instance Exception DataNotInCache where
toException :: DataNotInCache -> SomeException
toException = DataNotInCache -> SomeException
forall e. Exception e => e -> SomeException
consumerExceptionToException
fromException :: SomeException -> Maybe DataNotInCache
fromException = SomeException -> Maybe DataNotInCache
forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException
newtype UnknownError = UnknownError Text deriving (Int -> UnknownError -> ShowS
[UnknownError] -> ShowS
UnknownError -> String
(Int -> UnknownError -> ShowS)
-> (UnknownError -> String)
-> ([UnknownError] -> ShowS)
-> Show UnknownError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UnknownError -> ShowS
showsPrec :: Int -> UnknownError -> ShowS
$cshow :: UnknownError -> String
show :: UnknownError -> String
$cshowList :: [UnknownError] -> ShowS
showList :: [UnknownError] -> ShowS
Show)
instance Exception UnknownError where
toException :: UnknownError -> SomeException
toException = UnknownError -> SomeException
forall e. Exception e => e -> SomeException
consumerExceptionToException
fromException :: SomeException -> Maybe UnknownError
fromException = SomeException -> Maybe UnknownError
forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException
errorTypeToException :: ErrorType -> Text -> a
errorTypeToException :: forall a. ErrorType -> Text -> a
errorTypeToException ErrorType
ErrorNoData = NoData -> a
forall a e. Exception e => e -> a
throw (NoData -> a) -> (Text -> NoData) -> Text -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> NoData
NoData
errorTypeToException ErrorType
ErrorEndOfStream = EndOfStream -> a
forall a e. Exception e => e -> a
throw (EndOfStream -> a) -> (Text -> EndOfStream) -> Text -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> EndOfStream
EndOfStream
errorTypeToException ErrorType
ErrorStreamFinished = StreamFinished -> a
forall a e. Exception e => e -> a
throw (StreamFinished -> a) -> (Text -> StreamFinished) -> Text -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> StreamFinished
StreamFinished
errorTypeToException ErrorType
ErrorUnavailableService = UnavailableService -> a
forall a e. Exception e => e -> a
throw (UnavailableService -> a)
-> (Text -> UnavailableService) -> Text -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> UnavailableService
UnavailableService
errorTypeToException ErrorType
ErrorInterruptedTransaction = InterruptedTransaction -> a
forall a e. Exception e => e -> a
throw (InterruptedTransaction -> a)
-> (Text -> InterruptedTransaction) -> Text -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> InterruptedTransaction
InterruptedTransaction
errorTypeToException ErrorType
ErrorLocalIOError = LocalIOError -> a
forall a e. Exception e => e -> a
throw (LocalIOError -> a) -> (Text -> LocalIOError) -> Text -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> LocalIOError
LocalIOError
errorTypeToException ErrorType
ErrorWrongInput = WrongInput -> a
forall a e. Exception e => e -> a
throw (WrongInput -> a) -> (Text -> WrongInput) -> Text -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> WrongInput
WrongInput
errorTypeToException ErrorType
ErrorPartialData = PartialData -> a
forall a e. Exception e => e -> a
throw (PartialData -> a) -> (Text -> PartialData) -> Text -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> PartialData
PartialData
errorTypeToException ErrorType
ErrorUnsupportedClient = UnsupportedClient -> a
forall a e. Exception e => e -> a
throw (UnsupportedClient -> a)
-> (Text -> UnsupportedClient) -> Text -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> UnsupportedClient
UnsupportedClient
errorTypeToException ErrorType
ErrorDataNotInCache = DataNotInCache -> a
forall a e. Exception e => e -> a
throw (DataNotInCache -> a) -> (Text -> DataNotInCache) -> Text -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> DataNotInCache
DataNotInCache
errorTypeToException ErrorType
ErrorUnknownError = UnknownError -> a
forall a e. Exception e => e -> a
throw (UnknownError -> a) -> (Text -> UnknownError) -> Text -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> UnknownError
UnknownError
withConsumer :: forall a. ServerName -> SourcePath -> FilesystemFlag -> SourceCredentials -> (Consumer -> IO a) -> IO a
withConsumer :: forall a.
ServerName
-> SourcePath
-> FilesystemFlag
-> SourceCredentials
-> (Consumer -> IO a)
-> IO a
withConsumer ServerName
serverName SourcePath
sourcePath FilesystemFlag
filesystemFlag SourceCredentials
creds Consumer -> IO a
onSuccess =
let onError :: Error -> a
onError (Error Text
errorMessage ErrorType
errorType) = ErrorType -> Text -> a
forall a. ErrorType -> Text -> a
errorTypeToException ErrorType
errorType Text
errorMessage
in ServerName
-> SourcePath
-> FilesystemFlag
-> SourceCredentials
-> (Error -> IO a)
-> (Consumer -> IO a)
-> IO a
forall a.
ServerName
-> SourcePath
-> FilesystemFlag
-> SourceCredentials
-> (Error -> IO a)
-> (Consumer -> IO a)
-> IO a
PC.withConsumer ServerName
serverName SourcePath
sourcePath FilesystemFlag
filesystemFlag SourceCredentials
creds Error -> IO a
forall {a}. Error -> a
onError Consumer -> IO a
onSuccess
withGroupId :: forall a. Consumer -> (GroupId -> IO a) -> IO a
withGroupId :: forall a. Consumer -> (GroupId -> IO a) -> IO a
withGroupId Consumer
consumer GroupId -> IO a
onSuccess =
let onError :: Error -> a
onError (Error Text
errorMessage ErrorType
errorType) = ErrorType -> Text -> a
forall a. ErrorType -> Text -> a
errorTypeToException ErrorType
errorType Text
errorMessage
in Consumer -> (Error -> IO a) -> (GroupId -> IO a) -> IO a
forall a. Consumer -> (Error -> IO a) -> (GroupId -> IO a) -> IO a
PC.withGroupId Consumer
consumer Error -> IO a
forall {a}. Error -> a
onError GroupId -> IO a
onSuccess
maybeThrow :: (Monad m) => m (Either Error b) -> m b
maybeThrow :: forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow m (Either Error b)
f = do
Either Error b
result <- m (Either Error b)
f
case Either Error b
result of
Left (Error Text
errorMessage ErrorType
errorType) -> ErrorType -> Text -> m b
forall a. ErrorType -> Text -> a
errorTypeToException ErrorType
errorType Text
errorMessage
Right b
v -> b -> m b
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
v
resetLastReadMarker :: Consumer -> GroupId -> StreamName -> IO Int
resetLastReadMarker :: Consumer -> GroupId -> StreamName -> IO Int
resetLastReadMarker Consumer
consumer GroupId
groupId StreamName
streamName = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> GroupId -> StreamName -> IO (Either Error Int)
PC.resetLastReadMarker Consumer
consumer GroupId
groupId StreamName
streamName)
setLastReadMarker :: Consumer -> GroupId -> StreamName -> MessageId -> IO Int
setLastReadMarker :: Consumer -> GroupId -> StreamName -> MessageId -> IO Int
setLastReadMarker Consumer
consumer GroupId
groupId StreamName
streamName MessageId
value = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> GroupId -> StreamName -> MessageId -> IO (Either Error Int)
PC.setLastReadMarker Consumer
consumer GroupId
groupId StreamName
streamName MessageId
value)
acknowledge :: Consumer -> GroupId -> StreamName -> MessageId -> IO Int
acknowledge :: Consumer -> GroupId -> StreamName -> MessageId -> IO Int
acknowledge Consumer
consumer GroupId
groupId StreamName
streamName MessageId
messageId = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> GroupId -> StreamName -> MessageId -> IO (Either Error Int)
PC.acknowledge Consumer
consumer GroupId
groupId StreamName
streamName MessageId
messageId)
negativeAcknowledge ::
Consumer ->
GroupId ->
StreamName ->
MessageId ->
NominalDiffTime ->
IO Int
negativeAcknowledge :: Consumer
-> GroupId -> StreamName -> MessageId -> NominalDiffTime -> IO Int
negativeAcknowledge Consumer
consumer GroupId
groupId StreamName
streamName MessageId
messageId NominalDiffTime
delay = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> GroupId
-> StreamName
-> MessageId
-> NominalDiffTime
-> IO (Either Error Int)
PC.negativeAcknowledge Consumer
consumer GroupId
groupId StreamName
streamName MessageId
messageId NominalDiffTime
delay)
getUnacknowledgedMessages :: Consumer -> GroupId -> StreamName -> (MessageId, MessageId) -> IO [MessageId]
getUnacknowledgedMessages :: Consumer
-> GroupId
-> StreamName
-> (MessageId, MessageId)
-> IO [MessageId]
getUnacknowledgedMessages Consumer
consumer GroupId
groupId StreamName
streamName (MessageId
from, MessageId
to) = IO (Either Error [MessageId]) -> IO [MessageId]
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> GroupId
-> StreamName
-> (MessageId, MessageId)
-> IO (Either Error [MessageId])
PC.getUnacknowledgedMessages Consumer
consumer GroupId
groupId StreamName
streamName (MessageId
from, MessageId
to))
getStreamList :: Consumer -> Maybe StreamName -> StreamFilter -> IO [StreamInfo]
getStreamList :: Consumer -> Maybe StreamName -> StreamFilter -> IO [StreamInfo]
getStreamList Consumer
consumer Maybe StreamName
streamName StreamFilter
filter = IO (Either Error [StreamInfo]) -> IO [StreamInfo]
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> Maybe StreamName
-> StreamFilter
-> IO (Either Error [StreamInfo])
PC.getStreamList Consumer
consumer Maybe StreamName
streamName StreamFilter
filter)
deleteStream :: Consumer -> StreamName -> DeleteFlag -> ErrorOnNotExistFlag -> IO Int
deleteStream :: Consumer
-> StreamName -> DeleteFlag -> ErrorOnNotExistFlag -> IO Int
deleteStream Consumer
consumer StreamName
streamName DeleteFlag
deleteFlag ErrorOnNotExistFlag
errorOnNotExistFlag = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> StreamName
-> DeleteFlag
-> ErrorOnNotExistFlag
-> IO (Either Error Int)
PC.deleteStream Consumer
consumer StreamName
streamName DeleteFlag
deleteFlag ErrorOnNotExistFlag
errorOnNotExistFlag)
setStreamPersistent :: Consumer -> StreamName -> IO Int
setStreamPersistent :: Consumer -> StreamName -> IO Int
setStreamPersistent Consumer
consumer StreamName
streamName = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> StreamName -> IO (Either Error Int)
PC.setStreamPersistent Consumer
consumer StreamName
streamName)
getCurrentSize :: Consumer -> StreamName -> IO Int
getCurrentSize :: Consumer -> StreamName -> IO Int
getCurrentSize Consumer
consumer StreamName
streamName = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> StreamName -> IO (Either Error Int)
PC.getCurrentSize Consumer
consumer StreamName
streamName)
getCurrentDatasetCount :: Consumer -> StreamName -> IncludeIncompleteFlag -> IO Int
getCurrentDatasetCount :: Consumer -> StreamName -> IncludeIncompleteFlag -> IO Int
getCurrentDatasetCount Consumer
consumer StreamName
streamName IncludeIncompleteFlag
inludeIncomplete = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> StreamName -> IncludeIncompleteFlag -> IO (Either Error Int)
PC.getCurrentDatasetCount Consumer
consumer StreamName
streamName IncludeIncompleteFlag
inludeIncomplete)
getBeamtimeMeta :: Consumer -> IO (Maybe Text)
getBeamtimeMeta :: Consumer -> IO (Maybe Text)
getBeamtimeMeta Consumer
consumer = IO (Either Error (Maybe Text)) -> IO (Maybe Text)
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> IO (Either Error (Maybe Text))
PC.getBeamtimeMeta Consumer
consumer)
getNextDataset ::
Consumer ->
GroupId ->
Word64 ->
StreamName ->
IO Dataset
getNextDataset :: Consumer -> GroupId -> Word64 -> StreamName -> IO Dataset
getNextDataset Consumer
consumer GroupId
groupId Word64
minSize StreamName
streamName = IO (Either Error Dataset) -> IO Dataset
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> GroupId -> Word64 -> StreamName -> IO (Either Error Dataset)
PC.getNextDataset Consumer
consumer GroupId
groupId Word64
minSize StreamName
streamName)
getLastDataset ::
Consumer ->
Word64 ->
StreamName ->
IO Dataset
getLastDataset :: Consumer -> Word64 -> StreamName -> IO Dataset
getLastDataset Consumer
consumer Word64
minSize StreamName
streamName = IO (Either Error Dataset) -> IO Dataset
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> Word64 -> StreamName -> IO (Either Error Dataset)
PC.getLastDataset Consumer
consumer Word64
minSize StreamName
streamName)
getLastDatasetInGroup ::
Consumer ->
GroupId ->
Word64 ->
StreamName ->
IO Dataset
getLastDatasetInGroup :: Consumer -> GroupId -> Word64 -> StreamName -> IO Dataset
getLastDatasetInGroup Consumer
consumer GroupId
groupId Word64
minSize StreamName
streamName = IO (Either Error Dataset) -> IO Dataset
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> GroupId -> Word64 -> StreamName -> IO (Either Error Dataset)
PC.getLastDatasetInGroup Consumer
consumer GroupId
groupId Word64
minSize StreamName
streamName)
getMessageMetaAndDataById :: Consumer -> StreamName -> MessageId -> IO (MessageMeta, BS.ByteString)
getMessageMetaAndDataById :: Consumer -> StreamName -> MessageId -> IO (MessageMeta, ByteString)
getMessageMetaAndDataById Consumer
consumer StreamName
streamName MessageId
messageId = IO (Either Error (MessageMeta, ByteString))
-> IO (MessageMeta, ByteString)
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> StreamName
-> MessageId
-> IO (Either Error (MessageMeta, ByteString))
PC.getMessageMetaAndDataById Consumer
consumer StreamName
streamName MessageId
messageId)
getMessageMetaById :: Consumer -> StreamName -> MessageId -> IO MessageMeta
getMessageMetaById :: Consumer -> StreamName -> MessageId -> IO MessageMeta
getMessageMetaById Consumer
consumer StreamName
streamName MessageId
messageId = IO (Either Error MessageMeta) -> IO MessageMeta
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> StreamName -> MessageId -> IO (Either Error MessageMeta)
PC.getMessageMetaById Consumer
consumer StreamName
streamName MessageId
messageId)
getMessageDataById :: Consumer -> StreamName -> MessageId -> IO BS.ByteString
getMessageDataById :: Consumer -> StreamName -> MessageId -> IO ByteString
getMessageDataById Consumer
consumer StreamName
streamName MessageId
messageId = IO (Either Error ByteString) -> IO ByteString
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> StreamName -> MessageId -> IO (Either Error ByteString)
PC.getMessageDataById Consumer
consumer StreamName
streamName MessageId
messageId)
getLastMessageMetaAndData :: Consumer -> StreamName -> IO (MessageMeta, BS.ByteString)
getLastMessageMetaAndData :: Consumer -> StreamName -> IO (MessageMeta, ByteString)
getLastMessageMetaAndData Consumer
consumer StreamName
streamName = IO (Either Error (MessageMeta, ByteString))
-> IO (MessageMeta, ByteString)
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> StreamName -> IO (Either Error (MessageMeta, ByteString))
PC.getLastMessageMetaAndData Consumer
consumer StreamName
streamName)
getLastMessageMeta :: Consumer -> StreamName -> IO MessageMeta
getLastMessageMeta :: Consumer -> StreamName -> IO MessageMeta
getLastMessageMeta Consumer
consumer StreamName
streamName = IO (Either Error MessageMeta) -> IO MessageMeta
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> StreamName -> IO (Either Error MessageMeta)
PC.getLastMessageMeta Consumer
consumer StreamName
streamName)
getLastMessageData :: Consumer -> StreamName -> IO BS.ByteString
getLastMessageData :: Consumer -> StreamName -> IO ByteString
getLastMessageData Consumer
consumer StreamName
streamName = IO (Either Error ByteString) -> IO ByteString
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> StreamName -> IO (Either Error ByteString)
PC.getLastMessageData Consumer
consumer StreamName
streamName)
getLastInGroupMessageMetaAndData :: Consumer -> StreamName -> GroupId -> IO (MessageMeta, BS.ByteString)
getLastInGroupMessageMetaAndData :: Consumer -> StreamName -> GroupId -> IO (MessageMeta, ByteString)
getLastInGroupMessageMetaAndData Consumer
consumer StreamName
streamName GroupId
groupId = IO (Either Error (MessageMeta, ByteString))
-> IO (MessageMeta, ByteString)
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> StreamName
-> GroupId
-> IO (Either Error (MessageMeta, ByteString))
PC.getLastInGroupMessageMetaAndData Consumer
consumer StreamName
streamName GroupId
groupId)
getLastInGroupMessageMeta :: Consumer -> StreamName -> GroupId -> IO MessageMeta
getLastInGroupMessageMeta :: Consumer -> StreamName -> GroupId -> IO MessageMeta
getLastInGroupMessageMeta Consumer
consumer StreamName
streamName GroupId
groupId = IO (Either Error MessageMeta) -> IO MessageMeta
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> StreamName -> GroupId -> IO (Either Error MessageMeta)
PC.getLastInGroupMessageMeta Consumer
consumer StreamName
streamName GroupId
groupId)
getLastInGroupMessageData :: Consumer -> StreamName -> GroupId -> IO BS.ByteString
getLastInGroupMessageData :: Consumer -> StreamName -> GroupId -> IO ByteString
getLastInGroupMessageData Consumer
consumer StreamName
streamName GroupId
groupId = IO (Either Error ByteString) -> IO ByteString
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> StreamName -> GroupId -> IO (Either Error ByteString)
PC.getLastInGroupMessageData Consumer
consumer StreamName
streamName GroupId
groupId)
getNextMessageMetaAndData :: Consumer -> StreamName -> GroupId -> IO (MessageMeta, BS.ByteString)
getNextMessageMetaAndData :: Consumer -> StreamName -> GroupId -> IO (MessageMeta, ByteString)
getNextMessageMetaAndData Consumer
consumer StreamName
streamName GroupId
groupId = IO (Either Error (MessageMeta, ByteString))
-> IO (MessageMeta, ByteString)
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> StreamName
-> GroupId
-> IO (Either Error (MessageMeta, ByteString))
PC.getNextMessageMetaAndData Consumer
consumer StreamName
streamName GroupId
groupId)
getNextMessageMeta :: Consumer -> StreamName -> GroupId -> IO MessageMeta
getNextMessageMeta :: Consumer -> StreamName -> GroupId -> IO MessageMeta
getNextMessageMeta Consumer
consumer StreamName
streamName GroupId
groupId = IO (Either Error MessageMeta) -> IO MessageMeta
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> StreamName -> GroupId -> IO (Either Error MessageMeta)
PC.getNextMessageMeta Consumer
consumer StreamName
streamName GroupId
groupId)
getNextMessageData :: Consumer -> StreamName -> GroupId -> IO BS.ByteString
getNextMessageData :: Consumer -> StreamName -> GroupId -> IO ByteString
getNextMessageData Consumer
consumer StreamName
streamName GroupId
groupId = IO (Either Error ByteString) -> IO ByteString
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> StreamName -> GroupId -> IO (Either Error ByteString)
PC.getNextMessageData Consumer
consumer StreamName
streamName GroupId
groupId)
queryMessages ::
Consumer ->
Text ->
StreamName ->
IO [MessageMeta]
queryMessages :: Consumer -> Text -> StreamName -> IO [MessageMeta]
queryMessages Consumer
consumer Text
query StreamName
streamName = IO (Either Error [MessageMeta]) -> IO [MessageMeta]
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> Text -> StreamName -> IO (Either Error [MessageMeta])
PC.queryMessages Consumer
consumer Text
query StreamName
streamName)
retrieveDataForMessageMeta :: Consumer -> MessageMeta -> IO BS.ByteString
retrieveDataForMessageMeta :: Consumer -> MessageMeta -> IO ByteString
retrieveDataForMessageMeta Consumer
consumer MessageMeta
meta = IO (Either Error ByteString) -> IO ByteString
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> MessageMeta -> IO (Either Error ByteString)
PC.retrieveDataForMessageMeta Consumer
consumer MessageMeta
meta)