module Database.Redis.PubSub (
publish,
pubSub,
Message(..),
PubSub(),
subscribe, unsubscribe,
psubscribe, punsubscribe,
) where
import Control.Applicative
import Control.Monad.Writer
import Data.ByteString.Char8 (ByteString)
import Data.Maybe
import qualified Database.Redis.Core as Core
import Database.Redis.Reply (Reply(..))
import Database.Redis.Types
newtype PubSub = PubSub [[ByteString]]
instance Monoid PubSub where
mempty = PubSub []
mappend (PubSub p) (PubSub p') = PubSub (p ++ p')
data Message = Message { msgChannel, msgMessage :: ByteString}
| PMessage { msgPattern, msgChannel, msgMessage :: ByteString}
deriving (Show)
publish
:: ByteString
-> ByteString
-> Core.Redis (Either Reply Integer)
publish channel message =
Core.sendRequest ["PUBLISH", channel, message]
subscribe
:: [ByteString]
-> PubSub
subscribe = pubSubAction "SUBSCRIBE"
unsubscribe
:: [ByteString]
-> PubSub
unsubscribe = pubSubAction "UNSUBSCRIBE"
psubscribe
:: [ByteString]
-> PubSub
psubscribe = pubSubAction "PSUBSCRIBE"
punsubscribe
:: [ByteString]
-> PubSub
punsubscribe = pubSubAction "PUNSUBSCRIBE"
pubSub
:: PubSub
-> (Message -> IO PubSub)
-> Core.Redis ()
pubSub p callback = send p 0
where
send (PubSub cmds) pending = do
mapM_ Core.send cmds
recv (pending + length cmds)
recv pending = do
reply <- Core.recv
case decodeMsg reply of
Left cnt -> let pending' = pending 1
in unless (cnt == 0 && pending' == 0) $
send mempty pending'
Right msg -> do act <- liftIO $ callback msg
send act pending
pubSubAction :: ByteString -> [ByteString] -> PubSub
pubSubAction cmd chans = PubSub [cmd : chans]
decodeMsg :: Reply -> Either Integer Message
decodeMsg r@(MultiBulk (Just (r0:r1:r2:rs))) = either (errMsg r) id $ do
kind <- decode r0
case kind :: ByteString of
"message" -> Right <$> decodeMessage
"pmessage" -> Right <$> decodePMessage
_ -> Left <$> decode r2
where
decodeMessage = Message <$> decode r1 <*> decode r2
decodePMessage = PMessage <$> decode r1 <*> decode r2
<*> decode (head rs)
decodeMsg r = errMsg r
errMsg :: Reply -> a
errMsg r = error $ "Hedis: expected pub/sub-message but got: " ++ show r