{-# LANGUAGE ScopedTypeVariables #-}
{-# OPTIONS_GHC -Wno-missing-import-lists #-}

-- |
-- Description : High-level interface for all producer-related functions, using exceptions instead of @Either@
--
-- To implement an ASAP:O producer, you should only need this interface.
-- It exposes no memory-management functions (like free) or pointers, and
-- is thus safe to use.
--
-- = Simple Example
--
-- Here's some code for a simple producer that connects and sends a message with id "1".
--
-- >>> :seti -XOverloadedStrings
-- >>> :{
--  import Asapo.Producer
--  import Control.Applicative (Applicative ((<*>)))
--  import Control.Monad(void)
--  import Data.Either (Either (Left, Right))
--  import Data.Function (($))
--  import Data.Functor ((<$>))
--  import Data.Semigroup (Semigroup ((<>)))
--  import Data.Text (Text, pack)
--  import Data.Text.Encoding (encodeUtf8)
--  import qualified Data.Text.IO as TIO
--  import Data.Time.Clock (secondsToNominalDiffTime)
--  import Data.Word (Word64)
--  import System.IO (IO)
--  import Text.Show (Show (show))
--  import Prelude ()
--  main :: IO ()
--  main =
--    withProducer
--      (Endpoint "localhost:8400")
--      (ProcessingThreads 1)
--      TcpHandler
--      ( SourceCredentials
--          { sourceType = RawSource,
--            instanceId = InstanceId "test_instance",
--            pipelineStep = PipelineStep "pipeline_step_1",
--            beamtime = Beamtime "asapo_test",
--            beamline = Beamline "auto",
--            dataSource = DataSource "asapo_source",
--            token = Token "sometoken"
--          }
--      )
--      (secondsToNominalDiffTime 10) $ \producer -> do
--        TIO.putStrLn "connected, sending data"
--        let responseHandler :: RequestResponse -> IO ()
--            responseHandler requestResponse =
--              TIO.putStrLn $ "in response handler, payload "
--                <> responsePayload requestResponse
--                <> ", error "
--                <> pack (show (responseError requestResponse))
--        send
--          producer
--          (MessageId 1)
--          (FileName "raw/default/1.txt")
--          (Metadata "{\"test\": 3.0}")
--          (DatasetSubstream 0)
--          (DatasetSize 0)
--          NoAutoId
--          (encodeUtf8 "test")
--          DataAndMetadata
--          FilesystemAndDatabase
--          (StreamName "default")
--          responseHandler
--        void $ waitRequestsFinished producer (secondsToNominalDiffTime 10)
-- :}
module Asapo.Producer
  ( -- * Types
    ProducerException (..),
    Endpoint (..),
    ProcessingThreads (..),
    RequestHandlerType (..),
    Error (..),
    Metadata (..),
    SourceCredentials (..),
    MessageId (..),
    DeletionFlags (..),
    Producer,
    LogLevel (..),
    FileName (..),
    PipelineStep (..),
    Beamtime (Beamtime),
    DataSource (DataSource),
    Beamline (Beamline),
    StreamInfo (..),
    SourceType (..),
    StreamName (..),
    InstanceId (..),
    Token (..),
    DatasetSubstream (..),
    DatasetSize (..),
    VersionInfo (..),
    UpsertMode (..),
    MetadataIngestMode (..),
    AutoIdFlag (..),
    TransferFlag (..),
    StorageFlag (..),
    RequestResponse (..),
    Opcode (..),
    GenericRequestHeader (..),

    -- * Initialization
    withProducer,

    -- * Getters
    getVersionInfo,
    getStreamInfo,
    getLastStream,
    getBeamtimeMeta,
    getStreamMeta,
    getRequestsQueueSize,
    getRequestsQueueVolumeMb,

    -- * Modifiers
    send,
    sendFile,
    sendStreamFinishedFlag,
    sendBeamtimeMetadata,
    sendStreamMetadata,
    deleteStream,
    setLogLevel,
    enableLocalLog,
    enableRemoteLog,
    setCredentials,
    setRequestsQueueLimits,
    messageIdFromInt,
    waitRequestsFinished,
  )
where

import Asapo.Either.Common
  ( Beamline (Beamline),
    Beamtime (Beamtime),
    DataSource (DataSource),
    InstanceId (..),
    MessageId (..),
    PipelineStep (..),
    SourceCredentials (..),
    SourceType (..),
    StreamInfo (..),
    StreamName (..),
    Token (..),
    messageIdFromInt,
  )
import Asapo.Either.Producer
  ( AutoIdFlag,
    DatasetSize,
    DatasetSubstream,
    DeletionFlags,
    Endpoint,
    Error (Error),
    FileName,
    GenericRequestHeader (..),
    LogLevel (..),
    Metadata,
    MetadataIngestMode,
    Opcode (..),
    ProcessingThreads,
    Producer,
    RequestHandlerType,
    RequestResponse,
    StorageFlag,
    TransferFlag,
    UpsertMode,
    VersionInfo,
    enableLocalLog,
    enableRemoteLog,
    getRequestsQueueSize,
    getRequestsQueueVolumeMb,
    setLogLevel,
    setRequestsQueueLimits,
  )
import qualified Asapo.Either.Producer as PlainProducer
import Control.Applicative (pure)
import Control.Exception (Exception, throw)
import Control.Monad (Monad)
import qualified Data.ByteString as BS
import Data.Either (Either (Left, Right))
import Data.Int (Int)
import Data.Maybe (Maybe)
import Data.Text (Text)
import Data.Time (NominalDiffTime)
import System.IO (IO)
import Text.Show (Show)
import Prelude ()

newtype ProducerException = ProducerException Text deriving (Int -> ProducerException -> ShowS
[ProducerException] -> ShowS
ProducerException -> String
(Int -> ProducerException -> ShowS)
-> (ProducerException -> String)
-> ([ProducerException] -> ShowS)
-> Show ProducerException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ProducerException -> ShowS
showsPrec :: Int -> ProducerException -> ShowS
$cshow :: ProducerException -> String
show :: ProducerException -> String
$cshowList :: [ProducerException] -> ShowS
showList :: [ProducerException] -> ShowS
Show)

instance Exception ProducerException

-- | Create a producer and do something with it. This is the main entrypoint into the producer.
withProducer ::
  forall a.
  Endpoint ->
  ProcessingThreads ->
  RequestHandlerType ->
  SourceCredentials ->
  -- | timeout
  NominalDiffTime ->
  (Producer -> IO a) ->
  IO a
withProducer :: forall a.
Endpoint
-> ProcessingThreads
-> RequestHandlerType
-> SourceCredentials
-> NominalDiffTime
-> (Producer -> IO a)
-> IO a
withProducer Endpoint
endpoint ProcessingThreads
processingThreads RequestHandlerType
handlerType SourceCredentials
sourceCredentials NominalDiffTime
timeout Producer -> IO a
onSuccess =
  let onError :: Error -> a
onError (Error Text
message) = ProducerException -> a
forall a e. Exception e => e -> a
throw (Text -> ProducerException
ProducerException Text
message)
   in Endpoint
-> ProcessingThreads
-> RequestHandlerType
-> SourceCredentials
-> NominalDiffTime
-> (Error -> IO a)
-> (Producer -> IO a)
-> IO a
forall a.
Endpoint
-> ProcessingThreads
-> RequestHandlerType
-> SourceCredentials
-> NominalDiffTime
-> (Error -> IO a)
-> (Producer -> IO a)
-> IO a
PlainProducer.withProducer Endpoint
endpoint ProcessingThreads
processingThreads RequestHandlerType
handlerType SourceCredentials
sourceCredentials NominalDiffTime
timeout Error -> IO a
forall {a}. Error -> a
onError Producer -> IO a
onSuccess

maybeThrow :: (Monad m) => m (Either Error b) -> m b
maybeThrow :: forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow m (Either Error b)
f = do
  Either Error b
result <- m (Either Error b)
f
  case Either Error b
result of
    Left (Error Text
message) -> ProducerException -> m b
forall a e. Exception e => e -> a
throw (Text -> ProducerException
ProducerException Text
message)
    Right b
v -> b -> m b
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
v

-- | Retrieve producer version info
getVersionInfo :: Producer -> IO VersionInfo
getVersionInfo :: Producer -> IO VersionInfo
getVersionInfo Producer
producer = IO (Either Error VersionInfo) -> IO VersionInfo
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer -> IO (Either Error VersionInfo)
PlainProducer.getVersionInfo Producer
producer)

-- | Retrieve info for a single stream
getStreamInfo ::
  Producer ->
  StreamName ->
  -- | Timeout
  NominalDiffTime ->
  IO StreamInfo
getStreamInfo :: Producer -> StreamName -> NominalDiffTime -> IO StreamInfo
getStreamInfo Producer
producer StreamName
stream NominalDiffTime
timeout = IO (Either Error StreamInfo) -> IO StreamInfo
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer
-> StreamName -> NominalDiffTime -> IO (Either Error StreamInfo)
PlainProducer.getStreamInfo Producer
producer StreamName
stream NominalDiffTime
timeout)

