{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Network.MQTT.RPC (call) where
import Control.Concurrent.STM (atomically, newTChanIO,
readTChan, writeTChan)
import qualified Control.Exception as E
import qualified Data.ByteString.Lazy as BL
import Control.Monad (when)
import Data.Text (Text)
import qualified Data.Text.Encoding as TE
import qualified Data.UUID as UUID
import Network.MQTT.Client
import System.Random (randomIO)
blToText :: BL.ByteString -> Text
blToText = TE.decodeUtf8 . BL.toStrict
call :: MQTTClient -> Topic -> BL.ByteString -> IO BL.ByteString
call mc topic req = do
r <- newTChanIO
corr <- BL.fromStrict . UUID.toASCIIBytes <$> randomIO
subid <- BL.fromStrict . ("$rpc/" <>) . UUID.toASCIIBytes <$> randomIO
go corr subid r
where go theID theTopic r = E.bracket reg unreg rt
where reg = do
atomically $ registerCorrelated mc theID (SimpleCallback cb)
subscribe mc [(blToText theTopic, subOptions)] mempty
unreg _ = do
atomically $ unregisterCorrelated mc theID
unsubscribe mc [blToText theTopic] mempty
cb _ _ m _ = atomically $ writeTChan r m
rt _ = do
publishq mc topic req False QoS2 [
PropCorrelationData theID,
PropResponseTopic theTopic]
atomically do
connd <- isConnectedSTM mc
when (not connd) $ E.throw (MQTTException "disconnected")
readTChan r