{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# OPTIONS_GHC -Wno-missing-import-lists #-}

-- |
-- Description : High-level interface for all consumer-related functions, using exceptions instead of @Either@
--
-- To implement an ASAP:O consumer, you should only need this interface.
-- It exposes no memory-management functions (like free) or pointers, and
-- is thus safe to use.
--
-- = Simple Example
--
-- Here's some code for a simple consumer that connects, outputs the available streams, and then gets a specific message by ID with metadata and data:
--
-- >>> :seti -XOverloadedStrings
-- >>> :{
--  module Main where
--  import Asapo.Consumer
--  import Prelude hiding (putStrLn)
--  import Data.Maybe(fromMaybe)
--  import Control.Monad(forM_)
--  import Data.Text(pack)
--  import Data.Text.IO(putStrLn)
--  import Data.Text.Encoding(decodeUtf8)
--  import qualified Data.ByteString as BS
--  main :: IO ()
--  main =
--    withConsumer
--      (ServerName "localhost:8040")
--      (SourcePath "")
--      WithoutFilesystem
--      ( SourceCredentials
--        { sourceType = RawSource,
--          instanceId = InstanceId "auto",
--          pipelineStep = PipelineStep "ps1",
--          beamtime = Beamtime "asapo_test",
--          beamline = Beamline "",
--          dataSource = DataSource "asapo_source",
--          token = Token "token-please-change"
--        }
--      ) $ \consumer -> do
--        beamtimeMeta <- getBeamtimeMeta consumer
--        putStrLn $ "beamtime metadata: " <> (fromMaybe "N/A" beamtimeMeta)
--        streams <- getStreamList consumer Nothing FilterAllStreams
--        forM_ streams $ \stream -> do
--          putStrLn $ "=> stream info " <> pack (show stream)
--          streamSize <- getCurrentSize consumer (streamInfoName stream)
--          putStrLn $ "   stream size: " <> pack (show streamSize)
--          datasetCount <- getCurrentDatasetCount consumer (streamInfoName stream) IncludeIncomplete
--          putStrLn $ "   dataset count: " <> pack (show datasetCount)
--        metaAndData <- getMessageMetaAndDataById consumer (StreamName "default") (messageIdFromInt 1337)
--        putStrLn $ "meta: " <> pack (show (fst metaAndData))
--        putStrLn $ "data: " <> decodeUtf8 (snd metaAndData)
-- :}
module Asapo.Consumer
  ( -- * Error types
    SomeConsumerException,
    NoData,
    EndOfStream,
    StreamFinished,
    UnavailableService,
    InterruptedTransaction,
    LocalIOError,
    WrongInput,
    PartialData,
    UnsupportedClient,
    DataNotInCache,
    UnknownError,

    -- * Types
    Consumer,
    Dataset (..),
    MessageMetaHandle,
    DeleteFlag (..),
    ErrorOnNotExistFlag (..),
    ErrorType (ErrorNoData),
    FilesystemFlag (..),
    PipelineStep (..),
    Beamtime (Beamtime),
    DataSource (DataSource),
    Beamline (Beamline),
    StreamInfo (..),
    GroupId,
    IncludeIncompleteFlag (..),
    MessageMeta (..),
    ServerName (..),
    SourcePath (..),
    SourceType (..),
    StreamName (..),
    InstanceId (..),
    Token (..),
    StreamFilter (..),
    SourceCredentials (..),
    NetworkConnectionType (..),

    -- * Initialization
    withConsumer,
    withGroupId,

    -- * Getters
    getCurrentSize,
    getCurrentDatasetCount,
    getBeamtimeMeta,
    getNextDataset,
    getLastDataset,
    getLastDatasetInGroup,
    getMessageMetaAndDataById,
    getMessageMetaById,
    getMessageDataById,
    getNextMessageMetaAndData,
    getNextMessageMeta,
    getNextMessageData,
    getLastMessageMetaAndData,
    getLastMessageMeta,
    getLastMessageData,
    getLastInGroupMessageMetaAndData,
    getLastInGroupMessageMeta,
    getLastInGroupMessageData,
    getUnacknowledgedMessages,
    getCurrentConnectionType,
    getStreamList,
    messageIdFromInt,
    queryMessages,
    retrieveDataForMessageMeta,

    -- * Modifiers
    resetLastReadMarker,
    setTimeout,
    setLastReadMarker,
    setStreamPersistent,
    acknowledge,
    negativeAcknowledge,
    deleteStream,
    resendNacs,
  )
where

import Asapo.Either.Common
  ( Beamline (Beamline),
    Beamtime (Beamtime),
    DataSource (DataSource),
    InstanceId (..),
    MessageId,
    PipelineStep (..),
    SourceCredentials (..),
    SourceType (..),
    StreamInfo (..),
    StreamName (..),
    Token (..),
    messageIdFromInt,
  )
import Asapo.Either.Consumer
  ( Consumer,
    Dataset (..),
    DeleteFlag (..),
    Error (Error),
    ErrorOnNotExistFlag (..),
    ErrorType (..),
    FilesystemFlag (..),
    GroupId,
    IncludeIncompleteFlag (..),
    MessageMeta (..),
    MessageMetaHandle,
    NetworkConnectionType (..),
    ServerName (..),
    SourcePath (..),
    StreamFilter (..),
    getCurrentConnectionType,
    resendNacs,
    setTimeout,
  )
import qualified Asapo.Either.Consumer as PC
import Control.Applicative (pure)
import Control.Exception (Exception (fromException, toException), SomeException, throw)
import Control.Monad (Monad)
import qualified Data.ByteString as BS
import Data.Either (Either (Left, Right))
import Data.Function ((.))
import Data.Int (Int)
import Data.Maybe (Maybe)
import Data.Text (Text)
import Data.Time (NominalDiffTime)
import Data.Typeable (cast)
import Data.Word (Word64)
import System.IO (IO)
import Text.Show (Show, show)
import Prelude ()

-- $setup
-- >>> :seti -XOverloadedStrings
-- >>> import Control.Exception(catch)
-- >>> import Prelude(undefined, (<>), error)
-- >>> import Data.Text.IO(putStrLn)
-- >>> import Data.Text(pack)
-- >>> let consumer = undefined

-- | Parent class for all consumer-related exceptions. This makes catchall possible, as in...
--
-- @
-- setStreamPersistent consumer (StreamName "default")
--   `catch` (\e -> error ("Caught " <> (show (e :: SomeConsumerException))))
-- @
data SomeConsumerException
  = forall e. (Exception e) => SomeConsumerException e

instance Show SomeConsumerException where
  show :: SomeConsumerException -> String
show (SomeConsumerException e
e) = e -> String
forall a. Show a => a -> String
show e
e

instance Exception SomeConsumerException

consumerExceptionToException :: (Exception e) => e -> SomeException
consumerExceptionToException :: forall e. Exception e => e -> SomeException
consumerExceptionToException = SomeConsumerException -> SomeException
forall e. Exception e => e -> SomeException
toException (SomeConsumerException -> SomeException)
-> (e -> SomeConsumerException) -> e -> SomeException
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> SomeConsumerException
forall e. Exception e => e -> SomeConsumerException
SomeConsumerException

consumerExceptionFromException :: (Exception e) => SomeException -> Maybe e
consumerExceptionFromException :: forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException SomeException
x = do
  SomeConsumerException e
a <- SomeException -> Maybe SomeConsumerException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
x
  e -> Maybe e
forall a b. (Typeable a, Typeable b) => a -> Maybe b
cast e
a

newtype NoData = NoData Text deriving (Int -> NoData -> ShowS
[NoData] -> ShowS
NoData -> String
(Int -> NoData -> ShowS)
-> (NoData -> String) -> ([NoData] -> ShowS) -> Show NoData
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> NoData -> ShowS
showsPrec :: Int -> NoData -> ShowS
$cshow :: NoData -> String
show :: NoData -> String
$cshowList :: [NoData] -> ShowS
showList :: [NoData] -> ShowS
Show)

instance Exception NoData where
  toException :: NoData -> SomeException
toException = NoData -> SomeException
forall e. Exception e => e -> SomeException
consumerExceptionToException
  fromException :: SomeException -> Maybe NoData
fromException = SomeException -> Maybe NoData
forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException

newtype EndOfStream = EndOfStream Text deriving (Int -> EndOfStream -> ShowS
[EndOfStream] -> ShowS
EndOfStream -> String
(Int -> EndOfStream -> ShowS)
-> (EndOfStream -> String)
-> ([EndOfStream] -> ShowS)
-> Show EndOfStream
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> EndOfStream -> ShowS
showsPrec :: Int -> EndOfStream -> ShowS
$cshow :: EndOfStream -> String
show :: EndOfStream -> String
$cshowList :: [EndOfStream] -> ShowS
showList :: [EndOfStream] -> ShowS
Show)

instance Exception EndOfStream where
  toException :: EndOfStream -> SomeException
toException = EndOfStream -> SomeException
forall e. Exception e => e -> SomeException
consumerExceptionToException
  fromException :: SomeException -> Maybe EndOfStream
fromException = SomeException -> Maybe EndOfStream
forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException

newtype StreamFinished = StreamFinished Text deriving (Int -> StreamFinished -> ShowS
[StreamFinished] -> ShowS
StreamFinished -> String
(Int -> StreamFinished -> ShowS)
-> (StreamFinished -> String)
-> ([StreamFinished] -> ShowS)
-> Show StreamFinished
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> StreamFinished -> ShowS
showsPrec :: Int -> StreamFinished -> ShowS
$cshow :: StreamFinished -> String
show :: StreamFinished -> String
$cshowList :: [StreamFinished] -> ShowS
showList :: [StreamFinished] -> ShowS
Show)

instance Exception StreamFinished where
  toException :: StreamFinished -> SomeException
toException = StreamFinished -> SomeException
forall e. Exception e => e -> SomeException
consumerExceptionToException
  fromException :: SomeException -> Maybe StreamFinished
fromException = SomeException -> Maybe StreamFinished
forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException

newtype UnavailableService = UnavailableService Text deriving (Int -> UnavailableService -> ShowS
[UnavailableService] -> ShowS
UnavailableService -> String
(Int -> UnavailableService -> ShowS)
-> (UnavailableService -> String)
-> ([UnavailableService] -> ShowS)
-> Show UnavailableService
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UnavailableService -> ShowS
showsPrec :: Int -> UnavailableService -> ShowS
$cshow :: UnavailableService -> String
show :: UnavailableService -> String
$cshowList :: [UnavailableService] -> ShowS
showList :: [UnavailableService] -> ShowS
Show)

instance Exception UnavailableService where
  toException :: UnavailableService -> SomeException
toException = UnavailableService -> SomeException
forall e. Exception e => e -> SomeException
consumerExceptionToException
  fromException :: SomeException -> Maybe UnavailableService
fromException = SomeException -> Maybe UnavailableService
forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException

newtype InterruptedTransaction = InterruptedTransaction Text deriving (Int -> InterruptedTransaction -> ShowS
[InterruptedTransaction] -> ShowS
InterruptedTransaction -> String
(Int -> InterruptedTransaction -> ShowS)
-> (InterruptedTransaction -> String)
-> ([InterruptedTransaction] -> ShowS)
-> Show InterruptedTransaction
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> InterruptedTransaction -> ShowS
showsPrec :: Int -> InterruptedTransaction -> ShowS
$cshow :: InterruptedTransaction -> String
show :: InterruptedTransaction -> String
$cshowList :: [InterruptedTransaction] -> ShowS
showList :: [InterruptedTransaction] -> ShowS
Show)

instance Exception InterruptedTransaction where
  toException :: InterruptedTransaction -> SomeException
toException = InterruptedTransaction -> SomeException
forall e. Exception e => e -> SomeException
consumerExceptionToException
  fromException :: SomeException -> Maybe InterruptedTransaction
fromException = SomeException -> Maybe InterruptedTransaction
forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException

newtype LocalIOError = LocalIOError Text deriving (Int -> LocalIOError -> ShowS
[LocalIOError] -> ShowS
LocalIOError -> String
(Int -> LocalIOError -> ShowS)
-> (LocalIOError -> String)
-> ([LocalIOError] -> ShowS)
-> Show LocalIOError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> LocalIOError -> ShowS
showsPrec :: Int -> LocalIOError -> ShowS
$cshow :: LocalIOError -> String
show :: LocalIOError -> String
$cshowList :: [LocalIOError] -> ShowS
showList :: [LocalIOError] -> ShowS
Show)

instance Exception LocalIOError where
  toException :: LocalIOError -> SomeException
toException = LocalIOError -> SomeException
forall e. Exception e => e -> SomeException
consumerExceptionToException
  fromException :: SomeException -> Maybe LocalIOError
fromException = SomeException -> Maybe LocalIOError
forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException

newtype WrongInput = WrongInput Text deriving (Int -> WrongInput -> ShowS
[WrongInput] -> ShowS
WrongInput -> String
(Int -> WrongInput -> ShowS)
-> (WrongInput -> String)
-> ([WrongInput] -> ShowS)
-> Show WrongInput
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> WrongInput -> ShowS
showsPrec :: Int -> WrongInput -> ShowS
$cshow :: WrongInput -> String
show :: WrongInput -> String
$cshowList :: [WrongInput] -> ShowS
showList :: [WrongInput] -> ShowS
Show)

instance Exception WrongInput where
  toException :: WrongInput -> SomeException
toException = WrongInput -> SomeException
forall e. Exception e => e -> SomeException
consumerExceptionToException
  fromException :: SomeException -> Maybe WrongInput
fromException = SomeException -> Maybe WrongInput
forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException

newtype PartialData = PartialData Text deriving (Int -> PartialData -> ShowS
[PartialData] -> ShowS
PartialData -> String
(Int -> PartialData -> ShowS)
-> (PartialData -> String)
-> ([PartialData] -> ShowS)
-> Show PartialData
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> PartialData -> ShowS
showsPrec :: Int -> PartialData -> ShowS
$cshow :: PartialData -> String
show :: PartialData -> String
$cshowList :: [PartialData] -> ShowS
showList :: [PartialData] -> ShowS
Show)

instance Exception PartialData where
  toException :: PartialData -> SomeException
toException = PartialData -> SomeException
forall e. Exception e => e -> SomeException
consumerExceptionToException
  fromException :: SomeException -> Maybe PartialData
fromException = SomeException -> Maybe PartialData
forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException

newtype UnsupportedClient = UnsupportedClient Text deriving (Int -> UnsupportedClient -> ShowS
[UnsupportedClient] -> ShowS
UnsupportedClient -> String
(Int -> UnsupportedClient -> ShowS)
-> (UnsupportedClient -> String)
-> ([UnsupportedClient] -> ShowS)
-> Show UnsupportedClient
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UnsupportedClient -> ShowS
showsPrec :: Int -> UnsupportedClient -> ShowS
$cshow :: UnsupportedClient -> String
show :: UnsupportedClient -> String
$cshowList :: [UnsupportedClient] -> ShowS
showList :: [UnsupportedClient] -> ShowS
Show)

instance Exception UnsupportedClient where
  toException :: UnsupportedClient -> SomeException
toException = UnsupportedClient -> SomeException
forall e. Exception e => e -> SomeException
consumerExceptionToException
  fromException :: SomeException -> Maybe UnsupportedClient
fromException = SomeException -> Maybe UnsupportedClient
forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException

newtype DataNotInCache = DataNotInCache Text deriving (Int -> DataNotInCache -> ShowS
[DataNotInCache] -> ShowS
DataNotInCache -> String
(Int -> DataNotInCache -> ShowS)
-> (DataNotInCache -> String)
-> ([DataNotInCache] -> ShowS)
-> Show DataNotInCache
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> DataNotInCache -> ShowS
showsPrec :: Int -> DataNotInCache -> ShowS
$cshow :: DataNotInCache -> String
show :: DataNotInCache -> String
$cshowList :: [DataNotInCache] -> ShowS
showList :: [DataNotInCache] -> ShowS
Show)

instance Exception DataNotInCache where
  toException :: DataNotInCache -> SomeException
toException = DataNotInCache -> SomeException
forall e. Exception e => e -> SomeException
consumerExceptionToException
  fromException :: SomeException -> Maybe DataNotInCache
fromException = SomeException -> Maybe DataNotInCache
forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException

newtype UnknownError = UnknownError Text deriving (Int -> UnknownError -> ShowS
[UnknownError] -> ShowS
UnknownError -> String
(Int -> UnknownError -> ShowS)
-> (UnknownError -> String)
-> ([UnknownError] -> ShowS)
-> Show UnknownError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UnknownError -> ShowS
showsPrec :: Int -> UnknownError -> ShowS
$cshow :: UnknownError -> String
show :: UnknownError -> String
$cshowList :: [UnknownError] -> ShowS
showList :: [UnknownError] -> ShowS
Show)

instance Exception UnknownError where
  toException :: UnknownError -> SomeException
toException = UnknownError -> SomeException
forall e. Exception e => e -> SomeException
consumerExceptionToException
  fromException :: SomeException -> Maybe UnknownError
fromException = SomeException -> Maybe UnknownError
forall e. Exception e => SomeException -> Maybe e
consumerExceptionFromException

errorTypeToException :: ErrorType -> Text -> a
errorTypeToException :: forall a. ErrorType -> Text -> a
errorTypeToException ErrorType
ErrorNoData = NoData -> a
forall a e. Exception e => e -> a
throw (NoData -> a) -> (Text -> NoData) -> Text -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> NoData
NoData
errorTypeToException ErrorType
ErrorEndOfStream = EndOfStream -> a
forall a e. Exception e => e -> a
throw (EndOfStream -> a) -> (Text -> EndOfStream) -> Text -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> EndOfStream
EndOfStream
errorTypeToException ErrorType
ErrorStreamFinished = StreamFinished -> a
forall a e. Exception e => e -> a
throw (StreamFinished -> a) -> (Text -> StreamFinished) -> Text -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> StreamFinished
StreamFinished
errorTypeToException ErrorType
ErrorUnavailableService = UnavailableService -> a
forall a e. Exception e => e -> a
throw (UnavailableService -> a)
-> (Text -> UnavailableService) -> Text -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> UnavailableService
UnavailableService
errorTypeToException ErrorType
ErrorInterruptedTransaction = InterruptedTransaction -> a
forall a e. Exception e => e -> a
throw (InterruptedTransaction -> a)
-> (Text -> InterruptedTransaction) -> Text -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> InterruptedTransaction
InterruptedTransaction
errorTypeToException ErrorType
ErrorLocalIOError = LocalIOError -> a
forall a e. Exception e => e -> a
throw (LocalIOError -> a) -> (Text -> LocalIOError) -> Text -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> LocalIOError
LocalIOError
errorTypeToException ErrorType
ErrorWrongInput = WrongInput -> a
forall a e. Exception e => e -> a
throw (WrongInput -> a) -> (Text -> WrongInput) -> Text -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> WrongInput
WrongInput
errorTypeToException ErrorType
ErrorPartialData = PartialData -> a
forall a e. Exception e => e -> a
throw (PartialData -> a) -> (Text -> PartialData) -> Text -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> PartialData
PartialData
errorTypeToException ErrorType
ErrorUnsupportedClient = UnsupportedClient -> a
forall a e. Exception e => e -> a
throw (UnsupportedClient -> a)
-> (Text -> UnsupportedClient) -> Text -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> UnsupportedClient
UnsupportedClient
errorTypeToException ErrorType
ErrorDataNotInCache = DataNotInCache -> a
forall a e. Exception e => e -> a
throw (DataNotInCache -> a) -> (Text -> DataNotInCache) -> Text -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> DataNotInCache
DataNotInCache
errorTypeToException ErrorType
ErrorUnknownError = UnknownError -> a
forall a e. Exception e => e -> a
throw (UnknownError -> a) -> (Text -> UnknownError) -> Text -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> UnknownError
UnknownError

-- | Create a consumer and do something with it. This is the main entrypoint into the consumer
withConsumer :: forall a. ServerName -> SourcePath -> FilesystemFlag -> SourceCredentials -> (Consumer -> IO a) -> IO a
withConsumer :: forall a.
ServerName
-> SourcePath
-> FilesystemFlag
-> SourceCredentials
-> (Consumer -> IO a)
-> IO a
withConsumer ServerName
serverName SourcePath
sourcePath FilesystemFlag
filesystemFlag SourceCredentials
creds Consumer -> IO a
onSuccess =
  let onError :: Error -> a
onError (Error Text
errorMessage ErrorType
errorType) = ErrorType -> Text -> a
forall a. ErrorType -> Text -> a
errorTypeToException ErrorType
errorType Text
errorMessage
   in ServerName
-> SourcePath
-> FilesystemFlag
-> SourceCredentials
-> (Error -> IO a)
-> (Consumer -> IO a)
-> IO a
forall a.
ServerName
-> SourcePath
-> FilesystemFlag
-> SourceCredentials
-> (Error -> IO a)
-> (Consumer -> IO a)
-> IO a
PC.withConsumer ServerName
serverName SourcePath
sourcePath FilesystemFlag
filesystemFlag SourceCredentials
creds Error -> IO a
forall {a}. Error -> a
onError Consumer -> IO a
onSuccess

-- | Allocate a group ID and call a callback
withGroupId :: forall a. Consumer -> (GroupId -> IO a) -> IO a
withGroupId :: forall a. Consumer -> (GroupId -> IO a) -> IO a
withGroupId Consumer
consumer GroupId -> IO a
onSuccess =
  let onError :: Error -> a
onError (Error Text
errorMessage ErrorType
errorType) = ErrorType -> Text -> a
forall a. ErrorType -> Text -> a
errorTypeToException ErrorType
errorType Text
errorMessage
   in Consumer -> (Error -> IO a) -> (GroupId -> IO a) -> IO a
forall a. Consumer -> (Error -> IO a) -> (GroupId -> IO a) -> IO a
PC.withGroupId Consumer
consumer Error -> IO a
forall {a}. Error -> a
onError GroupId -> IO a
onSuccess

maybeThrow :: (Monad m) => m (Either Error b) -> m b
maybeThrow :: forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow m (Either Error b)
f = do
  Either Error b
result <- m (Either Error b)
f
  case Either Error b
result of
    Left (Error Text
errorMessage ErrorType
errorType) -> ErrorType -> Text -> m b
forall a. ErrorType -> Text -> a
errorTypeToException ErrorType
errorType Text
errorMessage
    Right b
v -> b -> m b
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
v

-- | Reset the last read marker for the stream
resetLastReadMarker :: Consumer -> GroupId -> StreamName -> IO Int
resetLastReadMarker :: Consumer -> GroupId -> StreamName -> IO Int
resetLastReadMarker Consumer
consumer GroupId
groupId StreamName
streamName = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> GroupId -> StreamName -> IO (Either Error Int)
PC.resetLastReadMarker Consumer
consumer GroupId
groupId StreamName
streamName)

