{- Defines a high-level Pulsar producer for the end user -}
module Pulsar.Producer where

import qualified Control.Monad.Catch           as E
import           Control.Monad.Managed
import           Data.IORef
import           Data.Text                      ( Text )
import qualified Pulsar.Core                   as C
import           Pulsar.Connection
import           Pulsar.Types
import           UnliftIO.Chan

{- | An abstract 'Producer' able to 'produce' messages of type 'PulsarMessage'. -}
newtype Producer m = Producer
  { Producer m -> PulsarMessage -> m ()
produce :: PulsarMessage -> m () -- ^ Produces a single message.
  }

data ProducerState = ProducerState
  { ProducerState -> SeqId
stSeqId :: SeqId -- an incremental message sequence counter
  , ProducerState -> Text
stName :: Text   -- a unique name
  }

mkSeqId :: MonadIO m => IORef ProducerState -> m SeqId
mkSeqId :: IORef ProducerState -> m SeqId
mkSeqId ref :: IORef ProducerState
ref = IO SeqId -> m SeqId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO SeqId -> m SeqId) -> IO SeqId -> m SeqId
forall a b. (a -> b) -> a -> b
$ IORef ProducerState
-> (ProducerState -> (ProducerState, SeqId)) -> IO SeqId
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef
  IORef ProducerState
ref
  (\(ProducerState s :: SeqId
s n :: Text
n) -> let s' :: SeqId
s' = SeqId
s SeqId -> SeqId -> SeqId
forall a. Num a => a -> a -> a
+ 1 in (SeqId -> Text -> ProducerState
ProducerState SeqId
s' Text
n, SeqId
s))

{- | Create a new 'Producer' by supplying a 'PulsarCtx' (returned by 'Pulsar.connect') and a 'Topic'. -}
newProducer
  :: (MonadManaged m, MonadIO f) => PulsarCtx -> Topic -> m (Producer f)
newProducer :: PulsarCtx -> Topic -> m (Producer f)
newProducer (Ctx conn :: Connection
conn app :: IORef AppState
app) topic :: Topic
topic = do
  Chan Response
chan  <- m (Chan Response)
forall (m :: * -> *) a. MonadIO m => m (Chan a)
newChan
  ProducerId
pid   <- Chan Response -> IORef AppState -> m ProducerId
forall (m :: * -> *).
MonadIO m =>
Chan Response -> IORef AppState -> m ProducerId
mkProducerId Chan Response
chan IORef AppState
app
  Text
pname <- IO Text -> m Text
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Text -> m Text) -> IO Text -> m Text
forall a b. (a -> b) -> a -> b
$ Chan Response -> ProducerId -> IO Text
mkProducer Chan Response
chan ProducerId
pid
  IORef ProducerState
pst   <- IO (IORef ProducerState) -> m (IORef ProducerState)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ProducerState) -> m (IORef ProducerState))
-> IO (IORef ProducerState) -> m (IORef ProducerState)
forall a b. (a -> b) -> a -> b
$ ProducerState -> IO (IORef ProducerState)
forall a. a -> IO (IORef a)
newIORef (SeqId -> Text -> ProducerState
ProducerState 0 Text
pname)
  Managed (Producer f) -> m (Producer f)
forall (m :: * -> *) a. MonadManaged m => Managed a -> m a
using (Managed (Producer f) -> m (Producer f))
-> Managed (Producer f) -> m (Producer f)
forall a b. (a -> b) -> a -> b
$ (forall r. (Producer f -> IO r) -> IO r) -> Managed (Producer f)
forall a. (forall r. (a -> IO r) -> IO r) -> Managed a
managed
    (IO (Producer f)
-> (Producer f -> IO ()) -> (Producer f -> IO r) -> IO r
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
E.bracket (Producer f -> IO (Producer f)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Producer f -> IO (Producer f)) -> Producer f -> IO (Producer f)
forall a b. (a -> b) -> a -> b
$ (PulsarMessage -> f ()) -> Producer f
forall (m :: * -> *). (PulsarMessage -> m ()) -> Producer m
Producer (Chan Response
-> ProducerId -> IORef ProducerState -> PulsarMessage -> f ()
forall (m :: * -> *).
MonadIO m =>
Chan Response
-> ProducerId -> IORef ProducerState -> PulsarMessage -> m ()
dispatch Chan Response
chan ProducerId
pid IORef ProducerState
pst))
               (IO () -> Producer f -> IO ()
forall a b. a -> b -> a
const (IO () -> Producer f -> IO ()) -> IO () -> Producer f -> IO ()
forall a b. (a -> b) -> a -> b
$ IO ReqId
newReq IO ReqId -> (ReqId -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \r :: ReqId
r -> Connection -> Chan Response -> ReqId -> ProducerId -> IO ()
C.closeProducer Connection
conn Chan Response
chan ReqId
r ProducerId
pid)
    )
 where
  newReq :: IO ReqId
newReq = IORef AppState -> IO ReqId
forall (m :: * -> *). MonadIO m => IORef AppState -> m ReqId
mkRequestId IORef AppState
app
  dispatch :: Chan Response
-> ProducerId -> IORef ProducerState -> PulsarMessage -> m ()
dispatch chan :: Chan Response
chan pid :: ProducerId
pid pst :: IORef ProducerState
pst msg :: PulsarMessage
msg = do
    SeqId
sid <- IORef ProducerState -> m SeqId
forall (m :: * -> *). MonadIO m => IORef ProducerState -> m SeqId
mkSeqId IORef ProducerState
pst
    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Connection
-> Chan Response -> ProducerId -> SeqId -> PulsarMessage -> IO ()
C.send Connection
conn Chan Response
chan ProducerId
pid SeqId
sid PulsarMessage
msg
  mkProducer :: Chan Response -> ProducerId -> IO Text
mkProducer chan :: Chan Response
chan pid :: ProducerId
pid = do
    ReqId
req1 <- IO ReqId
newReq
    Connection -> Chan Response -> ReqId -> Topic -> IO ()
C.lookup Connection
conn Chan Response
chan ReqId
req1 Topic
topic
    ReqId
req2 <- IO ReqId
newReq
    Connection
-> Chan Response -> ReqId -> ProducerId -> Topic -> IO Text
C.newProducer Connection
conn Chan Response
chan ReqId
req2 ProducerId
pid Topic
topic