{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Network.MQTT.RPC (call) where
import Control.Concurrent.STM (atomically, newTChanIO, readTChan, writeTChan)
import Control.Monad (when)
import Control.Monad.Catch (bracket, throwM)
import Control.Monad.IO.Class (MonadIO (..))
import qualified Data.ByteString.Lazy as BL
import Data.Text (Text)
import qualified Data.Text.Encoding as TE
import qualified Data.Text.Encoding.Error as TE
import qualified Data.UUID as UUID
import Network.MQTT.Client
import Network.MQTT.Topic
import System.Random (randomIO)
blToText :: BL.ByteString -> Text
blToText :: ByteString -> Text
blToText = OnDecodeError -> ByteString -> Text
TE.decodeUtf8With OnDecodeError
TE.lenientDecode (ByteString -> Text)
-> (ByteString -> ByteString) -> ByteString -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BL.toStrict
call :: MonadIO m => MQTTClient -> Topic -> BL.ByteString -> m BL.ByteString
call :: MQTTClient -> Topic -> ByteString -> m ByteString
call MQTTClient
mc Topic
topic ByteString
req = IO ByteString -> m ByteString
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO do
TChan ByteString
r <- IO (TChan ByteString)
forall a. IO (TChan a)
newTChanIO
ByteString
corr <- ByteString -> ByteString
BL.fromStrict (ByteString -> ByteString)
-> (UUID -> ByteString) -> UUID -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. UUID -> ByteString
UUID.toASCIIBytes (UUID -> ByteString) -> IO UUID -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
forall a (m :: * -> *). (Random a, MonadIO m) => m a
randomIO
ByteString
subid <- ByteString -> ByteString
BL.fromStrict (ByteString -> ByteString)
-> (UUID -> ByteString) -> UUID -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ByteString
"$rpc/" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<>) (ByteString -> ByteString)
-> (UUID -> ByteString) -> UUID -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. UUID -> ByteString
UUID.toASCIIBytes (UUID -> ByteString) -> IO UUID -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
forall a (m :: * -> *). (Random a, MonadIO m) => m a
randomIO
Just Filter
filt <- Maybe Filter -> IO (Maybe Filter)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text -> Maybe Filter
mkFilter (Text -> Maybe Filter)
-> (ByteString -> Text) -> ByteString -> Maybe Filter
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
blToText (ByteString -> Maybe Filter) -> ByteString -> Maybe Filter
forall a b. (a -> b) -> a -> b
$ ByteString
subid)
ByteString
-> ByteString -> Filter -> TChan ByteString -> IO ByteString
go ByteString
corr ByteString
subid Filter
filt TChan ByteString
r
where go :: ByteString
-> ByteString -> Filter -> TChan ByteString -> IO ByteString
go ByteString
theID ByteString
theTopic Filter
filt TChan ByteString
r = IO ([Either SubErr QoS], [Property])
-> (([Either SubErr QoS], [Property])
-> IO ([UnsubStatus], [Property]))
-> (([Either SubErr QoS], [Property]) -> IO ByteString)
-> IO ByteString
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket IO ([Either SubErr QoS], [Property])
reg ([Either SubErr QoS], [Property]) -> IO ([UnsubStatus], [Property])
forall p. p -> IO ([UnsubStatus], [Property])
unreg ([Either SubErr QoS], [Property]) -> IO ByteString
forall p. p -> IO ByteString
rt
where reg :: IO ([Either SubErr QoS], [Property])
reg = do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ MQTTClient -> ByteString -> MessageCallback -> STM ()
registerCorrelated MQTTClient
mc ByteString
theID ((MQTTClient -> Topic -> ByteString -> [Property] -> IO ())
-> MessageCallback
SimpleCallback MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
forall p p p. p -> p -> ByteString -> p -> IO ()
cb)
MQTTClient
-> [(Filter, SubOptions)]
-> [Property]
-> IO ([Either SubErr QoS], [Property])
subscribe MQTTClient
mc [(Filter
filt, SubOptions
subOptions)] [Property]
forall a. Monoid a => a
mempty
unreg :: p -> IO ([UnsubStatus], [Property])
unreg p
_ = do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ MQTTClient -> ByteString -> STM ()
unregisterCorrelated MQTTClient
mc ByteString
theID
MQTTClient
-> [Filter] -> [Property] -> IO ([UnsubStatus], [Property])
unsubscribe MQTTClient
mc [Filter
filt] [Property]
forall a. Monoid a => a
mempty
cb :: p -> p -> ByteString -> p -> IO ()
cb p
_ p
_ ByteString
m p
_ = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan ByteString -> ByteString -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan ByteString
r ByteString
m
rt :: p -> IO ByteString
rt p
_ = do
MQTTClient
-> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO ()
publishq MQTTClient
mc Topic
topic ByteString
req Bool
False QoS
QoS2 [
ByteString -> Property
PropCorrelationData ByteString
theID,
ByteString -> Property
PropResponseTopic ByteString
theTopic]
STM ByteString -> IO ByteString
forall a. STM a -> IO a
atomically do
Bool
connd <- MQTTClient -> STM Bool
isConnectedSTM MQTTClient
mc
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
connd) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ MQTTException -> STM ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (String -> MQTTException
MQTTException String
"disconnected")
TChan ByteString -> STM ByteString
forall a. TChan a -> STM a
readTChan TChan ByteString
r