-- | Set the last read marker for the stream
setLastReadMarker :: Consumer -> GroupId -> StreamName -> MessageId -> IO Int
setLastReadMarker :: Consumer -> GroupId -> StreamName -> MessageId -> IO Int
setLastReadMarker Consumer
consumer GroupId
groupId StreamName
streamName MessageId
value = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> GroupId -> StreamName -> MessageId -> IO (Either Error Int)
PC.setLastReadMarker Consumer
consumer GroupId
groupId StreamName
streamName MessageId
value)

-- | Acknowledge a specific message
acknowledge :: Consumer -> GroupId -> StreamName -> MessageId -> IO Int
acknowledge :: Consumer -> GroupId -> StreamName -> MessageId -> IO Int
acknowledge Consumer
consumer GroupId
groupId StreamName
streamName MessageId
messageId = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> GroupId -> StreamName -> MessageId -> IO (Either Error Int)
PC.acknowledge Consumer
consumer GroupId
groupId StreamName
streamName MessageId
messageId)

-- | Negatively acknowledge a specific message
negativeAcknowledge ::
  Consumer ->
  GroupId ->
  StreamName ->
  MessageId ->
  -- | delay
  NominalDiffTime ->
  IO Int
negativeAcknowledge :: Consumer
-> GroupId -> StreamName -> MessageId -> NominalDiffTime -> IO Int
negativeAcknowledge Consumer
consumer GroupId
groupId StreamName
streamName MessageId
messageId NominalDiffTime
delay = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> GroupId
-> StreamName
-> MessageId
-> NominalDiffTime
-> IO (Either Error Int)
PC.negativeAcknowledge Consumer
consumer GroupId
groupId StreamName
streamName MessageId
messageId NominalDiffTime
delay)