-- | Retrieve info for the latest stream
getLastStream ::
  Producer ->
  -- | Timeout
  NominalDiffTime ->
  IO StreamInfo
getLastStream :: Producer -> NominalDiffTime -> IO StreamInfo
getLastStream Producer
producer NominalDiffTime
timeout = IO (Either Error StreamInfo) -> IO StreamInfo
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer -> NominalDiffTime -> IO (Either Error StreamInfo)
PlainProducer.getLastStream Producer
producer NominalDiffTime
timeout)

-- | Retrieve metadata for the given stream (which might be missing, in which case @Nothing@ is returned)
getStreamMeta ::
  Producer ->
  StreamName ->
  -- | timeout
  NominalDiffTime ->
  IO (Maybe Text)
getStreamMeta :: Producer -> StreamName -> NominalDiffTime -> IO (Maybe Text)
getStreamMeta Producer
producer StreamName
stream NominalDiffTime
timeout = IO (Either Error (Maybe Text)) -> IO (Maybe Text)
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer
-> StreamName -> NominalDiffTime -> IO (Either Error (Maybe Text))
PlainProducer.getStreamMeta Producer
producer StreamName
stream NominalDiffTime
timeout)

-- | Retrieve metadata for the given stream (which might be missing, in which case @Nothing@ is returned)
getBeamtimeMeta ::
  Producer ->
  -- | timeout
  NominalDiffTime ->
  IO (Maybe Text)
