{-# LANGUAGE ScopedTypeVariables #-}
{-# OPTIONS_GHC -Wno-missing-import-lists #-}
module Asapo.Producer
(
ProducerException (..),
Endpoint (..),
ProcessingThreads (..),
RequestHandlerType (..),
Error (..),
Metadata (..),
SourceCredentials (..),
MessageId (..),
DeletionFlags (..),
Producer,
LogLevel (..),
FileName (..),
PipelineStep (..),
Beamtime (Beamtime),
DataSource (DataSource),
Beamline (Beamline),
StreamInfo (..),
SourceType (..),
StreamName (..),
InstanceId (..),
Token (..),
DatasetSubstream (..),
DatasetSize (..),
VersionInfo (..),
UpsertMode (..),
MetadataIngestMode (..),
AutoIdFlag (..),
TransferFlag (..),
StorageFlag (..),
RequestResponse (..),
Opcode (..),
GenericRequestHeader (..),
withProducer,
getVersionInfo,
getStreamInfo,
getLastStream,
getBeamtimeMeta,
getStreamMeta,
getRequestsQueueSize,
getRequestsQueueVolumeMb,
send,
sendFile,
sendStreamFinishedFlag,
sendBeamtimeMetadata,
sendStreamMetadata,
deleteStream,
setLogLevel,
enableLocalLog,
enableRemoteLog,
setCredentials,
setRequestsQueueLimits,
messageIdFromInt,
waitRequestsFinished,
)
where
import Asapo.Either.Common
( Beamline (Beamline),
Beamtime (Beamtime),
DataSource (DataSource),
InstanceId (..),
MessageId (..),
PipelineStep (..),
SourceCredentials (..),
SourceType (..),
StreamInfo (..),
StreamName (..),
Token (..),
messageIdFromInt,
)
import Asapo.Either.Producer
( AutoIdFlag,
DatasetSize,
DatasetSubstream,
DeletionFlags,
Endpoint,
Error (Error),
FileName,
GenericRequestHeader (..),
LogLevel (..),
Metadata,
MetadataIngestMode,
Opcode (..),
ProcessingThreads,
Producer,
RequestHandlerType,
RequestResponse,
StorageFlag,
TransferFlag,
UpsertMode,
VersionInfo,
enableLocalLog,
enableRemoteLog,
getRequestsQueueSize,
getRequestsQueueVolumeMb,
setLogLevel,
setRequestsQueueLimits,
)
import qualified Asapo.Either.Producer as PlainProducer
import Control.Applicative (pure)
import Control.Exception (Exception, throw)
import Control.Monad (Monad)
import qualified Data.ByteString as BS
import Data.Either (Either (Left, Right))
import Data.Int (Int)
import Data.Maybe (Maybe)
import Data.Text (Text)
import Data.Time (NominalDiffTime)
import System.IO (IO)
import Text.Show (Show)
import Prelude ()
newtype ProducerException = ProducerException Text deriving (Int -> ProducerException -> ShowS
[ProducerException] -> ShowS
ProducerException -> String
(Int -> ProducerException -> ShowS)
-> (ProducerException -> String)
-> ([ProducerException] -> ShowS)
-> Show ProducerException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ProducerException -> ShowS
showsPrec :: Int -> ProducerException -> ShowS
$cshow :: ProducerException -> String
show :: ProducerException -> String
$cshowList :: [ProducerException] -> ShowS
showList :: [ProducerException] -> ShowS
Show)
instance Exception ProducerException
withProducer ::
forall a.
Endpoint ->
ProcessingThreads ->
RequestHandlerType ->
SourceCredentials ->
NominalDiffTime ->
(Producer -> IO a) ->
IO a
withProducer :: forall a.
Endpoint
-> ProcessingThreads
-> RequestHandlerType
-> SourceCredentials
-> NominalDiffTime
-> (Producer -> IO a)
-> IO a
withProducer Endpoint
endpoint ProcessingThreads
processingThreads RequestHandlerType
handlerType SourceCredentials
sourceCredentials NominalDiffTime
timeout Producer -> IO a
onSuccess =
let onError :: Error -> a
onError (Error Text
message) = ProducerException -> a
forall a e. Exception e => e -> a
throw (Text -> ProducerException
ProducerException Text
message)
in Endpoint
-> ProcessingThreads
-> RequestHandlerType
-> SourceCredentials
-> NominalDiffTime
-> (Error -> IO a)
-> (Producer -> IO a)
-> IO a
forall a.
Endpoint
-> ProcessingThreads
-> RequestHandlerType
-> SourceCredentials
-> NominalDiffTime
-> (Error -> IO a)
-> (Producer -> IO a)
-> IO a
PlainProducer.withProducer Endpoint
endpoint ProcessingThreads
processingThreads RequestHandlerType
handlerType SourceCredentials
sourceCredentials NominalDiffTime
timeout Error -> IO a
forall {a}. Error -> a
onError Producer -> IO a
onSuccess
maybeThrow :: (Monad m) => m (Either Error b) -> m b
maybeThrow :: forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow m (Either Error b)
f = do
Either Error b
result <- m (Either Error b)
f
case Either Error b
result of
Left (Error Text
message) -> ProducerException -> m b
forall a e. Exception e => e -> a
throw (Text -> ProducerException
ProducerException Text
message)
Right b
v -> b -> m b
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
v
getVersionInfo :: Producer -> IO VersionInfo
getVersionInfo :: Producer -> IO VersionInfo
getVersionInfo Producer
producer = IO (Either Error VersionInfo) -> IO VersionInfo
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer -> IO (Either Error VersionInfo)
PlainProducer.getVersionInfo Producer
producer)
getStreamInfo ::
Producer ->
StreamName ->
NominalDiffTime ->
IO StreamInfo
getStreamInfo :: Producer -> StreamName -> NominalDiffTime -> IO StreamInfo
getStreamInfo Producer
producer StreamName
stream NominalDiffTime
timeout = IO (Either Error StreamInfo) -> IO StreamInfo
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer
-> StreamName -> NominalDiffTime -> IO (Either Error StreamInfo)
PlainProducer.getStreamInfo Producer
producer StreamName
stream NominalDiffTime
timeout)
getLastStream ::
Producer ->
NominalDiffTime ->
IO StreamInfo
getLastStream :: Producer -> NominalDiffTime -> IO StreamInfo
getLastStream Producer
producer NominalDiffTime
timeout = IO (Either Error StreamInfo) -> IO StreamInfo
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer -> NominalDiffTime -> IO (Either Error StreamInfo)
PlainProducer.getLastStream Producer
producer NominalDiffTime
timeout)
getStreamMeta ::
Producer ->
StreamName ->
NominalDiffTime ->
IO (Maybe Text)
getStreamMeta :: Producer -> StreamName -> NominalDiffTime -> IO (Maybe Text)
getStreamMeta Producer
producer StreamName
stream NominalDiffTime
timeout = IO (Either Error (Maybe Text)) -> IO (Maybe Text)
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer
-> StreamName -> NominalDiffTime -> IO (Either Error (Maybe Text))
PlainProducer.getStreamMeta Producer
producer StreamName
stream NominalDiffTime
timeout)
getBeamtimeMeta ::
Producer ->
NominalDiffTime ->
IO (Maybe Text)
getBeamtimeMeta :: Producer -> NominalDiffTime -> IO (Maybe Text)
getBeamtimeMeta Producer
producer NominalDiffTime
timeout = IO (Either Error (Maybe Text)) -> IO (Maybe Text)
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer -> NominalDiffTime -> IO (Either Error (Maybe Text))
PlainProducer.getBeamtimeMeta Producer
producer NominalDiffTime
timeout)
deleteStream ::
Producer ->
StreamName ->
NominalDiffTime ->
[DeletionFlags] ->
IO Int
deleteStream :: Producer
-> StreamName -> NominalDiffTime -> [DeletionFlags] -> IO Int
deleteStream Producer
producer StreamName
stream NominalDiffTime
timeout [DeletionFlags]
deletionFlags = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer
-> StreamName
-> NominalDiffTime
-> [DeletionFlags]
-> IO (Either Error Int)
PlainProducer.deleteStream Producer
producer StreamName
stream NominalDiffTime
timeout [DeletionFlags]
deletionFlags)
send ::
Producer ->
MessageId ->
FileName ->
Metadata ->
DatasetSubstream ->
DatasetSize ->
AutoIdFlag ->
BS.ByteString ->
TransferFlag ->
StorageFlag ->
StreamName ->
(RequestResponse -> IO ()) ->
IO Int
send :: Producer
-> MessageId
-> FileName
-> Metadata
-> DatasetSubstream
-> DatasetSize
-> AutoIdFlag
-> ByteString
-> TransferFlag
-> StorageFlag
-> StreamName
-> (RequestResponse -> IO ())
-> IO Int
send Producer
producer MessageId
messageId FileName
fileName Metadata
metadata DatasetSubstream
datasetSubstream DatasetSize
datasetSize AutoIdFlag
autoIdFlag ByteString
data' TransferFlag
transferFlag StorageFlag
storageFlag StreamName
stream RequestResponse -> IO ()
callback =
IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer
-> MessageId
-> FileName
-> Metadata
-> DatasetSubstream
-> DatasetSize
-> AutoIdFlag
-> ByteString
-> TransferFlag
-> StorageFlag
-> StreamName
-> (RequestResponse -> IO ())
-> IO (Either Error Int)
PlainProducer.send Producer
producer MessageId
messageId FileName
fileName Metadata
metadata DatasetSubstream
datasetSubstream DatasetSize
datasetSize AutoIdFlag
autoIdFlag ByteString
data' TransferFlag
transferFlag StorageFlag
storageFlag StreamName
stream RequestResponse -> IO ()
callback)
sendFile ::
Producer ->
MessageId ->
FileName ->
Metadata ->
DatasetSubstream ->
DatasetSize ->
AutoIdFlag ->
Int ->
FileName ->
TransferFlag ->
StorageFlag ->
StreamName ->
(RequestResponse -> IO ()) ->
IO Int
sendFile :: Producer
-> MessageId
-> FileName
-> Metadata
-> DatasetSubstream
-> DatasetSize
-> AutoIdFlag
-> Int
-> FileName
-> TransferFlag
-> StorageFlag
-> StreamName
-> (RequestResponse -> IO ())
-> IO Int
sendFile Producer
producer MessageId
messageId FileName
fileName Metadata
meta DatasetSubstream
datasetSubstream DatasetSize
datasetSize AutoIdFlag
autoIdFlag Int
size FileName
fileNameToSend TransferFlag
transferFlag StorageFlag
storageFlag StreamName
stream RequestResponse -> IO ()
callback =
IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer
-> MessageId
-> FileName
-> Metadata
-> DatasetSubstream
-> DatasetSize
-> AutoIdFlag
-> Int
-> FileName
-> TransferFlag
-> StorageFlag
-> StreamName
-> (RequestResponse -> IO ())
-> IO (Either Error Int)
PlainProducer.sendFile Producer
producer MessageId
messageId FileName
fileName Metadata
meta DatasetSubstream
datasetSubstream DatasetSize
datasetSize AutoIdFlag
autoIdFlag Int
size FileName
fileNameToSend TransferFlag
transferFlag StorageFlag
storageFlag StreamName
stream RequestResponse -> IO ()
callback)
sendStreamFinishedFlag :: Producer -> StreamName -> MessageId -> StreamName -> (RequestResponse -> IO ()) -> IO Int
sendStreamFinishedFlag :: Producer
-> StreamName
-> MessageId
-> StreamName
-> (RequestResponse -> IO ())
-> IO Int
sendStreamFinishedFlag Producer
producer StreamName
stream MessageId
lastId StreamName
nextStream RequestResponse -> IO ()
callback =
IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow
( Producer
-> StreamName
-> MessageId
-> StreamName
-> (RequestResponse -> IO ())
-> IO (Either Error Int)
PlainProducer.sendStreamFinishedFlag
Producer
producer
StreamName
stream
MessageId
lastId
StreamName
nextStream
RequestResponse -> IO ()
callback
)
sendBeamtimeMetadata :: Producer -> Metadata -> MetadataIngestMode -> UpsertMode -> (RequestResponse -> IO ()) -> IO Int
sendBeamtimeMetadata :: Producer
-> Metadata
-> MetadataIngestMode
-> UpsertMode
-> (RequestResponse -> IO ())
-> IO Int
sendBeamtimeMetadata Producer
producer Metadata
metadata MetadataIngestMode
ingestMode UpsertMode
upsertMode RequestResponse -> IO ()
callback = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer
-> Metadata
-> MetadataIngestMode
-> UpsertMode
-> (RequestResponse -> IO ())
-> IO (Either Error Int)
PlainProducer.sendBeamtimeMetadata Producer
producer Metadata
metadata MetadataIngestMode
ingestMode UpsertMode
upsertMode RequestResponse -> IO ()
callback)
sendStreamMetadata :: Producer -> Metadata -> MetadataIngestMode -> UpsertMode -> StreamName -> (RequestResponse -> IO ()) -> IO Int
sendStreamMetadata :: Producer
-> Metadata
-> MetadataIngestMode
-> UpsertMode
-> StreamName
-> (RequestResponse -> IO ())
-> IO Int
sendStreamMetadata Producer
producer Metadata
metadata MetadataIngestMode
ingestMode UpsertMode
upsertMode StreamName
stream RequestResponse -> IO ()
callback = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer
-> Metadata
-> MetadataIngestMode
-> UpsertMode
-> StreamName
-> (RequestResponse -> IO ())
-> IO (Either Error Int)
PlainProducer.sendStreamMetadata Producer
producer Metadata
metadata MetadataIngestMode
ingestMode UpsertMode
upsertMode StreamName
stream RequestResponse -> IO ()
callback)
setCredentials :: Producer -> SourceCredentials -> IO Int
setCredentials :: Producer -> SourceCredentials -> IO Int
setCredentials Producer
producer SourceCredentials
creds = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer -> SourceCredentials -> IO (Either Error Int)
PlainProducer.setCredentials Producer
producer SourceCredentials
creds)
waitRequestsFinished :: Producer -> NominalDiffTime -> IO Int
waitRequestsFinished :: Producer -> NominalDiffTime -> IO Int
waitRequestsFinished Producer
producer NominalDiffTime
timeout = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer -> NominalDiffTime -> IO (Either Error Int)
PlainProducer.waitRequestsFinished Producer
producer NominalDiffTime
timeout)