-- | Get a list of all unacknowledged message IDs in a range
getUnacknowledgedMessages :: Consumer -> GroupId -> StreamName -> (MessageId, MessageId) -> IO [MessageId]
getUnacknowledgedMessages :: Consumer
-> GroupId
-> StreamName
-> (MessageId, MessageId)
-> IO [MessageId]
getUnacknowledgedMessages Consumer
consumer GroupId
groupId StreamName
streamName (MessageId
from, MessageId
to) = IO (Either Error [MessageId]) -> IO [MessageId]
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> GroupId
-> StreamName
-> (MessageId, MessageId)
-> IO (Either Error [MessageId])
PC.getUnacknowledgedMessages Consumer
consumer GroupId
groupId StreamName
streamName (MessageId
from, MessageId
to))

-- | Retrieve the list of streams with metadata
getStreamList :: Consumer -> Maybe StreamName -> StreamFilter -> IO [StreamInfo]
getStreamList :: Consumer -> Maybe StreamName -> StreamFilter -> IO [StreamInfo]
getStreamList Consumer
consumer Maybe StreamName
streamName StreamFilter
filter = IO (Either Error [StreamInfo]) -> IO [StreamInfo]
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> Maybe StreamName
-> StreamFilter
-> IO (Either Error [StreamInfo])
PC.getStreamList Consumer
consumer Maybe StreamName
streamName StreamFilter
filter)

