{-# LANGUAGE GeneralizedNewtypeDeriving, LambdaCase, OverloadedStrings #-}

module Pulsar.Connection where

import           Control.Monad                  ( forever )
import           Control.Monad.Catch            ( MonadThrow )
import           Control.Monad.Managed
import qualified Data.Binary                   as B
import           Data.Foldable                  ( traverse_ )
import           Data.IORef
import           Lens.Family
import qualified Network.Socket                as NS
import qualified Network.Socket.ByteString.Lazy
                                               as SBL
import           Proto.PulsarApi                ( BaseCommand
                                                , MessageMetadata
                                                )
import qualified Proto.PulsarApi_Fields        as F
import           Pulsar.Internal.Logger
import           Pulsar.Internal.TCPClient      ( acquireSocket )
import qualified Pulsar.Protocol.Commands      as P
import           Pulsar.Protocol.Decoder        ( decodeBaseCommand )
import           Pulsar.Protocol.Encoder        ( encodeBaseCommand )
import           Pulsar.Protocol.Frame          ( Payload
                                                , Response(..)
                                                , frameMaxSize
                                                , getCommand
                                                )
import           System.Timeout                 ( timeout )
import           UnliftIO.Async                 ( concurrently_ )
import           UnliftIO.Chan
import           UnliftIO.Concurrent            ( forkIO
                                                , killThread
                                                , threadDelay
                                                )
import           UnliftIO.Exception             ( bracket
                                                , throwIO
                                                )

newtype Connection = Conn NS.Socket

newtype ReqId = ReqId B.Word64 deriving (Integer -> ReqId
ReqId -> ReqId
ReqId -> ReqId -> ReqId
(ReqId -> ReqId -> ReqId)
-> (ReqId -> ReqId -> ReqId)
-> (ReqId -> ReqId -> ReqId)
-> (ReqId -> ReqId)
-> (ReqId -> ReqId)
-> (ReqId -> ReqId)
-> (Integer -> ReqId)
-> Num ReqId
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> ReqId
$cfromInteger :: Integer -> ReqId
signum :: ReqId -> ReqId
$csignum :: ReqId -> ReqId
abs :: ReqId -> ReqId
$cabs :: ReqId -> ReqId
negate :: ReqId -> ReqId
$cnegate :: ReqId -> ReqId
* :: ReqId -> ReqId -> ReqId
$c* :: ReqId -> ReqId -> ReqId
- :: ReqId -> ReqId -> ReqId
$c- :: ReqId -> ReqId -> ReqId
+ :: ReqId -> ReqId -> ReqId
$c+ :: ReqId -> ReqId -> ReqId
Num, Int -> ReqId -> ShowS
[ReqId] -> ShowS
ReqId -> String
(Int -> ReqId -> ShowS)
-> (ReqId -> String) -> ([ReqId] -> ShowS) -> Show ReqId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ReqId] -> ShowS
$cshowList :: [ReqId] -> ShowS
show :: ReqId -> String
$cshow :: ReqId -> String
showsPrec :: Int -> ReqId -> ShowS
$cshowsPrec :: Int -> ReqId -> ShowS
Show)
newtype SeqId = SeqId B.Word64 deriving (Integer -> SeqId
SeqId -> SeqId
SeqId -> SeqId -> SeqId
(SeqId -> SeqId -> SeqId)
-> (SeqId -> SeqId -> SeqId)
-> (SeqId -> SeqId -> SeqId)
-> (SeqId -> SeqId)
-> (SeqId -> SeqId)
-> (SeqId -> SeqId)
-> (Integer -> SeqId)
-> Num SeqId
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> SeqId
$cfromInteger :: Integer -> SeqId
signum :: SeqId -> SeqId
$csignum :: SeqId -> SeqId
abs :: SeqId -> SeqId
$cabs :: SeqId -> SeqId
negate :: SeqId -> SeqId
$cnegate :: SeqId -> SeqId
* :: SeqId -> SeqId -> SeqId
$c* :: SeqId -> SeqId -> SeqId
- :: SeqId -> SeqId -> SeqId
$c- :: SeqId -> SeqId -> SeqId
+ :: SeqId -> SeqId -> SeqId
$c+ :: SeqId -> SeqId -> SeqId
Num, Int -> SeqId -> ShowS
[SeqId] -> ShowS
SeqId -> String
(Int -> SeqId -> ShowS)
-> (SeqId -> String) -> ([SeqId] -> ShowS) -> Show SeqId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SeqId] -> ShowS
$cshowList :: [SeqId] -> ShowS
show :: SeqId -> String
$cshow :: SeqId -> String
showsPrec :: Int -> SeqId -> ShowS
$cshowsPrec :: Int -> SeqId -> ShowS
Show)
newtype ProducerId = PId B.Word64 deriving (Integer -> ProducerId
ProducerId -> ProducerId
ProducerId -> ProducerId -> ProducerId
(ProducerId -> ProducerId -> ProducerId)
-> (ProducerId -> ProducerId -> ProducerId)
-> (ProducerId -> ProducerId -> ProducerId)
-> (ProducerId -> ProducerId)
-> (ProducerId -> ProducerId)
-> (ProducerId -> ProducerId)
-> (Integer -> ProducerId)
-> Num ProducerId
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> ProducerId
$cfromInteger :: Integer -> ProducerId
signum :: ProducerId -> ProducerId
$csignum :: ProducerId -> ProducerId
abs :: ProducerId -> ProducerId
$cabs :: ProducerId -> ProducerId
negate :: ProducerId -> ProducerId
$cnegate :: ProducerId -> ProducerId
* :: ProducerId -> ProducerId -> ProducerId
$c* :: ProducerId -> ProducerId -> ProducerId
- :: ProducerId -> ProducerId -> ProducerId
$c- :: ProducerId -> ProducerId -> ProducerId
+ :: ProducerId -> ProducerId -> ProducerId
$c+ :: ProducerId -> ProducerId -> ProducerId
Num, Int -> ProducerId -> ShowS
[ProducerId] -> ShowS
ProducerId -> String
(Int -> ProducerId -> ShowS)
-> (ProducerId -> String)
-> ([ProducerId] -> ShowS)
-> Show ProducerId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ProducerId] -> ShowS
$cshowList :: [ProducerId] -> ShowS
show :: ProducerId -> String
$cshow :: ProducerId -> String
showsPrec :: Int -> ProducerId -> ShowS
$cshowsPrec :: Int -> ProducerId -> ShowS
Show)
newtype ConsumerId = CId B.Word64 deriving (Integer -> ConsumerId
ConsumerId -> ConsumerId
ConsumerId -> ConsumerId -> ConsumerId
(ConsumerId -> ConsumerId -> ConsumerId)
-> (ConsumerId -> ConsumerId -> ConsumerId)
-> (ConsumerId -> ConsumerId -> ConsumerId)
-> (ConsumerId -> ConsumerId)
-> (ConsumerId -> ConsumerId)
-> (ConsumerId -> ConsumerId)
-> (Integer -> ConsumerId)
-> Num ConsumerId
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> ConsumerId
$cfromInteger :: Integer -> ConsumerId
signum :: ConsumerId -> ConsumerId
$csignum :: ConsumerId -> ConsumerId
abs :: ConsumerId -> ConsumerId
$cabs :: ConsumerId -> ConsumerId
negate :: ConsumerId -> ConsumerId
$cnegate :: ConsumerId -> ConsumerId
* :: ConsumerId -> ConsumerId -> ConsumerId
$c* :: ConsumerId -> ConsumerId -> ConsumerId
- :: ConsumerId -> ConsumerId -> ConsumerId
$c- :: ConsumerId -> ConsumerId -> ConsumerId
+ :: ConsumerId -> ConsumerId -> ConsumerId
$c+ :: ConsumerId -> ConsumerId -> ConsumerId
Num, Int -> ConsumerId -> ShowS
[ConsumerId] -> ShowS
ConsumerId -> String
(Int -> ConsumerId -> ShowS)
-> (ConsumerId -> String)
-> ([ConsumerId] -> ShowS)
-> Show ConsumerId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConsumerId] -> ShowS
$cshowList :: [ConsumerId] -> ShowS
show :: ConsumerId -> String
$cshow :: ConsumerId -> String
showsPrec :: Int -> ConsumerId -> ShowS
$cshowsPrec :: Int -> ConsumerId -> ShowS
Show)

