hs-asapo-0.9.0: Haskell bindings for ASAP:O
Safe HaskellSafe-Inferred
LanguageHaskell98

Asapo.Producer

Description

To implement an ASAP:O producer, you should only need this interface. It exposes no memory-management functions (like free) or pointers, and is thus safe to use.

Simple Example

Here's some code for a simple producer that connects and sends a message with id "1".

>>> :seti -XOverloadedStrings
>>> :{
 import Asapo.Producer
 import Control.Applicative (Applicative ((<*>)))
 import Control.Monad(void)
 import Data.Either (Either (Left, Right))
 import Data.Function (($))
 import Data.Functor ((<$>))
 import Data.Semigroup (Semigroup ((<>)))
 import Data.Text (Text, pack)
 import Data.Text.Encoding (encodeUtf8)
 import qualified Data.Text.IO as TIO
 import Data.Time.Clock (secondsToNominalDiffTime)
 import Data.Word (Word64)
 import System.IO (IO)
 import Text.Show (Show (show))
 import Prelude ()
 main :: IO ()
 main =
   withProducer
     (Endpoint "localhost:8400")
     (ProcessingThreads 1)
     TcpHandler
     ( SourceCredentials
         { sourceType = RawSource,
           instanceId = InstanceId "test_instance",
           pipelineStep = PipelineStep "pipeline_step_1",
           beamtime = Beamtime "asapo_test",
           beamline = Beamline "auto",
           dataSource = DataSource "asapo_source",
           token = Token "sometoken"
         }
     )
     (secondsToNominalDiffTime 10) $ \producer -> do
       TIO.putStrLn "connected, sending data"
       let responseHandler :: RequestResponse -> IO ()
           responseHandler requestResponse =
             TIO.putStrLn $ "in response handler, payload "
               <> responsePayload requestResponse
               <> ", error "
               <> pack (show (responseError requestResponse))
       send
         producer
         (MessageId 1)
         (FileName "raw/default/1.txt")
         (Metadata "{\"test\": 3.0}")
         (DatasetSubstream 0)
         (DatasetSize 0)
         NoAutoId
         (encodeUtf8 "test")
         DataAndMetadata
         FilesystemAndDatabase
         (StreamName "default")
         responseHandler
       void $ waitRequestsFinished producer (secondsToNominalDiffTime 10)
:}
Synopsis

Types

newtype Endpoint Source #

Wrapper around an ASAP:O producer endpoint (usually something like "host:port")

Constructors

Endpoint Text 

newtype ProcessingThreads Source #

Wrapper around the number of ASAP:O processing threads (simply to make call signatures mor readable)

Constructors

ProcessingThreads Int 

data RequestHandlerType Source #

This has no documentation in ASAP:O yet

newtype Error Source #

Wrapper around an ASAP:O producer error. Note that there is only an error "explanation" here, no error code, since the C interface does not expose this.

Constructors

Error Text 

Instances

Instances details
Show Error Source # 
Instance details

Defined in Asapo.Either.Producer

Methods

showsPrec :: Int -> Error -> ShowS #

show :: Error -> String #

showList :: [Error] -> ShowS #

newtype Metadata Source #

Wrapper around metadata to be produced

Constructors

Metadata Text 

newtype MessageId Source #

Constructors

MessageId Word64 

Instances

Instances details
Show MessageId Source # 
Instance details

Defined in Asapo.Either.Common

data DeletionFlags Source #

Constructors

DeleteMeta

Delete metadata also

DeleteErrorOnNotExist

Don't throw an error if the data doesn't exist anyways

Instances

Instances details
Eq DeletionFlags Source # 
Instance details

Defined in Asapo.Either.Producer

data Producer Source #

Opaque wrapper around an ASAP:O producer

data LogLevel Source #

Instances

Instances details
Eq LogLevel Source # 
Instance details

Defined in Asapo.Either.Producer

newtype FileName Source #

Wrapper around file name (dubious to use Text here, but fine for now)

Constructors

FileName Text 

newtype PipelineStep Source #

Constructors

PipelineStep Text 

newtype Beamtime Source #

Constructors

Beamtime Text 

newtype DataSource Source #

Constructors

DataSource Text 

newtype Beamline Source #

Constructors

Beamline Text 

newtype StreamName Source #

Constructors

StreamName Text 

Instances

Instances details
Show StreamName Source # 
Instance details

Defined in Asapo.Either.Common

newtype InstanceId Source #

Constructors

InstanceId Text 

newtype Token Source #

Constructors

Token Text 

newtype DatasetSubstream Source #

Wrapper around the substream to use