-- | Delete a given stream
deleteStream :: Consumer -> StreamName -> DeleteFlag -> ErrorOnNotExistFlag -> IO Int
deleteStream :: Consumer
-> StreamName -> DeleteFlag -> ErrorOnNotExistFlag -> IO Int
deleteStream Consumer
consumer StreamName
streamName DeleteFlag
deleteFlag ErrorOnNotExistFlag
errorOnNotExistFlag = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> StreamName
-> DeleteFlag
-> ErrorOnNotExistFlag
-> IO (Either Error Int)
PC.deleteStream Consumer
consumer StreamName
streamName DeleteFlag
deleteFlag ErrorOnNotExistFlag
errorOnNotExistFlag)

-- | Set a stream persistent
setStreamPersistent :: Consumer -> StreamName -> IO Int
setStreamPersistent :: Consumer -> StreamName -> IO Int
setStreamPersistent Consumer
consumer StreamName
streamName = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> StreamName -> IO (Either Error Int)
PC.setStreamPersistent Consumer
consumer StreamName
streamName)

-- | Get the current size (number of messages) of the stream
getCurrentSize :: Consumer -> StreamName -> IO Int
getCurrentSize :: Consumer -> StreamName -> IO Int
getCurrentSize Consumer
consumer StreamName
streamName = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> StreamName -> IO (Either Error Int)
PC.getCurrentSize Consumer
consumer StreamName
streamName)

