{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- |
-- Description : High-level interface for all producer-related functions
--
-- To implement an ASAP:O producer, you should only need this interface.
-- It exposes no memory-management functions (like free) or pointers, and
-- is thus safe to use.
module Asapo.Either.Producer
  ( Endpoint (..),
    ProcessingThreads (..),
    RequestHandlerType (..),
    Error (..),
    Metadata (..),
    SourceCredentials,
    DeletionFlags (..),
    Producer,
    LogLevel (..),
    FileName (..),
    DatasetSubstream (..),
    DatasetSize (..),
    VersionInfo (..),
    UpsertMode (..),
    MetadataIngestMode (..),
    AutoIdFlag (..),
    TransferFlag (..),
    StorageFlag (..),
    RequestResponse (..),
    Opcode (..),
    GenericRequestHeader (..),
    getRequestsQueueSize,
    getRequestsQueueVolumeMb,
    setRequestsQueueLimits,
    checkError,
    checkErrorWithGivenHandle,
    withProducer,
    enableLocalLog,
    waitRequestsFinished,
    getVersionInfo,
    getStreamInfo,
    getStreamMeta,
    getBeamtimeMeta,
    deleteStream,
    getLastStream,
    send,
    sendFile,
    sendStreamFinishedFlag,
    sendBeamtimeMetadata,
    sendStreamMetadata,
    setLogLevel,
    enableRemoteLog,
    setCredentials,
  )
where

import Asapo.Either.Common (MessageId (MessageId), SourceCredentials, StreamInfo, StreamName (StreamName), nominalDiffToMillis, peekCStringText, retrieveStreamInfoFromC, stringHandleToText, stringHandleToTextUnsafe, withCStringNToText, withConstText, withCredentials, withPtr, withText)
import Asapo.Raw.Common (AsapoErrorHandle, AsapoStreamInfoHandle, AsapoStringHandle, asapo_error_explain, asapo_free_error_handle, asapo_free_stream_info_handle, asapo_free_string_handle, asapo_is_error, asapo_new_error_handle, asapo_new_string_handle)
import Asapo.Raw.Producer
  ( AsapoGenericRequestHeader (AsapoGenericRequestHeader),
    AsapoLogLevel,
    AsapoMessageHeaderHandle,
    AsapoOpcode,
    AsapoProducerHandle,
    AsapoRequestCallbackPayloadHandle,
    asapoLogLevelDebug,
    asapoLogLevelError,
    asapoLogLevelInfo,
    asapoLogLevelNone,
    asapoLogLevelWarning,
    asapo_create_message_header,
    asapo_create_producer,
    asapo_free_message_header_handle,
    asapo_free_producer_handle,
    asapo_producer_delete_stream,
    asapo_producer_enable_local_log,
    asapo_producer_enable_remote_log,
    asapo_producer_get_beamtime_meta,
    asapo_producer_get_last_stream,
    asapo_producer_get_requests_queue_size,
    asapo_producer_get_requests_queue_volume_mb,
    asapo_producer_get_stream_info,
    asapo_producer_get_stream_meta,
    asapo_producer_get_version_info,
    asapo_producer_send,
    asapo_producer_send_beamtime_metadata,
    asapo_producer_send_file,
    asapo_producer_send_stream_finished_flag,
    asapo_producer_send_stream_metadata,
    asapo_producer_set_credentials,
    asapo_producer_set_log_level,
    asapo_producer_set_requests_queue_limits,
    asapo_producer_wait_requests_finished,
    asapo_request_callback_payload_get_original_header,
    asapo_request_callback_payload_get_response,
    createRequestCallback,
    kFilesystem,
    kInsert,
    kOpcodeAuthorize,
    kOpcodeCount,
    kOpcodeDeleteStream,
    kOpcodeGetBufferData,
    kOpcodeGetMeta,
    kOpcodeLastStream,
    kOpcodeStreamInfo,
    kOpcodeTransferData,
    kOpcodeTransferDatasetData,
    kOpcodeTransferMetaData,
    kOpcodeUnknownOp,
    kReplace,
    kStoreInDatabase,
    kStoreInFilesystem,
    kTcp,
    kTransferData,
    kTransferMetaDataOnly,
    kUpdate,
  )
import Control.Applicative (Applicative (pure))
import Control.Exception (bracket)
import Data.Bits ((.|.))
import Data.Bool (Bool)
import qualified Data.ByteString as BS
import Data.ByteString.Unsafe (unsafeUseAsCString)
import Data.Either (Either (Left, Right))
import Data.Eq (Eq ((==)))
import Data.Foldable (Foldable (elem))
import Data.Functor ((<$>))
import Data.Int (Int)
import Data.Maybe (Maybe (Just, Nothing))
import Data.Ord ((>))
import Data.Text (Text)
import Data.Time (NominalDiffTime)
import Data.Word (Word64)
import Foreign (Storable (peek), alloca, castPtr)
import Foreign.C.ConstPtr (ConstPtr (unConstPtr))
import Foreign.Ptr (Ptr)
import System.IO (IO)
import Text.Show (Show)
import Prelude (fromIntegral)

-- | Wrapper around an ASAP:O producer error. Note that there is only
--  an error "explanation" here, no error code, since the C interface
--  does not expose this.
newtype Error = Error Text deriving (Int -> Error -> ShowS
[Error] -> ShowS
Error -> String
(Int -> Error -> ShowS)
-> (Error -> String) -> ([Error] -> ShowS) -> Show Error
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Error -> ShowS
showsPrec :: Int -> Error -> ShowS
$cshow :: Error -> String
show :: Error -> String
$cshowList :: [Error] -> ShowS
showList :: [Error] -> ShowS
Show)

-- | Wrapper around an ASAP:O producer endpoint (usually something like "host:port")
newtype Endpoint = Endpoint Text

-- | Wrapper around the number of ASAP:O processing threads (simply to make call signatures mor readable)
newtype ProcessingThreads = ProcessingThreads Int

-- | This has no documentation in ASAP:O yet
data RequestHandlerType = TcpHandler | FilesystemHandler

-- | Opaque wrapper around an ASAP:O producer
newtype Producer = Producer AsapoProducerHandle

-- | Internal function to check and return either an error (if it's
-- present behind the given handle) or a "result" of some function
-- that produced the error handle
checkErrorWithGivenHandle :: AsapoErrorHandle -> b -> IO (Either Error b)
checkErrorWithGivenHandle :: forall b. AsapoErrorHandle -> b -> IO (Either Error b)
checkErrorWithGivenHandle AsapoErrorHandle
errorHandle b
result = do
  AsapoBool
isError <- AsapoErrorHandle -> IO AsapoBool
asapo_is_error AsapoErrorHandle
errorHandle
  if AsapoBool
isError AsapoBool -> AsapoBool -> Bool
forall a. Ord a => a -> a -> Bool
> AsapoBool
0
    then do
      let explanationLength :: Int
explanationLength = Int
1024
      Text
explanation <- Int -> (CString -> IO ()) -> IO Text
withCStringNToText Int
explanationLength \CString
explanationPtr ->
        AsapoErrorHandle -> CString -> CSize -> IO ()
asapo_error_explain
          AsapoErrorHandle
errorHandle
          CString
explanationPtr
          (Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
explanationLength)
      Either Error b -> IO (Either Error b)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Error -> Either Error b
forall a b. a -> Either a b
Left (Text -> Error
Error Text
explanation))
    else Either Error b -> IO (Either Error b)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> Either Error b
forall a b. b -> Either a b
Right b
result)

