{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Asapo.Either.Producer
( Endpoint (..),
ProcessingThreads (..),
RequestHandlerType (..),
Error (..),
Metadata (..),
SourceCredentials,
DeletionFlags (..),
Producer,
LogLevel (..),
FileName (..),
DatasetSubstream (..),
DatasetSize (..),
VersionInfo (..),
UpsertMode (..),
MetadataIngestMode (..),
AutoIdFlag (..),
TransferFlag (..),
StorageFlag (..),
RequestResponse (..),
Opcode (..),
GenericRequestHeader (..),
getRequestsQueueSize,
getRequestsQueueVolumeMb,
setRequestsQueueLimits,
checkError,
checkErrorWithGivenHandle,
withProducer,
enableLocalLog,
waitRequestsFinished,
getVersionInfo,
getStreamInfo,
getStreamMeta,
getBeamtimeMeta,
deleteStream,
getLastStream,
send,
sendFile,
sendStreamFinishedFlag,
sendBeamtimeMetadata,
sendStreamMetadata,
setLogLevel,
enableRemoteLog,
setCredentials,
)
where
import Asapo.Either.Common (MessageId (MessageId), SourceCredentials, StreamInfo, StreamName (StreamName), nominalDiffToMillis, peekCStringText, retrieveStreamInfoFromC, stringHandleToText, stringHandleToTextUnsafe, withCStringNToText, withConstText, withCredentials, withPtr, withText)
import Asapo.Raw.Common (AsapoErrorHandle, AsapoStreamInfoHandle, AsapoStringHandle, asapo_error_explain, asapo_free_error_handle, asapo_free_stream_info_handle, asapo_free_string_handle, asapo_is_error, asapo_new_error_handle, asapo_new_string_handle)
import Asapo.Raw.Producer
( AsapoGenericRequestHeader (AsapoGenericRequestHeader),
AsapoLogLevel,
AsapoMessageHeaderHandle,
AsapoOpcode,
AsapoProducerHandle,
AsapoRequestCallbackPayloadHandle,
asapoLogLevelDebug,
asapoLogLevelError,
asapoLogLevelInfo,
asapoLogLevelNone,
asapoLogLevelWarning,
asapo_create_message_header,
asapo_create_producer,
asapo_free_message_header_handle,
asapo_free_producer_handle,
asapo_producer_delete_stream,
asapo_producer_enable_local_log,
asapo_producer_enable_remote_log,
asapo_producer_get_beamtime_meta,
asapo_producer_get_last_stream,
asapo_producer_get_requests_queue_size,
asapo_producer_get_requests_queue_volume_mb,
asapo_producer_get_stream_info,
asapo_producer_get_stream_meta,
asapo_producer_get_version_info,
asapo_producer_send,
asapo_producer_send_beamtime_metadata,
asapo_producer_send_file,
asapo_producer_send_stream_finished_flag,
asapo_producer_send_stream_metadata,
asapo_producer_set_credentials,
asapo_producer_set_log_level,
asapo_producer_set_requests_queue_limits,
asapo_producer_wait_requests_finished,
asapo_request_callback_payload_get_original_header,
asapo_request_callback_payload_get_response,
createRequestCallback,
kFilesystem,
kInsert,
kOpcodeAuthorize,
kOpcodeCount,
kOpcodeDeleteStream,
kOpcodeGetBufferData,
kOpcodeGetMeta,
kOpcodeLastStream,
kOpcodeStreamInfo,
kOpcodeTransferData,
kOpcodeTransferDatasetData,
kOpcodeTransferMetaData,
kOpcodeUnknownOp,
kReplace,
kStoreInDatabase,
kStoreInFilesystem,
kTcp,
kTransferData,
kTransferMetaDataOnly,
kUpdate,
)
import Control.Applicative (Applicative (pure))
import Control.Exception (bracket)
import Data.Bits ((.|.))
import Data.Bool (Bool)
import qualified Data.ByteString as BS
import Data.ByteString.Unsafe (unsafeUseAsCString)
import Data.Either (Either (Left, Right))
import Data.Eq (Eq ((==)))
import Data.Foldable (Foldable (elem))
import Data.Functor ((<$>))
import Data.Int (Int)
import Data.Maybe (Maybe (Just, Nothing))
import Data.Ord ((>))
import Data.Text (Text)
import Data.Time (NominalDiffTime)
import Data.Word (Word64)
import Foreign (Storable (peek), alloca, castPtr)
import Foreign.C.ConstPtr (ConstPtr (unConstPtr))
import Foreign.Ptr (Ptr)
import System.IO (IO)
import Text.Show (Show)
import Prelude (fromIntegral)
newtype Error = Error Text deriving (Int -> Error -> ShowS
[Error] -> ShowS
Error -> String
(Int -> Error -> ShowS)
-> (Error -> String) -> ([Error] -> ShowS) -> Show Error
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Error -> ShowS
showsPrec :: Int -> Error -> ShowS
$cshow :: Error -> String
show :: Error -> String
$cshowList :: [Error] -> ShowS
showList :: [Error] -> ShowS
Show)
newtype Endpoint = Endpoint Text
newtype ProcessingThreads = ProcessingThreads Int
data RequestHandlerType = TcpHandler | FilesystemHandler
newtype Producer = Producer AsapoProducerHandle
checkErrorWithGivenHandle :: AsapoErrorHandle -> b -> IO (Either Error b)
checkErrorWithGivenHandle :: forall b. AsapoErrorHandle -> b -> IO (Either Error b)
checkErrorWithGivenHandle AsapoErrorHandle
errorHandle b
result = do
AsapoBool
isError <- AsapoErrorHandle -> IO AsapoBool
asapo_is_error AsapoErrorHandle
errorHandle
if AsapoBool
isError AsapoBool -> AsapoBool -> Bool
forall a. Ord a => a -> a -> Bool
> AsapoBool
0
then do
let explanationLength :: Int
explanationLength = Int
1024
Text
explanation <- Int -> (CString -> IO ()) -> IO Text
withCStringNToText Int
explanationLength \CString
explanationPtr ->
AsapoErrorHandle -> CString -> CSize -> IO ()
asapo_error_explain
AsapoErrorHandle
errorHandle
CString
explanationPtr
(Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
explanationLength)
Either Error b -> IO (Either Error b)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Error -> Either Error b
forall a b. a -> Either a b
Left (Text -> Error
Error Text
explanation))
else Either Error b -> IO (Either Error b)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> Either Error b
forall a b. b -> Either a b
Right b
result)
withErrorHandle :: (AsapoErrorHandle -> IO c) -> IO c
withErrorHandle :: forall c. (AsapoErrorHandle -> IO c) -> IO c
withErrorHandle = IO AsapoErrorHandle
-> (AsapoErrorHandle -> IO ())
-> (AsapoErrorHandle -> IO c)
-> IO c
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO AsapoErrorHandle
asapo_new_error_handle AsapoErrorHandle -> IO ()
asapo_free_error_handle
checkError :: (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError :: forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError Ptr AsapoErrorHandle -> IO b
f = do
(AsapoErrorHandle -> IO (Either Error b)) -> IO (Either Error b)
forall c. (AsapoErrorHandle -> IO c) -> IO c
withErrorHandle \AsapoErrorHandle
errorHandle -> do
(AsapoErrorHandle
errorHandlePtr, b
result) <- AsapoErrorHandle
-> (Ptr AsapoErrorHandle -> IO b) -> IO (AsapoErrorHandle, b)
forall a b. Storable a => a -> (Ptr a -> IO b) -> IO (a, b)
withPtr AsapoErrorHandle
errorHandle Ptr AsapoErrorHandle -> IO b
f
AsapoErrorHandle -> b -> IO (Either Error b)
forall b. AsapoErrorHandle -> b -> IO (Either Error b)
checkErrorWithGivenHandle AsapoErrorHandle
errorHandlePtr b
result
create :: Endpoint -> ProcessingThreads -> RequestHandlerType -> SourceCredentials -> NominalDiffTime -> IO (Either Error AsapoProducerHandle)
create :: Endpoint
-> ProcessingThreads
-> RequestHandlerType
-> SourceCredentials
-> NominalDiffTime
-> IO (Either Error AsapoProducerHandle)
create (Endpoint Text
endpoint) (ProcessingThreads Int
processingThreads) RequestHandlerType
handlerType SourceCredentials
sourceCredentials NominalDiffTime
timeout = do
SourceCredentials
-> (AsapoSourceCredentialsHandle
-> IO (Either Error AsapoProducerHandle))
-> IO (Either Error AsapoProducerHandle)
forall a.
SourceCredentials -> (AsapoSourceCredentialsHandle -> IO a) -> IO a
withCredentials SourceCredentials
sourceCredentials \AsapoSourceCredentialsHandle
credentials' ->
let convertHandlerType :: RequestHandlerType -> AsapoBool
convertHandlerType RequestHandlerType
TcpHandler = AsapoBool
kTcp
convertHandlerType RequestHandlerType
FilesystemHandler = AsapoBool
kFilesystem
in do
Text
-> (CString -> IO (Either Error AsapoProducerHandle))
-> IO (Either Error AsapoProducerHandle)
forall a. Text -> (CString -> IO a) -> IO a
withText Text
endpoint \CString
endpoint' -> do
(Ptr AsapoErrorHandle -> IO AsapoProducerHandle)
-> IO (Either Error AsapoProducerHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError
( CString
-> CUChar
-> AsapoBool
-> AsapoSourceCredentialsHandle
-> Word64
-> Ptr AsapoErrorHandle
-> IO AsapoProducerHandle
asapo_create_producer
CString
endpoint'
(Int -> CUChar
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
processingThreads)
(RequestHandlerType -> AsapoBool
convertHandlerType RequestHandlerType
handlerType)
AsapoSourceCredentialsHandle
credentials'
(NominalDiffTime -> Word64
forall a. Integral a => NominalDiffTime -> a
nominalDiffToMillis NominalDiffTime
timeout)
)
withProducer ::
forall a.
Endpoint ->
ProcessingThreads ->
RequestHandlerType ->
SourceCredentials ->
NominalDiffTime ->
(Error -> IO a) ->
(Producer -> IO a) ->
IO a
withProducer :: forall a.
Endpoint
-> ProcessingThreads
-> RequestHandlerType
-> SourceCredentials
-> NominalDiffTime
-> (Error -> IO a)
-> (Producer -> IO a)
-> IO a
withProducer Endpoint
endpoint ProcessingThreads
processingThreads RequestHandlerType
handlerType SourceCredentials
sourceCredentials NominalDiffTime
timeout Error -> IO a
onError Producer -> IO a
onSuccess = IO (Either Error AsapoProducerHandle)
-> (Either Error AsapoProducerHandle -> IO ())
-> (Either Error AsapoProducerHandle -> IO a)
-> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (Endpoint
-> ProcessingThreads
-> RequestHandlerType
-> SourceCredentials
-> NominalDiffTime
-> IO (Either Error AsapoProducerHandle)
create Endpoint
endpoint ProcessingThreads
processingThreads RequestHandlerType
handlerType SourceCredentials
sourceCredentials NominalDiffTime
timeout) Either Error AsapoProducerHandle -> IO ()
freeProducer Either Error AsapoProducerHandle -> IO a
handle
where
freeProducer :: Either Error AsapoProducerHandle -> IO ()
freeProducer :: Either Error AsapoProducerHandle -> IO ()
freeProducer (Left Error
_) = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
freeProducer (Right AsapoProducerHandle
producerHandle) = AsapoProducerHandle -> IO ()
asapo_free_producer_handle AsapoProducerHandle
producerHandle
handle :: Either Error AsapoProducerHandle -> IO a
handle :: Either Error AsapoProducerHandle -> IO a
handle (Left Error
e) = Error -> IO a
onError Error
e
handle (Right AsapoProducerHandle
v) = Producer -> IO a
onSuccess (AsapoProducerHandle -> Producer
Producer AsapoProducerHandle
v)
withStringHandle :: (AsapoStringHandle -> IO c) -> IO c
withStringHandle :: forall c. (AsapoStringHandle -> IO c) -> IO c
withStringHandle = IO AsapoStringHandle
-> (AsapoStringHandle -> IO ())
-> (AsapoStringHandle -> IO c)
-> IO c
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO AsapoStringHandle
asapo_new_string_handle AsapoStringHandle -> IO ()
asapo_free_string_handle
data VersionInfo = VersionInfo
{ VersionInfo -> Text
versionClient :: Text,
VersionInfo -> Text
versionServer :: Text,
VersionInfo -> Bool
versionSupported :: Bool
}
deriving (Int -> VersionInfo -> ShowS
[VersionInfo] -> ShowS
VersionInfo -> String
(Int -> VersionInfo -> ShowS)
-> (VersionInfo -> String)
-> ([VersionInfo] -> ShowS)
-> Show VersionInfo
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> VersionInfo -> ShowS
showsPrec :: Int -> VersionInfo -> ShowS
$cshow :: VersionInfo -> String
show :: VersionInfo -> String
$cshowList :: [VersionInfo] -> ShowS
showList :: [VersionInfo] -> ShowS
Show)
getVersionInfo :: Producer -> IO (Either Error VersionInfo)
getVersionInfo :: Producer -> IO (Either Error VersionInfo)
getVersionInfo (Producer AsapoProducerHandle
producerHandle) =
(AsapoStringHandle -> IO (Either Error VersionInfo))
-> IO (Either Error VersionInfo)
forall c. (AsapoStringHandle -> IO c) -> IO c
withStringHandle \AsapoStringHandle
clientInfo -> (AsapoStringHandle -> IO (Either Error VersionInfo))
-> IO (Either Error VersionInfo)
forall c. (AsapoStringHandle -> IO c) -> IO c
withStringHandle \AsapoStringHandle
serverInfo -> (Ptr AsapoBool -> IO (Either Error VersionInfo))
-> IO (Either Error VersionInfo)
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca \Ptr AsapoBool
supportedPtr -> do
Either Error AsapoBool
result <- (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoProducerHandle
-> AsapoStringHandle
-> AsapoStringHandle
-> Ptr AsapoBool
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_producer_get_version_info AsapoProducerHandle
producerHandle AsapoStringHandle
clientInfo AsapoStringHandle
serverInfo Ptr AsapoBool
supportedPtr)
case Either Error AsapoBool
result of
Left Error
e -> Either Error VersionInfo -> IO (Either Error VersionInfo)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Error -> Either Error VersionInfo
forall a b. a -> Either a b
Left Error
e)
Right AsapoBool
_integerReturnCode -> do
AsapoBool
supported <- Ptr AsapoBool -> IO AsapoBool
forall a. Storable a => Ptr a -> IO a
peek Ptr AsapoBool
supportedPtr
Text
clientInfo' <- AsapoStringHandle -> IO Text
stringHandleToTextUnsafe AsapoStringHandle
clientInfo
Text
serverInfo' <- AsapoStringHandle -> IO Text
stringHandleToTextUnsafe AsapoStringHandle
serverInfo
Either Error VersionInfo -> IO (Either Error VersionInfo)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (VersionInfo -> Either Error VersionInfo
forall a b. b -> Either a b
Right (Text -> Text -> Bool -> VersionInfo
VersionInfo Text
clientInfo' Text
serverInfo' (AsapoBool
supported AsapoBool -> AsapoBool -> Bool
forall a. Ord a => a -> a -> Bool
> AsapoBool
0)))
getStreamInfo ::
Producer ->
StreamName ->
NominalDiffTime ->
IO (Either Error StreamInfo)
getStreamInfo :: Producer
-> StreamName -> NominalDiffTime -> IO (Either Error StreamInfo)
getStreamInfo (Producer AsapoProducerHandle
producer) (StreamName Text
stream) NominalDiffTime
timeout = IO (Either Error AsapoStreamInfoHandle)
-> (Either Error AsapoStreamInfoHandle -> IO ())
-> (Either Error AsapoStreamInfoHandle
-> IO (Either Error StreamInfo))
-> IO (Either Error StreamInfo)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either Error AsapoStreamInfoHandle)
init Either Error AsapoStreamInfoHandle -> IO ()
destroy Either Error AsapoStreamInfoHandle -> IO (Either Error StreamInfo)
f
where
init :: IO (Either Error AsapoStreamInfoHandle)
init :: IO (Either Error AsapoStreamInfoHandle)
init = Text
-> (ConstCString -> IO (Either Error AsapoStreamInfoHandle))
-> IO (Either Error AsapoStreamInfoHandle)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
stream \ConstCString
streamC -> (Ptr AsapoErrorHandle -> IO AsapoStreamInfoHandle)
-> IO (Either Error AsapoStreamInfoHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoProducerHandle
-> ConstCString
-> Word64
-> Ptr AsapoErrorHandle
-> IO AsapoStreamInfoHandle
asapo_producer_get_stream_info AsapoProducerHandle
producer ConstCString
streamC (NominalDiffTime -> Word64
forall a. Integral a => NominalDiffTime -> a
nominalDiffToMillis NominalDiffTime
timeout))
destroy :: Either Error AsapoStreamInfoHandle -> IO ()
destroy :: Either Error AsapoStreamInfoHandle -> IO ()
destroy (Right AsapoStreamInfoHandle
handle) = AsapoStreamInfoHandle -> IO ()
asapo_free_stream_info_handle AsapoStreamInfoHandle
handle
destroy Either Error AsapoStreamInfoHandle
_ = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
f :: Either Error AsapoStreamInfoHandle -> IO (Either Error StreamInfo)
f :: Either Error AsapoStreamInfoHandle -> IO (Either Error StreamInfo)
f (Left Error
e) = Either Error StreamInfo -> IO (Either Error StreamInfo)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Error -> Either Error StreamInfo
forall a b. a -> Either a b
Left Error
e)
f (Right AsapoStreamInfoHandle
streamInfoHandle) = StreamInfo -> Either Error StreamInfo
forall a b. b -> Either a b
Right (StreamInfo -> Either Error StreamInfo)
-> IO StreamInfo -> IO (Either Error StreamInfo)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AsapoStreamInfoHandle -> IO StreamInfo
retrieveStreamInfoFromC AsapoStreamInfoHandle
streamInfoHandle
getLastStream ::
Producer ->
NominalDiffTime ->
IO (Either Error StreamInfo)
getLastStream :: Producer -> NominalDiffTime -> IO (Either Error StreamInfo)
getLastStream (Producer AsapoProducerHandle
producer) NominalDiffTime
timeout = IO (Either Error AsapoStreamInfoHandle)
-> (Either Error AsapoStreamInfoHandle -> IO ())
-> (Either Error AsapoStreamInfoHandle
-> IO (Either Error StreamInfo))
-> IO (Either Error StreamInfo)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either Error AsapoStreamInfoHandle)
init Either Error AsapoStreamInfoHandle -> IO ()
destroy Either Error AsapoStreamInfoHandle -> IO (Either Error StreamInfo)
f
where
init :: IO (Either Error AsapoStreamInfoHandle)
init :: IO (Either Error AsapoStreamInfoHandle)
init = (Ptr AsapoErrorHandle -> IO AsapoStreamInfoHandle)
-> IO (Either Error AsapoStreamInfoHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoProducerHandle
-> Word64 -> Ptr AsapoErrorHandle -> IO AsapoStreamInfoHandle
asapo_producer_get_last_stream AsapoProducerHandle
producer (NominalDiffTime -> Word64
forall a. Integral a => NominalDiffTime -> a
nominalDiffToMillis NominalDiffTime
timeout))
destroy :: Either Error AsapoStreamInfoHandle -> IO ()
destroy :: Either Error AsapoStreamInfoHandle -> IO ()
destroy (Right AsapoStreamInfoHandle
handle) = AsapoStreamInfoHandle -> IO ()
asapo_free_stream_info_handle AsapoStreamInfoHandle
handle
destroy Either Error AsapoStreamInfoHandle
_ = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
f :: Either Error AsapoStreamInfoHandle -> IO (Either Error StreamInfo)
f :: Either Error AsapoStreamInfoHandle -> IO (Either Error StreamInfo)
f (Left Error
e) = Either Error StreamInfo -> IO (Either Error StreamInfo)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Error -> Either Error StreamInfo
forall a b. a -> Either a b
Left Error
e)
f (Right AsapoStreamInfoHandle
streamInfoHandle) = StreamInfo -> Either Error StreamInfo
forall a b. b -> Either a b
Right (StreamInfo -> Either Error StreamInfo)
-> IO StreamInfo -> IO (Either Error StreamInfo)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AsapoStreamInfoHandle -> IO StreamInfo
retrieveStreamInfoFromC AsapoStreamInfoHandle
streamInfoHandle
getStreamMeta ::
Producer ->
StreamName ->
NominalDiffTime ->
IO (Either Error (Maybe Text))
getStreamMeta :: Producer
-> StreamName -> NominalDiffTime -> IO (Either Error (Maybe Text))
getStreamMeta (Producer AsapoProducerHandle
producer) (StreamName Text
stream) NominalDiffTime
timeout = IO (Either Error AsapoStringHandle)
-> (Either Error AsapoStringHandle -> IO ())
-> (Either Error AsapoStringHandle
-> IO (Either Error (Maybe Text)))
-> IO (Either Error (Maybe Text))
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either Error AsapoStringHandle)
init Either Error AsapoStringHandle -> IO ()
destroy Either Error AsapoStringHandle -> IO (Either Error (Maybe Text))
f
where
init :: IO (Either Error AsapoStringHandle)
init :: IO (Either Error AsapoStringHandle)
init = Text
-> (ConstCString -> IO (Either Error AsapoStringHandle))
-> IO (Either Error AsapoStringHandle)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
stream \ConstCString
streamC -> (Ptr AsapoErrorHandle -> IO AsapoStringHandle)
-> IO (Either Error AsapoStringHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoProducerHandle
-> ConstCString
-> Word64
-> Ptr AsapoErrorHandle
-> IO AsapoStringHandle
asapo_producer_get_stream_meta AsapoProducerHandle
producer ConstCString
streamC (NominalDiffTime -> Word64
forall a. Integral a => NominalDiffTime -> a
nominalDiffToMillis NominalDiffTime
timeout))
destroy :: Either Error AsapoStringHandle -> IO ()
destroy :: Either Error AsapoStringHandle -> IO ()
destroy (Right AsapoStringHandle
handle) = AsapoStringHandle -> IO ()
asapo_free_string_handle AsapoStringHandle
handle
destroy Either Error AsapoStringHandle
_ = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
f :: Either Error AsapoStringHandle -> IO (Either Error (Maybe Text))
f :: Either Error AsapoStringHandle -> IO (Either Error (Maybe Text))
f (Left Error
e) = Either Error (Maybe Text) -> IO (Either Error (Maybe Text))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Error -> Either Error (Maybe Text)
forall a b. a -> Either a b
Left Error
e)
f (Right AsapoStringHandle
string) = Maybe Text -> Either Error (Maybe Text)
forall a b. b -> Either a b
Right (Maybe Text -> Either Error (Maybe Text))
-> IO (Maybe Text) -> IO (Either Error (Maybe Text))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AsapoStringHandle -> IO (Maybe Text)
stringHandleToText AsapoStringHandle
string
getBeamtimeMeta ::
Producer ->
NominalDiffTime ->
IO (Either Error (Maybe Text))
getBeamtimeMeta :: Producer -> NominalDiffTime -> IO (Either Error (Maybe Text))
getBeamtimeMeta (Producer AsapoProducerHandle
producer) NominalDiffTime
timeout = IO (Either Error AsapoStringHandle)
-> (Either Error AsapoStringHandle -> IO ())
-> (Either Error AsapoStringHandle
-> IO (Either Error (Maybe Text)))
-> IO (Either Error (Maybe Text))
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO (Either Error AsapoStringHandle)
init Either Error AsapoStringHandle -> IO ()
destroy Either Error AsapoStringHandle -> IO (Either Error (Maybe Text))
f
where
init :: IO (Either Error AsapoStringHandle)
init :: IO (Either Error AsapoStringHandle)
init = (Ptr AsapoErrorHandle -> IO AsapoStringHandle)
-> IO (Either Error AsapoStringHandle)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoProducerHandle
-> Word64 -> Ptr AsapoErrorHandle -> IO AsapoStringHandle
asapo_producer_get_beamtime_meta AsapoProducerHandle
producer (NominalDiffTime -> Word64
forall a. Integral a => NominalDiffTime -> a
nominalDiffToMillis NominalDiffTime
timeout))
destroy :: Either Error AsapoStringHandle -> IO ()
destroy :: Either Error AsapoStringHandle -> IO ()
destroy (Right AsapoStringHandle
handle) = AsapoStringHandle -> IO ()
asapo_free_string_handle AsapoStringHandle
handle
destroy Either Error AsapoStringHandle
_ = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
f :: Either Error AsapoStringHandle -> IO (Either Error (Maybe Text))
f :: Either Error AsapoStringHandle -> IO (Either Error (Maybe Text))
f (Left Error
e) = Either Error (Maybe Text) -> IO (Either Error (Maybe Text))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Error -> Either Error (Maybe Text)
forall a b. a -> Either a b
Left Error
e)
f (Right AsapoStringHandle
string) = Maybe Text -> Either Error (Maybe Text)
forall a b. b -> Either a b
Right (Maybe Text -> Either Error (Maybe Text))
-> IO (Maybe Text) -> IO (Either Error (Maybe Text))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AsapoStringHandle -> IO (Maybe Text)
stringHandleToText AsapoStringHandle
string
data DeletionFlags
=
DeleteMeta
|
DeleteErrorOnNotExist
deriving (DeletionFlags -> DeletionFlags -> Bool
(DeletionFlags -> DeletionFlags -> Bool)
-> (DeletionFlags -> DeletionFlags -> Bool) -> Eq DeletionFlags
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: DeletionFlags -> DeletionFlags -> Bool
== :: DeletionFlags -> DeletionFlags -> Bool
$c/= :: DeletionFlags -> DeletionFlags -> Bool
/= :: DeletionFlags -> DeletionFlags -> Bool
Eq)
deleteStream ::
Producer ->
StreamName ->
NominalDiffTime ->
[DeletionFlags] ->
IO (Either Error Int)
deleteStream :: Producer
-> StreamName
-> NominalDiffTime
-> [DeletionFlags]
-> IO (Either Error Int)
deleteStream (Producer AsapoProducerHandle
producer) (StreamName Text
stream) NominalDiffTime
timeout [DeletionFlags]
deletionFlags = do
Either Error AsapoBool
result <- Text
-> (ConstCString -> IO (Either Error AsapoBool))
-> IO (Either Error AsapoBool)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
stream \ConstCString
streamC ->
(Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError
( AsapoProducerHandle
-> ConstCString
-> Word64
-> AsapoBool
-> AsapoBool
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_producer_delete_stream
AsapoProducerHandle
producer
ConstCString
streamC
(NominalDiffTime -> Word64
forall a. Integral a => NominalDiffTime -> a
nominalDiffToMillis NominalDiffTime
timeout)
(if DeletionFlags
DeleteMeta DeletionFlags -> [DeletionFlags] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [DeletionFlags]
deletionFlags then AsapoBool
1 else AsapoBool
0)
(if DeletionFlags
DeleteErrorOnNotExist DeletionFlags -> [DeletionFlags] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [DeletionFlags]
deletionFlags then AsapoBool
1 else AsapoBool
0)
)
Either Error Int -> IO (Either Error Int)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Error AsapoBool
result)
newtype FileName = FileName Text
newtype Metadata = Metadata Text
newtype DatasetSubstream = DatasetSubstream Int
newtype DatasetSize = DatasetSize Int
data AutoIdFlag = UseAutoId | NoAutoId deriving (AutoIdFlag -> AutoIdFlag -> Bool
(AutoIdFlag -> AutoIdFlag -> Bool)
-> (AutoIdFlag -> AutoIdFlag -> Bool) -> Eq AutoIdFlag
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: AutoIdFlag -> AutoIdFlag -> Bool
== :: AutoIdFlag -> AutoIdFlag -> Bool
$c/= :: AutoIdFlag -> AutoIdFlag -> Bool
/= :: AutoIdFlag -> AutoIdFlag -> Bool
Eq)
data TransferFlag = DataAndMetadata | MetadataOnly
data StorageFlag = Filesystem | Database | FilesystemAndDatabase
convertSendFlags :: TransferFlag -> StorageFlag -> Word64
convertSendFlags :: TransferFlag -> StorageFlag -> Word64
convertSendFlags TransferFlag
tf StorageFlag
sf = TransferFlag -> Word64
convertTransferFlag TransferFlag
tf Word64 -> Word64 -> Word64
forall a. Bits a => a -> a -> a
.|. StorageFlag -> Word64
convertStorageFlag StorageFlag
sf
where
convertTransferFlag :: TransferFlag -> Word64
convertTransferFlag :: TransferFlag -> Word64
convertTransferFlag TransferFlag
DataAndMetadata = AsapoBool -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral AsapoBool
kTransferData
convertTransferFlag TransferFlag
MetadataOnly = AsapoBool -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral AsapoBool
kTransferMetaDataOnly
convertStorageFlag :: StorageFlag -> Word64
convertStorageFlag :: StorageFlag -> Word64
convertStorageFlag StorageFlag
Filesystem = AsapoBool -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral AsapoBool
kStoreInFilesystem
convertStorageFlag StorageFlag
Database = AsapoBool -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral AsapoBool
kStoreInDatabase
convertStorageFlag StorageFlag
FilesystemAndDatabase = AsapoBool -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral AsapoBool
kStoreInDatabase Word64 -> Word64 -> Word64
forall a. Bits a => a -> a -> a
.|. AsapoBool -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral AsapoBool
kStoreInFilesystem
withMessageHeaderHandle ::
MessageId ->
FileName ->
Metadata ->
DatasetSubstream ->
DatasetSize ->
AutoIdFlag ->
Int ->
(AsapoMessageHeaderHandle -> IO b) ->
IO b
withMessageHeaderHandle :: forall b.
MessageId
-> FileName
-> Metadata
-> DatasetSubstream
-> DatasetSize
-> AutoIdFlag
-> Int
-> (AsapoMessageHeaderHandle -> IO b)
-> IO b
withMessageHeaderHandle
(MessageId Word64
messageId)
(FileName Text
fileName)
(Metadata Text
metadata)
(DatasetSubstream Int
datasetSubstream)
(DatasetSize Int
datasetSize)
AutoIdFlag
autoIdFlag
Int
dataSize = IO AsapoMessageHeaderHandle
-> (AsapoMessageHeaderHandle -> IO ())
-> (AsapoMessageHeaderHandle -> IO b)
-> IO b
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO AsapoMessageHeaderHandle
init AsapoMessageHeaderHandle -> IO ()
destroy
where
init :: IO AsapoMessageHeaderHandle
init :: IO AsapoMessageHeaderHandle
init = Text
-> (ConstCString -> IO AsapoMessageHeaderHandle)
-> IO AsapoMessageHeaderHandle
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
fileName \ConstCString
fileNameC -> Text
-> (ConstCString -> IO AsapoMessageHeaderHandle)
-> IO AsapoMessageHeaderHandle
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
metadata \ConstCString
metadataC ->
Word64
-> Word64
-> ConstCString
-> ConstCString
-> Word64
-> Word64
-> AsapoBool
-> IO AsapoMessageHeaderHandle
asapo_create_message_header
Word64
messageId
(Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
dataSize)
ConstCString
fileNameC
ConstCString
metadataC
(Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
datasetSubstream)
(Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
datasetSize)
(if AutoIdFlag
autoIdFlag AutoIdFlag -> AutoIdFlag -> Bool
forall a. Eq a => a -> a -> Bool
== AutoIdFlag
UseAutoId then AsapoBool
1 else AsapoBool
0)
destroy :: AsapoMessageHeaderHandle -> IO ()
destroy = AsapoMessageHeaderHandle -> IO ()
asapo_free_message_header_handle
data Opcode
= OpcodeUnknownOp
| OpcodeTransferData
| OpcodeTransferDatasetData
| OpcodeStreamInfo
| OpcodeLastStream
| OpcodeGetBufferData
| OpcodeAuthorize
| OpcodeTransferMetaData
| OpcodeDeleteStream
| OpcodeGetMeta
| OpcodeCount
| OpcodePersistStream
convertOpcode :: AsapoOpcode -> Opcode
convertOpcode :: AsapoBool -> Opcode
convertOpcode AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kOpcodeUnknownOp = Opcode
OpcodeUnknownOp
convertOpcode AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kOpcodeTransferData = Opcode
OpcodeTransferData
convertOpcode AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kOpcodeTransferDatasetData = Opcode
OpcodeTransferDatasetData
convertOpcode AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kOpcodeStreamInfo = Opcode
OpcodeStreamInfo
convertOpcode AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kOpcodeLastStream = Opcode
OpcodeLastStream
convertOpcode AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kOpcodeGetBufferData = Opcode
OpcodeGetBufferData
convertOpcode AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kOpcodeAuthorize = Opcode
OpcodeAuthorize
convertOpcode AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kOpcodeTransferMetaData = Opcode
OpcodeTransferMetaData
convertOpcode AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kOpcodeDeleteStream = Opcode
OpcodeDeleteStream
convertOpcode AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kOpcodeGetMeta = Opcode
OpcodeGetMeta
convertOpcode AsapoBool
x | AsapoBool
x AsapoBool -> AsapoBool -> Bool
forall a. Eq a => a -> a -> Bool
== AsapoBool
kOpcodeCount = Opcode
OpcodeCount
convertOpcode AsapoBool
_ = Opcode
OpcodePersistStream
data =
{ :: Opcode,
:: Int,
:: Int,
:: Int,
:: [Int],
:: BS.ByteString,
:: Text,
:: Text
}
convertRequestHeader :: AsapoGenericRequestHeader -> IO GenericRequestHeader
(AsapoGenericRequestHeader AsapoBool
opcode Word64
dataId Word64
dataSize Word64
metaSize [Word64]
customData CString
message CString
stream CString
apiVersion) = do
Text
streamText <- CString -> IO Text
peekCStringText CString
stream
Text
apiVersionText <- CString -> IO Text
peekCStringText CString
apiVersion
ByteString
messageAsBs <- CStringLen -> IO ByteString
BS.packCStringLen (CString
message, Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
dataSize)
let customData' :: [Int]
customData' :: [Int]
customData' = Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word64 -> Int) -> [Word64] -> [Int]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Word64]
customData
GenericRequestHeader -> IO GenericRequestHeader
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
( Opcode
-> Int
-> Int
-> Int
-> [Int]
-> ByteString
-> Text
-> Text
-> GenericRequestHeader
GenericRequestHeader
(AsapoBool -> Opcode
convertOpcode AsapoBool
opcode)
(Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
dataId)
(Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
dataSize)
(Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
metaSize)
[Int]
customData'
ByteString
messageAsBs
Text
streamText
Text
apiVersionText
)
data RequestResponse = RequestResponse
{ RequestResponse -> Text
responsePayload :: Text,
:: GenericRequestHeader,
RequestResponse -> Maybe Error
responseError :: Maybe Error
}
sendRequestCallback :: (RequestResponse -> IO ()) -> Ptr () -> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ()
sendRequestCallback :: (RequestResponse -> IO ())
-> Ptr ()
-> AsapoRequestCallbackPayloadHandle
-> AsapoErrorHandle
-> IO ()
sendRequestCallback RequestResponse -> IO ()
simpleCallback Ptr ()
_data AsapoRequestCallbackPayloadHandle
payloadHandle AsapoErrorHandle
errorHandle = do
Text
payloadText <- IO AsapoStringHandle
-> (AsapoStringHandle -> IO ())
-> (AsapoStringHandle -> IO Text)
-> IO Text
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (AsapoRequestCallbackPayloadHandle -> IO AsapoStringHandle
asapo_request_callback_payload_get_response AsapoRequestCallbackPayloadHandle
payloadHandle) AsapoStringHandle -> IO ()
asapo_free_string_handle AsapoStringHandle -> IO Text
stringHandleToTextUnsafe
ConstPtr AsapoGenericRequestHeader
originalHeaderCPtr <- AsapoRequestCallbackPayloadHandle
-> IO (ConstPtr AsapoGenericRequestHeader)
asapo_request_callback_payload_get_original_header AsapoRequestCallbackPayloadHandle
payloadHandle
AsapoGenericRequestHeader
originalHeaderC <- Ptr AsapoGenericRequestHeader -> IO AsapoGenericRequestHeader
forall a. Storable a => Ptr a -> IO a
peek (ConstPtr AsapoGenericRequestHeader -> Ptr AsapoGenericRequestHeader
forall a. ConstPtr a -> Ptr a
unConstPtr ConstPtr AsapoGenericRequestHeader
originalHeaderCPtr)
GenericRequestHeader
originalHeader <- AsapoGenericRequestHeader -> IO GenericRequestHeader
convertRequestHeader AsapoGenericRequestHeader
originalHeaderC
Either Error ()
errorHandle' <- AsapoErrorHandle -> () -> IO (Either Error ())
forall b. AsapoErrorHandle -> b -> IO (Either Error b)
checkErrorWithGivenHandle AsapoErrorHandle
errorHandle ()
case Either Error ()
errorHandle' of
Left Error
e -> RequestResponse -> IO ()
simpleCallback (Text -> GenericRequestHeader -> Maybe Error -> RequestResponse
RequestResponse Text
payloadText GenericRequestHeader
originalHeader (Error -> Maybe Error
forall a. a -> Maybe a
Just Error
e))
Either Error ()
_ -> RequestResponse -> IO ()
simpleCallback (Text -> GenericRequestHeader -> Maybe Error -> RequestResponse
RequestResponse Text
payloadText GenericRequestHeader
originalHeader Maybe Error
forall a. Maybe a
Nothing)
send ::
Producer ->
MessageId ->
FileName ->
Metadata ->
DatasetSubstream ->
DatasetSize ->
AutoIdFlag ->
BS.ByteString ->
TransferFlag ->
StorageFlag ->
StreamName ->
(RequestResponse -> IO ()) ->
IO (Either Error Int)
send :: Producer
-> MessageId
-> FileName
-> Metadata
-> DatasetSubstream
-> DatasetSize
-> AutoIdFlag
-> ByteString
-> TransferFlag
-> StorageFlag
-> StreamName
-> (RequestResponse -> IO ())
-> IO (Either Error Int)
send (Producer AsapoProducerHandle
producer) MessageId
messageId FileName
fileName Metadata
metadata DatasetSubstream
datasetSubstream DatasetSize
datasetSize AutoIdFlag
autoIdFlag ByteString
data' TransferFlag
transferFlag StorageFlag
storageFlag (StreamName Text
stream) RequestResponse -> IO ()
callback =
MessageId
-> FileName
-> Metadata
-> DatasetSubstream
-> DatasetSize
-> AutoIdFlag
-> Int
-> (AsapoMessageHeaderHandle -> IO (Either Error Int))
-> IO (Either Error Int)
forall b.
MessageId
-> FileName
-> Metadata
-> DatasetSubstream
-> DatasetSize
-> AutoIdFlag
-> Int
-> (AsapoMessageHeaderHandle -> IO b)
-> IO b
withMessageHeaderHandle
MessageId
messageId
FileName
fileName
Metadata
metadata
DatasetSubstream
datasetSubstream
DatasetSize
datasetSize
AutoIdFlag
autoIdFlag
(ByteString -> Int
BS.length ByteString
data')
\AsapoMessageHeaderHandle
messageHeaderHandle ->
ByteString
-> (CString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. ByteString -> (CString -> IO a) -> IO a
unsafeUseAsCString ByteString
data' \CString
data'' -> Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
stream \ConstCString
streamC -> do
FunPtr
(Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
requestCallback <- (Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
-> IO
(FunPtr
(Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ()))
createRequestCallback ((RequestResponse -> IO ())
-> Ptr ()
-> AsapoRequestCallbackPayloadHandle
-> AsapoErrorHandle
-> IO ()
sendRequestCallback RequestResponse -> IO ()
callback)
( AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral
(AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
)
(Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError
( AsapoProducerHandle
-> AsapoMessageHeaderHandle
-> Ptr ()
-> Word64
-> ConstCString
-> FunPtr
(Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_producer_send
AsapoProducerHandle
producer
AsapoMessageHeaderHandle
messageHeaderHandle
(CString -> Ptr ()
forall a b. Ptr a -> Ptr b
castPtr CString
data'')
(TransferFlag -> StorageFlag -> Word64
convertSendFlags TransferFlag
transferFlag StorageFlag
storageFlag)
ConstCString
streamC
FunPtr
(Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
requestCallback
)
sendFile ::
Producer ->
MessageId ->
FileName ->
Metadata ->
DatasetSubstream ->
DatasetSize ->
AutoIdFlag ->
Int ->
FileName ->
TransferFlag ->
StorageFlag ->
StreamName ->
(RequestResponse -> IO ()) ->
IO (Either Error Int)
sendFile :: Producer
-> MessageId
-> FileName
-> Metadata
-> DatasetSubstream
-> DatasetSize
-> AutoIdFlag
-> Int
-> FileName
-> TransferFlag
-> StorageFlag
-> StreamName
-> (RequestResponse -> IO ())
-> IO (Either Error Int)
sendFile (Producer AsapoProducerHandle
producer) MessageId
messageId FileName
fileName Metadata
meta DatasetSubstream
datasetSubstream DatasetSize
datasetSize AutoIdFlag
autoIdFlag Int
size (FileName Text
fileNameToSend) TransferFlag
transferFlag StorageFlag
storageFlag (StreamName Text
stream) RequestResponse -> IO ()
callback =
MessageId
-> FileName
-> Metadata
-> DatasetSubstream
-> DatasetSize
-> AutoIdFlag
-> Int
-> (AsapoMessageHeaderHandle -> IO (Either Error Int))
-> IO (Either Error Int)
forall b.
MessageId
-> FileName
-> Metadata
-> DatasetSubstream
-> DatasetSize
-> AutoIdFlag
-> Int
-> (AsapoMessageHeaderHandle -> IO b)
-> IO b
withMessageHeaderHandle
MessageId
messageId
FileName
fileName
Metadata
meta
DatasetSubstream
datasetSubstream
DatasetSize
datasetSize
AutoIdFlag
autoIdFlag
Int
size
\AsapoMessageHeaderHandle
messageHeaderHandle ->
Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
fileNameToSend \ConstCString
fileNameToSendC -> Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
stream \ConstCString
streamC -> do
FunPtr
(Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
requestCallback <- (Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
-> IO
(FunPtr
(Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ()))
createRequestCallback ((RequestResponse -> IO ())
-> Ptr ()
-> AsapoRequestCallbackPayloadHandle
-> AsapoErrorHandle
-> IO ()
sendRequestCallback RequestResponse -> IO ()
callback)
( AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral
(AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
)
(Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError
( AsapoProducerHandle
-> AsapoMessageHeaderHandle
-> ConstCString
-> Word64
-> ConstCString
-> FunPtr
(Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_producer_send_file
AsapoProducerHandle
producer
AsapoMessageHeaderHandle
messageHeaderHandle
ConstCString
fileNameToSendC
(TransferFlag -> StorageFlag -> Word64
convertSendFlags TransferFlag
transferFlag StorageFlag
storageFlag)
ConstCString
streamC
FunPtr
(Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
requestCallback
)
sendStreamFinishedFlag :: Producer -> StreamName -> MessageId -> StreamName -> (RequestResponse -> IO ()) -> IO (Either Error Int)
sendStreamFinishedFlag :: Producer
-> StreamName
-> MessageId
-> StreamName
-> (RequestResponse -> IO ())
-> IO (Either Error Int)
sendStreamFinishedFlag (Producer AsapoProducerHandle
producer) (StreamName Text
stream) (MessageId Word64
lastId) (StreamName Text
nextStream) RequestResponse -> IO ()
callback = do
FunPtr
(Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
requestCallback <- (Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
-> IO
(FunPtr
(Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ()))
createRequestCallback ((RequestResponse -> IO ())
-> Ptr ()
-> AsapoRequestCallbackPayloadHandle
-> AsapoErrorHandle
-> IO ()
sendRequestCallback RequestResponse -> IO ()
callback)
Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
stream \ConstCString
streamC -> Text
-> (ConstCString -> IO (Either Error Int)) -> IO (Either Error Int)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
nextStream \ConstCString
nextStreamC ->
(AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>)
(Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError
( AsapoProducerHandle
-> ConstCString
-> Word64
-> ConstCString
-> FunPtr
(Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_producer_send_stream_finished_flag
AsapoProducerHandle
producer
ConstCString
streamC
Word64
lastId
ConstCString
nextStreamC
FunPtr
(Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
requestCallback
)
data MetadataIngestMode = Insert | Replace | Update
data UpsertMode = UseUpsert | NoUpsert
sendBeamtimeMetadata :: Producer -> Metadata -> MetadataIngestMode -> UpsertMode -> (RequestResponse -> IO ()) -> IO (Either Error Int)
sendBeamtimeMetadata :: Producer
-> Metadata
-> MetadataIngestMode
-> UpsertMode
-> (RequestResponse -> IO ())
-> IO (Either Error Int)
sendBeamtimeMetadata (Producer AsapoProducerHandle
producer) (Metadata Text
metadata) MetadataIngestMode
ingestMode UpsertMode
upsertMode RequestResponse -> IO ()
callback = do
FunPtr
(Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
requestCallback <- (Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
-> IO
(FunPtr
(Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ()))
createRequestCallback ((RequestResponse -> IO ())
-> Ptr ()
-> AsapoRequestCallbackPayloadHandle
-> AsapoErrorHandle
-> IO ()
sendRequestCallback RequestResponse -> IO ()
callback)
(AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>)
(Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text
-> (ConstCString -> IO (Either Error AsapoBool))
-> IO (Either Error AsapoBool)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
metadata \ConstCString
metadataC ->
(Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError
( AsapoProducerHandle
-> ConstCString
-> AsapoBool
-> AsapoBool
-> FunPtr
(Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_producer_send_beamtime_metadata
AsapoProducerHandle
producer
ConstCString
metadataC
( case MetadataIngestMode
ingestMode of
MetadataIngestMode
Insert -> AsapoBool
kInsert
MetadataIngestMode
Replace -> AsapoBool
kReplace
MetadataIngestMode
Update -> AsapoBool
kUpdate
)
( case UpsertMode
upsertMode of
UpsertMode
UseUpsert -> AsapoBool
1
UpsertMode
_ -> AsapoBool
0
)
FunPtr
(Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
requestCallback
)
sendStreamMetadata :: Producer -> Metadata -> MetadataIngestMode -> UpsertMode -> StreamName -> (RequestResponse -> IO ()) -> IO (Either Error Int)
sendStreamMetadata :: Producer
-> Metadata
-> MetadataIngestMode
-> UpsertMode
-> StreamName
-> (RequestResponse -> IO ())
-> IO (Either Error Int)
sendStreamMetadata (Producer AsapoProducerHandle
producer) (Metadata Text
metadata) MetadataIngestMode
ingestMode UpsertMode
upsertMode (StreamName Text
stream) RequestResponse -> IO ()
callback = do
FunPtr
(Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
requestCallback <- (Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
-> IO
(FunPtr
(Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ()))
createRequestCallback ((RequestResponse -> IO ())
-> Ptr ()
-> AsapoRequestCallbackPayloadHandle
-> AsapoErrorHandle
-> IO ()
sendRequestCallback RequestResponse -> IO ()
callback)
(AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>)
(Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text
-> (ConstCString -> IO (Either Error AsapoBool))
-> IO (Either Error AsapoBool)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
metadata \ConstCString
metadataC -> Text
-> (ConstCString -> IO (Either Error AsapoBool))
-> IO (Either Error AsapoBool)
forall a. Text -> (ConstCString -> IO a) -> IO a
withConstText Text
stream \ConstCString
streamC ->
(Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError
( AsapoProducerHandle
-> ConstCString
-> AsapoBool
-> AsapoBool
-> ConstCString
-> FunPtr
(Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_producer_send_stream_metadata
AsapoProducerHandle
producer
ConstCString
metadataC
( case MetadataIngestMode
ingestMode of
MetadataIngestMode
Insert -> AsapoBool
kInsert
MetadataIngestMode
Replace -> AsapoBool
kReplace
MetadataIngestMode
Update -> AsapoBool
kUpdate
)
( case UpsertMode
upsertMode of
UpsertMode
UseUpsert -> AsapoBool
1
UpsertMode
_ -> AsapoBool
0
)
ConstCString
streamC
FunPtr
(Ptr ()
-> AsapoRequestCallbackPayloadHandle -> AsapoErrorHandle -> IO ())
requestCallback
)
data LogLevel
= LogNone
| LogError
| LogInfo
| LogDebug
| LogWarning
deriving (LogLevel -> LogLevel -> Bool
(LogLevel -> LogLevel -> Bool)
-> (LogLevel -> LogLevel -> Bool) -> Eq LogLevel
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: LogLevel -> LogLevel -> Bool
== :: LogLevel -> LogLevel -> Bool
$c/= :: LogLevel -> LogLevel -> Bool
/= :: LogLevel -> LogLevel -> Bool
Eq)
convertLogLevel :: LogLevel -> AsapoLogLevel
convertLogLevel :: LogLevel -> AsapoBool
convertLogLevel LogLevel
x | LogLevel
x LogLevel -> LogLevel -> Bool
forall a. Eq a => a -> a -> Bool
== LogLevel
LogNone = AsapoBool
asapoLogLevelNone
convertLogLevel LogLevel
x | LogLevel
x LogLevel -> LogLevel -> Bool
forall a. Eq a => a -> a -> Bool
== LogLevel
LogError = AsapoBool
asapoLogLevelError
convertLogLevel LogLevel
x | LogLevel
x LogLevel -> LogLevel -> Bool
forall a. Eq a => a -> a -> Bool
== LogLevel
LogInfo = AsapoBool
asapoLogLevelInfo
convertLogLevel LogLevel
x | LogLevel
x LogLevel -> LogLevel -> Bool
forall a. Eq a => a -> a -> Bool
== LogLevel
LogDebug = AsapoBool
asapoLogLevelDebug
convertLogLevel LogLevel
_ = AsapoBool
asapoLogLevelWarning
setLogLevel :: Producer -> LogLevel -> IO ()
setLogLevel :: Producer -> LogLevel -> IO ()
setLogLevel (Producer AsapoProducerHandle
producer) LogLevel
logLevel = AsapoProducerHandle -> AsapoBool -> IO ()
asapo_producer_set_log_level AsapoProducerHandle
producer (LogLevel -> AsapoBool
convertLogLevel LogLevel
logLevel)
enableLocalLog :: Producer -> Bool -> IO ()
enableLocalLog :: Producer -> Bool -> IO ()
enableLocalLog (Producer AsapoProducerHandle
producer) Bool
enable = AsapoProducerHandle -> AsapoBool -> IO ()
asapo_producer_enable_local_log AsapoProducerHandle
producer (if Bool
enable then AsapoBool
1 else AsapoBool
0)
enableRemoteLog :: Producer -> Bool -> IO ()
enableRemoteLog :: Producer -> Bool -> IO ()
enableRemoteLog (Producer AsapoProducerHandle
producer) Bool
enable = AsapoProducerHandle -> AsapoBool -> IO ()
asapo_producer_enable_remote_log AsapoProducerHandle
producer (if Bool
enable then AsapoBool
1 else AsapoBool
0)
setCredentials :: Producer -> SourceCredentials -> IO (Either Error Int)
setCredentials :: Producer -> SourceCredentials -> IO (Either Error Int)
setCredentials (Producer AsapoProducerHandle
producer) SourceCredentials
credentials = SourceCredentials
-> (AsapoSourceCredentialsHandle -> IO (Either Error Int))
-> IO (Either Error Int)
forall a.
SourceCredentials -> (AsapoSourceCredentialsHandle -> IO a) -> IO a
withCredentials SourceCredentials
credentials \AsapoSourceCredentialsHandle
credentialsHandle ->
(AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoProducerHandle
-> AsapoSourceCredentialsHandle
-> Ptr AsapoErrorHandle
-> IO AsapoBool
asapo_producer_set_credentials AsapoProducerHandle
producer AsapoSourceCredentialsHandle
credentialsHandle)
getRequestsQueueSize :: Producer -> IO Int
getRequestsQueueSize :: Producer -> IO Int
getRequestsQueueSize (Producer AsapoProducerHandle
producer) = Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word64 -> Int) -> IO Word64 -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AsapoProducerHandle -> IO Word64
asapo_producer_get_requests_queue_size AsapoProducerHandle
producer
getRequestsQueueVolumeMb :: Producer -> IO Int
getRequestsQueueVolumeMb :: Producer -> IO Int
getRequestsQueueVolumeMb (Producer AsapoProducerHandle
producer) = Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word64 -> Int) -> IO Word64 -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> AsapoProducerHandle -> IO Word64
asapo_producer_get_requests_queue_volume_mb AsapoProducerHandle
producer
setRequestsQueueLimits ::
Producer ->
Int ->
Int ->
IO ()
setRequestsQueueLimits :: Producer -> Int -> Int -> IO ()
setRequestsQueueLimits (Producer AsapoProducerHandle
producer) Int
size Int
volume =
AsapoProducerHandle -> Word64 -> Word64 -> IO ()
asapo_producer_set_requests_queue_limits AsapoProducerHandle
producer (Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
size) (Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
volume)
waitRequestsFinished :: Producer -> NominalDiffTime -> IO (Either Error Int)
waitRequestsFinished :: Producer -> NominalDiffTime -> IO (Either Error Int)
waitRequestsFinished (Producer AsapoProducerHandle
producer) NominalDiffTime
timeout = do
(AsapoBool -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (AsapoBool -> Int) -> Either Error AsapoBool -> Either Error Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>) (Either Error AsapoBool -> Either Error Int)
-> IO (Either Error AsapoBool) -> IO (Either Error Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ptr AsapoErrorHandle -> IO AsapoBool)
-> IO (Either Error AsapoBool)
forall b. (Ptr AsapoErrorHandle -> IO b) -> IO (Either Error b)
checkError (AsapoProducerHandle
-> Word64 -> Ptr AsapoErrorHandle -> IO AsapoBool
asapo_producer_wait_requests_finished AsapoProducerHandle
producer (NominalDiffTime -> Word64
forall a. Integral a => NominalDiffTime -> a
nominalDiffToMillis NominalDiffTime
timeout))