-- | Get number of datasets in stream
getCurrentDatasetCount :: Consumer -> StreamName -> IncludeIncompleteFlag -> IO Int
getCurrentDatasetCount :: Consumer -> StreamName -> IncludeIncompleteFlag -> IO Int
getCurrentDatasetCount Consumer
consumer StreamName
streamName IncludeIncompleteFlag
inludeIncomplete = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> StreamName -> IncludeIncompleteFlag -> IO (Either Error Int)
PC.getCurrentDatasetCount Consumer
consumer StreamName
streamName IncludeIncompleteFlag
inludeIncomplete)

-- | Get beamtime metadata (which can be not set, in which case @Nothing@ is returned)
getBeamtimeMeta :: Consumer -> IO (Maybe Text)
getBeamtimeMeta :: Consumer -> IO (Maybe Text)
getBeamtimeMeta Consumer
consumer = IO (Either Error (Maybe Text)) -> IO (Maybe Text)
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> IO (Either Error (Maybe Text))
PC.getBeamtimeMeta Consumer
consumer)

-- | Get the next dataset for a stream
getNextDataset ::
  Consumer ->
  GroupId ->
  -- | minimum size
  Word64 ->
  StreamName ->
  IO Dataset
getNextDataset :: Consumer -> GroupId -> Word64 -> StreamName -> IO Dataset
getNextDataset Consumer
consumer GroupId
groupId Word64
minSize StreamName
streamName = IO (Either Error Dataset) -> IO Dataset
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> GroupId -> Word64 -> StreamName -> IO (Either Error Dataset)
PC.getNextDataset Consumer
consumer GroupId
groupId Word64
minSize StreamName
streamName)