withErrorHandle :: (AsapoErrorHandle -> IO c) -> IO c
withErrorHandle :: forall c. (AsapoErrorHandle -> IO c) -> IO c
withErrorHandle = IO AsapoErrorHandle
-> (AsapoErrorHandle -> IO ())
-> (AsapoErrorHandle -> IO c)
-> IO c
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO AsapoErrorHandle
asapo_new_error_handle AsapoErrorHandle -> IO ()
asapo_free_error_handle

-- | Helper function since most ASAP:O functions receive a pointer to
-- an error handle as the last argument, so error checking becomes
-- "abstractable"
checkError :: (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError :: forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError Ptr AsapoErrorHandle -> IO b
f = do
  (AsapoErrorHandle -> IO (Either Error b)) -> IO (Either Error b)
forall c. (AsapoErrorHandle -> IO c) -> IO c
withErrorHandle \AsapoErrorHandle
errorHandle -> do
    (AsapoErrorHandle
errorHandlePtr, b
result) <- AsapoErrorHandle
-> (Ptr AsapoErrorHandle -> IO b) -> IO (AsapoErrorHandle, b)
forall a b. Storable a => a -> (Ptr a -> IO b) -> IO (a, b)
withPtr AsapoErrorHandle
errorHandle Ptr AsapoErrorHandle -> IO b
f
    AsapoErrorHandle -> b -> IO (Either Error b)
forall b. AsapoErrorHandle -> b -> IO (Either Error b)
checkErrorWithGivenHandle AsapoErrorHandle
errorHandlePtr b
result

create :: Endpoint -> ProcessingThreads -> RequestHandlerType -> SourceCredentials -> NominalDiffTime -> IO (Either Error AsapoProducerHandle)
create :: Endpoint
-> ProcessingThreads
-> RequestHandlerType
-> SourceCredentials
-> NominalDiffTime
-> IO (Either Error AsapoProducerHandle)
create (Endpoint Text
endpoint) (ProcessingThreads Int
processingThreads) RequestHandlerType
handlerType SourceCredentials
sourceCredentials NominalDiffTime
timeout = do
  SourceCredentials
-> (AsapoSourceCredentialsHandle
    -> IO (Either Error AsapoProducerHandle))
-> IO (Either Error AsapoProducerHandle)
forall a.
SourceCredentials -> (AsapoSourceCredentialsHandle -> IO a) -> IO a
withCredentials SourceCredentials
sourceCredentials \AsapoSourceCredentialsHandle
credentials' ->
    let convertHandlerType :: RequestHandlerType -> AsapoBool
convertHandlerType RequestHandlerType
TcpHandler = AsapoBool
kTcp
        convertHandlerType RequestHandlerType
FilesystemHandler = AsapoBool
kFilesystem
     in do
          Text
-> (CString -> IO (Either Error AsapoProducerHandle))
-> IO (Either Error AsapoProducerHandle)
forall a. Text -> (CString -> IO a) -> IO a
withText Text
endpoint \CString
endpoint' -> do
            (Ptr AsapoErrorHandle -> IO AsapoProducerHandle)
-> IO (Either Error AsapoProducerHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError
              ( CString
-> CUChar
-> AsapoBool
-> AsapoSourceCredentialsHandle
-> Word64
-> Ptr AsapoErrorHandle
-> IO AsapoProducerHandle
asapo_create_producer
                  CString
endpoint'
                  (Int -> CUChar
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
processingThreads)
                  (RequestHandlerType -> AsapoBool
convertHandlerType RequestHandlerType
handlerType)
                  AsapoSourceCredentialsHandle
credentials'
                  (NominalDiffTime -> Word64
forall a. Integral a => NominalDiffTime -> a
nominalDiffToMillis NominalDiffTime
timeout)
              )

-- | Create a producer and do something with it. This is the main entrypoint into the producer
withProducer ::
  forall a.
  Endpoint ->
  ProcessingThreads ->
  RequestHandlerType ->
  SourceCredentials ->
  -- | timeout
  NominalDiffTime ->
  (Error -> IO a) ->
  (Producer -> IO a) ->
  IO a
withProducer :: forall a.
Endpoint
-> ProcessingThreads
-> RequestHandlerType
-> SourceCredentials
-> NominalDiffTime
-> (Error -> IO a)
-> (Producer -> IO a)
-> IO a
withProducer Endpoint
endpoint ProcessingThreads
processingThreads RequestHandlerType
handlerType SourceCredentials
sourceCredentials NominalDiffTime
timeout Error -> IO a
onError Producer -> IO a
onSuccess = IO (Either Error AsapoProducerHandle)
-> (Either Error AsapoProducerHandle -> IO ())
-> (Either Error AsapoProducerHandle -> IO a)
-> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (Endpoint
-> ProcessingThreads
-> RequestHandlerType
-> SourceCredentials
-> NominalDiffTime
-> IO (Either Error AsapoProducerHandle)
create Endpoint
endpoint ProcessingThreads
processingThreads RequestHandlerType
handlerType SourceCredentials
sourceCredentials NominalDiffTime
timeout) Either Error AsapoProducerHandle -> IO ()
freeProducer Either Error AsapoProducerHandle -> IO a
handle
  where
    freeProducer :: Either Error AsapoProducerHandle -> IO ()
    freeProducer :: Either Error AsapoProducerHandle -> IO ()
freeProducer (Left Error
_) = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    freeProducer (Right AsapoProducerHandle
producerHandle) = AsapoProducerHandle -> IO ()
asapo_free_producer_handle AsapoProducerHandle
producerHandle
    handle :: Either Error AsapoProducerHandle -> IO a
    handle :: Either Error AsapoProducerHandle -> IO a
handle (Left Error
e) = Error -> IO a
onError Error
e
    handle (Right AsapoProducerHandle
v) = Producer -> IO a
onSuccess (AsapoProducerHandle -> Producer
Producer AsapoProducerHandle
v)

withStringHandle :: (AsapoStringHandle -> IO c) -> IO c
withStringHandle :: forall c. (AsapoStringHandle -> IO c) -> IO c
withStringHandle = IO AsapoStringHandle
-> (AsapoStringHandle -> IO ())
-> (AsapoStringHandle -> IO c)
-> IO c
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO AsapoStringHandle
asapo_new_string_handle AsapoStringHandle -> IO ()
asapo_free_string_handle

data VersionInfo = VersionInfo
  { VersionInfo -> Text
versionClient :: Text,
    VersionInfo -> Text
versionServer :: Text,
    VersionInfo -> Bool
versionSupported :: Bool
  }
  deriving (Int -> VersionInfo -> ShowS
[VersionInfo] -> ShowS
VersionInfo -> String
(Int -> VersionInfo -> ShowS)
-> (VersionInfo -> String)
-> ([VersionInfo] -> ShowS)
-> Show VersionInfo
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> VersionInfo -> ShowS
showsPrec :: Int -> VersionInfo -> ShowS
$cshow :: VersionInfo -> String
show :: VersionInfo -> String
$cshowList :: [VersionInfo] -> ShowS
showList :: [VersionInfo] -> ShowS
Show)

-- | Retrieve producer version info
getVersionInfo :: Producer -> IO (Either Error VersionInfo)
getVersionInfo :: Producer -> IO (Either Error VersionInfo)
getVersionInfo (Producer AsapoProducerHandle
producerHandle) =
  (AsapoStringHandle -> IO (Either Error VersionInfo))
-> IO (Either Error VersionInfo)
forall c. (AsapoStringHandle -> IO c) -> IO c
withStringHandle \AsapoStringHandle
clientInfo -> (AsapoStringHandle -> IO (Either Error VersionInfo))
-> IO (Either Error VersionInfo)
forall c. (AsapoStringHandle -> IO c) -> IO c
withStringHandle \AsapoStringHandle
serverInfo -> (Ptr AsapoBool -> IO (Either Error VersionInfo))
-> IO (Either Error VersionInfo)
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr AsapoBool
supportedPtr -> do
    Either Error AsapoBool
result <- (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoProducerHandle
-> AsapoStringHandle
-> AsapoStringHandle
-> Ptr AsapoBool
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_producer_get_version_info AsapoProducerHandle
producerHandle AsapoStringHandle
clientInfo AsapoStringHandle
serverInfo Ptr AsapoBool
supportedPtr)
    case Either Error AsapoBool
result of
      Left Error
e -> Either Error VersionInfo -> IO (Either Error VersionInfo)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Error -> Either Error VersionInfo
forall a b. a -> Either a b
Left Error
e)
      -- The return value is a CInt which is unnecessary probably?
      Right AsapoBool
_integerReturnCode -> do
        AsapoBool
supported <- Ptr AsapoBool -> IO AsapoBool
forall a. Storable a => Ptr a -> IO a
peek Ptr AsapoBool
supportedPtr
        Text
clientInfo' <- AsapoStringHandle -> IO Text
stringHandleToTextUnsafe AsapoStringHandle
clientInfo
        Text
serverInfo' <- AsapoStringHandle -> IO Text
stringHandleToTextUnsafe AsapoStringHandle
serverInfo
        Either Error VersionInfo -> IO (Either Error VersionInfo)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (VersionInfo -> Either Error VersionInfo
forall a b. b -> Either a b
Right (Text -> Text -> Bool -> VersionInfo
VersionInfo Text
clientInfo' Text
serverInfo' (AsapoBool
supported AsapoBool -> AsapoBool -> Bool
forall a. Ord a => a -> a -> Bool
> AsapoBool
0)))