data AppState = AppState
  { AppState -> [(ConsumerId, Chan Response)]
appConsumers :: [(ConsumerId, Chan Response)] -- a list of consumer identifiers associated with a communication channel
  , AppState -> ConsumerId
appConsumerId :: ConsumerId                   -- an incremental counter to assign unique consumer ids
  , AppState -> [(ProducerId, Chan Response)]
appProducers :: [(ProducerId, Chan Response)] -- a list of producer identifiers associated with a communication channel
  , AppState -> ProducerId
appProducerId :: ProducerId                   -- an incremental counter to assign unique producer ids
  , AppState -> ReqId
appRequestId :: ReqId                         -- an incremental counter to assign unique request ids for all commands
  }

mkConsumerId :: MonadIO m => Chan Response -> IORef AppState -> m ConsumerId
mkConsumerId :: Chan Response -> IORef AppState -> m ConsumerId
mkConsumerId chan :: Chan Response
chan ref :: IORef AppState
ref = IO ConsumerId -> m ConsumerId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ConsumerId -> m ConsumerId) -> IO ConsumerId -> m ConsumerId
forall a b. (a -> b) -> a -> b
$ IORef AppState
-> (AppState -> (AppState, ConsumerId)) -> IO ConsumerId
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef
  IORef AppState