-- | Get the last dataset in the stream
getLastDataset ::
  Consumer ->
  -- | minimum size
  Word64 ->
  StreamName ->
  IO Dataset
getLastDataset :: Consumer -> Word64 -> StreamName -> IO Dataset
getLastDataset Consumer
consumer Word64
minSize StreamName
streamName = IO (Either Error Dataset) -> IO Dataset
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> Word64 -> StreamName -> IO (Either Error Dataset)
PC.getLastDataset Consumer
consumer Word64
minSize StreamName
streamName)

-- | Get the last data ste in the given group
getLastDatasetInGroup ::
  Consumer ->
  GroupId ->
  -- | minimum size
  Word64 ->
  StreamName ->
  IO Dataset
getLastDatasetInGroup :: Consumer -> GroupId -> Word64 -> StreamName -> IO Dataset
getLastDatasetInGroup Consumer
consumer GroupId
groupId Word64
minSize StreamName
streamName = IO (Either Error Dataset) -> IO Dataset
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> GroupId -> Word64 -> StreamName -> IO (Either Error Dataset)
PC.getLastDatasetInGroup Consumer
consumer GroupId
groupId Word64
minSize StreamName
streamName)

-- | Given a message ID, retrieve both metadata and data
getMessageMetaAndDataById :: Consumer -> StreamName -> MessageId -> IO (MessageMeta, BS.ByteString)
getMessageMetaAndDataById :: Consumer -> StreamName -> MessageId -> IO (MessageMeta, ByteString)
getMessageMetaAndDataById Consumer
consumer StreamName
streamName MessageId
messageId = IO (Either Error (MessageMeta, ByteString))
-> IO (MessageMeta, ByteString)
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> StreamName
-> MessageId
-> IO (Either Error (MessageMeta, ByteString))
PC.getMessageMetaAndDataById Consumer
consumer StreamName
streamName MessageId
messageId)

