{-# LANGUAGE BlockArguments    #-}
{-# LANGUAGE NamedFieldPuns    #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards   #-}

module Network.MQTT.RPC (call) where

import           Control.Concurrent.STM   (atomically, newTChanIO, readTChan, writeTChan)
import           Control.Monad            (when)
import           Control.Monad.Catch      (bracket, throwM)
import           Control.Monad.IO.Class   (MonadIO (..))
import qualified Data.ByteString.Lazy     as BL
import           Data.Text                (Text)
import qualified Data.Text.Encoding       as TE
import qualified Data.Text.Encoding.Error as TE
import qualified Data.UUID                as UUID
import           Network.MQTT.Client
import           Network.MQTT.Topic
import           System.Random            (randomIO)

blToText :: BL.ByteString -> Text
blToText :: ByteString -> Text
blToText = OnDecodeError -> ByteString -> Text
TE.decodeUtf8With OnDecodeError
TE.lenientDecode (ByteString -> Text)
-> (ByteString -> ByteString) -> ByteString -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
BL.toStrict

-- | Send a message to a topic on an MQTT broker with a random
-- subscription and correlation such that an agent may receive this
-- message and respond over the ephemeral channel.  The response will
-- be returned.
--
-- Note that this client provides no timeouts or retries.  MQTT will
-- guarantee the request message is delivered to the broker, but if
-- there's nothing to pick it up, there may never be a response.
call :: MonadIO m => MQTTClient -> Topic -> BL.ByteString -> m BL.ByteString
call :: MQTTClient -> Topic -> ByteString -> m ByteString
call MQTTClient
mc Topic
topic ByteString
req = IO ByteString -> m ByteString
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO do
  TChan ByteString
r <- IO (TChan ByteString)
forall a. IO (TChan a)
newTChanIO
  ByteString
corr <- ByteString -> ByteString
BL.fromStrict (ByteString -> ByteString)
-> (UUID -> ByteString) -> UUID -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. UUID -> ByteString
UUID.toASCIIBytes (UUID -> ByteString) -> IO UUID -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
forall a (m :: * -> *). (Random a, MonadIO m) => m a
randomIO
  ByteString
subid <- ByteString -> ByteString
BL.fromStrict (ByteString -> ByteString)
-> (UUID -> ByteString) -> UUID -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ByteString
"$rpc/" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<>) (ByteString -> ByteString)
-> (UUID -> ByteString) -> UUID -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. UUID -> ByteString
UUID.toASCIIBytes (UUID -> ByteString) -> IO UUID -> IO ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UUID
forall a (m :: * -> *). (Random a, MonadIO m) => m a
randomIO
  Just Filter
filt <- Maybe Filter -> IO (Maybe Filter)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text -> Maybe Filter
mkFilter (Text -> Maybe Filter)
-> (ByteString -> Text) -> ByteString -> Maybe Filter
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
blToText (ByteString -> Maybe Filter) -> ByteString -> Maybe Filter
forall a b. (a -> b) -> a -> b
$ ByteString
subid)
  ByteString
-> ByteString -> Filter -> TChan ByteString -> IO ByteString
go ByteString
corr ByteString
subid Filter
filt TChan ByteString
r

  where go :: ByteString
-> ByteString -> Filter -> TChan ByteString -> IO ByteString
go ByteString
theID ByteString
theTopic Filter
filt TChan ByteString
r = IO ([Either SubErr QoS], [Property])
-> (([Either SubErr QoS], [Property])
    -> IO ([UnsubStatus], [Property]))
-> (([Either SubErr QoS], [Property]) -> IO ByteString)
-> IO ByteString
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket IO ([Either SubErr QoS], [Property])
reg ([Either SubErr QoS], [Property]) -> IO ([UnsubStatus], [Property])
forall p. p -> IO ([UnsubStatus], [Property])
unreg ([Either SubErr QoS], [Property]) -> IO ByteString
forall p. p -> IO ByteString
rt
          where reg :: IO ([Either SubErr QoS], [Property])
reg = do
                  STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ MQTTClient -> ByteString -> MessageCallback -> STM ()
registerCorrelated MQTTClient
mc ByteString
theID ((MQTTClient -> Topic -> ByteString -> [Property] -> IO ())
-> MessageCallback
SimpleCallback MQTTClient -> Topic -> ByteString -> [Property] -> IO ()
forall p p p. p -> p -> ByteString -> p -> IO ()
cb)
                  MQTTClient
-> [(Filter, SubOptions)]
-> [Property]
-> IO ([Either SubErr QoS], [Property])
subscribe MQTTClient
mc [(Filter
filt, SubOptions
subOptions)] [Property]
forall a. Monoid a => a
mempty
                unreg :: p -> IO ([UnsubStatus], [Property])
unreg p
_ = do
                  STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ MQTTClient -> ByteString -> STM ()
unregisterCorrelated MQTTClient
mc ByteString
theID
                  MQTTClient
-> [Filter] -> [Property] -> IO ([UnsubStatus], [Property])
unsubscribe MQTTClient
mc [Filter
filt] [Property]
forall a. Monoid a => a
mempty
                cb :: p -> p -> ByteString -> p -> IO ()
cb p
_ p
_ ByteString
m p
_ = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan ByteString -> ByteString -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan ByteString
r ByteString
m
                rt :: p -> IO ByteString
rt p
_ = do
                  MQTTClient
-> Topic -> ByteString -> Bool -> QoS -> [Property] -> IO ()
publishq MQTTClient
mc Topic
topic ByteString
req Bool
False QoS
QoS2 [
                    ByteString -> Property
PropCorrelationData ByteString
theID,
                    ByteString -> Property
PropResponseTopic ByteString
theTopic]
                  STM ByteString -> IO ByteString
forall a. STM a -> IO a
atomically do
                    Bool
connd <- MQTTClient -> STM Bool
isConnectedSTM MQTTClient
mc
                    Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
connd) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ MQTTException -> STM ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (String -> MQTTException
MQTTException String
"disconnected")
                    TChan ByteString -> STM ByteString
forall a. TChan a -> STM a
readTChan TChan ByteString
r