hs-asapo-0.9.0: Haskell bindings for ASAP:O
Safe HaskellSafe-Inferred
LanguageHaskell98

Asapo.Consumer

Description

To implement an ASAP:O consumer, you should only need this interface. It exposes no memory-management functions (like free) or pointers, and is thus safe to use.

Simple Example

Here's some code for a simple consumer that connects, outputs the available streams, and then gets a specific message by ID with metadata and data:

>>> :seti -XOverloadedStrings
>>> :{
 module Main where
 import Asapo.Consumer
 import Prelude hiding (putStrLn)
 import Data.Maybe(fromMaybe)
 import Control.Monad(forM_)
 import Data.Text(pack)
 import Data.Text.IO(putStrLn)
 import Data.Text.Encoding(decodeUtf8)
 import qualified Data.ByteString as BS
 main :: IO ()
 main =
   withConsumer
     (ServerName "localhost:8040")
     (SourcePath "")
     WithoutFilesystem
     ( SourceCredentials
       { sourceType = RawSource,
         instanceId = InstanceId "auto",
         pipelineStep = PipelineStep "ps1",
         beamtime = Beamtime "asapo_test",
         beamline = Beamline "",
         dataSource = DataSource "asapo_source",
         token = Token "token-please-change"
       }
     ) $ \consumer -> do
       beamtimeMeta <- getBeamtimeMeta consumer
       putStrLn $ "beamtime metadata: " <> (fromMaybe "N/A" beamtimeMeta)
       streams <- getStreamList consumer Nothing FilterAllStreams
       forM_ streams $ \stream -> do
         putStrLn $ "=> stream info " <> pack (show stream)
         streamSize <- getCurrentSize consumer (streamInfoName stream)
         putStrLn $ "   stream size: " <> pack (show streamSize)
         datasetCount <- getCurrentDatasetCount consumer (streamInfoName stream) IncludeIncomplete
         putStrLn $ "   dataset count: " <> pack (show datasetCount)
       metaAndData <- getMessageMetaAndDataById consumer (StreamName "default") (messageIdFromInt 1337)
       putStrLn $ "meta: " <> pack (show (fst metaAndData))
       putStrLn $ "data: " <> decodeUtf8 (snd metaAndData)
:}
Synopsis

Error types

data SomeConsumerException Source #

Parent class for all consumer-related exceptions. This makes catchall possible, as in...

setStreamPersistent consumer (StreamName "default")
  catch (e -> error ("Caught " <> (show (e :: SomeConsumerException))))

data NoData Source #

Instances

Instances details
Exception NoData Source # 
Instance details

Defined in Asapo.Consumer

Show NoData Source # 
Instance details

Defined in Asapo.Consumer

Types

data Consumer Source #

Wrapper around a consumer handle. Create with the withConsumer function(s).

data Dataset Source #

Metadata for a dataset

data MessageMetaHandle Source #

Metadata handle, can be passed around as a pure value and be used to retrieve actual data for the metadata as a two-step process, using the retrieveDataForMessageMeta function(s)

Instances

Instances details
Show MessageMetaHandle Source # 
Instance details

Defined in Asapo.Either.Consumer

data DeleteFlag Source #

Anti-boolean-blindness for delete or not delete metadata when deleting a stream

Constructors

DeleteMeta 
DontDeleteMeta 

Instances

Instances details
Eq DeleteFlag Source # 
Instance details

Defined in Asapo.Either.Consumer

data ErrorOnNotExistFlag Source #

Anti-boolean-blindness for "error on not existing data"

data ErrorType Source #

Constructors

ErrorNoData 

Instances

Instances details
Show ErrorType Source # 
Instance details

Defined in Asapo.Either.Consumer

data FilesystemFlag Source #

Whether to use the filesystem or do it in-memory

Instances

Instances details
Eq FilesystemFlag Source # 
Instance details

Defined in Asapo.Either.Consumer

newtype PipelineStep Source #

Constructors

PipelineStep Text 

newtype Beamtime Source #

Constructors

Beamtime Text 

newtype DataSource Source #

Constructors

DataSource Text 

newtype Beamline Source #

Constructors

Beamline Text 

data GroupId Source #

Wrapper around a group ID

data IncludeIncompleteFlag Source #

Anti-boolean-blindness for "include incomplete data sets in list"

newtype ServerName Source #

Wrapper for a server name (something like "host:port")

Constructors

ServerName Text 

newtype SourcePath Source #