-- | Given a message ID, retrieve only the metadata (you can get the data later with 'retrieveDataFromMessageMeta')
getMessageMetaById :: Consumer -> StreamName -> MessageId -> IO MessageMeta
getMessageMetaById :: Consumer -> StreamName -> MessageId -> IO MessageMeta
getMessageMetaById Consumer
consumer StreamName
streamName MessageId
messageId = IO (Either Error MessageMeta) -> IO MessageMeta
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> StreamName -> MessageId -> IO (Either Error MessageMeta)
PC.getMessageMetaById Consumer
consumer StreamName
streamName MessageId
messageId)

-- | Given a message ID, retrieve only the data
getMessageDataById :: Consumer -> StreamName -> MessageId -> IO BS.ByteString
getMessageDataById :: Consumer -> StreamName -> MessageId -> IO ByteString
getMessageDataById Consumer
consumer StreamName
streamName MessageId
messageId = IO (Either Error ByteString) -> IO ByteString
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> StreamName -> MessageId -> IO (Either Error ByteString)
PC.getMessageDataById Consumer
consumer StreamName
streamName MessageId
messageId)

-- | Retrieve the last message in the stream, with data and metadata
getLastMessageMetaAndData :: Consumer -> StreamName -> IO (MessageMeta, BS.ByteString)
getLastMessageMetaAndData :: Consumer -> StreamName -> IO (MessageMeta, ByteString)
getLastMessageMetaAndData Consumer
consumer StreamName
streamName = IO (Either Error (MessageMeta, ByteString))
-> IO (MessageMeta, ByteString)
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> StreamName -> IO (Either Error (MessageMeta, ByteString))
PC.getLastMessageMetaAndData Consumer
consumer StreamName
streamName)

-- | Retrieve the last message in the stream, only metadata (you can get the data later with 'retrieveDataFromMessageMeta')
getLastMessageMeta :: Consumer -> StreamName -> IO MessageMeta
getLastMessageMeta :: Consumer -> StreamName -> IO MessageMeta
getLastMessageMeta Consumer
consumer StreamName
streamName = IO (Either Error MessageMeta) -> IO MessageMeta
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> StreamName -> IO (Either Error MessageMeta)
PC.getLastMessageMeta Consumer
consumer StreamName
streamName)

-- | Retrieve the last message in the stream, only data
getLastMessageData :: Consumer -> StreamName -> IO BS.ByteString
getLastMessageData :: Consumer -> StreamName -> IO ByteString
getLastMessageData Consumer
consumer StreamName
streamName = IO (Either Error ByteString) -> IO ByteString
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> StreamName -> IO (Either Error ByteString)
PC.getLastMessageData Consumer
consumer StreamName
streamName)