-- | Retrieve info for a single stream
getStreamInfo ::
  Producer ->
  StreamName ->
  -- | Timeout
  NominalDiffTime ->
  IO (Either Error StreamInfo)
getStreamInfo :: Producer
-> StreamName -> NominalDiffTime -> IO (Either Error StreamInfo)
getStreamInfo (Producer AsapoProducerHandle
producer) (StreamName Text
stream) NominalDiffTime
timeout = IO (Either Error AsapoStreamInfoHandle)
-> (Either Error AsapoStreamInfoHandle -> IO ())
-> (Either Error AsapoStreamInfoHandle
    -> IO (Either Error StreamInfo))
-> IO (Either Error StreamInfo)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either Error AsapoStreamInfoHandle)
init Either Error AsapoStreamInfoHandle -> IO ()
destroy Either Error AsapoStreamInfoHandle -> IO (Either Error StreamInfo)
f
  where
    init :: IO (Either Error AsapoStreamInfoHandle)
    init :: IO (Either Error AsapoStreamInfoHandle)
init = Text
-> (ConstCString -> IO (Either Error AsapoStreamInfoHandle))
-> IO (Either Error AsapoStreamInfoHandle)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
stream \ConstCString
streamC -> (Ptr AsapoErrorHandle -> IO AsapoStreamInfoHandle)
-> IO (Either Error AsapoStreamInfoHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoProducerHandle
-> ConstCString
-> Word64
-> Ptr AsapoErrorHandle
-> IO AsapoStreamInfoHandle
asapo_producer_get_stream_info AsapoProducerHandle
producer ConstCString
streamC (NominalDiffTime -> Word64
forall a. Integral a => NominalDiffTime -> a
nominalDiffToMillis NominalDiffTime
timeout))
    destroy :: Either Error AsapoStreamInfoHandle -> IO ()
    destroy :: Either Error AsapoStreamInfoHandle -> IO ()
destroy (Right AsapoStreamInfoHandle
handle) = AsapoStreamInfoHandle -> IO ()
asapo_free_stream_info_handle AsapoStreamInfoHandle
handle
    destroy Either Error AsapoStreamInfoHandle
_ = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    f :: Either Error AsapoStreamInfoHandle -> IO (Either Error StreamInfo)
    f :: Either Error AsapoStreamInfoHandle -> IO (Either Error StreamInfo)
f (Left Error
e) = Either Error StreamInfo -> IO (Either Error StreamInfo)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Error -> Either Error StreamInfo
forall a b. a -> Either a b
Left Error
e)
    f (Right AsapoStreamInfoHandle
streamInfoHandle) = StreamInfo -> Either Error StreamInfo
forall a b. b -> Either a b
Right (StreamInfo -> Either Error StreamInfo)
-> IO StreamInfo -> IO (Either Error StreamInfo)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AsapoStreamInfoHandle -> IO StreamInfo
retrieveStreamInfoFromC AsapoStreamInfoHandle
streamInfoHandle

-- | Retrieve info for the latest stream
getLastStream ::
  Producer ->
  -- | Timeout
  NominalDiffTime ->
  IO (Either Error StreamInfo)
getLastStream :: Producer -> NominalDiffTime -> IO (Either Error StreamInfo)
getLastStream (Producer AsapoProducerHandle
producer) NominalDiffTime
timeout = IO (Either Error AsapoStreamInfoHandle)
-> (Either Error AsapoStreamInfoHandle -> IO ())
-> (Either Error AsapoStreamInfoHandle
    -> IO (Either Error StreamInfo))
-> IO (Either Error StreamInfo)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either Error AsapoStreamInfoHandle)
init Either Error AsapoStreamInfoHandle -> IO ()
destroy Either Error AsapoStreamInfoHandle -> IO (Either Error StreamInfo)
f
  where
    init :: IO (Either Error AsapoStreamInfoHandle)
    init :: IO (Either Error AsapoStreamInfoHandle)
init = (Ptr AsapoErrorHandle -> IO AsapoStreamInfoHandle)
-> IO (Either Error AsapoStreamInfoHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoProducerHandle
-> Word64 -> Ptr AsapoErrorHandle -> IO AsapoStreamInfoHandle
asapo_producer_get_last_stream AsapoProducerHandle
producer (NominalDiffTime -> Word64
forall a. Integral a => NominalDiffTime -> a
nominalDiffToMillis NominalDiffTime
timeout))
    destroy :: Either Error AsapoStreamInfoHandle -> IO ()
    destroy :: Either Error AsapoStreamInfoHandle -> IO ()
destroy (Right AsapoStreamInfoHandle
handle) = AsapoStreamInfoHandle -> IO ()
asapo_free_stream_info_handle AsapoStreamInfoHandle
handle
    destroy Either Error AsapoStreamInfoHandle
_ = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    f :: Either Error AsapoStreamInfoHandle -> IO (Either Error StreamInfo)
    f :: Either Error AsapoStreamInfoHandle -> IO (Either Error StreamInfo)
f (Left Error
e) = Either Error StreamInfo -> IO (Either Error StreamInfo)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Error -> Either Error StreamInfo
forall a b. a -> Either a b
Left Error
e)
    f (Right AsapoStreamInfoHandle
streamInfoHandle) = StreamInfo -> Either Error StreamInfo
forall a b. b -> Either a b
Right (StreamInfo -> Either Error StreamInfo)
-> IO StreamInfo -> IO (Either Error StreamInfo)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AsapoStreamInfoHandle -> IO StreamInfo
retrieveStreamInfoFromC AsapoStreamInfoHandle
streamInfoHandle