getBeamtimeMeta :: Producer -> NominalDiffTime -> IO (Maybe Text)
getBeamtimeMeta Producer
producer NominalDiffTime
timeout = IO (Either Error (Maybe Text)) -> IO (Maybe Text)
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer -> NominalDiffTime -> IO (Either Error (Maybe Text))
PlainProducer.getBeamtimeMeta Producer
producer NominalDiffTime
timeout)

deleteStream ::
  Producer ->
  StreamName ->
  -- | timeout
  NominalDiffTime ->
  [DeletionFlags] ->
  IO Int
deleteStream :: Producer
-> StreamName -> NominalDiffTime -> [DeletionFlags] -> IO Int
deleteStream Producer
producer StreamName
stream NominalDiffTime
timeout [DeletionFlags]
deletionFlags = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer
-> StreamName
-> NominalDiffTime
-> [DeletionFlags]
-> IO (Either Error Int)
PlainProducer.deleteStream Producer
producer StreamName
stream NominalDiffTime
timeout [DeletionFlags]
deletionFlags)

-- | Send a message containing raw data. Due to newtype and enum usage, all parameter should be self-explanatory
send ::
  Producer ->
  MessageId ->
  FileName ->
  Metadata ->
  DatasetSubstream ->
  DatasetSize ->
  AutoIdFlag ->
  -- | Actual data to send
  BS.ByteString ->
  TransferFlag ->
  StorageFlag ->
  StreamName ->
  (RequestResponse -> IO ()) ->
  IO Int
send :: Producer
-> MessageId
-> FileName
-> Metadata
-> DatasetSubstream
-> DatasetSize
-> AutoIdFlag
-> ByteString
-> TransferFlag
-> StorageFlag
-> StreamName
-> (RequestResponse -> IO ())
-> IO Int
send Producer
producer MessageId
messageId FileName
fileName Metadata
metadata DatasetSubstream
datasetSubstream DatasetSize
datasetSize AutoIdFlag
autoIdFlag ByteString
data' TransferFlag
transferFlag StorageFlag
storageFlag StreamName
stream RequestResponse -> IO ()
callback =
  IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer
-> MessageId
-> FileName
-> Metadata
-> DatasetSubstream
-> DatasetSize
-> AutoIdFlag
-> ByteString
-> TransferFlag
-> StorageFlag
-> StreamName
-> (RequestResponse -> IO ())
-> IO (Either Error Int)
PlainProducer.send Producer
producer MessageId
messageId FileName
fileName Metadata
metadata DatasetSubstream
datasetSubstream DatasetSize
datasetSize AutoIdFlag
autoIdFlag ByteString
data' TransferFlag
transferFlag StorageFlag
storageFlag StreamName
stream RequestResponse -> IO ()
callback)

-- | Send a message containing a file. Due to newtype and enum usage, all parameter should be self-explanatory
sendFile ::
  Producer ->
  MessageId ->
  -- | File name to put into the message header
  FileName ->
  Metadata ->
  DatasetSubstream ->
  DatasetSize ->
  AutoIdFlag ->
  -- | Size
  Int ->
  -- | File to actually send
  FileName ->
  TransferFlag ->
  StorageFlag ->
  StreamName ->
  (RequestResponse -> IO ()) ->
  IO Int