ref
  (\(AppState cs :: [(ConsumerId, Chan Response)]
cs cid :: ConsumerId
cid ps :: [(ProducerId, Chan Response)]
ps pid :: ProducerId
pid rid :: ReqId
rid) ->
    let cid' :: ConsumerId
cid' = ConsumerId
cid ConsumerId -> ConsumerId -> ConsumerId
forall a. Num a => a -> a -> a
+ 1 in ([(ConsumerId, Chan Response)]
-> ConsumerId
-> [(ProducerId, Chan Response)]
-> ProducerId
-> ReqId
-> AppState
AppState ((ConsumerId
cid', Chan Response
chan) (ConsumerId, Chan Response)
-> [(ConsumerId, Chan Response)] -> [(ConsumerId, Chan Response)]
forall a. a -> [a] -> [a]
: [(ConsumerId, Chan Response)]
cs) ConsumerId
cid' [(ProducerId, Chan Response)]
ps ProducerId
pid ReqId
rid, ConsumerId
cid)
  )

mkProducerId :: MonadIO m => Chan Response -> IORef AppState -> m ProducerId
mkProducerId :: Chan Response -> IORef AppState -> m ProducerId
mkProducerId chan :: Chan Response
chan ref :: IORef AppState
ref = IO ProducerId -> m ProducerId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ProducerId -> m ProducerId) -> IO ProducerId -> m ProducerId
forall a b. (a -> b) -> a -> b
$ IORef AppState
-> (AppState -> (AppState, ProducerId)) -> IO ProducerId
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef
  IORef AppState
ref
  (\(AppState cs :: [(ConsumerId, Chan Response)]
cs cid :: ConsumerId
cid ps :: [(ProducerId, Chan Response)]
ps pid :: ProducerId
pid rid :: ReqId
rid) ->
    let pid' :: ProducerId
pid' = ProducerId
pid ProducerId -> ProducerId -> ProducerId
forall a. Num a => a -> a -> a
+ 1 in ([(ConsumerId, Chan Response)]
-> ConsumerId
-> [(ProducerId, Chan Response)]
-> ProducerId
-> ReqId
-> AppState
AppState [(ConsumerId, Chan Response)]
cs ConsumerId
cid ((ProducerId
pid', Chan Response
chan) (ProducerId, Chan Response)
-> [(ProducerId, Chan Response)] -> [(ProducerId, Chan Response)]
forall a. a -> [a] -> [a]
: [(ProducerId, Chan Response)]
ps) ProducerId
pid' ReqId
rid, ProducerId
pid)
  )

mkRequestId :: MonadIO m => IORef AppState -> m ReqId
mkRequestId :: IORef AppState -> m ReqId
mkRequestId ref :: IORef AppState
ref = IO ReqId -> m ReqId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ReqId -> m ReqId) -> IO ReqId -> m ReqId
forall a b. (a -> b) -> a -> b
$ IORef AppState -> (AppState -> (AppState, ReqId)) -> IO ReqId
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef
  IORef AppState