-- | Retrieve metadata for the given stream (which might be missing, in which case @Nothing@ is returned)
getStreamMeta ::
  Producer ->
  StreamName ->
  -- | timeout
  NominalDiffTime ->
  IO (Either Error (Maybe Text))
getStreamMeta :: Producer
-> StreamName -> NominalDiffTime -> IO (Either Error (Maybe Text))
getStreamMeta (Producer AsapoProducerHandle
producer) (StreamName Text
stream) NominalDiffTime
timeout = IO (Either Error AsapoStringHandle)
-> (Either Error AsapoStringHandle -> IO ())
-> (Either Error AsapoStringHandle
    -> IO (Either Error (Maybe Text)))
-> IO (Either Error (Maybe Text))
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either Error AsapoStringHandle)
init Either Error AsapoStringHandle -> IO ()
destroy Either Error AsapoStringHandle -> IO (Either Error (Maybe Text))
f
  where
    init :: IO (Either Error AsapoStringHandle)
    init :: IO (Either Error AsapoStringHandle)
init = Text
-> (ConstCString -> IO (Either Error AsapoStringHandle))
-> IO (Either Error AsapoStringHandle)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
stream \ConstCString
streamC -> (Ptr AsapoErrorHandle -> IO AsapoStringHandle)
-> IO (Either Error AsapoStringHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoProducerHandle
-> ConstCString
-> Word64
-> Ptr AsapoErrorHandle
-> IO AsapoStringHandle
asapo_producer_get_stream_meta AsapoProducerHandle
producer ConstCString
streamC (NominalDiffTime -> Word64
forall a. Integral a => NominalDiffTime -> a
nominalDiffToMillis NominalDiffTime
timeout))
    destroy :: Either Error AsapoStringHandle -> IO ()
    destroy :: Either Error AsapoStringHandle -> IO ()
destroy (Right AsapoStringHandle
handle) = AsapoStringHandle -> IO ()
asapo_free_string_handle AsapoStringHandle
handle
    destroy Either Error AsapoStringHandle
_ = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    f :: Either Error AsapoStringHandle -> IO (Either Error (Maybe Text))
    f :: Either Error AsapoStringHandle -> IO (Either Error (Maybe Text))
f (Left Error
e) = Either Error (Maybe Text) -> IO (Either Error (Maybe Text))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Error -> Either Error (Maybe Text)
forall a b. a -> Either a b
Left Error
e)
    f (Right AsapoStringHandle
string) = Maybe Text -> Either Error (Maybe Text)
forall a b. b -> Either a b
Right (Maybe Text -> Either Error (Maybe Text))
-> IO (Maybe Text) -> IO (Either Error (Maybe Text))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AsapoStringHandle -> IO (Maybe Text)
stringHandleToText AsapoStringHandle
string

-- | Retrieve metadata for the given stream (which might be missing, in which case @Nothing@ is returned)
getBeamtimeMeta ::
  Producer ->
  -- | timeout
  NominalDiffTime ->
  IO (Either Error (Maybe Text))
getBeamtimeMeta :: Producer -> NominalDiffTime -> IO (Either Error (Maybe Text))
getBeamtimeMeta (Producer AsapoProducerHandle
producer) NominalDiffTime
timeout = IO (Either Error AsapoStringHandle)
-> (Either Error AsapoStringHandle -> IO ())
-> (Either Error AsapoStringHandle
    -> IO (Either Error (Maybe Text)))
-> IO (Either Error (Maybe Text))
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either Error AsapoStringHandle)
init Either Error AsapoStringHandle -> IO ()
destroy Either Error AsapoStringHandle -> IO (Either Error (Maybe Text))
f
  where
    init :: IO (Either Error AsapoStringHandle)
    init :: IO (Either Error AsapoStringHandle)
init = (Ptr AsapoErrorHandle -> IO AsapoStringHandle)
-> IO (Either Error AsapoStringHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoProducerHandle
-> Word64 -> Ptr AsapoErrorHandle -> IO AsapoStringHandle
asapo_producer_get_beamtime_meta AsapoProducerHandle
producer (NominalDiffTime -> Word64
forall a. Integral a => NominalDiffTime -> a
nominalDiffToMillis NominalDiffTime
timeout))
    destroy :: Either Error AsapoStringHandle -> IO ()
    destroy :: Either Error AsapoStringHandle -> IO ()
destroy (Right AsapoStringHandle
handle) = AsapoStringHandle -> IO ()
asapo_free_string_handle AsapoStringHandle
handle
    destroy Either Error AsapoStringHandle
_ = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    f :: Either Error AsapoStringHandle -> IO (Either Error (Maybe Text))
    f :: Either Error AsapoStringHandle -> IO (Either Error (Maybe Text))
f (Left Error
e) = Either Error (Maybe Text) -> IO (Either Error (Maybe Text))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Error -> Either Error (Maybe Text)
forall a b. a -> Either a b
Left Error
e)
    f (Right AsapoStringHandle
string) = Maybe Text -> Either Error (Maybe Text)
forall a b. b -> Either a b
Right (Maybe Text -> Either Error (Maybe Text))
-> IO (Maybe Text) -> IO (Either Error (Maybe Text))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AsapoStringHandle -> IO (Maybe Text)
stringHandleToText AsapoStringHandle
string

data DeletionFlags
  = -- | Delete metadata also
    DeleteMeta
  | -- | Don't throw an error if the data doesn't exist anyways
    DeleteErrorOnNotExist
  deriving (DeletionFlags -> DeletionFlags -> Bool
(DeletionFlags -> DeletionFlags -> Bool)
-> (DeletionFlags -> DeletionFlags -> Bool) -> Eq DeletionFlags
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: DeletionFlags -> DeletionFlags -> Bool
== :: DeletionFlags -> DeletionFlags -> Bool
$c/= :: DeletionFlags -> DeletionFlags -> Bool
/= :: DeletionFlags -> DeletionFlags -> Bool
Eq)

-- | Delete the given stream
deleteStream ::
  Producer ->
  StreamName ->
  -- | timeout
  NominalDiffTime ->
  [DeletionFlags] ->
  IO (Either Error Int)
deleteStream :: Producer
-> StreamName
-> NominalDiffTime
-> [DeletionFlags]
-> IO (Either Error Int)
deleteStream (Producer AsapoProducerHandle
producer) (StreamName Text
stream) NominalDiffTime
timeout [DeletionFlags]
deletionFlags = do
  Either Error AsapoBool
result <- Text
-> (ConstCString -> IO (Either Error AsapoBool))
-> IO (Either Error AsapoBool)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
stream \ConstCString
streamC ->
    (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError
      ( AsapoProducerHandle
-> ConstCString
-> Word64
-> AsapoBool
-> AsapoBool
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_producer_delete_stream
          AsapoProducerHandle
producer
          ConstCString
streamC
          (NominalDiffTime -> Word64
forall a. Integral a => NominalDiffTime -> a
nominalDiffToMillis NominalDiffTime
timeout)
          (if DeletionFlags
DeleteMeta DeletionFlags -> [DeletionFlags] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [DeletionFlags]
deletionFlags then AsapoBool
1 else AsapoBool
0)
          (if DeletionFlags
DeleteErrorOnNotExist DeletionFlags -> [DeletionFlags] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [DeletionFlags]
deletionFlags then AsapoBool
1 else AsapoBool
0)
      )
  Either Error Int -> IO (Either Error Int)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Error AsapoBool
result)