sendFile :: Producer
-> MessageId
-> FileName
-> Metadata
-> DatasetSubstream
-> DatasetSize
-> AutoIdFlag
-> Int
-> FileName
-> TransferFlag
-> StorageFlag
-> StreamName
-> (RequestResponse -> IO ())
-> IO Int
sendFile Producer
producer MessageId
messageId FileName
fileName Metadata
meta DatasetSubstream
datasetSubstream DatasetSize
datasetSize AutoIdFlag
autoIdFlag Int
size FileName
fileNameToSend TransferFlag
transferFlag StorageFlag
storageFlag StreamName
stream RequestResponse -> IO ()
callback =
  IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer
-> MessageId
-> FileName
-> Metadata
-> DatasetSubstream
-> DatasetSize
-> AutoIdFlag
-> Int
-> FileName
-> TransferFlag
-> StorageFlag
-> StreamName
-> (RequestResponse -> IO ())
-> IO (Either Error Int)
PlainProducer.sendFile Producer
producer MessageId
messageId FileName
fileName Metadata
meta DatasetSubstream
datasetSubstream DatasetSize
datasetSize AutoIdFlag
autoIdFlag Int
size FileName
fileNameToSend TransferFlag
transferFlag StorageFlag
storageFlag StreamName
stream RequestResponse -> IO ()
callback)

-- | As the title says, send the "stream finished" flag
sendStreamFinishedFlag :: Producer -> StreamName -> MessageId -> StreamName -> (RequestResponse -> IO ()) -> IO Int
sendStreamFinishedFlag :: Producer
-> StreamName
-> MessageId
-> StreamName
-> (RequestResponse -> IO ())
-> IO Int
sendStreamFinishedFlag Producer
producer StreamName
stream MessageId
lastId StreamName
nextStream RequestResponse -> IO ()
callback =
  IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow
    ( Producer
-> StreamName
-> MessageId
-> StreamName
-> (RequestResponse -> IO ())
-> IO (Either Error Int)
PlainProducer.sendStreamFinishedFlag
        Producer
producer
        StreamName
stream
        MessageId
lastId
        StreamName
nextStream
        RequestResponse -> IO ()
callback
    )

-- | Send or extend beamtime metadata
sendBeamtimeMetadata :: Producer -> Metadata -> MetadataIngestMode -> UpsertMode -> (RequestResponse -> IO ()) -> IO Int
sendBeamtimeMetadata :: Producer
-> Metadata
-> MetadataIngestMode
-> UpsertMode
-> (RequestResponse -> IO ())
-> IO Int
sendBeamtimeMetadata Producer
producer Metadata
metadata MetadataIngestMode
ingestMode UpsertMode
upsertMode RequestResponse -> IO ()
callback = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer
-> Metadata
-> MetadataIngestMode
-> UpsertMode
-> (RequestResponse -> IO ())
-> IO (Either Error Int)
PlainProducer.sendBeamtimeMetadata Producer
producer Metadata
metadata MetadataIngestMode
ingestMode UpsertMode
upsertMode RequestResponse -> IO ()
callback)

-- | Send or extend stream metadata
sendStreamMetadata :: Producer -> Metadata -> MetadataIngestMode -> UpsertMode -> StreamName -> (RequestResponse -> IO ()) -> IO Int
sendStreamMetadata :: Producer
-> Metadata
-> MetadataIngestMode
-> UpsertMode
-> StreamName
-> (RequestResponse -> IO ())
-> IO Int
sendStreamMetadata Producer
producer Metadata
metadata MetadataIngestMode
ingestMode UpsertMode
upsertMode StreamName
stream RequestResponse -> IO ()
callback = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer
-> Metadata
-> MetadataIngestMode
-> UpsertMode
-> StreamName
-> (RequestResponse -> IO ())
-> IO (Either Error Int)
PlainProducer.sendStreamMetadata Producer
producer Metadata
metadata MetadataIngestMode
ingestMode UpsertMode
upsertMode StreamName
stream RequestResponse -> IO ()
callback)

-- | Set a different set of credentials
setCredentials :: Producer -> SourceCredentials -> IO Int
setCredentials :: Producer -> SourceCredentials -> IO Int
setCredentials Producer
producer SourceCredentials
creds = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer -> SourceCredentials -> IO (Either Error Int)
PlainProducer.setCredentials Producer
producer SourceCredentials
creds)

-- | Wait for all outstanding requests to finish
waitRequestsFinished :: Producer -> NominalDiffTime -> IO Int
waitRequestsFinished :: Producer -> NominalDiffTime -> IO Int
waitRequestsFinished Producer
producer NominalDiffTime
timeout = IO (Either Error Int) -> IO Int
forall (m :: * -> *) b. Monad m => m (Either Error b) -> m b
maybeThrow (Producer -> NominalDiffTime -> IO (Either Error Int)
PlainProducer.waitRequestsFinished Producer
producer NominalDiffTime
timeout)