ref
  (\(AppState cs :: [(ConsumerId, Chan Response)]
cs cid :: ConsumerId
cid ps :: [(ProducerId, Chan Response)]
ps pid :: ProducerId
pid req :: ReqId
req) ->
    let req' :: ReqId
req' = ReqId
req ReqId -> ReqId -> ReqId
forall a. Num a => a -> a -> a
+ 1 in ([(ConsumerId, Chan Response)]
-> ConsumerId
-> [(ProducerId, Chan Response)]
-> ProducerId
-> ReqId
-> AppState
AppState [(ConsumerId, Chan Response)]
cs ConsumerId
cid [(ProducerId, Chan Response)]
ps ProducerId
pid ReqId
req', ReqId
req)
  )

{- | Connection details: host and port. -}
data ConnectData = ConnData
    { ConnectData -> String
connHost :: NS.HostName
    , ConnectData -> String
connPort :: NS.ServiceName
    } deriving Int -> ConnectData -> ShowS
[ConnectData] -> ShowS
ConnectData -> String
(Int -> ConnectData -> ShowS)
-> (ConnectData -> String)
-> ([ConnectData] -> ShowS)
-> Show ConnectData
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConnectData] -> ShowS
$cshowList :: [ConnectData] -> ShowS
show :: ConnectData -> String
$cshow :: ConnectData -> String
showsPrec :: Int -> ConnectData -> ShowS
$cshowsPrec :: Int -> ConnectData -> ShowS
Show

{- | Internal Pulsar context. You will never need to access its content (not exported) but might need to take it as argument. -}
data PulsarCtx = Ctx
  { PulsarCtx -> Connection
ctxConn :: Connection
  , PulsarCtx -> IORef AppState
ctxState :: IORef AppState
  }

{- | Default connection data: "127.0.0.1:6650" -}
defaultConnectData :: ConnectData
defaultConnectData :: ConnectData
defaultConnectData = ConnData :: String -> String -> ConnectData
ConnData { connHost :: String
connHost = "127.0.0.1", connPort :: String
connPort = "6650" }

{- | Starts a Pulsar connection with the supplied 'ConnectData' -}
connect
  :: (MonadThrow m, MonadIO m, MonadManaged m) => ConnectData -> m PulsarCtx
connect :: ConnectData -> m PulsarCtx
connect (ConnData h :: String
h p :: String
p) = do
  Socket
socket <- String -> String -> m Socket
forall (m :: * -> *).
(MonadIO m, MonadManaged m) =>
String -> String -> m Socket
acquireSocket String
h String
p
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Socket -> BaseCommand -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
socket BaseCommand
P.connect
  Response
resp <- Socket -> m Response
forall (m :: * -> *). MonadIO m => Socket -> m Response
receive Socket
socket
  case Response -> BaseCommand
getCommand Response
resp BaseCommand
-> FoldLike
     (Maybe CommandConnected)
     BaseCommand
     BaseCommand
     (Maybe CommandConnected)
     (Maybe CommandConnected)
-> Maybe CommandConnected
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  (Maybe CommandConnected)
  BaseCommand
  BaseCommand
  (Maybe CommandConnected)
  (Maybe CommandConnected)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'connected" a) =>
LensLike' f s a
F.maybe'connected of
    Just _  -> Response -> m ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse Response
resp
    Nothing -> IOError -> m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (IOError -> m ()) -> IOError -> m ()
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError "Could not connect"
  IORef AppState
app   <- IO (IORef AppState) -> m (IORef AppState)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (IORef AppState)
forall (m :: * -> *). MonadIO m => m (IORef AppState)
initAppState
  Chan BaseCommand
kchan <- IO (Chan BaseCommand) -> m (Chan BaseCommand)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Chan BaseCommand)
forall (m :: * -> *) a. MonadIO m => m (Chan a)
newChan
  let ctx :: PulsarCtx
ctx        = Connection -> IORef AppState -> PulsarCtx
Ctx (Socket -> Connection
Conn Socket
socket) IORef AppState
app
      dispatcher :: IO ()
dispatcher = Socket -> IORef AppState -> Chan BaseCommand -> IO ()
forall (m :: * -> *).
MonadIO m =>
Socket -> IORef AppState -> Chan BaseCommand -> m ()
recvDispatch Socket
socket IORef AppState
app Chan BaseCommand
kchan
      task :: IO ()