-- | Wrapper around file name (dubious to use @Text@ here, but fine for now)
newtype FileName = FileName Text

-- | Wrapper around metadata to be produced
newtype Metadata = Metadata Text

-- | Wrapper around the substream to use
newtype DatasetSubstream = DatasetSubstream Int

-- | Wrapper around the dataset size to use
newtype DatasetSize = DatasetSize Int

-- | Anti-boolean-blindness for the "auto id" flag in the message header
data AutoIdFlag = UseAutoId | NoAutoId deriving (AutoIdFlag -> AutoIdFlag -> Bool
(AutoIdFlag -> AutoIdFlag -> Bool)
-> (AutoIdFlag -> AutoIdFlag -> Bool) -> Eq AutoIdFlag
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: AutoIdFlag -> AutoIdFlag -> Bool
== :: AutoIdFlag -> AutoIdFlag -> Bool
$c/= :: AutoIdFlag -> AutoIdFlag -> Bool
/= :: AutoIdFlag -> AutoIdFlag -> Bool
Eq)

-- | Which data to transfer
data TransferFlag = DataAndMetadata | MetadataOnly

-- | Where to store the data
data StorageFlag = Filesystem | Database | FilesystemAndDatabase

convertSendFlags :: TransferFlag -> StorageFlag -> Word64
convertSendFlags :: TransferFlag -> StorageFlag -> Word64
convertSendFlags TransferFlag
tf StorageFlag
sf = TransferFlag -> Word64
convertTransferFlag TransferFlag
tf Word64 -> Word64 -> Word64
forall a. Bits a => a -> a -> a
.|. StorageFlag -> Word64
convertStorageFlag StorageFlag
sf
  where
    convertTransferFlag :: TransferFlag -> Word64
    convertTransferFlag :: TransferFlag -> Word64
convertTransferFlag TransferFlag
DataAndMetadata = AsapoBool -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral AsapoBool
kTransferData
    convertTransferFlag TransferFlag
MetadataOnly = AsapoBool -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral AsapoBool
kTransferMetaDataOnly
    convertStorageFlag :: StorageFlag -> Word64
    convertStorageFlag :: StorageFlag -> Word64
convertStorageFlag StorageFlag
Filesystem = AsapoBool -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral AsapoBool
kStoreInFilesystem
    convertStorageFlag StorageFlag
Database = AsapoBool -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral AsapoBool
kStoreInDatabase
    convertStorageFlag StorageFlag
FilesystemAndDatabase = AsapoBool -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral AsapoBool
kStoreInDatabase Word64 -> Word64 -> Word64
forall a. Bits a => a -> a -> a
.|. AsapoBool -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral AsapoBool
kStoreInFilesystem

-- | Internal function to create a message header handle, which gets used to send data.
withMessageHeaderHandle ::
  MessageId ->
  FileName ->
  Metadata ->
  DatasetSubstream ->
  DatasetSize ->
  AutoIdFlag ->
  Int ->
  (AsapoMessageHeaderHandle -> IO b) ->
  IO b
withMessageHeaderHandle :: forall b.
MessageId
-> FileName
-> Metadata
-> DatasetSubstream
-> DatasetSize
-> AutoIdFlag
-> Int
-> (AsapoMessageHeaderHandle -> IO b)
-> IO b
withMessageHeaderHandle
  (MessageId Word64
messageId)
  (FileName Text
fileName)
  (Metadata Text
metadata)
  (DatasetSubstream Int
datasetSubstream)
  (DatasetSize Int
datasetSize)
  AutoIdFlag
autoIdFlag
  Int
dataSize = IO AsapoMessageHeaderHandle
-> (AsapoMessageHeaderHandle -> IO ())
-> (AsapoMessageHeaderHandle -> IO b)
-> IO b
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO AsapoMessageHeaderHandle
init AsapoMessageHeaderHandle -> IO ()
destroy
    where
      init :: IO AsapoMessageHeaderHandle
      init :: IO AsapoMessageHeaderHandle
init = Text
-> (ConstCString -> IO AsapoMessageHeaderHandle)
-> IO AsapoMessageHeaderHandle
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
fileName \ConstCString
fileNameC -> Text
-> (ConstCString -> IO AsapoMessageHeaderHandle)
-> IO AsapoMessageHeaderHandle
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
metadata \ConstCString
metadataC ->
        Word64
-> Word64
-> ConstCString
-> ConstCString
-> Word64
-> Word64
-> AsapoBool
-> IO AsapoMessageHeaderHandle
asapo_create_message_header
          Word64
messageId
          (Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
dataSize)
          ConstCString
fileNameC
          ConstCString
metadataC
          (Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
datasetSubstream)
          (Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
datasetSize)
          (if AutoIdFlag
autoIdFlag AutoIdFlag -> AutoIdFlag -> Bool
forall a. Eq a => a -> a -> Bool
== AutoIdFlag
UseAutoId then AsapoBool
1 else AsapoBool
0)
      destroy :: AsapoMessageHeaderHandle -> IO ()
destroy = AsapoMessageHeaderHandle -> IO ()
asapo_free_message_header_handle

data Opcode
  = OpcodeUnknownOp
  | OpcodeTransferData
  | OpcodeTransferDatasetData
  | OpcodeStreamInfo
  | OpcodeLastStream
  | OpcodeGetBufferData
  | OpcodeAuthorize
  | OpcodeTransferMetaData
  | OpcodeDeleteStream
  | OpcodeGetMeta
  | OpcodeCount
  | OpcodePersistStream

convertOpcode :: AsapoOpcode -> Opcode
convertOpcode :: AsapoBool -> Opcode
convertOpcode AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kOpcodeUnknownOp = Opcode
OpcodeUnknownOp
convertOpcode AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kOpcodeTransferData = Opcode
OpcodeTransferData
convertOpcode AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kOpcodeTransferDatasetData = Opcode
OpcodeTransferDatasetData
convertOpcode AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kOpcodeStreamInfo = Opcode
OpcodeStreamInfo
convertOpcode AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kOpcodeLastStream = Opcode
OpcodeLastStream
convertOpcode AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kOpcodeGetBufferData = Opcode
OpcodeGetBufferData
convertOpcode AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kOpcodeAuthorize = Opcode
OpcodeAuthorize
convertOpcode AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kOpcodeTransferMetaData = Opcode
OpcodeTransferMetaData
convertOpcode AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kOpcodeDeleteStream = Opcode
OpcodeDeleteStream
convertOpcode AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kOpcodeGetMeta = Opcode
OpcodeGetMeta
convertOpcode AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kOpcodeCount = Opcode
OpcodeCount
convertOpcode AsapoBool
_ = Opcode
OpcodePersistStream

-- | Information about the send request, to be used in the ASAP:O send callback
data GenericRequestHeader = GenericRequestHeader
  { GenericRequestHeader -> Opcode
genericRequestHeaderOpCode :: Opcode,
    GenericRequestHeader -> Int
genericRequestHeaderDataId :: Int,
    GenericRequestHeader -> Int
genericRequestHeaderDataSize :: Int,
    GenericRequestHeader -> Int
genericRequestHeaderMetaSize :: Int,
    GenericRequestHeader -> [Int]
genericRequestHeaderCustomData :: [Int],
    GenericRequestHeader -> ByteString
genericRequestHeaderMessage :: BS.ByteString,
    GenericRequestHeader -> Text
genericRequestHeaderStream :: Text,
    GenericRequestHeader -> Text
genericRequestHeaderApiVersion :: Text
  }

