Safe Haskell | Safe-Inferred |
---|---|
Language | Haskell98 |
To implement an ASAP:O producer, you should only need this interface. It exposes no memory-management functions (like free) or pointers, and is thus safe to use.
Simple Example
Here's some code for a simple producer that connects and sends a message with id "1".
>>>
:seti -XOverloadedStrings
>>>
:{
import Asapo.Producer import Control.Applicative (Applicative ((<*>))) import Control.Monad(void) import Data.Either (Either (Left, Right)) import Data.Function (($)) import Data.Functor ((<$>)) import Data.Semigroup (Semigroup ((<>))) import Data.Text (Text, pack) import Data.Text.Encoding (encodeUtf8) import qualified Data.Text.IO as TIO import Data.Time.Clock (secondsToNominalDiffTime) import Data.Word (Word64) import System.IO (IO) import Text.Show (Show (show)) import Prelude () main :: IO () main = withProducer (Endpoint "localhost:8400") (ProcessingThreads 1) TcpHandler ( SourceCredentials { sourceType = RawSource, instanceId = InstanceId "test_instance", pipelineStep = PipelineStep "pipeline_step_1", beamtime = Beamtime "asapo_test", beamline = Beamline "auto", dataSource = DataSource "asapo_source", token = Token "sometoken" } ) (secondsToNominalDiffTime 10) $ \producer -> do TIO.putStrLn "connected, sending data" let responseHandler :: RequestResponse -> IO () responseHandler requestResponse = TIO.putStrLn $ "in response handler, payload " <> responsePayload requestResponse <> ", error " <> pack (show (responseError requestResponse)) send producer (MessageId 1) (FileName "raw/default/1.txt") (Metadata "{\"test\": 3.0}") (DatasetSubstream 0) (DatasetSize 0) NoAutoId (encodeUtf8 "test") DataAndMetadata FilesystemAndDatabase (StreamName "default") responseHandler void $ waitRequestsFinished producer (secondsToNominalDiffTime 10) :}
Synopsis
- newtype ProducerException = ProducerException Text
- newtype Endpoint = Endpoint Text
- newtype ProcessingThreads = ProcessingThreads Int
- data RequestHandlerType
- newtype Error = Error Text
- newtype Metadata = Metadata Text
- data SourceCredentials = SourceCredentials {}
- newtype MessageId = MessageId Word64
- data DeletionFlags
- data Producer
- data LogLevel
- = LogNone
- | LogError
- | LogInfo
- | LogDebug
- | LogWarning
- newtype FileName = FileName Text
- newtype PipelineStep = PipelineStep Text
- newtype Beamtime = Beamtime Text
- newtype DataSource = DataSource Text
- newtype Beamline = Beamline Text
- data StreamInfo = StreamInfo {}
- data SourceType
- newtype StreamName = StreamName Text
- newtype InstanceId = InstanceId Text
- newtype Token = Token Text
- newtype DatasetSubstream = DatasetSubstream Int
- newtype DatasetSize = DatasetSize Int
- data VersionInfo = VersionInfo {}
- data UpsertMode
- data MetadataIngestMode
- data AutoIdFlag
- data TransferFlag
- data StorageFlag
- data RequestResponse = RequestResponse {}
- data Opcode
- data GenericRequestHeader = GenericRequestHeader {}
- withProducer :: forall a. Endpoint -> ProcessingThreads -> RequestHandlerType -> SourceCredentials -> NominalDiffTime -> (Producer -> IO a) -> IO a
- getVersionInfo :: Producer -> IO VersionInfo
- getStreamInfo :: Producer -> StreamName -> NominalDiffTime -> IO StreamInfo
- getLastStream :: Producer -> NominalDiffTime -> IO StreamInfo
- getBeamtimeMeta :: Producer -> NominalDiffTime -> IO (Maybe Text)
- getStreamMeta :: Producer -> StreamName -> NominalDiffTime -> IO (Maybe Text)
- getRequestsQueueSize :: Producer -> IO Int
- getRequestsQueueVolumeMb :: Producer -> IO Int
- send :: Producer -> MessageId -> FileName -> Metadata -> DatasetSubstream -> DatasetSize -> AutoIdFlag -> ByteString -> TransferFlag -> StorageFlag -> StreamName -> (RequestResponse -> IO ()) -> IO Int
- sendFile :: Producer -> MessageId -> FileName -> Metadata -> DatasetSubstream -> DatasetSize -> AutoIdFlag -> Int -> FileName -> TransferFlag -> StorageFlag -> StreamName -> (RequestResponse -> IO ()) -> IO Int
- sendStreamFinishedFlag :: Producer -> StreamName -> MessageId -> StreamName -> (RequestResponse -> IO ()) -> IO Int
- sendBeamtimeMetadata :: Producer -> Metadata -> MetadataIngestMode -> UpsertMode -> (RequestResponse -> IO ()) -> IO Int
- sendStreamMetadata :: Producer -> Metadata -> MetadataIngestMode -> UpsertMode -> StreamName -> (RequestResponse -> IO ()) -> IO Int
- deleteStream :: Producer -> StreamName -> NominalDiffTime -> [DeletionFlags] -> IO Int
- setLogLevel :: Producer -> LogLevel -> IO ()
- enableLocalLog :: Producer -> Bool -> IO ()
- enableRemoteLog :: Producer -> Bool -> IO ()
- setCredentials :: Producer -> SourceCredentials -> IO Int
- setRequestsQueueLimits :: Producer -> Int -> Int -> IO ()
- messageIdFromInt :: Integral a => a -> MessageId
- waitRequestsFinished :: Producer -> NominalDiffTime -> IO Int
Types
newtype ProducerException Source #
Instances
Exception ProducerException Source # | |
Defined in Asapo.Producer | |
Show ProducerException Source # | |
Defined in Asapo.Producer showsPrec :: Int -> ProducerException -> ShowS # show :: ProducerException -> String # showList :: [ProducerException] -> ShowS # |
Wrapper around an ASAP:O producer endpoint (usually something like "host:port")
newtype ProcessingThreads Source #
Wrapper around the number of ASAP:O processing threads (simply to make call signatures mor readable)
Wrapper around an ASAP:O producer error. Note that there is only an error "explanation" here, no error code, since the C interface does not expose this.
data SourceCredentials Source #
data DeletionFlags Source #
DeleteMeta | Delete metadata also |
DeleteErrorOnNotExist | Don't throw an error if the data doesn't exist anyways |
Instances
Eq DeletionFlags Source # | |
Defined in Asapo.Either.Producer (==) :: DeletionFlags -> DeletionFlags -> Bool # (/=) :: DeletionFlags -> DeletionFlags -> Bool # |
Wrapper around file name (dubious to use Text
here, but fine for now)
newtype PipelineStep Source #
newtype DataSource Source #
data StreamInfo Source #
Instances
Show StreamInfo Source # | |
Defined in Asapo.Either.Common showsPrec :: Int -> StreamInfo -> ShowS # show :: StreamInfo -> String # showList :: [StreamInfo] -> ShowS # |
newtype StreamName Source #
Instances
Show StreamName Source # | |
Defined in Asapo.Either.Common showsPrec :: Int -> StreamName -> ShowS # show :: StreamName -> String # showList :: [StreamName] -> ShowS # |
newtype InstanceId Source #
data VersionInfo Source #
Instances
Show VersionInfo Source # | |
Defined in Asapo.Either.Producer showsPrec :: Int -> VersionInfo -> ShowS # show :: VersionInfo -> String # showList :: [VersionInfo] -> ShowS # |
data AutoIdFlag Source #
Anti-boolean-blindness for the "auto id" flag in the message header
Instances
Eq AutoIdFlag Source # | |
Defined in Asapo.Either.Producer (==) :: AutoIdFlag -> AutoIdFlag -> Bool # (/=) :: AutoIdFlag -> AutoIdFlag -> Bool # |
data RequestResponse Source #
Information about the request and its response, to be used in the ASAP:O send callback
data GenericRequestHeader Source #
Information about the send request, to be used in the ASAP:O send callback
Initialization
:: forall a. Endpoint | |
-> ProcessingThreads | |
-> RequestHandlerType | |
-> SourceCredentials | |
-> NominalDiffTime | timeout |
-> (Producer -> IO a) | |
-> IO a |
Create a producer and do something with it. This is the main entrypoint into the producer.
Getters
getVersionInfo :: Producer -> IO VersionInfo Source #
Retrieve producer version info
:: Producer | |
-> StreamName | |
-> NominalDiffTime | Timeout |
-> IO StreamInfo |
Retrieve info for a single stream
:: Producer | |
-> NominalDiffTime | Timeout |
-> IO StreamInfo |
Retrieve info for the latest stream
:: Producer | |
-> NominalDiffTime | timeout |
-> IO (Maybe Text) |
Retrieve metadata for the given stream (which might be missing, in which case Nothing
is returned)
:: Producer | |
-> StreamName | |
-> NominalDiffTime | timeout |
-> IO (Maybe Text) |
Retrieve metadata for the given stream (which might be missing, in which case Nothing
is returned)
getRequestsQueueSize :: Producer -> IO Int Source #
Get current size of the requests queue (number of requests pending/being processed)
getRequestsQueueVolumeMb :: Producer -> IO Int Source #
Get current volume of the requests queue (total memory of occupied by pending/being processed requests)
Modifiers
:: Producer | |
-> MessageId | |
-> FileName | |
-> Metadata | |
-> DatasetSubstream | |
-> DatasetSize | |
-> AutoIdFlag | |
-> ByteString | Actual data to send |
-> TransferFlag | |
-> StorageFlag | |
-> StreamName | |
-> (RequestResponse -> IO ()) | |
-> IO Int |
Send a message containing raw data. Due to newtype and enum usage, all parameter should be self-explanatory
:: Producer | |
-> MessageId | |
-> FileName | File name to put into the message header |
-> Metadata | |
-> DatasetSubstream | |
-> DatasetSize | |
-> AutoIdFlag | |
-> Int | Size |
-> FileName | File to actually send |
-> TransferFlag | |
-> StorageFlag | |
-> StreamName | |
-> (RequestResponse -> IO ()) | |
-> IO Int |
Send a message containing a file. Due to newtype and enum usage, all parameter should be self-explanatory
sendStreamFinishedFlag :: Producer -> StreamName -> MessageId -> StreamName -> (RequestResponse -> IO ()) -> IO Int Source #
As the title says, send the "stream finished" flag
sendBeamtimeMetadata :: Producer -> Metadata -> MetadataIngestMode -> UpsertMode -> (RequestResponse -> IO ()) -> IO Int Source #
Send or extend beamtime metadata
sendStreamMetadata :: Producer -> Metadata -> MetadataIngestMode -> UpsertMode -> StreamName -> (RequestResponse -> IO ()) -> IO Int Source #
Send or extend stream metadata
:: Producer | |
-> StreamName | |
-> NominalDiffTime | timeout |
-> [DeletionFlags] | |
-> IO Int |
setCredentials :: Producer -> SourceCredentials -> IO Int Source #
Set a different set of credentials
setRequestsQueueLimits Source #
Set maximum size of the requests queue
messageIdFromInt :: Integral a => a -> MessageId Source #
waitRequestsFinished :: Producer -> NominalDiffTime -> IO Int Source #
Wait for all outstanding requests to finish