{-# LANGUAGE LambdaCase, OverloadedStrings #-}

module Pulsar.Consumer where

import           Control.Monad                  ( forever )
import qualified Control.Monad.Catch           as E
import           Control.Monad.Managed
import           Lens.Family
import qualified Proto.PulsarApi_Fields        as F
import qualified Pulsar.Core                   as C
import           Pulsar.Connection
import           Pulsar.Internal.Logger         ( logResponse )
import           Pulsar.Protocol.Frame          ( Payload(..)
                                                , Response(..)
                                                )
import           Pulsar.Types
import           UnliftIO.Chan
import           UnliftIO.Concurrent            ( forkIO
                                                , killThread
                                                )

{- | An abstract 'Consumer' able to 'fetch' messages and 'ack'nowledge them. -}
data Consumer m = Consumer
  { Consumer m -> m Message
fetch :: m Message   -- ^ Fetches a single message. Blocks if no messages are available.
  , Consumer m -> MsgId -> m ()
ack :: MsgId -> m () -- ^ Acknowledges a single message.
  }

{- | Create a new 'Consumer' by supplying a 'PulsarCtx' (returned by 'Pulsar.connect'), a 'Topic' and a 'SubscriptionName'. -}
newConsumer
  :: (MonadManaged m, MonadIO f)
  => PulsarCtx
  -> Topic
  -> SubscriptionName
  -> m (Consumer f)
newConsumer :: PulsarCtx -> Topic -> SubscriptionName -> m (Consumer f)
newConsumer (Ctx conn :: Connection
conn app :: IORef AppState
app) topic :: Topic
topic sub :: SubscriptionName
sub = do
  Chan Response
chan  <- m (Chan Response)
forall (m :: * -> *) a. MonadIO m => m (Chan a)
newChan
  ConsumerId
cid   <- Chan Response -> IORef AppState -> m ConsumerId
forall (m :: * -> *).
MonadIO m =>
Chan Response -> IORef AppState -> m ConsumerId
mkConsumerId Chan Response
chan IORef AppState
app
  Chan Message
fchan <- m (Chan Message)
forall (m :: * -> *) a. MonadIO m => m (Chan a)
newChan
  Managed (Consumer f) -> m (Consumer f)
forall (m :: * -> *) a. MonadManaged m => Managed a -> m a
using (Managed (Consumer f) -> m (Consumer f))
-> Managed (Consumer f) -> m (Consumer f)
forall a b. (a -> b) -> a -> b
$ f Message -> (MsgId -> f ()) -> Consumer f
forall (m :: * -> *). m Message -> (MsgId -> m ()) -> Consumer m
Consumer (Chan Message -> f Message
forall (m :: * -> *) a. MonadIO m => Chan a -> m a
readChan Chan Message
fchan) (ConsumerId -> MsgId -> f ()
forall (m :: * -> *). MonadIO m => ConsumerId -> MsgId -> m ()
acker ConsumerId
cid) Consumer f -> Managed ThreadId -> Managed (Consumer f)
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ (forall r. (ThreadId -> IO r) -> IO r) -> Managed ThreadId
forall a. (forall r. (a -> IO r) -> IO r) -> Managed a
managed
    (IO ThreadId -> (ThreadId -> IO ()) -> (ThreadId -> IO r) -> IO r
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
E.bracket
      (Chan Response -> ConsumerId -> IO ()
mkSubscriber Chan Response
chan ConsumerId
cid IO () -> IO ThreadId -> IO ThreadId
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> IO ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO (Chan Response -> Chan Message -> IO ()
forall (m :: * -> *) a.
MonadIO m =>
Chan Response -> Chan Message -> m a
fetcher Chan Response
chan Chan Message
fchan))
      (\i :: ThreadId
i -> IO ReqId
newReq IO ReqId -> (ReqId -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \r :: ReqId
r -> Connection -> Chan Response -> ReqId -> ConsumerId -> IO ()
C.closeConsumer Connection
conn Chan Response
chan ReqId
r ConsumerId
cid IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ThreadId -> IO ()
forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
i)
    )
 where
  fetcher :: Chan Response -> Chan Message -> m a
fetcher chan :: Chan Response
chan fc :: Chan Message
fc = IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> (IO () -> IO a) -> IO () -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO a
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> m a) -> IO () -> m a
forall a b. (a -> b) -> a -> b
$ Chan Response -> IO Response
forall (m :: * -> *) a. MonadIO m => Chan a -> m a
readChan Chan Response
chan IO Response -> (Response -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    PayloadResponse cmd :: BaseCommand
cmd _ p :: Maybe Payload
p -> case BaseCommand
cmd BaseCommand
-> FoldLike
     (Maybe CommandMessage)
     BaseCommand
     BaseCommand
     (Maybe CommandMessage)
     (Maybe CommandMessage)
-> Maybe CommandMessage
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  (Maybe CommandMessage)
  BaseCommand
  BaseCommand
  (Maybe CommandMessage)
  (Maybe CommandMessage)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'message" a) =>
LensLike' f s a
F.maybe'message of
      Just msg :: CommandMessage
msg ->
        let msgId :: MessageIdData
msgId = CommandMessage
msg CommandMessage
-> FoldLike
     MessageIdData
     CommandMessage
     CommandMessage
     MessageIdData
     MessageIdData
-> MessageIdData
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  MessageIdData
  CommandMessage
  CommandMessage
  MessageIdData
  MessageIdData
forall (f :: * -> *) s a.
(Functor f, HasField s "messageId" a) =>
LensLike' f s a
F.messageId
            pm :: Message
pm    = MsgId -> ByteString -> Message
Message (MessageIdData -> MsgId
MsgId MessageIdData
msgId) (ByteString -> Message) -> ByteString -> Message
forall a b. (a -> b) -> a -> b
$ ByteString
-> (Payload -> ByteString) -> Maybe Payload -> ByteString
forall b a. b -> (a -> b) -> Maybe a -> b
maybe "" (\(Payload x :: ByteString
x) -> ByteString
x) Maybe Payload
p
        in  BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse BaseCommand
cmd IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Chan Message -> Message -> IO ()
forall (m :: * -> *) a. MonadIO m => Chan a -> a -> m ()
writeChan Chan Message
fc Message
pm
      Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    _ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
  newReq :: IO ReqId
newReq = IORef AppState -> IO ReqId
forall (m :: * -> *). MonadIO m => IORef AppState -> m ReqId
mkRequestId IORef AppState
app
  acker :: ConsumerId -> MsgId -> m ()
acker cid :: ConsumerId
cid (MsgId mid :: MessageIdData
mid) = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Connection -> ConsumerId -> MessageIdData -> IO ()
forall (m :: * -> *).
MonadIO m =>
Connection -> ConsumerId -> MessageIdData -> m ()
C.ack Connection
conn ConsumerId
cid MessageIdData
mid
  mkSubscriber :: Chan Response -> ConsumerId -> IO ()
mkSubscriber chan :: Chan Response
chan cid :: ConsumerId
cid = do
    ReqId
req1 <- IO ReqId
newReq
    Connection -> Chan Response -> ReqId -> Topic -> IO ()
C.lookup Connection
conn Chan Response
chan ReqId
req1 Topic
topic
    ReqId
req2 <- IO ReqId
newReq
    Connection
-> Chan Response
-> ReqId
-> ConsumerId
-> Topic
-> SubscriptionName
-> IO ()
C.newSubscriber Connection
conn Chan Response
chan ReqId
req2 ConsumerId
cid Topic
topic SubscriptionName
sub
    Connection -> ConsumerId -> IO ()
C.flow Connection
conn ConsumerId
cid