{-# LANGUAGE LambdaCase, OverloadedStrings #-}
module Pulsar.Consumer where
import Control.Monad ( forever )
import qualified Control.Monad.Catch as E
import Control.Monad.Managed
import Lens.Family
import qualified Proto.PulsarApi_Fields as F
import qualified Pulsar.Core as C
import Pulsar.Connection
import Pulsar.Internal.Logger ( logResponse )
import Pulsar.Protocol.Frame ( Payload(..)
, Response(..)
)
import Pulsar.Types
import UnliftIO.Chan
import UnliftIO.Concurrent ( forkIO
, killThread
)
data Consumer m = Consumer
{ Consumer m -> m Message
fetch :: m Message
, Consumer m -> MsgId -> m ()
ack :: MsgId -> m ()
}
newConsumer
:: (MonadManaged m, MonadIO f)
=> PulsarCtx
-> Topic
-> SubscriptionName
-> m (Consumer f)
newConsumer :: PulsarCtx -> Topic -> SubscriptionName -> m (Consumer f)
newConsumer (Ctx conn :: Connection
conn app :: IORef AppState
app) topic :: Topic
topic sub :: SubscriptionName
sub = do
Chan Response
chan <- m (Chan Response)
forall (m :: * -> *) a. MonadIO m => m (Chan a)
newChan
ConsumerId
cid <- Chan Response -> IORef AppState -> m ConsumerId
forall (m :: * -> *).
MonadIO m =>
Chan Response -> IORef AppState -> m ConsumerId
mkConsumerId Chan Response
chan IORef AppState
app
Chan Message
fchan <- m (Chan Message)
forall (m :: * -> *) a. MonadIO m => m (Chan a)
newChan
Managed (Consumer f) -> m (Consumer f)
forall (m :: * -> *) a. MonadManaged m => Managed a -> m a
using (Managed (Consumer f) -> m (Consumer f))
-> Managed (Consumer f) -> m (Consumer f)
forall a b. (a -> b) -> a -> b
$ f Message -> (MsgId -> f ()) -> Consumer f
forall (m :: * -> *). m Message -> (MsgId -> m ()) -> Consumer m
Consumer (Chan Message -> f Message
forall (m :: * -> *) a. MonadIO m => Chan a -> m a
readChan Chan Message
fchan) (ConsumerId -> MsgId -> f ()
forall (m :: * -> *). MonadIO m => ConsumerId -> MsgId -> m ()
acker ConsumerId
cid) Consumer f -> Managed ThreadId -> Managed (Consumer f)
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ (forall r. (ThreadId -> IO r) -> IO r) -> Managed ThreadId
forall a. (forall r. (a -> IO r) -> IO r) -> Managed a
managed
(IO ThreadId -> (ThreadId -> IO ()) -> (ThreadId -> IO r) -> IO r
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
E.bracket
(Chan Response -> ConsumerId -> IO ()
mkSubscriber Chan Response
chan ConsumerId
cid IO () -> IO ThreadId -> IO ThreadId
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> IO ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO (Chan Response -> Chan Message -> IO ()
forall (m :: * -> *) a.
MonadIO m =>
Chan Response -> Chan Message -> m a
fetcher Chan Response
chan Chan Message
fchan))
(\i :: ThreadId
i -> IO ReqId
newReq IO ReqId -> (ReqId -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \r :: ReqId
r -> Connection -> Chan Response -> ReqId -> ConsumerId -> IO ()
C.closeConsumer Connection
conn Chan Response
chan ReqId
r ConsumerId
cid IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ThreadId -> IO ()
forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread ThreadId
i)
)
where
fetcher :: Chan Response -> Chan Message -> m a
fetcher chan :: Chan Response
chan fc :: Chan Message
fc = IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> (IO () -> IO a) -> IO () -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO a
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> m a) -> IO () -> m a
forall a b. (a -> b) -> a -> b
$ Chan Response -> IO Response
forall (m :: * -> *) a. MonadIO m => Chan a -> m a
readChan Chan Response
chan IO Response -> (Response -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
PayloadResponse cmd :: BaseCommand
cmd _ p :: Maybe Payload
p -> case BaseCommand
cmd BaseCommand
-> FoldLike
(Maybe CommandMessage)
BaseCommand
BaseCommand
(Maybe CommandMessage)
(Maybe CommandMessage)
-> Maybe CommandMessage
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe CommandMessage)
BaseCommand
BaseCommand
(Maybe CommandMessage)
(Maybe CommandMessage)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'message" a) =>
LensLike' f s a
F.maybe'message of
Just msg :: CommandMessage
msg ->
let msgId :: MessageIdData
msgId = CommandMessage
msg CommandMessage
-> FoldLike
MessageIdData
CommandMessage
CommandMessage
MessageIdData
MessageIdData
-> MessageIdData
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
MessageIdData
CommandMessage
CommandMessage
MessageIdData
MessageIdData
forall (f :: * -> *) s a.
(Functor f, HasField s "messageId" a) =>
LensLike' f s a
F.messageId
pm :: Message
pm = MsgId -> ByteString -> Message
Message (MessageIdData -> MsgId
MsgId MessageIdData
msgId) (ByteString -> Message) -> ByteString -> Message
forall a b. (a -> b) -> a -> b
$ ByteString
-> (Payload -> ByteString) -> Maybe Payload -> ByteString
forall b a. b -> (a -> b) -> Maybe a -> b
maybe "" (\(Payload x :: ByteString
x) -> ByteString
x) Maybe Payload
p
in BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse BaseCommand
cmd IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Chan Message -> Message -> IO ()
forall (m :: * -> *) a. MonadIO m => Chan a -> a -> m ()
writeChan Chan Message
fc Message
pm
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
newReq :: IO ReqId
newReq = IORef AppState -> IO ReqId
forall (m :: * -> *). MonadIO m => IORef AppState -> m ReqId
mkRequestId IORef AppState
app
acker :: ConsumerId -> MsgId -> m ()
acker cid :: ConsumerId
cid (MsgId mid :: MessageIdData
mid) = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Connection -> ConsumerId -> MessageIdData -> IO ()
forall (m :: * -> *).
MonadIO m =>
Connection -> ConsumerId -> MessageIdData -> m ()
C.ack Connection
conn ConsumerId
cid MessageIdData
mid
mkSubscriber :: Chan Response -> ConsumerId -> IO ()
mkSubscriber chan :: Chan Response
chan cid :: ConsumerId
cid = do
ReqId
req1 <- IO ReqId
newReq
Connection -> Chan Response -> ReqId -> Topic -> IO ()
C.lookup Connection
conn Chan Response
chan ReqId
req1 Topic
topic
ReqId
req2 <- IO ReqId
newReq
Connection
-> Chan Response
-> ReqId
-> ConsumerId
-> Topic
-> SubscriptionName
-> IO ()
C.newSubscriber Connection
conn Chan Response
chan ReqId
req2 ConsumerId
cid Topic
topic SubscriptionName
sub
Connection -> ConsumerId -> IO ()
C.flow Connection
conn ConsumerId
cid