Safe Haskell | Safe-Inferred |
---|---|
Language | Haskell98 |
To implement an ASAP:O consumer, you should only need this interface. It exposes no memory-management functions (like free) or pointers, and is thus safe to use.
Simple Example
Here's some code for a simple consumer that connects, outputs the available streams, and then gets a specific message by ID with metadata and data:
>>>
:seti -XOverloadedStrings
>>>
:{
module Main where import Asapo.Consumer import Prelude hiding (putStrLn) import Data.Maybe(fromMaybe) import Control.Monad(forM_) import Data.Text(pack) import Data.Text.IO(putStrLn) import Data.Text.Encoding(decodeUtf8) import qualified Data.ByteString as BS main :: IO () main = withConsumer (ServerName "localhost:8040") (SourcePath "") WithoutFilesystem ( SourceCredentials { sourceType = RawSource, instanceId = InstanceId "auto", pipelineStep = PipelineStep "ps1", beamtime = Beamtime "asapo_test", beamline = Beamline "", dataSource = DataSource "asapo_source", token = Token "token-please-change" } ) $ \consumer -> do beamtimeMeta <- getBeamtimeMeta consumer putStrLn $ "beamtime metadata: " <> (fromMaybe "N/A" beamtimeMeta) streams <- getStreamList consumer Nothing FilterAllStreams forM_ streams $ \stream -> do putStrLn $ "=> stream info " <> pack (show stream) streamSize <- getCurrentSize consumer (streamInfoName stream) putStrLn $ " stream size: " <> pack (show streamSize) datasetCount <- getCurrentDatasetCount consumer (streamInfoName stream) IncludeIncomplete putStrLn $ " dataset count: " <> pack (show datasetCount) metaAndData <- getMessageMetaAndDataById consumer (StreamName "default") (messageIdFromInt 1337) putStrLn $ "meta: " <> pack (show (fst metaAndData)) putStrLn $ "data: " <> decodeUtf8 (snd metaAndData) :}
Synopsis
- data SomeConsumerException
- data NoData
- data EndOfStream
- data StreamFinished
- data UnavailableService
- data InterruptedTransaction
- data LocalIOError
- data WrongInput
- data PartialData
- data UnsupportedClient
- data DataNotInCache
- data UnknownError
- data Consumer
- data Dataset = Dataset {}
- data MessageMetaHandle
- data DeleteFlag
- data ErrorOnNotExistFlag
- data ErrorType = ErrorNoData
- data FilesystemFlag
- newtype PipelineStep = PipelineStep Text
- newtype Beamtime = Beamtime Text
- newtype DataSource = DataSource Text
- newtype Beamline = Beamline Text
- data StreamInfo = StreamInfo {}
- data GroupId
- data IncludeIncompleteFlag
- data MessageMeta = MessageMeta {}
- newtype ServerName = ServerName Text
- newtype SourcePath = SourcePath Text
- data SourceType
- newtype StreamName = StreamName Text
- newtype InstanceId = InstanceId Text
- newtype Token = Token Text
- data StreamFilter
- data SourceCredentials = SourceCredentials {}
- data NetworkConnectionType
- withConsumer :: forall a. ServerName -> SourcePath -> FilesystemFlag -> SourceCredentials -> (Consumer -> IO a) -> IO a
- withGroupId :: forall a. Consumer -> (GroupId -> IO a) -> IO a
- getCurrentSize :: Consumer -> StreamName -> IO Int
- getCurrentDatasetCount :: Consumer -> StreamName -> IncludeIncompleteFlag -> IO Int
- getBeamtimeMeta :: Consumer -> IO (Maybe Text)
- getNextDataset :: Consumer -> GroupId -> Word64 -> StreamName -> IO Dataset
- getLastDataset :: Consumer -> Word64 -> StreamName -> IO Dataset
- getLastDatasetInGroup :: Consumer -> GroupId -> Word64 -> StreamName -> IO Dataset
- getMessageMetaAndDataById :: Consumer -> StreamName -> MessageId -> IO (MessageMeta, ByteString)
- getMessageMetaById :: Consumer -> StreamName -> MessageId -> IO MessageMeta
- getMessageDataById :: Consumer -> StreamName -> MessageId -> IO ByteString
- getNextMessageMetaAndData :: Consumer -> StreamName -> GroupId -> IO (MessageMeta, ByteString)
- getNextMessageMeta :: Consumer -> StreamName -> GroupId -> IO MessageMeta
- getNextMessageData :: Consumer -> StreamName -> GroupId -> IO ByteString
- getLastMessageMetaAndData :: Consumer -> StreamName -> IO (MessageMeta, ByteString)
- getLastMessageMeta :: Consumer -> StreamName -> IO MessageMeta
- getLastMessageData :: Consumer -> StreamName -> IO ByteString
- getLastInGroupMessageMetaAndData :: Consumer -> StreamName -> GroupId -> IO (MessageMeta, ByteString)
- getLastInGroupMessageMeta :: Consumer -> StreamName -> GroupId -> IO MessageMeta
- getLastInGroupMessageData :: Consumer -> StreamName -> GroupId -> IO ByteString
- getUnacknowledgedMessages :: Consumer -> GroupId -> StreamName -> (MessageId, MessageId) -> IO [MessageId]
- getCurrentConnectionType :: Consumer -> IO NetworkConnectionType
- getStreamList :: Consumer -> Maybe StreamName -> StreamFilter -> IO [StreamInfo]
- messageIdFromInt :: Integral a => a -> MessageId
- queryMessages :: Consumer -> Text -> StreamName -> IO [MessageMeta]
- retrieveDataForMessageMeta :: Consumer -> MessageMeta -> IO ByteString
- resetLastReadMarker :: Consumer -> GroupId -> StreamName -> IO Int
- setTimeout :: Consumer -> NominalDiffTime -> IO ()
- setLastReadMarker :: Consumer -> GroupId -> StreamName -> MessageId -> IO Int
- setStreamPersistent :: Consumer -> StreamName -> IO Int
- acknowledge :: Consumer -> GroupId -> StreamName -> MessageId -> IO Int
- negativeAcknowledge :: Consumer -> GroupId -> StreamName -> MessageId -> NominalDiffTime -> IO Int
- deleteStream :: Consumer -> StreamName -> DeleteFlag -> ErrorOnNotExistFlag -> IO Int
- resendNacs :: Consumer -> Bool -> NominalDiffTime -> Word64 -> IO ()
Error types
data SomeConsumerException Source #
Parent class for all consumer-related exceptions. This makes catchall possible, as in...
setStreamPersistent consumer (StreamName "default")
catch
(e -> error ("Caught " <> (show (e :: SomeConsumerException))))
Instances
Exception SomeConsumerException Source # | |
Defined in Asapo.Consumer | |
Show SomeConsumerException Source # | |
Defined in Asapo.Consumer showsPrec :: Int -> SomeConsumerException -> ShowS # show :: SomeConsumerException -> String # showList :: [SomeConsumerException] -> ShowS # |
Instances
Exception NoData Source # | |
Defined in Asapo.Consumer toException :: NoData -> SomeException # fromException :: SomeException -> Maybe NoData # displayException :: NoData -> String # | |
Show NoData Source # | |
data EndOfStream Source #
Instances
Exception EndOfStream Source # | |
Defined in Asapo.Consumer | |
Show EndOfStream Source # | |
Defined in Asapo.Consumer showsPrec :: Int -> EndOfStream -> ShowS # show :: EndOfStream -> String # showList :: [EndOfStream] -> ShowS # |
data StreamFinished Source #
Instances
Exception StreamFinished Source # | |
Defined in Asapo.Consumer | |
Show StreamFinished Source # | |
Defined in Asapo.Consumer showsPrec :: Int -> StreamFinished -> ShowS # show :: StreamFinished -> String # showList :: [StreamFinished] -> ShowS # |
data UnavailableService Source #
Instances
data InterruptedTransaction Source #
Instances
Exception InterruptedTransaction Source # | |
Defined in Asapo.Consumer | |
Show InterruptedTransaction Source # | |
Defined in Asapo.Consumer showsPrec :: Int -> InterruptedTransaction -> ShowS # show :: InterruptedTransaction -> String # showList :: [InterruptedTransaction] -> ShowS # |
data LocalIOError Source #
Instances
Exception LocalIOError Source # | |
Defined in Asapo.Consumer | |
Show LocalIOError Source # | |
Defined in Asapo.Consumer showsPrec :: Int -> LocalIOError -> ShowS # show :: LocalIOError -> String # showList :: [LocalIOError] -> ShowS # |
data WrongInput Source #
Instances
Exception WrongInput Source # | |
Defined in Asapo.Consumer toException :: WrongInput -> SomeException # fromException :: SomeException -> Maybe WrongInput # displayException :: WrongInput -> String # | |
Show WrongInput Source # | |
Defined in Asapo.Consumer showsPrec :: Int -> WrongInput -> ShowS # show :: WrongInput -> String # showList :: [WrongInput] -> ShowS # |
data PartialData Source #
Instances
Exception PartialData Source # | |
Defined in Asapo.Consumer | |
Show PartialData Source # | |
Defined in Asapo.Consumer showsPrec :: Int -> PartialData -> ShowS # show :: PartialData -> String # showList :: [PartialData] -> ShowS # |
data UnsupportedClient Source #
Instances
Exception UnsupportedClient Source # | |
Defined in Asapo.Consumer | |
Show UnsupportedClient Source # | |
Defined in Asapo.Consumer showsPrec :: Int -> UnsupportedClient -> ShowS # show :: UnsupportedClient -> String # showList :: [UnsupportedClient] -> ShowS # |
data DataNotInCache Source #
Instances
Exception DataNotInCache Source # | |
Defined in Asapo.Consumer | |
Show DataNotInCache Source # | |
Defined in Asapo.Consumer showsPrec :: Int -> DataNotInCache -> ShowS # show :: DataNotInCache -> String # showList :: [DataNotInCache] -> ShowS # |
data UnknownError Source #
Instances
Exception UnknownError Source # | |
Defined in Asapo.Consumer | |
Show UnknownError Source # | |
Defined in Asapo.Consumer showsPrec :: Int -> UnknownError -> ShowS # show :: UnknownError -> String # showList :: [UnknownError] -> ShowS # |
Types
Metadata for a dataset
data MessageMetaHandle Source #
Metadata handle, can be passed around as a pure value and be used to retrieve actual data for the metadata as a two-step process, using the retrieveDataForMessageMeta
function(s)
Instances
Show MessageMetaHandle Source # | |
Defined in Asapo.Either.Consumer showsPrec :: Int -> MessageMetaHandle -> ShowS # show :: MessageMetaHandle -> String # showList :: [MessageMetaHandle] -> ShowS # |
data DeleteFlag Source #
Anti-boolean-blindness for delete or not delete metadata when deleting a stream
Instances
Eq DeleteFlag Source # | |
Defined in Asapo.Either.Consumer (==) :: DeleteFlag -> DeleteFlag -> Bool # (/=) :: DeleteFlag -> DeleteFlag -> Bool # |
data ErrorOnNotExistFlag Source #
Anti-boolean-blindness for "error on not existing data"
Instances
Eq ErrorOnNotExistFlag Source # | |
Defined in Asapo.Either.Consumer (==) :: ErrorOnNotExistFlag -> ErrorOnNotExistFlag -> Bool # (/=) :: ErrorOnNotExistFlag -> ErrorOnNotExistFlag -> Bool # |
data FilesystemFlag Source #
Whether to use the filesystem or do it in-memory
Instances
Eq FilesystemFlag Source # | |
Defined in Asapo.Either.Consumer (==) :: FilesystemFlag -> FilesystemFlag -> Bool # (/=) :: FilesystemFlag -> FilesystemFlag -> Bool # |
newtype PipelineStep Source #
newtype DataSource Source #
data StreamInfo Source #
Instances
Show StreamInfo Source # | |
Defined in Asapo.Either.Common showsPrec :: Int -> StreamInfo -> ShowS # show :: StreamInfo -> String # showList :: [StreamInfo] -> ShowS # |
data IncludeIncompleteFlag Source #
Anti-boolean-blindness for "include incomplete data sets in list"
Instances
Eq IncludeIncompleteFlag Source # | |
Defined in Asapo.Either.Consumer (==) :: IncludeIncompleteFlag -> IncludeIncompleteFlag -> Bool # (/=) :: IncludeIncompleteFlag -> IncludeIncompleteFlag -> Bool # |
data MessageMeta Source #
Metadata for a single message
Instances
Show MessageMeta Source # | |
Defined in Asapo.Either.Consumer showsPrec :: Int -> MessageMeta -> ShowS # show :: MessageMeta -> String # showList :: [MessageMeta] -> ShowS # |
newtype SourcePath Source #
Wrapper for a source path (dubious to not use FilePath
, but let's see)
newtype StreamName Source #
Instances
Show StreamName Source # | |
Defined in Asapo.Either.Common showsPrec :: Int -> StreamName -> ShowS # show :: StreamName -> String # showList :: [StreamName] -> ShowS # |
newtype InstanceId Source #
data SourceCredentials Source #
data NetworkConnectionType Source #
Network connection type
ConnectionUndefined | not sure about this |
ConnectionTcp | TCP |
ConnectionFabric | not sure about this |
Initialization
withConsumer :: forall a. ServerName -> SourcePath -> FilesystemFlag -> SourceCredentials -> (Consumer -> IO a) -> IO a Source #
Create a consumer and do something with it. This is the main entrypoint into the consumer
withGroupId :: forall a. Consumer -> (GroupId -> IO a) -> IO a Source #
Allocate a group ID and call a callback
Getters
getCurrentSize :: Consumer -> StreamName -> IO Int Source #
Get the current size (number of messages) of the stream
getCurrentDatasetCount :: Consumer -> StreamName -> IncludeIncompleteFlag -> IO Int Source #
Get number of datasets in stream
getBeamtimeMeta :: Consumer -> IO (Maybe Text) Source #
Get beamtime metadata (which can be not set, in which case Nothing
is returned)
Get the next dataset for a stream
:: Consumer | |
-> Word64 | minimum size |
-> StreamName | |
-> IO Dataset |
Get the last dataset in the stream
getLastDatasetInGroup Source #
Get the last data ste in the given group
getMessageMetaAndDataById :: Consumer -> StreamName -> MessageId -> IO (MessageMeta, ByteString) Source #
Given a message ID, retrieve both metadata and data
getMessageMetaById :: Consumer -> StreamName -> MessageId -> IO MessageMeta Source #
Given a message ID, retrieve only the metadata (you can get the data later with retrieveDataFromMessageMeta
)
getMessageDataById :: Consumer -> StreamName -> MessageId -> IO ByteString Source #
Given a message ID, retrieve only the data
getNextMessageMetaAndData :: Consumer -> StreamName -> GroupId -> IO (MessageMeta, ByteString) Source #
Retrieve the next message in the stream and group, with data and metadata
getNextMessageMeta :: Consumer -> StreamName -> GroupId -> IO MessageMeta Source #
Retrieve the next message in the stream and group, only metadata (you can get the data later with retrieveDataFromMessageMeta
)
getNextMessageData :: Consumer -> StreamName -> GroupId -> IO ByteString Source #
Retrieve the next message in the stream and group, only data
getLastMessageMetaAndData :: Consumer -> StreamName -> IO (MessageMeta, ByteString) Source #
Retrieve the last message in the stream, with data and metadata
getLastMessageMeta :: Consumer -> StreamName -> IO MessageMeta Source #
Retrieve the last message in the stream, only metadata (you can get the data later with retrieveDataFromMessageMeta
)
getLastMessageData :: Consumer -> StreamName -> IO ByteString Source #
Retrieve the last message in the stream, only data
getLastInGroupMessageMetaAndData :: Consumer -> StreamName -> GroupId -> IO (MessageMeta, ByteString) Source #
Retrieve the last message in a given stream and group, with data and metadata
getLastInGroupMessageMeta :: Consumer -> StreamName -> GroupId -> IO MessageMeta Source #
Retrieve the last message in a given stream and group, only metadata (you can get the data later with retrieveDataFromMessageMeta
)
getLastInGroupMessageData :: Consumer -> StreamName -> GroupId -> IO ByteString Source #
Retrieve the last message in a given stream and group, only data
getUnacknowledgedMessages :: Consumer -> GroupId -> StreamName -> (MessageId, MessageId) -> IO [MessageId] Source #
Get a list of all unacknowledged message IDs in a range
getCurrentConnectionType :: Consumer -> IO NetworkConnectionType Source #
Retrieve the current consumer connection type
getStreamList :: Consumer -> Maybe StreamName -> StreamFilter -> IO [StreamInfo] Source #
Retrieve the list of streams with metadata
messageIdFromInt :: Integral a => a -> MessageId Source #
:: Consumer | |
-> Text | Actual query string, see the docs for syntax |
-> StreamName | |
-> IO [MessageMeta] |
Query messages, return handles without data
retrieveDataForMessageMeta :: Consumer -> MessageMeta -> IO ByteString Source #
Retrieve actual data for the handle
Modifiers
resetLastReadMarker :: Consumer -> GroupId -> StreamName -> IO Int Source #
Reset the last read marker for the stream
setTimeout :: Consumer -> NominalDiffTime -> IO () Source #
Set the global consumer timeout
setLastReadMarker :: Consumer -> GroupId -> StreamName -> MessageId -> IO Int Source #
Set the last read marker for the stream
setStreamPersistent :: Consumer -> StreamName -> IO Int Source #
Set a stream persistent
acknowledge :: Consumer -> GroupId -> StreamName -> MessageId -> IO Int Source #
Acknowledge a specific message
:: Consumer | |
-> GroupId | |
-> StreamName | |
-> MessageId | |
-> NominalDiffTime | delay |
-> IO Int |
Negatively acknowledge a specific message
deleteStream :: Consumer -> StreamName -> DeleteFlag -> ErrorOnNotExistFlag -> IO Int Source #
Delete a given stream
:: Consumer | |
-> Bool | resend yes/no |
-> NominalDiffTime | delay |
-> Word64 | attempts |
-> IO () |
Reset negative acknowledgements