Wrapper for a source path (dubious to not use FilePath, but let's see)

Constructors

SourcePath Text 

newtype StreamName Source #

Constructors

StreamName Text 

Instances

Instances details
Show StreamName Source # 
Instance details

Defined in Asapo.Either.Common

newtype InstanceId Source #

Constructors

InstanceId Text 

newtype Token Source #

Constructors

Token Text 

data NetworkConnectionType Source #

Network connection type

Constructors

ConnectionUndefined

not sure about this

ConnectionTcp

TCP

ConnectionFabric

not sure about this

Initialization

withConsumer :: forall a. ServerName -> SourcePath -> FilesystemFlag -> SourceCredentials -> (Consumer -> IO a) -> IO a Source #

Create a consumer and do something with it. This is the main entrypoint into the consumer

withGroupId :: forall a. Consumer -> (GroupId -> IO a) -> IO a Source #

Allocate a group ID and call a callback

Getters

getCurrentSize :: Consumer -> StreamName -> IO Int Source #

Get the current size (number of messages) of the stream

getCurrentDatasetCount :: Consumer -> StreamName -> IncludeIncompleteFlag -> IO Int Source #

Get number of datasets in stream

getBeamtimeMeta :: Consumer -> IO (Maybe Text) Source #

Get beamtime metadata (which can be not set, in which case Nothing is returned)

getNextDataset Source #

Arguments

:: Consumer 
-> GroupId 
-> Word64

minimum size

-> StreamName 
-> IO Dataset 

Get the next dataset for a stream

getLastDataset Source #

Arguments

:: Consumer 
-> Word64

minimum size

-> StreamName 
-> IO Dataset 

Get the last dataset in the stream

getLastDatasetInGroup Source #

Arguments

:: Consumer 
-> GroupId 
-> Word64

minimum size

-> StreamName 
-> IO Dataset 

Get the last data ste in the given group

getMessageMetaAndDataById :: Consumer -> StreamName -> MessageId -> IO (MessageMeta, ByteString) Source #

Given a message ID, retrieve both metadata and data

getMessageMetaById :: Consumer -> StreamName -> MessageId -> IO MessageMeta Source #

Given a message ID, retrieve only the metadata (you can get the data later with retrieveDataFromMessageMeta)

getMessageDataById :: Consumer -> StreamName -> MessageId -> IO ByteString Source #

Given a message ID, retrieve only the data

getNextMessageMetaAndData :: Consumer -> StreamName -> GroupId -> IO (MessageMeta, ByteString) Source #

Retrieve the next message in the stream and group, with data and metadata

getNextMessageMeta :: Consumer -> StreamName -> GroupId -> IO MessageMeta Source #

Retrieve the next message in the stream and group, only metadata (you can get the data later with retrieveDataFromMessageMeta)

getNextMessageData :: Consumer -> StreamName -> GroupId -> IO ByteString Source #

Retrieve the next message in the stream and group, only data

getLastMessageMetaAndData :: Consumer -> StreamName -> IO (MessageMeta, ByteString) Source #

Retrieve the last message in the stream, with data and metadata

getLastMessageMeta :: Consumer -> StreamName -> IO MessageMeta Source #

Retrieve the last message in the stream, only metadata (you can get the data later with retrieveDataFromMessageMeta)

getLastMessageData :: Consumer -> StreamName -> IO ByteString Source #

Retrieve the last message in the stream, only data

getLastInGroupMessageMetaAndData :: Consumer -> StreamName -> GroupId -> IO (MessageMeta, ByteString) Source #

Retrieve the last message in a given stream and group, with data and metadata

getLastInGroupMessageMeta :: Consumer -> StreamName -> GroupId -> IO MessageMeta Source #

Retrieve the last message in a given stream and group, only metadata (you can get the data later with retrieveDataFromMessageMeta)

getLastInGroupMessageData :: Consumer -> StreamName -> GroupId -> IO ByteString Source #

Retrieve the last message in a given stream and group, only data

getUnacknowledgedMessages :: Consumer -> GroupId -> StreamName -> (MessageId, MessageId) -> IO [MessageId] Source #

Get a list of all unacknowledged message IDs in a range

getCurrentConnectionType :: Consumer -> IO NetworkConnectionType Source #

Retrieve the current consumer connection type

getStreamList :: Consumer -> Maybe StreamName -> StreamFilter -> IO [StreamInfo] Source #

Retrieve the list of streams with metadata

queryMessages Source #

Arguments

:: Consumer 
-> Text

Actual query string, see the docs for syntax

-> StreamName 
-> IO [MessageMeta] 

Query messages, return handles without data

retrieveDataForMessageMeta :: Consumer -> MessageMeta -> IO ByteString Source #

Retrieve actual data for the handle

Modifiers

resetLastReadMarker :: Consumer -> GroupId -> StreamName -> IO Int Source #

Reset the last read marker for the stream

setTimeout :: Consumer -> NominalDiffTime -> IO () Source #

Set the global consumer timeout

setLastReadMarker :: Consumer -> GroupId -> StreamName -> MessageId -> IO Int Source #

Set the last read marker for the stream

setStreamPersistent :: Consumer -> StreamName -> IO Int Source #

Set a stream persistent

acknowledge :: Consumer -> GroupId -> StreamName -> MessageId -> IO Int Source #

Acknowledge a specific message

negativeAcknowledge Source #

Arguments

:: Consumer 
-> GroupId 
-> StreamName 
-> MessageId 
-> NominalDiffTime

delay

-> IO Int 

Negatively acknowledge a specific message

resendNacs Source #

Arguments

:: Consumer 
-> Bool

resend yes/no

-> NominalDiffTime

delay

-> Word64

attempts

-> IO () 

Reset negative acknowledgements