convertRequestHeader :: AsapoGenericRequestHeader -> IO GenericRequestHeader
convertRequestHeader :: AsapoGenericRequestHeader -> IO GenericRequestHeader
convertRequestHeader (AsapoGenericRequestHeader AsapoBool
opcode Word64
dataId Word64
dataSize Word64
metaSize [Word64]
customData CString
message CString
stream CString
apiVersion) = do
  Text
streamText <- CString -> IO Text
peekCStringText CString
stream
  Text
apiVersionText <- CString -> IO Text
peekCStringText CString
apiVersion
  ByteString
messageAsBs <- CStringLen -> IO ByteString
BS.packCStringLen (CString
message, Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
dataSize)
  let customData' :: [Int]
      customData' :: [Int]
customData' = Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word64 -> Int) -> [Word64] -> [Int]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Word64]
customData
  GenericRequestHeader -> IO GenericRequestHeader
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    ( Opcode
-> Int
-> Int
-> Int
-> [Int]
-> ByteString
-> Text
-> Text
-> GenericRequestHeader
GenericRequestHeader
        (AsapoBool -> Opcode
convertOpcode AsapoBool
opcode)
        (Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
dataId)
        (Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
dataSize)
        (Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
metaSize)
        [Int]
customData'
        ByteString
messageAsBs
        Text
streamText
        Text
apiVersionText
    )

-- | Information about the request and its response, to be used in the ASAP:O send callback
data RequestResponse = RequestResponse
  { RequestResponse -> Text
responsePayload :: Text,
    RequestResponse -> GenericRequestHeader
responseOriginalRequestHeader :: GenericRequestHeader,
    RequestResponse -> Maybe Error
responseError :: Maybe Error
  }

sendRequestCallback :: (RequestResponse -> IO ()) -> Ptr () -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ()
sendRequestCallback :: (RequestResponse -> IO ())
-> Ptr ()
-> AsapoRequestCallbackPayloadHandle
-> AsapoErrorHandle
-> IO ()
sendRequestCallback RequestResponse -> IO ()
simpleCallback Ptr ()
_data AsapoRequestCallbackPayloadHandle
payloadHandle AsapoErrorHandle
errorHandle = do
  Text
payloadText <- IO AsapoStringHandle
-> (AsapoStringHandle -> IO ())
-> (AsapoStringHandle -> IO Text)
-> IO Text
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (AsapoRequestCallbackPayloadHandle -> IO AsapoStringHandle
asapo_request_callback_payload_get_response AsapoRequestCallbackPayloadHandle
payloadHandle) AsapoStringHandle -> IO ()
asapo_free_string_handle AsapoStringHandle -> IO Text
stringHandleToTextUnsafe
  ConstPtr AsapoGenericRequestHeader
originalHeaderCPtr <- AsapoRequestCallbackPayloadHandle
-> IO (ConstPtr AsapoGenericRequestHeader)
asapo_request_callback_payload_get_original_header AsapoRequestCallbackPayloadHandle
payloadHandle
  AsapoGenericRequestHeader
originalHeaderC <- Ptr AsapoGenericRequestHeader -> IO AsapoGenericRequestHeader
forall a. Storable a => Ptr a -> IO a
peek (ConstPtr AsapoGenericRequestHeader -> Ptr AsapoGenericRequestHeader
forall a. ConstPtr a -> Ptr a
unConstPtr ConstPtr AsapoGenericRequestHeader
originalHeaderCPtr)
  GenericRequestHeader
originalHeader <- AsapoGenericRequestHeader -> IO GenericRequestHeader
convertRequestHeader AsapoGenericRequestHeader
originalHeaderC
  Either Error ()
errorHandle' <- AsapoErrorHandle -> () -> IO (Either Error ())
forall b. AsapoErrorHandle -> b -> IO (Either Error b)
checkErrorWithGivenHandle AsapoErrorHandle
errorHandle ()
  case Either Error ()
errorHandle' of
    Left Error
e -> RequestResponse -> IO ()
simpleCallback (Text -> GenericRequestHeader -> Maybe Error -> RequestResponse
RequestResponse Text
payloadText GenericRequestHeader
originalHeader (Error -> Maybe Error
forall a. a -> Maybe a
Just Error
e))
    Either Error ()
_ -> RequestResponse -> IO ()
simpleCallback (Text -> GenericRequestHeader -> Maybe Error -> RequestResponse
RequestResponse Text
payloadText GenericRequestHeader
originalHeader Maybe Error
forall a. Maybe a
Nothing)

-- | Send a message containing raw data. Due to newtype and enum usage, all parameter should be self-explanatory
send ::
  Producer ->
  MessageId ->
  FileName ->
  Metadata ->
  DatasetSubstream ->
  DatasetSize ->
  AutoIdFlag ->
  BS.ByteString ->
  TransferFlag ->
  StorageFlag ->
  StreamName ->
  (RequestResponse -> IO ()) ->
  IO (Either Error Int)
send :: Producer
-> MessageId
-> FileName
-> Metadata
-> DatasetSubstream
-> DatasetSize
-> AutoIdFlag
-> ByteString
-> TransferFlag
-> StorageFlag
-> StreamName
-> (RequestResponse -> IO ())
-> IO (Either Error Int)
send (Producer AsapoProducerHandle
producer) MessageId
messageId FileName
fileName Metadata
metadata DatasetSubstream
datasetSubstream DatasetSize
datasetSize AutoIdFlag
autoIdFlag ByteString
data' TransferFlag
transferFlag StorageFlag
storageFlag (StreamName Text
stream) RequestResponse -> IO ()
callback =
  MessageId
-> FileName
-> Metadata
-> DatasetSubstream
-> DatasetSize
-> AutoIdFlag
-> Int
-> (AsapoMessageHeaderHandle -> IO (Either Error Int))
-> IO (Either Error Int)
forall b.
MessageId
-> FileName
-> Metadata
-> DatasetSubstream
-> DatasetSize
-> AutoIdFlag
-> Int
-> (AsapoMessageHeaderHandle -> IO b)
-> IO b
withMessageHeaderHandle
    MessageId
messageId
    FileName
fileName
    Metadata
metadata
    DatasetSubstream
datasetSubstream
    DatasetSize
datasetSize
    AutoIdFlag
