{-# LANGUAGE GeneralizedNewtypeDeriving, LambdaCase, OverloadedStrings #-}
module Pulsar.Connection where
import Control.Monad ( forever )
import Control.Monad.Catch ( MonadThrow )
import Control.Monad.Managed
import qualified Data.Binary as B
import Data.Foldable ( traverse_ )
import Data.IORef
import Lens.Family
import qualified Network.Socket as NS
import qualified Network.Socket.ByteString.Lazy
as SBL
import Proto.PulsarApi ( BaseCommand
, MessageMetadata
)
import qualified Proto.PulsarApi_Fields as F
import Pulsar.Internal.Logger
import Pulsar.Internal.TCPClient ( acquireSocket )
import qualified Pulsar.Protocol.Commands as P
import Pulsar.Protocol.Decoder ( decodeBaseCommand )
import Pulsar.Protocol.Encoder ( encodeBaseCommand )
import Pulsar.Protocol.Frame ( Payload
, Response(..)
, frameMaxSize
, getCommand
)
import System.Timeout ( timeout )
import UnliftIO.Async ( concurrently_ )
import UnliftIO.Chan
import UnliftIO.Concurrent ( forkIO
, killThread
, threadDelay
)
import UnliftIO.Exception ( bracket
, throwIO
)
newtype Connection = Conn NS.Socket
newtype ReqId = ReqId B.Word64 deriving (Integer -> ReqId
ReqId -> ReqId
ReqId -> ReqId -> ReqId
(ReqId -> ReqId -> ReqId)
-> (ReqId -> ReqId -> ReqId)
-> (ReqId -> ReqId -> ReqId)
-> (ReqId -> ReqId)
-> (ReqId -> ReqId)
-> (ReqId -> ReqId)
-> (Integer -> ReqId)
-> Num ReqId
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> ReqId
$cfromInteger :: Integer -> ReqId
signum :: ReqId -> ReqId
$csignum :: ReqId -> ReqId
abs :: ReqId -> ReqId
$cabs :: ReqId -> ReqId
negate :: ReqId -> ReqId
$cnegate :: ReqId -> ReqId
* :: ReqId -> ReqId -> ReqId
$c* :: ReqId -> ReqId -> ReqId
- :: ReqId -> ReqId -> ReqId
$c- :: ReqId -> ReqId -> ReqId
+ :: ReqId -> ReqId -> ReqId
$c+ :: ReqId -> ReqId -> ReqId
Num, Int -> ReqId -> ShowS
[ReqId] -> ShowS
ReqId -> String
(Int -> ReqId -> ShowS)
-> (ReqId -> String) -> ([ReqId] -> ShowS) -> Show ReqId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ReqId] -> ShowS
$cshowList :: [ReqId] -> ShowS
show :: ReqId -> String
$cshow :: ReqId -> String
showsPrec :: Int -> ReqId -> ShowS
$cshowsPrec :: Int -> ReqId -> ShowS
Show)
newtype SeqId = SeqId B.Word64 deriving (Integer -> SeqId
SeqId -> SeqId
SeqId -> SeqId -> SeqId
(SeqId -> SeqId -> SeqId)
-> (SeqId -> SeqId -> SeqId)
-> (SeqId -> SeqId -> SeqId)
-> (SeqId -> SeqId)
-> (SeqId -> SeqId)
-> (SeqId -> SeqId)
-> (Integer -> SeqId)
-> Num SeqId
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> SeqId
$cfromInteger :: Integer -> SeqId
signum :: SeqId -> SeqId
$csignum :: SeqId -> SeqId
abs :: SeqId -> SeqId
$cabs :: SeqId -> SeqId
negate :: SeqId -> SeqId
$cnegate :: SeqId -> SeqId
* :: SeqId -> SeqId -> SeqId
$c* :: SeqId -> SeqId -> SeqId
- :: SeqId -> SeqId -> SeqId
$c- :: SeqId -> SeqId -> SeqId
+ :: SeqId -> SeqId -> SeqId
$c+ :: SeqId -> SeqId -> SeqId
Num, Int -> SeqId -> ShowS
[SeqId] -> ShowS
SeqId -> String
(Int -> SeqId -> ShowS)
-> (SeqId -> String) -> ([SeqId] -> ShowS) -> Show SeqId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SeqId] -> ShowS
$cshowList :: [SeqId] -> ShowS
show :: SeqId -> String
$cshow :: SeqId -> String
showsPrec :: Int -> SeqId -> ShowS
$cshowsPrec :: Int -> SeqId -> ShowS
Show)
newtype ProducerId = PId B.Word64 deriving (Integer -> ProducerId
ProducerId -> ProducerId
ProducerId -> ProducerId -> ProducerId
(ProducerId -> ProducerId -> ProducerId)
-> (ProducerId -> ProducerId -> ProducerId)
-> (ProducerId -> ProducerId -> ProducerId)
-> (ProducerId -> ProducerId)
-> (ProducerId -> ProducerId)
-> (ProducerId -> ProducerId)
-> (Integer -> ProducerId)
-> Num ProducerId
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> ProducerId
$cfromInteger :: Integer -> ProducerId
signum :: ProducerId -> ProducerId
$csignum :: ProducerId -> ProducerId
abs :: ProducerId -> ProducerId
$cabs :: ProducerId -> ProducerId
negate :: ProducerId -> ProducerId
$cnegate :: ProducerId -> ProducerId
* :: ProducerId -> ProducerId -> ProducerId
$c* :: ProducerId -> ProducerId -> ProducerId
- :: ProducerId -> ProducerId -> ProducerId
$c- :: ProducerId -> ProducerId -> ProducerId
+ :: ProducerId -> ProducerId -> ProducerId
$c+ :: ProducerId -> ProducerId -> ProducerId
Num, Int -> ProducerId -> ShowS
[ProducerId] -> ShowS
ProducerId -> String
(Int -> ProducerId -> ShowS)
-> (ProducerId -> String)
-> ([ProducerId] -> ShowS)
-> Show ProducerId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ProducerId] -> ShowS
$cshowList :: [ProducerId] -> ShowS
show :: ProducerId -> String
$cshow :: ProducerId -> String
showsPrec :: Int -> ProducerId -> ShowS
$cshowsPrec :: Int -> ProducerId -> ShowS
Show)
newtype ConsumerId = CId B.Word64 deriving (Integer -> ConsumerId
ConsumerId -> ConsumerId
ConsumerId -> ConsumerId -> ConsumerId
(ConsumerId -> ConsumerId -> ConsumerId)
-> (ConsumerId -> ConsumerId -> ConsumerId)
-> (ConsumerId -> ConsumerId -> ConsumerId)
-> (ConsumerId -> ConsumerId)
-> (ConsumerId -> ConsumerId)
-> (ConsumerId -> ConsumerId)
-> (Integer -> ConsumerId)
-> Num ConsumerId
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> ConsumerId
$cfromInteger :: Integer -> ConsumerId
signum :: ConsumerId -> ConsumerId
$csignum :: ConsumerId -> ConsumerId
abs :: ConsumerId -> ConsumerId
$cabs :: ConsumerId -> ConsumerId
negate :: ConsumerId -> ConsumerId
$cnegate :: ConsumerId -> ConsumerId
* :: ConsumerId -> ConsumerId -> ConsumerId
$c* :: ConsumerId -> ConsumerId -> ConsumerId
- :: ConsumerId -> ConsumerId -> ConsumerId
$c- :: ConsumerId -> ConsumerId -> ConsumerId
+ :: ConsumerId -> ConsumerId -> ConsumerId
$c+ :: ConsumerId -> ConsumerId -> ConsumerId
Num, Int -> ConsumerId -> ShowS
[ConsumerId] -> ShowS
ConsumerId -> String
(Int -> ConsumerId -> ShowS)
-> (ConsumerId -> String)
-> ([ConsumerId] -> ShowS)
-> Show ConsumerId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConsumerId] -> ShowS
$cshowList :: [ConsumerId] -> ShowS
show :: ConsumerId -> String
$cshow :: ConsumerId -> String
showsPrec :: Int -> ConsumerId -> ShowS
$cshowsPrec :: Int -> ConsumerId -> ShowS
Show)
data AppState = AppState
{ AppState -> [(ConsumerId, Chan Response)]
appConsumers :: [(ConsumerId, Chan Response)]
, AppState -> ConsumerId
appConsumerId :: ConsumerId
, AppState -> [(ProducerId, Chan Response)]
appProducers :: [(ProducerId, Chan Response)]
, AppState -> ProducerId
appProducerId :: ProducerId
, AppState -> ReqId
appRequestId :: ReqId
}
mkConsumerId :: MonadIO m => Chan Response -> IORef AppState -> m ConsumerId
mkConsumerId :: Chan Response -> IORef AppState -> m ConsumerId
mkConsumerId chan :: Chan Response
chan ref :: IORef AppState
ref = IO ConsumerId -> m ConsumerId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ConsumerId -> m ConsumerId) -> IO ConsumerId -> m ConsumerId
forall a b. (a -> b) -> a -> b
$ IORef AppState
-> (AppState -> (AppState, ConsumerId)) -> IO ConsumerId
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef
IORef AppState
ref
(\(AppState cs :: [(ConsumerId, Chan Response)]
cs cid :: ConsumerId
cid ps :: [(ProducerId, Chan Response)]
ps pid :: ProducerId
pid rid :: ReqId
rid) ->
let cid' :: ConsumerId
cid' = ConsumerId
cid ConsumerId -> ConsumerId -> ConsumerId
forall a. Num a => a -> a -> a
+ 1 in ([(ConsumerId, Chan Response)]
-> ConsumerId
-> [(ProducerId, Chan Response)]
-> ProducerId
-> ReqId
-> AppState
AppState ((ConsumerId
cid', Chan Response
chan) (ConsumerId, Chan Response)
-> [(ConsumerId, Chan Response)] -> [(ConsumerId, Chan Response)]
forall a. a -> [a] -> [a]
: [(ConsumerId, Chan Response)]
cs) ConsumerId
cid' [(ProducerId, Chan Response)]
ps ProducerId
pid ReqId
rid, ConsumerId
cid)
)
mkProducerId :: MonadIO m => Chan Response -> IORef AppState -> m ProducerId
mkProducerId :: Chan Response -> IORef AppState -> m ProducerId
mkProducerId chan :: Chan Response
chan ref :: IORef AppState
ref = IO ProducerId -> m ProducerId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ProducerId -> m ProducerId) -> IO ProducerId -> m ProducerId
forall a b. (a -> b) -> a -> b
$ IORef AppState
-> (AppState -> (AppState, ProducerId)) -> IO ProducerId
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef
IORef AppState
ref
(\(AppState cs :: [(ConsumerId, Chan Response)]
cs cid :: ConsumerId
cid ps :: [(ProducerId, Chan Response)]
ps pid :: ProducerId
pid rid :: ReqId
rid) ->
let pid' :: ProducerId
pid' = ProducerId
pid ProducerId -> ProducerId -> ProducerId
forall a. Num a => a -> a -> a
+ 1 in ([(ConsumerId, Chan Response)]
-> ConsumerId
-> [(ProducerId, Chan Response)]
-> ProducerId
-> ReqId
-> AppState
AppState [(ConsumerId, Chan Response)]
cs ConsumerId
cid ((ProducerId
pid', Chan Response
chan) (ProducerId, Chan Response)
-> [(ProducerId, Chan Response)] -> [(ProducerId, Chan Response)]
forall a. a -> [a] -> [a]
: [(ProducerId, Chan Response)]
ps) ProducerId
pid' ReqId
rid, ProducerId
pid)
)
mkRequestId :: MonadIO m => IORef AppState -> m ReqId
mkRequestId :: IORef AppState -> m ReqId
mkRequestId ref :: IORef AppState
ref = IO ReqId -> m ReqId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ReqId -> m ReqId) -> IO ReqId -> m ReqId
forall a b. (a -> b) -> a -> b
$ IORef AppState -> (AppState -> (AppState, ReqId)) -> IO ReqId
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef
IORef AppState
ref
(\(AppState cs :: [(ConsumerId, Chan Response)]
cs cid :: ConsumerId
cid ps :: [(ProducerId, Chan Response)]
ps pid :: ProducerId
pid req :: ReqId
req) ->
let req' :: ReqId
req' = ReqId
req ReqId -> ReqId -> ReqId
forall a. Num a => a -> a -> a
+ 1 in ([(ConsumerId, Chan Response)]
-> ConsumerId
-> [(ProducerId, Chan Response)]
-> ProducerId
-> ReqId
-> AppState
AppState [(ConsumerId, Chan Response)]
cs ConsumerId
cid [(ProducerId, Chan Response)]
ps ProducerId
pid ReqId
req', ReqId
req)
)
data ConnectData = ConnData
{ ConnectData -> String
connHost :: NS.HostName
, ConnectData -> String
connPort :: NS.ServiceName
} deriving Int -> ConnectData -> ShowS
[ConnectData] -> ShowS
ConnectData -> String
(Int -> ConnectData -> ShowS)
-> (ConnectData -> String)
-> ([ConnectData] -> ShowS)
-> Show ConnectData
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConnectData] -> ShowS
$cshowList :: [ConnectData] -> ShowS
show :: ConnectData -> String
$cshow :: ConnectData -> String
showsPrec :: Int -> ConnectData -> ShowS
$cshowsPrec :: Int -> ConnectData -> ShowS
Show
data PulsarCtx = Ctx
{ PulsarCtx -> Connection
ctxConn :: Connection
, PulsarCtx -> IORef AppState
ctxState :: IORef AppState
}
defaultConnectData :: ConnectData
defaultConnectData :: ConnectData
defaultConnectData = ConnData :: String -> String -> ConnectData
ConnData { connHost :: String
connHost = "127.0.0.1", connPort :: String
connPort = "6650" }
connect
:: (MonadThrow m, MonadIO m, MonadManaged m) => ConnectData -> m PulsarCtx
connect :: ConnectData -> m PulsarCtx
connect (ConnData h :: String
h p :: String
p) = do
Socket
socket <- String -> String -> m Socket
forall (m :: * -> *).
(MonadIO m, MonadManaged m) =>
String -> String -> m Socket
acquireSocket String
h String
p
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Socket -> BaseCommand -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
socket BaseCommand
P.connect
Response
resp <- Socket -> m Response
forall (m :: * -> *). MonadIO m => Socket -> m Response
receive Socket
socket
case Response -> BaseCommand
getCommand Response
resp BaseCommand
-> FoldLike
(Maybe CommandConnected)
BaseCommand
BaseCommand
(Maybe CommandConnected)
(Maybe CommandConnected)
-> Maybe CommandConnected
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe CommandConnected)
BaseCommand
BaseCommand
(Maybe CommandConnected)
(Maybe CommandConnected)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'connected" a) =>
LensLike' f s a
F.maybe'connected of
Just _ -> Response -> m ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse Response
resp
Nothing -> IOError -> m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (IOError -> m ()) -> IOError -> m ()
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError "Could not connect"
IORef AppState
app <- IO (IORef AppState) -> m (IORef AppState)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (IORef AppState)
forall (m :: * -> *). MonadIO m => m (IORef AppState)
initAppState
Chan BaseCommand
kchan <- IO (Chan BaseCommand) -> m (Chan BaseCommand)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Chan BaseCommand)
forall (m :: * -> *) a. MonadIO m => m (Chan a)
newChan
let ctx :: PulsarCtx
ctx = Connection -> IORef AppState -> PulsarCtx
Ctx (Socket -> Connection
Conn Socket
socket) IORef AppState
app
dispatcher :: IO ()
dispatcher = Socket -> IORef AppState -> Chan BaseCommand -> IO ()
forall (m :: * -> *).
MonadIO m =>
Socket -> IORef AppState -> Chan BaseCommand -> m ()
recvDispatch Socket
socket IORef AppState
app Chan BaseCommand
kchan
task :: IO ()
task = IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m ()
concurrently_ IO ()
dispatcher (Socket -> Chan BaseCommand -> IO ()
forall (m :: * -> *).
MonadIO m =>
Socket -> Chan BaseCommand -> m ()
keepAlive Socket
socket Chan BaseCommand
kchan)
Managed PulsarCtx -> m PulsarCtx
forall (m :: * -> *) a. MonadManaged m => Managed a -> m a
using (Managed PulsarCtx -> m PulsarCtx)
-> Managed PulsarCtx -> m PulsarCtx
forall a b. (a -> b) -> a -> b
$ PulsarCtx
ctx PulsarCtx -> Managed ThreadId -> Managed PulsarCtx
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ (forall r. (ThreadId -> IO r) -> IO r) -> Managed ThreadId
forall a. (forall r. (a -> IO r) -> IO r) -> Managed a
managed (IO ThreadId -> (ThreadId -> IO ()) -> (ThreadId -> IO r) -> IO r
forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (IO () -> IO ThreadId
forall (m :: * -> *). MonadUnliftIO m => m () -> m ThreadId
forkIO IO ()
task) ThreadId -> IO ()
forall (m :: * -> *). MonadIO m => ThreadId -> m ()
killThread)
initAppState :: MonadIO m => m (IORef AppState)
initAppState :: m (IORef AppState)
initAppState = IO (IORef AppState) -> m (IORef AppState)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef AppState) -> m (IORef AppState))
-> (AppState -> IO (IORef AppState))
-> AppState
-> m (IORef AppState)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AppState -> IO (IORef AppState)
forall a. a -> IO (IORef a)
newIORef (AppState -> m (IORef AppState)) -> AppState -> m (IORef AppState)
forall a b. (a -> b) -> a -> b
$ [(ConsumerId, Chan Response)]
-> ConsumerId
-> [(ProducerId, Chan Response)]
-> ProducerId
-> ReqId
-> AppState
AppState [] 0 [] 0 0
recvDispatch
:: MonadIO m => NS.Socket -> IORef AppState -> Chan BaseCommand -> m ()
recvDispatch :: Socket -> IORef AppState -> Chan BaseCommand -> m ()
recvDispatch s :: Socket
s ref :: IORef AppState
ref chan :: Chan BaseCommand
chan = m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Response
resp <- Socket -> m Response
forall (m :: * -> *). MonadIO m => Socket -> m Response
receive Socket
s
(AppState cs :: [(ConsumerId, Chan Response)]
cs _ ps :: [(ProducerId, Chan Response)]
ps _ _) <- IO AppState -> m AppState
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AppState -> m AppState) -> IO AppState -> m AppState
forall a b. (a -> b) -> a -> b
$ IORef AppState -> IO AppState
forall a. IORef a -> IO a
readIORef IORef AppState
ref
case Response -> BaseCommand
getCommand Response
resp BaseCommand
-> FoldLike
(Maybe CommandPong)
BaseCommand
BaseCommand
(Maybe CommandPong)
(Maybe CommandPong)
-> Maybe CommandPong
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe CommandPong)
BaseCommand
BaseCommand
(Maybe CommandPong)
(Maybe CommandPong)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'pong" a) =>
LensLike' f s a
F.maybe'pong of
Just _ -> Chan BaseCommand -> BaseCommand -> m ()
forall (m :: * -> *) a. MonadIO m => Chan a -> a -> m ()
writeChan Chan BaseCommand
chan (Response -> BaseCommand
getCommand Response
resp)
Nothing -> (Chan Response -> m ()) -> [Chan Response] -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (Chan Response -> Response -> m ()
forall (m :: * -> *) a. MonadIO m => Chan a -> a -> m ()
`writeChan` Response
resp) (((ConsumerId, Chan Response) -> Chan Response
forall a b. (a, b) -> b
snd ((ConsumerId, Chan Response) -> Chan Response)
-> [(ConsumerId, Chan Response)] -> [Chan Response]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(ConsumerId, Chan Response)]
cs) [Chan Response] -> [Chan Response] -> [Chan Response]
forall a. [a] -> [a] -> [a]
++ ((ProducerId, Chan Response) -> Chan Response
forall a b. (a, b) -> b
snd ((ProducerId, Chan Response) -> Chan Response)
-> [(ProducerId, Chan Response)] -> [Chan Response]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [(ProducerId, Chan Response)]
ps))
keepAlive :: MonadIO m => NS.Socket -> Chan BaseCommand -> m ()
keepAlive :: Socket -> Chan BaseCommand -> m ()
keepAlive s :: Socket
s chan :: Chan BaseCommand
chan = m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Int -> m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay (29 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000000)
BaseCommand -> m ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest BaseCommand
P.ping
Socket -> BaseCommand -> m ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s BaseCommand
P.ping
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO BaseCommand -> IO (Maybe BaseCommand)
forall a. Int -> IO a -> IO (Maybe a)
timeout (2 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000000) (Chan BaseCommand -> IO BaseCommand
forall (m :: * -> *) a. MonadIO m => Chan a -> m a
readChan Chan BaseCommand
chan) IO (Maybe BaseCommand) -> (Maybe BaseCommand -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just cmd :: BaseCommand
cmd -> BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse BaseCommand
cmd
Nothing -> IOError -> IO ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (String -> IOError
userError "Keep Alive interruption")
sendSimpleCmd :: MonadIO m => NS.Socket -> BaseCommand -> m ()
sendSimpleCmd :: Socket -> BaseCommand -> m ()
sendSimpleCmd s :: Socket
s cmd :: BaseCommand
cmd =
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (ByteString -> IO ()) -> ByteString -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> ByteString -> IO ()
SBL.sendAll Socket
s (ByteString -> m ()) -> ByteString -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe MessageMetadata -> Maybe Payload -> BaseCommand -> ByteString
encodeBaseCommand Maybe MessageMetadata
forall a. Maybe a
Nothing Maybe Payload
forall a. Maybe a
Nothing BaseCommand
cmd
sendPayloadCmd
:: MonadIO m
=> NS.Socket
-> BaseCommand
-> MessageMetadata
-> Maybe Payload
-> m ()
sendPayloadCmd :: Socket -> BaseCommand -> MessageMetadata -> Maybe Payload -> m ()
sendPayloadCmd s :: Socket
s cmd :: BaseCommand
cmd meta :: MessageMetadata
meta payload :: Maybe Payload
payload =
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (ByteString -> IO ()) -> ByteString -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> ByteString -> IO ()
SBL.sendAll Socket
s (ByteString -> m ()) -> ByteString -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe MessageMetadata -> Maybe Payload -> BaseCommand -> ByteString
encodeBaseCommand (MessageMetadata -> Maybe MessageMetadata
forall a. a -> Maybe a
Just MessageMetadata
meta) Maybe Payload
payload BaseCommand
cmd
receive :: MonadIO m => NS.Socket -> m Response
receive :: Socket -> m Response
receive s :: Socket
s = IO Response -> m Response
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Response -> m Response) -> IO Response -> m Response
forall a b. (a -> b) -> a -> b
$ do
ByteString
msg <- Socket -> Int64 -> IO ByteString
SBL.recv Socket
s (Int64 -> IO ByteString) -> Int64 -> IO ByteString
forall a b. (a -> b) -> a -> b
$ Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
frameMaxSize
case ByteString -> Either String Response
decodeBaseCommand ByteString
msg of
Left e :: String
e -> String -> IO Response
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> IO Response) -> String -> IO Response
forall a b. (a -> b) -> a -> b
$ "Decoding error: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
e
Right resp :: Response
resp -> case Response -> BaseCommand
getCommand Response
resp BaseCommand
-> FoldLike
(Maybe CommandPing)
BaseCommand
BaseCommand
(Maybe CommandPing)
(Maybe CommandPing)
-> Maybe CommandPing
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
(Maybe CommandPing)
BaseCommand
BaseCommand
(Maybe CommandPing)
(Maybe CommandPing)
forall (f :: * -> *) s a.
(Functor f, HasField s "maybe'ping" a) =>
LensLike' f s a
F.maybe'ping of
Just _ -> do
BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logResponse (BaseCommand -> IO ()) -> BaseCommand -> IO ()
forall a b. (a -> b) -> a -> b
$ Response -> BaseCommand
getCommand Response
resp
BaseCommand -> IO ()
forall (m :: * -> *) a. (MonadIO m, Show a) => a -> m ()
logRequest BaseCommand
P.pong
Socket -> BaseCommand -> IO ()
forall (m :: * -> *). MonadIO m => Socket -> BaseCommand -> m ()
sendSimpleCmd Socket
s BaseCommand
P.pong
Response -> IO Response
forall (m :: * -> *) a. Monad m => a -> m a
return Response
resp
Nothing -> Response -> IO Response
forall (m :: * -> *) a. Monad m => a -> m a
return Response
resp