-- | Retrieve the last message in a given stream and group, with data and metadata
getLastInGroupMessageMetaAndData :: Consumer -> StreamName -> GroupId -> IO (MessageMeta, BS.ByteString)
getLastInGroupMessageMetaAndData :: Consumer -> StreamName -> GroupId -> IO (MessageMeta, ByteString)
getLastInGroupMessageMetaAndData Consumer
consumer StreamName
streamName GroupId
groupId = IO (Either Error (MessageMeta, ByteString))
-> IO (MessageMeta, ByteString)
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> StreamName
-> GroupId
-> IO (Either Error (MessageMeta, ByteString))
PC.getLastInGroupMessageMetaAndData Consumer
consumer StreamName
streamName GroupId
groupId)

-- | Retrieve the last message in a given stream and group, only metadata (you can get the data later with 'retrieveDataFromMessageMeta')
getLastInGroupMessageMeta :: Consumer -> StreamName -> GroupId -> IO MessageMeta
getLastInGroupMessageMeta :: Consumer -> StreamName -> GroupId -> IO MessageMeta
getLastInGroupMessageMeta Consumer
consumer StreamName
streamName GroupId
groupId = IO (Either Error MessageMeta) -> IO MessageMeta
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> StreamName -> GroupId -> IO (Either Error MessageMeta)
PC.getLastInGroupMessageMeta Consumer
consumer StreamName
streamName GroupId
groupId)

-- | Retrieve the last message in a given stream and group, only data
getLastInGroupMessageData :: Consumer -> StreamName -> GroupId -> IO BS.ByteString
getLastInGroupMessageData :: Consumer -> StreamName -> GroupId -> IO ByteString
getLastInGroupMessageData Consumer
consumer StreamName
streamName GroupId
groupId = IO (Either Error ByteString) -> IO ByteString
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> StreamName -> GroupId -> IO (Either Error ByteString)
PC.getLastInGroupMessageData Consumer
consumer StreamName
streamName GroupId
groupId)

-- | Retrieve the next message in the stream and group, with data and metadata
getNextMessageMetaAndData :: Consumer -> StreamName -> GroupId -> IO (MessageMeta, BS.ByteString)
getNextMessageMetaAndData :: Consumer -> StreamName -> GroupId -> IO (MessageMeta, ByteString)
getNextMessageMetaAndData Consumer
consumer StreamName
streamName GroupId
groupId = IO (Either Error (MessageMeta, ByteString))
-> IO (MessageMeta, ByteString)
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer
-> StreamName
-> GroupId
-> IO (Either Error (MessageMeta, ByteString))
PC.getNextMessageMetaAndData Consumer
consumer StreamName
streamName GroupId
groupId)

-- | Retrieve the next message in the stream and group, only metadata (you can get the data later with 'retrieveDataFromMessageMeta')
getNextMessageMeta :: Consumer -> StreamName -> GroupId -> IO MessageMeta
getNextMessageMeta :: Consumer -> StreamName -> GroupId -> IO MessageMeta
getNextMessageMeta Consumer
consumer StreamName
streamName GroupId
groupId = IO (Either Error MessageMeta) -> IO MessageMeta
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> StreamName -> GroupId -> IO (Either Error MessageMeta)
PC.getNextMessageMeta Consumer
consumer StreamName
streamName GroupId
groupId)

-- | Retrieve the next message in the stream and group, only data
getNextMessageData :: Consumer -> StreamName -> GroupId -> IO BS.ByteString
getNextMessageData :: Consumer -> StreamName -> GroupId -> IO ByteString
getNextMessageData Consumer
consumer StreamName
streamName GroupId
groupId = IO (Either Error ByteString) -> IO ByteString
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> StreamName -> GroupId -> IO (Either Error ByteString)
PC.getNextMessageData Consumer
consumer StreamName
streamName GroupId
groupId)

-- | Query messages, return handles without data
queryMessages ::
  Consumer ->
  -- | Actual query string, see the docs for syntax
  Text ->
  StreamName ->
  IO [MessageMeta]
queryMessages :: Consumer -> Text -> StreamName -> IO [MessageMeta]
queryMessages Consumer
consumer Text
query StreamName
streamName = IO (Either Error [MessageMeta]) -> IO [MessageMeta]
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> Text -> StreamName -> IO (Either Error [MessageMeta])
PC.queryMessages Consumer
consumer Text
query StreamName
streamName)

-- | Retrieve actual data for the handle
retrieveDataForMessageMeta :: Consumer -> MessageMeta -> IO BS.ByteString
retrieveDataForMessageMeta :: Consumer -> MessageMeta -> IO ByteString
retrieveDataForMessageMeta Consumer
consumer MessageMeta
meta = IO (Either Error ByteString) -> IO ByteString
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Consumer -> MessageMeta -> IO (Either Error ByteString)
PC.retrieveDataForMessageMeta Consumer
consumer MessageMeta
meta)