{-# LANGUAGE BlockArguments    #-}
{-# LANGUAGE NamedFieldPuns    #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards   #-}

module Network.MQTT.RPC (call) where

import           Control.Concurrent.STM (atomically, newTChanIO, readTChan,
                                         writeTChan)
import           Control.Monad          (when)
import           Control.Monad.Catch    (bracket, throwM)
import           Control.Monad.IO.Class (MonadIO (..))
import qualified Data.ByteString.Lazy   as BL
import           Data.Text              (Text)
import qualified Data.Text.Encoding     as TE
import qualified Data.UUID              as UUID
import           Network.MQTT.Client
import           System.Random          (randomIO)

blToText :: BL.ByteString -> Text
blToText = TE.decodeUtf8 . BL.toStrict

-- | Send a message to a topic on an MQTT broker with a random
-- subscription and correlation such that an agent may receive this
-- message and respond over the ephemeral channel.  The response will
-- be returned.
--
-- Note that this client provides no timeouts or retries.  MQTT will
-- guarantee the request message is delivered to the broker, but if
-- there's nothing to pick it up, there may never be a response.
call :: MonadIO m => MQTTClient -> Topic -> BL.ByteString -> m BL.ByteString
call mc topic req = liftIO do
  r <- newTChanIO
  corr <- BL.fromStrict . UUID.toASCIIBytes <$> randomIO
  subid <- BL.fromStrict . ("$rpc/" <>) . UUID.toASCIIBytes <$> randomIO
  go corr subid r

  where go theID theTopic r = bracket reg unreg rt
          where reg = do
                  atomically $ registerCorrelated mc theID (SimpleCallback cb)
                  subscribe mc [(blToText theTopic, subOptions)] mempty
                unreg _ = do
                  atomically $ unregisterCorrelated mc theID
                  unsubscribe mc [blToText theTopic] mempty
                cb _ _ m _ = atomically $ writeTChan r m
                rt _ = do
                  publishq mc topic req False QoS2 [
                    PropCorrelationData theID,
                    PropResponseTopic theTopic]
                  atomically do
                    connd <- isConnectedSTM mc
                    when (not connd) $ throwM (MQTTException "disconnected")
                    readTChan r