task       = IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m ()
concurrently_ IO ()
dispatcher (Socket -> Chan BaseCommand -> IO ()
forall (m :: * -> *).
MonadIO m =>
Socket -> Chan BaseCommand -> m ()
keepAlive Socket
socket Chan BaseCommand
kchan)
  Managed PulsarCtx -> m PulsarCtx
forall (m :: * -> *) a. MonadManaged m => Managed a -> m a
using (Managed PulsarCtx -> m PulsarCtx)
-> Managed PulsarCtx -> m PulsarCtx
forall a b. (a -> b) -> a -> b
$ PulsarCtx
ctx PulsarCtx -> Managed ThreadId -> Managed PulsarCtx
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ (forall r. (ThreadId -> IO r) -> IO r) -> Managed ThreadId
forall a. (forall r. (a -> IO r) -> IO r) -> Managed a
managed (IO ThreadId -> (ThreadId -> IO ()) -> (ThreadId -> IO r) -> IO r
forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (IO () -> IO ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO IO ()
task) ThreadId -> IO ()
forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread)

initAppState :: MonadIO m => m (IORef AppState)
initAppState :: m (IORef AppState)
initAppState = IO (IORef AppState) -> m (IORef AppState)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef AppState) -> m (IORef AppState))
-> (AppState -> IO (IORef AppState))
-> AppState
-> m (IORef AppState)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AppState -> IO (IORef AppState)
forall a. a -> IO (IORef a)
newIORef (AppState -> m (IORef AppState)) -> AppState -> m (IORef AppState)
forall a b. (a -> b) -> a -> b
$ [(ConsumerId, Chan Response)]
-> ConsumerId
-> [(ProducerId, Chan Response)]
-> ProducerId
-> ReqId
-> AppState
AppState [] 0 [] 0 0

recvDispatch
  :: MonadIO m => NS.Socket -> IORef AppState -> Chan BaseCommand -> m ()
recvDispatch :: Socket -> IORef AppState -> Chan BaseCommand -> m ()
recvDispatch s :: Socket
s ref :: IORef AppState
ref chan :: Chan BaseCommand
chan = m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
  Response
resp                   <- Socket -> m Response
forall (m :: * -> *). MonadIO m => Socket -> m Response
receive Socket
s
  (AppState cs :: [(ConsumerId, Chan Response)]
cs _ ps :: [(ProducerId, Chan Response)]
ps _ _) <- IO AppState -> m AppState
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AppState -> m AppState) -> IO AppState -> m AppState
forall a b. (a -> b) -> a -> b
$ IORef AppState -> IO AppState
forall a. IORef a -> IO a
readIORef IORef AppState
ref
  case Response -> BaseCommand
getCommand Response
resp BaseCommand
-> FoldLike
     (Maybe CommandPong)
     BaseCommand
     BaseCommand
     (Maybe CommandPong)
     (Maybe CommandPong)
-> Maybe CommandPong
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  (Maybe CommandPong)
  BaseCommand
  BaseCommand
  (Maybe CommandPong)
  (Maybe CommandPong)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'pong" a) =>
LensLike' f s a
F.maybe'pong of
    Just _  -> Chan BaseCommand -> BaseCommand -> m ()
forall (m :: * -> *) a. MonadIO m => Chan a -> a -> m ()
writeChan Chan BaseCommand
chan (Response -> BaseCommand
getCommand Response
resp)
    Nothing -> (Chan Response -> m ()) -> [Chan Response] -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (Chan Response -> Response -> m ()
forall (m :: * -> *) a. MonadIO m => Chan a -> a -> m ()
`writeChan` Response
resp) (((ConsumerId, Chan Response) -> Chan Response
forall a b. (a, b) -> b
snd ((ConsumerId, Chan Response) -> Chan Response)
-> [(ConsumerId, Chan Response)] -> [Chan Response]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(ConsumerId, Chan Response)]
cs) [Chan Response] -> [Chan Response] -> [Chan Response]
forall a. [a] -> [a] -> [a]
++ ((ProducerId, Chan Response) -> Chan Response
forall a b. (a, b) -> b
snd ((ProducerId, Chan Response) -> Chan Response)
-> [(ProducerId, Chan Response)] -> [Chan Response]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(ProducerId, Chan Response)]
ps))