Constructors

DatasetSubstream Int 

newtype DatasetSize Source #

Wrapper around the dataset size to use

Constructors

DatasetSize Int 

data VersionInfo Source #

Instances

Instances details
Show VersionInfo Source # 
Instance details

Defined in Asapo.Either.Producer

data UpsertMode Source #

Constructors

UseUpsert 
NoUpsert 

data AutoIdFlag Source #

Anti-boolean-blindness for the "auto id" flag in the message header

Constructors

UseAutoId 
NoAutoId 

Instances

Instances details
Eq AutoIdFlag Source # 
Instance details

Defined in Asapo.Either.Producer

data TransferFlag Source #

Which data to transfer

data StorageFlag Source #

Where to store the data

data RequestResponse Source #

Information about the request and its response, to be used in the ASAP:O send callback

Initialization

withProducer Source #

Arguments

:: forall a. Endpoint 
-> ProcessingThreads 
-> RequestHandlerType 
-> SourceCredentials 
-> NominalDiffTime

timeout

-> (Producer -> IO a) 
-> IO a 

Create a producer and do something with it. This is the main entrypoint into the producer.

Getters

getVersionInfo :: Producer -> IO VersionInfo Source #

Retrieve producer version info

getStreamInfo Source #

Arguments

:: Producer 
-> StreamName 
-> NominalDiffTime

Timeout

-> IO StreamInfo 

Retrieve info for a single stream

getLastStream Source #

Arguments

:: Producer 
-> NominalDiffTime

Timeout

-> IO StreamInfo 

Retrieve info for the latest stream

getBeamtimeMeta Source #

Arguments

:: Producer 
-> NominalDiffTime

timeout

-> IO (Maybe Text) 

Retrieve metadata for the given stream (which might be missing, in which case Nothing is returned)

getStreamMeta Source #

Arguments

:: Producer 
-> StreamName 
-> NominalDiffTime

timeout

-> IO (Maybe Text) 

Retrieve metadata for the given stream (which might be missing, in which case Nothing is returned)

getRequestsQueueSize :: Producer -> IO Int Source #

Get current size of the requests queue (number of requests pending/being processed)

getRequestsQueueVolumeMb :: Producer -> IO Int Source #

Get current volume of the requests queue (total memory of occupied by pending/being processed requests)

Modifiers

send Source #

Arguments

:: Producer 
-> MessageId 
-> FileName 
-> Metadata 
-> DatasetSubstream 
-> DatasetSize 
-> AutoIdFlag 
-> ByteString

Actual data to send

-> TransferFlag 
-> StorageFlag 
-> StreamName 
-> (RequestResponse -> IO ()) 
-> IO Int 

Send a message containing raw data. Due to newtype and enum usage, all parameter should be self-explanatory

sendFile Source #

Arguments

:: Producer 
-> MessageId 
-> FileName

File name to put into the message header

-> Metadata 
-> DatasetSubstream 
-> DatasetSize 
-> AutoIdFlag 
-> Int

Size

-> FileName

File to actually send

-> TransferFlag 
-> StorageFlag 
-> StreamName 
-> (RequestResponse -> IO ()) 
-> IO Int 

Send a message containing a file. Due to newtype and enum usage, all parameter should be self-explanatory

sendStreamFinishedFlag :: Producer -> StreamName -> MessageId -> StreamName -> (RequestResponse -> IO ()) -> IO Int Source #

As the title says, send the "stream finished" flag

sendBeamtimeMetadata :: Producer -> Metadata -> MetadataIngestMode -> UpsertMode -> (RequestResponse -> IO ()) -> IO Int Source #

Send or extend beamtime metadata

sendStreamMetadata :: Producer -> Metadata -> MetadataIngestMode -> UpsertMode -> StreamName -> (RequestResponse -> IO ()) -> IO Int Source #

Send or extend stream metadata

setLogLevel :: Producer -> LogLevel -> IO () Source #

Set the log level

enableLocalLog :: Producer -> Bool -> IO () Source #

Enable/Disable logging to stdout

enableRemoteLog :: Producer -> Bool -> IO () Source #

Enable/Disable logging to the central server

setCredentials :: Producer -> SourceCredentials -> IO Int Source #

Set a different set of credentials

setRequestsQueueLimits Source #

Arguments

:: Producer 
-> Int

Size (0 for unlimited)

-> Int

Volume (in MiB; 0 for unlimited)

-> IO () 

Set maximum size of the requests queue

waitRequestsFinished :: Producer -> NominalDiffTime -> IO Int Source #

Wait for all outstanding requests to finish