autoIdFlag
    (ByteString -> Int
BS.length ByteString
data')
    \AsapoMessageHeaderHandle
messageHeaderHandle ->
      ByteString
-> (CString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. ByteString -> (CString -> IO a) -> IO a
unsafeUseAsCString ByteString
data' \CString
data'' -> Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
stream \ConstCString
streamC -> do
        FunPtr
  (Ptr ()
   -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
requestCallback <- (Ptr ()
 -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
-> IO
     (FunPtr
        (Ptr ()
         -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ()))
createRequestCallback ((RequestResponse -> IO ())
-> Ptr ()
-> AsapoRequestCallbackPayloadHandle
-> AsapoErrorHandle
-> IO ()
sendRequestCallback RequestResponse -> IO ()
callback)
        ( AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral
            (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
          )
          (Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError
            ( AsapoProducerHandle
-> AsapoMessageHeaderHandle
-> Ptr ()
-> Word64
-> ConstCString
-> FunPtr
     (Ptr ()
      -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_producer_send
                AsapoProducerHandle
producer
                AsapoMessageHeaderHandle
messageHeaderHandle
                (CString -> Ptr ()
forall a b. Ptr a -> Ptr b
castPtr CString
data'')
                (TransferFlag -> StorageFlag -> Word64
convertSendFlags TransferFlag
transferFlag StorageFlag
storageFlag)
                ConstCString
streamC
                FunPtr
  (Ptr ()
   -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
requestCallback
            )

-- | Send a message containing a file. Due to newtype and enum usage, all parameter should be self-explanatory
sendFile ::
  Producer ->
  MessageId ->
  -- | File name to put into the message header
  FileName ->
  Metadata ->
  DatasetSubstream ->
  DatasetSize ->
  AutoIdFlag ->
  -- | Size
  Int ->
  -- | File to actually send
  FileName ->
  TransferFlag ->
  StorageFlag ->
  StreamName ->
  (RequestResponse -> IO ()) ->
  IO (Either Error Int)
sendFile :: Producer
-> MessageId
-> FileName
-> Metadata
-> DatasetSubstream
-> DatasetSize
-> AutoIdFlag
-> Int
-> FileName
-> TransferFlag
-> StorageFlag
-> StreamName
-> (RequestResponse -> IO ())
-> IO (Either Error Int)
sendFile (Producer AsapoProducerHandle
producer) MessageId
messageId FileName
fileName Metadata
meta DatasetSubstream
datasetSubstream DatasetSize
datasetSize AutoIdFlag
autoIdFlag Int
size (FileName Text
fileNameToSend) TransferFlag
transferFlag StorageFlag
storageFlag (StreamName Text
stream) RequestResponse -> IO ()
callback =
  MessageId
-> FileName
-> Metadata
-> DatasetSubstream
-> DatasetSize
-> AutoIdFlag
-> Int
-> (AsapoMessageHeaderHandle -> IO (Either Error Int))
-> IO (Either Error Int)
forall b.
MessageId
-> FileName
-> Metadata
-> DatasetSubstream
-> DatasetSize
-> AutoIdFlag
-> Int
-> (AsapoMessageHeaderHandle -> IO b)
-> IO b
withMessageHeaderHandle
    MessageId
messageId
    FileName
fileName
    Metadata
meta
    DatasetSubstream
datasetSubstream
    DatasetSize
datasetSize
    AutoIdFlag
autoIdFlag
    Int
size
    \AsapoMessageHeaderHandle
messageHeaderHandle ->
      Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
fileNameToSend \ConstCString
fileNameToSendC -> Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
stream \ConstCString
streamC -> do
        FunPtr
  (Ptr ()
   -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
requestCallback <- (Ptr ()
 -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
-> IO
     (FunPtr
        (Ptr ()
         -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ()))
createRequestCallback ((RequestResponse -> IO ())
-> Ptr ()
-> AsapoRequestCallbackPayloadHandle
-> AsapoErrorHandle
-> IO ()
sendRequestCallback RequestResponse -> IO ()
callback)
        ( AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral
            (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
          )
          (Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError
            ( AsapoProducerHandle
-> AsapoMessageHeaderHandle
-> ConstCString
-> Word64
-> ConstCString
-> FunPtr
     (Ptr ()
      -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_producer_send_file
                AsapoProducerHandle
producer
                AsapoMessageHeaderHandle
messageHeaderHandle
                ConstCString
fileNameToSendC
                (TransferFlag -> StorageFlag -> Word64
convertSendFlags TransferFlag
transferFlag StorageFlag
storageFlag)
                ConstCString
streamC
                FunPtr
  (Ptr ()
   -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
requestCallback
            )

-- | As the title says, send the "stream finished" flag
sendStreamFinishedFlag :: Producer -> StreamName -> MessageId -> StreamName -> (RequestResponse -> IO ()) -> IO (Either Error Int)
sendStreamFinishedFlag :: Producer
-> StreamName
-> MessageId
-> StreamName
-> (RequestResponse -> IO ())
-> IO (Either Error Int)
sendStreamFinishedFlag (Producer AsapoProducerHandle
producer) (StreamName Text
stream) (MessageId Word64
lastId) (StreamName Text
nextStream) RequestResponse -> IO ()
callback = do
  FunPtr
  (Ptr ()
   -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
requestCallback <- (Ptr ()
 -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
-> IO
     (FunPtr
        (Ptr ()
         -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ()))
createRequestCallback ((RequestResponse -> IO ())
-> Ptr ()
-> AsapoRequestCallbackPayloadHandle
-> AsapoErrorHandle
-> IO ()
sendRequestCallback RequestResponse -> IO ()
callback)
  Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
stream \ConstCString
streamC -> Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
nextStream \ConstCString
nextStreamC ->
    (AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>)
      (Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError
        ( AsapoProducerHandle
-> ConstCString
-> Word64
-> ConstCString
-> FunPtr
     (Ptr ()
      -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_producer_send_stream_finished_flag
            AsapoProducerHandle
producer
            ConstCString
streamC
            Word64
lastId
            ConstCString
nextStreamC
            FunPtr
  (Ptr ()
   -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
requestCallback
        )

data MetadataIngestMode = Insert | Replace | Update

data UpsertMode = UseUpsert | NoUpsert

-- | Send or extend beamtime metadata
sendBeamtimeMetadata :: Producer -> Metadata -> MetadataIngestMode -> UpsertMode -> (RequestResponse -> IO ()) -> IO (Either Error Int)
sendBeamtimeMetadata :: Producer
-> Metadata
-> MetadataIngestMode
-> UpsertMode
-> (RequestResponse -> IO ())
-> IO (Either Error Int)
sendBeamtimeMetadata (Producer AsapoProducerHandle
producer) (Metadata Text
metadata) MetadataIngestMode
ingestMode UpsertMode
upsertMode RequestResponse -> IO ()
callback = do
  FunPtr
  (Ptr ()
   -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
requestCallback <- (Ptr ()
 -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
-> IO
     (FunPtr
        (Ptr ()
         -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ()))
createRequestCallback ((RequestResponse -> IO ())
-> Ptr ()
-> AsapoRequestCallbackPayloadHandle
-> AsapoErrorHandle
-> IO ()
sendRequestCallback RequestResponse -> IO ()
callback)
  (AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>)
    (Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text
-> (ConstCString -> IO (Either Error AsapoBool))
-> IO (Either Error AsapoBool)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
metadata \ConstCString
metadataC ->
      (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError
        ( AsapoProducerHandle
-> ConstCString
-> AsapoBool
-> AsapoBool
-> FunPtr
     (Ptr ()
      -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_producer_send_beamtime_metadata
            AsapoProducerHandle
producer
            ConstCString
metadataC
            ( case MetadataIngestMode
ingestMode of
                MetadataIngestMode
Insert -> AsapoBool
kInsert
                MetadataIngestMode
Replace -> AsapoBool
kReplace
                MetadataIngestMode
Update -> AsapoBool
kUpdate
            )
            ( case UpsertMode
upsertMode of
                UpsertMode
UseUpsert -> AsapoBool
1
                UpsertMode
_ -> AsapoBool
0
            )
            FunPtr
  (Ptr ()
   -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
requestCallback
        )

-- | Send or extend stream metadata
sendStreamMetadata :: Producer -> Metadata -> MetadataIngestMode -> UpsertMode -> StreamName -> (RequestResponse -> IO ()) -> IO (Either Error Int)
sendStreamMetadata :: Producer
-> Metadata
-> MetadataIngestMode
-> UpsertMode
-> StreamName
-> (RequestResponse -> IO ())
-> IO (Either Error Int)
sendStreamMetadata (Producer AsapoProducerHandle
producer) (Metadata Text
metadata) MetadataIngestMode
ingestMode UpsertMode
upsertMode (StreamName Text
stream) RequestResponse -> IO ()
callback = do
  FunPtr
  (Ptr ()
   -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
requestCallback <- (Ptr ()
 -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
-> IO
     (FunPtr
        (Ptr ()
         -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ()))
createRequestCallback ((RequestResponse -> IO ())
-> Ptr ()
-> AsapoRequestCallbackPayloadHandle
-> AsapoErrorHandle
-> IO ()
sendRequestCallback RequestResponse -> IO ()
callback)
  (AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>)
    (Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text
-> (ConstCString -> IO (Either Error AsapoBool))
-> IO (Either Error AsapoBool)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
metadata \ConstCString
metadataC -> Text
-> (ConstCString -> IO (Either Error AsapoBool))
-> IO (Either Error AsapoBool)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
stream \ConstCString
streamC ->
      (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError
        ( AsapoProducerHandle
-> ConstCString
-> AsapoBool
-> AsapoBool
-> ConstCString
-> FunPtr
     (Ptr ()
      -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_producer_send_stream_metadata
            AsapoProducerHandle
producer
            ConstCString
metadataC
            ( case MetadataIngestMode
ingestMode of
                MetadataIngestMode
Insert -> AsapoBool
kInsert
                MetadataIngestMode
Replace -> AsapoBool
kReplace
                MetadataIngestMode
Update -> AsapoBool
kUpdate
            )
            ( case UpsertMode
upsertMode of
                UpsertMode
UseUpsert -> AsapoBool
1
                UpsertMode
_ -> AsapoBool
0
            )
            ConstCString
streamC
            FunPtr
  (Ptr ()
   -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
requestCallback
        )

data LogLevel
  = LogNone
  | LogError
  | LogInfo
  | LogDebug
  | LogWarning
  deriving (LogLevel -> LogLevel -> Bool
(LogLevel -> LogLevel -> Bool)
-> (LogLevel -> LogLevel -> Bool) -> Eq LogLevel
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: LogLevel -> LogLevel -> Bool
== :: LogLevel -> LogLevel -> Bool
$c/= :: LogLevel -> LogLevel -> Bool
/= :: LogLevel -> LogLevel -> Bool
Eq)

convertLogLevel :: LogLevel -> AsapoLogLevel
convertLogLevel :: LogLevel -> AsapoBool
convertLogLevel LogLevel
x | LogLevel
x LogLevel -> LogLevel -> Bool
forall a. Eq a => a -> a -> Bool
== LogLevel
LogNone = AsapoBool
asapoLogLevelNone
convertLogLevel LogLevel
x | LogLevel
x LogLevel -> LogLevel -> Bool
forall a. Eq a => a -> a -> Bool
== LogLevel
LogError = AsapoBool
asapoLogLevelError
convertLogLevel LogLevel
x | LogLevel
x LogLevel -> LogLevel -> Bool
forall a. Eq a => a -> a -> Bool
== LogLevel
LogInfo = AsapoBool
asapoLogLevelInfo
convertLogLevel LogLevel
x | LogLevel
x LogLevel -> LogLevel -> Bool
forall a. Eq a => a -> a -> Bool
== LogLevel
LogDebug = AsapoBool
asapoLogLevelDebug
convertLogLevel LogLevel
_ = AsapoBool
asapoLogLevelWarning

-- | Set the log level
setLogLevel :: Producer -> LogLevel -> IO ()
setLogLevel :: Producer -> LogLevel -> IO ()
setLogLevel (Producer AsapoProducerHandle
producer) LogLevel
logLevel = AsapoProducerHandle -> AsapoBool -> IO ()
asapo_producer_set_log_level AsapoProducerHandle
producer (LogLevel -> AsapoBool
convertLogLevel LogLevel
logLevel)

-- | Enable/Disable logging to stdout
enableLocalLog :: Producer -> Bool -> IO ()
enableLocalLog :: Producer -> Bool -> IO ()
enableLocalLog (Producer AsapoProducerHandle
producer) Bool
enable = AsapoProducerHandle -> AsapoBool -> IO ()
asapo_producer_enable_local_log AsapoProducerHandle
producer (if Bool
enable then AsapoBool
1 else AsapoBool
0)

-- | Enable/Disable logging to the central server
enableRemoteLog :: Producer -> Bool -> IO ()
enableRemoteLog :: Producer -> Bool -> IO ()
enableRemoteLog (Producer AsapoProducerHandle
producer) Bool
enable = AsapoProducerHandle -> AsapoBool -> IO ()
asapo_producer_enable_remote_log AsapoProducerHandle
producer (if Bool
enable then AsapoBool
1 else AsapoBool
0)

-- | Set a different set of credentials
setCredentials :: Producer -> SourceCredentials -> IO (Either Error Int)
setCredentials :: Producer -> SourceCredentials -> IO (Either Error Int)
setCredentials (Producer AsapoProducerHandle
producer) SourceCredentials
credentials = SourceCredentials
-> (AsapoSourceCredentialsHandle -> IO (Either Error Int))
-> IO (Either Error Int)
forall a.
SourceCredentials -> (AsapoSourceCredentialsHandle -> IO a) -> IO a
withCredentials SourceCredentials
credentials \AsapoSourceCredentialsHandle
credentialsHandle ->
  (AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoProducerHandle
-> AsapoSourceCredentialsHandle
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_producer_set_credentials AsapoProducerHandle
producer AsapoSourceCredentialsHandle
credentialsHandle)

-- | Get current size of the requests queue (number of requests pending/being processed)
getRequestsQueueSize :: Producer -> IO Int
getRequestsQueueSize :: Producer -> IO Int
getRequestsQueueSize (Producer AsapoProducerHandle
producer) = Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word64 -> Int) -> IO Word64 -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AsapoProducerHandle -> IO Word64
asapo_producer_get_requests_queue_size AsapoProducerHandle
producer

-- | Get current volume of the requests queue (total memory of occupied by pending/being processed requests)
getRequestsQueueVolumeMb :: Producer -> IO Int
getRequestsQueueVolumeMb :: Producer -> IO Int
getRequestsQueueVolumeMb (Producer AsapoProducerHandle
producer) = Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word64 -> Int) -> IO Word64 -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AsapoProducerHandle -> IO Word64
asapo_producer_get_requests_queue_volume_mb AsapoProducerHandle
producer

-- | Set maximum size of the requests queue
setRequestsQueueLimits ::
  Producer ->
  -- | Size (0 for unlimited)
  Int ->
  -- | Volume (in MiB; 0 for unlimited)
  Int ->
  IO ()
setRequestsQueueLimits :: Producer -> Int -> Int -> IO ()
setRequestsQueueLimits (Producer AsapoProducerHandle
producer) Int
size Int
volume =
  AsapoProducerHandle -> Word64 -> Word64 -> IO ()
asapo_producer_set_requests_queue_limits AsapoProducerHandle
producer (Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
size) (Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
volume)

-- | Wait for all outstanding requests to finish
waitRequestsFinished :: Producer -> NominalDiffTime -> IO (Either Error Int)
waitRequestsFinished :: Producer -> NominalDiffTime -> IO (Either Error Int)
waitRequestsFinished (Producer AsapoProducerHandle
producer) NominalDiffTime
timeout = do
  (AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoProducerHandle
-> Word64 -> Ptr AsapoErrorHandle -> IO AsapoBool
asapo_producer_wait_requests_finished AsapoProducerHandle
producer (NominalDiffTime -> Word64
forall a. Integral a => NominalDiffTime -> a
nominalDiffToMillis NominalDiffTime
timeout))