{- Emit a PING and expect a PONG every 29 seconds. If a PONG is not received, interrupt connection -}
keepAlive :: MonadIO m => NS.Socket -> Chan BaseCommand -> m ()
keepAlive :: Socket -> Chan BaseCommand -> m ()
keepAlive s :: Socket
s chan :: Chan BaseCommand
chan = m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
  Int -> m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay (29 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000000)
  BaseCommand -> m ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest BaseCommand
P.ping
  Socket -> BaseCommand -> m ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s BaseCommand
P.ping
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO BaseCommand -> IO (Maybe BaseCommand)
forall a. Int -> IO a -> IO (Maybe a)
timeout (2 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000000) (Chan BaseCommand -> IO BaseCommand
forall (m :: * -> *) a. MonadIO m => Chan a -> m a
readChan Chan BaseCommand
chan) IO (Maybe BaseCommand) -> (Maybe BaseCommand -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Just cmd :: BaseCommand
cmd -> BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse BaseCommand
cmd
    Nothing  -> IOError -> IO ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (String -> IOError
userError "Keep Alive interruption")

sendSimpleCmd :: MonadIO m => NS.Socket -> BaseCommand -> m ()
sendSimpleCmd :: Socket -> BaseCommand -> m ()
sendSimpleCmd s :: Socket
s cmd :: BaseCommand
cmd =
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (ByteString -> IO ()) -> ByteString -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> ByteString -> IO ()
SBL.sendAll Socket
s (ByteString -> m ()) -> ByteString -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe MessageMetadata -> Maybe Payload -> BaseCommand -> ByteString
encodeBaseCommand Maybe MessageMetadata
forall a. Maybe a
Nothing Maybe Payload
forall a. Maybe a
Nothing BaseCommand
cmd

sendPayloadCmd
  :: MonadIO m
  => NS.Socket
  -> BaseCommand
  -> MessageMetadata
  -> Maybe Payload
  -> m ()
sendPayloadCmd :: Socket -> BaseCommand -> MessageMetadata -> Maybe Payload -> m ()
sendPayloadCmd s :: Socket
s cmd :: BaseCommand
cmd meta :: MessageMetadata
meta payload :: Maybe Payload
payload =
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (ByteString -> IO ()) -> ByteString -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> ByteString -> IO ()
SBL.sendAll Socket
s (ByteString -> m ()) -> ByteString -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe MessageMetadata -> Maybe Payload -> BaseCommand -> ByteString
encodeBaseCommand (MessageMetadata -> Maybe MessageMetadata
forall a. a -> Maybe a
Just MessageMetadata
meta) Maybe Payload
payload BaseCommand
cmd

receive :: MonadIO m => NS.Socket -> m Response
receive :: Socket -> m Response
receive s :: Socket
s = IO Response -> m Response
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Response -> m Response) -> IO Response -> m Response
forall a b. (a -> b) -> a -> b
$ do
  ByteString
msg <- Socket -> Int64 -> IO ByteString
SBL.recv Socket
s (Int64 -> IO ByteString) -> Int64 -> IO ByteString
forall a b. (a -> b) -> a -> b
$ Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
frameMaxSize
  case ByteString -> Either String Response
decodeBaseCommand ByteString
msg of
    Left  e :: String
e    -> String -> IO Response
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> IO Response) -> String -> IO Response
forall a b. (a -> b) -> a -> b
$ "Decoding error: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
e
    Right resp :: Response
resp -> case Response -> BaseCommand
getCommand Response
resp BaseCommand
-> FoldLike
     (Maybe CommandPing)
     BaseCommand
     BaseCommand
     (Maybe CommandPing)
     (Maybe CommandPing)
-> Maybe CommandPing
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  (Maybe CommandPing)
  BaseCommand
  BaseCommand
  (Maybe CommandPing)
  (Maybe CommandPing)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'ping" a) =>
LensLike' f s a
F.maybe'ping of
      Just _ -> do
        BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Response -> BaseCommand
getCommand Response
resp
        BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest BaseCommand
P.pong
        Socket -> BaseCommand -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s BaseCommand
P.pong
        Response -> IO Response
forall (m :: * -> *) a. Monad m => a -> m a
return Response
resp
      Nothing -> Response -> IO Response
forall (m :: * -> *) a. Monad m => a -> m a
return Response
resp