module Marvin.Adapter.Slack.RTM
( SlackAdapter, RTM
, SlackUserId, SlackChannelId
, MkSlack
) where
import Control.Concurrent.Async.Lifted (async)
import Control.Concurrent.Chan.Lifted
import Control.Concurrent.MVar.Lifted
import Control.Concurrent.STM (atomically, newTMVar, putTMVar, takeTMVar)
import Control.Exception.Lifted
import Control.Lens hiding ((.=))
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Logger
import Data.Aeson hiding (Error)
import Data.Aeson.Types hiding (Error)
import qualified Data.ByteString.Lazy.Char8 as BS
import Data.Maybe (fromMaybe)
import qualified Data.Text as T
import Marvin.Adapter
import Marvin.Adapter.Slack.Common
import Marvin.Adapter.Slack.Types
import Marvin.Interpolate.Text
import Network.URI
import Network.WebSockets
import Network.Wreq
import Text.Read (readMaybe)
import Wuss
runConnectionLoop :: Chan (InternalType RTM) -> MVar Connection -> AdapterM (SlackAdapter RTM) ()
runConnectionLoop eventChan connectionTracker = forever $ do
token <- requireFromAdapterConfig "token"
messageChan <- newChan
void $ async $ forever $ do
msg <- readChan messageChan
case eitherDecode msg >>= parseEither eventParser of
Left e -> logErrorN $(isT "Error parsing json: #{e} original data: #{rawBS msg}")
Right v -> writeChan eventChan v
logDebugN "initializing socket"
r <- liftIO $ post "https://slack.com/api/rtm.start" [ "token" := (token :: T.Text) ]
case eitherDecode (r^.responseBody) of
Left err -> logErrorN $(isT "Error decoding rtm json #{err}")
Right js -> do
let uri = url js
authority = fromMaybe (error "URI lacks authority") (uriAuthority uri)
host = uriUserInfo authority ++ uriRegName authority
path = uriPath uri
portOnErr v = do
logErrorN $(isT "Unreadable port #{v}")
return 443
port <- case uriPort authority of
v@(':':r) -> maybe (portOnErr v) return $ readMaybe r
v -> portOnErr v
logDebugN $(isT "connecting to socket '#{uri}'")
logFn <- askLoggerIO
catch
(liftIO $ runSecureClient host port path $ \conn -> flip runLoggingT logFn $ do
logInfoN "Connection established"
d <- liftIO $ receiveData conn
case eitherDecode d >>= parseEither helloParser of
Right True -> logDebugN "Recieved hello packet"
Left _ -> error $ "Hello packet not readable: " ++ BS.unpack d
_ -> error $ "First packet was not hello packet: " ++ BS.unpack d
putMVar connectionTracker conn
forever $ do
d <- liftIO $ receiveData conn
writeChan messageChan d)
$ \e -> do
void $ takeMVar connectionTracker
logErrorN $(isT "#{e :: ConnectionException}")
senderLoop :: MVar Connection -> AdapterM (SlackAdapter a) ()
senderLoop connectionTracker = do
SlackAdapter{outChannel} <- getAdapter
midTracker <- liftIO $ atomically $ newTMVar (0 :: Int)
forever $ do
(SlackChannelId sid, msg) <- readChan outChannel
mid <- liftIO $ atomically $ do
id <- takeTMVar midTracker
putTMVar midTracker (id + 1)
return id
let encoded = encode $ object
[ "id" .= mid
, "type" .= ("message" :: T.Text)
, "channel" .= sid
, "text" .= msg
]
go 0 = logErrorN "Connection error, quitting retry."
go n =
catch
(do
conn <- readMVar connectionTracker
liftIO $ sendTextData conn encoded)
$ \e -> do
logErrorN $(isT "#{e :: ConnectionException}")
go (n1)
go (3 :: Int)
data RTM
instance MkSlack RTM where
mkAdapterId = "slack-rtm"
initIOConnections inChan = do
connTracker <- newEmptyMVar
async $ runConnectionLoop inChan connTracker
senderLoop connTracker