{-# LANGUAGE CPP, OverloadedStrings, RecordWildCards, EmptyDataDecls, FlexibleInstances, FlexibleContexts, GeneralizedNewtypeDeriving #-} module Database.Redis.PubSub ( publish, -- ** Subscribing to channels -- $pubsubexpl -- *** Single-thread Pub/Sub pubSub, Message(..), PubSub(), subscribe, unsubscribe, psubscribe, punsubscribe, -- *** Continuous Pub/Sub message controller pubSubForever, RedisChannel, RedisPChannel, MessageCallback, PMessageCallback, PubSubController, newPubSubController, currentChannels, currentPChannels, addChannels, addChannelsAndWait, removeChannels, removeChannelsAndWait, UnregisterCallbacksAction ) where #if __GLASGOW_HASKELL__ < 710 import Control.Applicative import Data.Monoid hiding (<>) #endif import Control.Concurrent.Async (withAsync, waitEitherCatch, waitEitherCatchSTM) import Control.Concurrent.STM import Control.Exception (throwIO) import Control.Monad import Control.Monad.State import Data.ByteString.Char8 (ByteString) import Data.List (foldl') import Data.Maybe (isJust) import Data.Pool import Data.Semigroup (Semigroup(..)) import qualified Data.HashMap.Strict as HM import qualified Database.Redis.Core as Core import qualified Database.Redis.ProtocolPipelining as PP import Database.Redis.Protocol (Reply(..), renderRequest) import Database.Redis.Types -- |While in PubSub mode, we keep track of the number of current subscriptions -- (as reported by Redis replies) and the number of messages we expect to -- receive after a SUBSCRIBE or PSUBSCRIBE command. We can safely leave the -- PubSub mode when both these numbers are zero. data PubSubState = PubSubState { subCnt, pending :: Int } modifyPending :: (MonadState PubSubState m) => (Int -> Int) -> m () modifyPending f = modify $ \s -> s{ pending = f (pending s) } putSubCnt :: (MonadState PubSubState m) => Int -> m () putSubCnt n = modify $ \s -> s{ subCnt = n } data Subscribe data Unsubscribe data Channel data Pattern -- |Encapsulates subscription changes. Use 'subscribe', 'unsubscribe', -- 'psubscribe', 'punsubscribe' or 'mempty' to construct a value. Combine -- values by using the 'Monoid' interface, i.e. 'mappend' and 'mconcat'. data PubSub = PubSub { subs :: Cmd Subscribe Channel , unsubs :: Cmd Unsubscribe Channel , psubs :: Cmd Subscribe Pattern , punsubs :: Cmd Unsubscribe Pattern } deriving (Eq) instance Semigroup PubSub where (<>) p1 p2 = PubSub { subs = subs p1 `mappend` subs p2 , unsubs = unsubs p1 `mappend` unsubs p2 , psubs = psubs p1 `mappend` psubs p2 , punsubs = punsubs p1 `mappend` punsubs p2 } instance Monoid PubSub where mempty = PubSub mempty mempty mempty mempty mappend = (<>) data Cmd a b = DoNothing | Cmd { changes :: [ByteString] } deriving (Eq) instance Semigroup (Cmd Subscribe a) where (<>) DoNothing x = x (<>) x DoNothing = x (<>) (Cmd xs) (Cmd ys) = Cmd (xs ++ ys) instance Monoid (Cmd Subscribe a) where mempty = DoNothing mappend = (<>) instance Semigroup (Cmd Unsubscribe a) where (<>) DoNothing x = x (<>) x DoNothing = x -- empty subscription list => unsubscribe all channels and patterns (<>) (Cmd []) _ = Cmd [] (<>) _ (Cmd []) = Cmd [] (<>) (Cmd xs) (Cmd ys) = Cmd (xs ++ ys) instance Monoid (Cmd Unsubscribe a) where mempty = DoNothing mappend = (<>) class Command a where redisCmd :: a -> ByteString updatePending :: a -> Int -> Int sendCmd :: (Command (Cmd a b)) => Cmd a b -> StateT PubSubState Core.Redis () sendCmd DoNothing = return () sendCmd cmd = do lift $ Core.send (redisCmd cmd : changes cmd) modifyPending (updatePending cmd) cmdCount :: Cmd a b -> Int cmdCount DoNothing = 0 cmdCount (Cmd c) = length c totalPendingChanges :: PubSub -> Int totalPendingChanges (PubSub{..}) = cmdCount subs + cmdCount unsubs + cmdCount psubs + cmdCount punsubs rawSendCmd :: (Command (Cmd a b)) => PP.Connection -> Cmd a b -> IO () rawSendCmd _ DoNothing = return () rawSendCmd conn cmd = PP.send conn $ renderRequest $ redisCmd cmd : changes cmd plusChangeCnt :: Cmd a b -> Int -> Int plusChangeCnt DoNothing = id plusChangeCnt (Cmd cs) = (+ length cs) instance Command (Cmd Subscribe Channel) where redisCmd = const "SUBSCRIBE" updatePending = plusChangeCnt instance Command (Cmd Subscribe Pattern) where redisCmd = const "PSUBSCRIBE" updatePending = plusChangeCnt instance Command (Cmd Unsubscribe Channel) where redisCmd = const "UNSUBSCRIBE" updatePending = const id instance Command (Cmd Unsubscribe Pattern) where redisCmd = const "PUNSUBSCRIBE" updatePending = const id data Message = Message { msgChannel, msgMessage :: ByteString} | PMessage { msgPattern, msgChannel, msgMessage :: ByteString} deriving (Show) data PubSubReply = Subscribed | Unsubscribed Int | Msg Message ------------------------------------------------------------------------------ -- Public Interface -- -- |Post a message to a channel (<http://redis.io/commands/publish>). publish :: (Core.RedisCtx m f) => ByteString -- ^ channel -> ByteString -- ^ message -> m (f Integer) publish channel message = Core.sendRequest ["PUBLISH", channel, message] -- |Listen for messages published to the given channels -- (<http://redis.io/commands/subscribe>). subscribe :: [ByteString] -- ^ channel -> PubSub subscribe [] = mempty subscribe cs = mempty{ subs = Cmd cs } -- |Stop listening for messages posted to the given channels -- (<http://redis.io/commands/unsubscribe>). unsubscribe :: [ByteString] -- ^ channel -> PubSub unsubscribe cs = mempty{ unsubs = Cmd cs } -- |Listen for messages published to channels matching the given patterns -- (<http://redis.io/commands/psubscribe>). psubscribe :: [ByteString] -- ^ pattern -> PubSub psubscribe [] = mempty psubscribe ps = mempty{ psubs = Cmd ps } -- |Stop listening for messages posted to channels matching the given patterns -- (<http://redis.io/commands/punsubscribe>). punsubscribe :: [ByteString] -- ^ pattern -> PubSub punsubscribe ps = mempty{ punsubs = Cmd ps } -- |Listens to published messages on subscribed channels and channels matching -- the subscribed patterns. For documentation on the semantics of Redis -- Pub\/Sub see <http://redis.io/topics/pubsub>. -- -- The given callback function is called for each received message. -- Subscription changes are triggered by the returned 'PubSub'. To keep -- subscriptions unchanged, the callback can return 'mempty'. -- -- Example: Subscribe to the \"news\" channel indefinitely. -- -- @ -- pubSub (subscribe [\"news\"]) $ \\msg -> do -- putStrLn $ \"Message from \" ++ show (msgChannel msg) -- return mempty -- @ -- -- Example: Receive a single message from the \"chat\" channel. -- -- @ -- pubSub (subscribe [\"chat\"]) $ \\msg -> do -- putStrLn $ \"Message from \" ++ show (msgChannel msg) -- return $ unsubscribe [\"chat\"] -- @ -- -- It should be noted that Redis Pub\/Sub by its nature is asynchronous -- so returning `unsubscribe` does not mean that callback won't be able -- to receive any further messages. And to guarantee that you won't -- won't process messages after unsubscription and won't unsubscribe -- from the same channel more than once you need to use `IORef` or -- something similar -- pubSub :: PubSub -- ^ Initial subscriptions. -> (Message -> IO PubSub) -- ^ Callback function. -> Core.Redis () pubSub initial callback | initial == mempty = return () | otherwise = evalStateT (send initial) (PubSubState 0 0) where send :: PubSub -> StateT PubSubState Core.Redis () send PubSub{..} = do sendCmd subs sendCmd unsubs sendCmd psubs sendCmd punsubs recv recv :: StateT PubSubState Core.Redis () recv = do reply <- lift Core.recv case decodeMsg reply of Msg msg -> liftIO (callback msg) >>= send Subscribed -> modifyPending (subtract 1) >> recv Unsubscribed n -> do putSubCnt n PubSubState{..} <- get unless (subCnt == 0 && pending == 0) recv -- | A Redis channel name type RedisChannel = ByteString -- | A Redis pattern channel name type RedisPChannel = ByteString -- | A handler for a message from a subscribed channel. -- The callback is passed the message content. -- -- Messages are processed synchronously in the receiving thread, so if the callback -- takes a long time it will block other callbacks and other messages from being -- received. If you need to move long-running work to a different thread, we suggest -- you use 'TBQueue' with a reasonable bound, so that if messages are arriving faster -- than you can process them, you do eventually block. -- -- If the callback throws an exception, the exception will be thrown from 'pubSubForever' -- which will cause the entire Redis connection for all subscriptions to be closed. -- As long as you call 'pubSubForever' in a loop you will reconnect to your subscribed -- channels, but you should probably add an exception handler to each callback to -- prevent this. type MessageCallback = ByteString -> IO () -- | A handler for a message from a psubscribed channel. -- The callback is passed the channel the message was sent on plus the message content. -- -- Similar to 'MessageCallback', callbacks are executed synchronously and any exceptions -- are rethrown from 'pubSubForever'. type PMessageCallback = RedisChannel -> ByteString -> IO () -- | An action that when executed will unregister the callbacks. It is returned from 'addChannels' -- or 'addChannelsAndWait' and typically you would use it in 'bracket' to guarantee that you -- unsubscribe from channels. For example, if you are using websockets to distribute messages to -- clients, you could use something such as: -- -- > websocketConn <- Network.WebSockets.acceptRequest pending -- > let mycallback msg = Network.WebSockets.sendTextData websocketConn msg -- > bracket (addChannelsAndWait ctrl [("hello", mycallback)] []) id $ const $ do -- > {- loop here calling Network.WebSockets.receiveData -} type UnregisterCallbacksAction = IO () newtype UnregisterHandle = UnregisterHandle Integer deriving (Eq, Show, Num) -- | A controller that stores a set of channels, pattern channels, and callbacks. -- It allows you to manage Pub/Sub subscriptions and pattern subscriptions and alter them at -- any time throughout the life of your program. -- You should typically create the controller at the start of your program and then store it -- through the life of your program, using 'addChannels' and 'removeChannels' to update the -- current subscriptions. data PubSubController = PubSubController { callbacks :: TVar (HM.HashMap RedisChannel [(UnregisterHandle, MessageCallback)]) , pcallbacks :: TVar (HM.HashMap RedisPChannel [(UnregisterHandle, PMessageCallback)]) , sendChanges :: TBQueue PubSub , pendingCnt :: TVar Int , lastUsedCallbackId :: TVar UnregisterHandle } -- | Create a new 'PubSubController'. Note that this does not subscribe to any channels, it just -- creates the controller. The subscriptions will happen once 'pubSubForever' is called. newPubSubController :: MonadIO m => [(RedisChannel, MessageCallback)] -- ^ the initial subscriptions -> [(RedisPChannel, PMessageCallback)] -- ^ the initial pattern subscriptions -> m PubSubController newPubSubController x y = liftIO $ do cbs <- newTVarIO (HM.map (\z -> [(0,z)]) $ HM.fromList x) pcbs <- newTVarIO (HM.map (\z -> [(0,z)]) $ HM.fromList y) c <- newTBQueueIO 10 pending <- newTVarIO 0 lastId <- newTVarIO 0 return $ PubSubController cbs pcbs c pending lastId -- | Get the list of current channels in the 'PubSubController'. WARNING! This might not -- exactly reflect the subscribed channels in the Redis server, because there is a delay -- between adding or removing a channel in the 'PubSubController' and when Redis receives -- and processes the subscription change request. #if __GLASGOW_HASKELL__ < 710 currentChannels :: (MonadIO m, Functor m) => PubSubController -> m [RedisChannel] #else currentChannels :: MonadIO m => PubSubController -> m [RedisChannel] #endif currentChannels ctrl = HM.keys <$> (liftIO $ atomically $ readTVar $ callbacks ctrl) -- | Get the list of current pattern channels in the 'PubSubController'. WARNING! This might not -- exactly reflect the subscribed channels in the Redis server, because there is a delay -- between adding or removing a channel in the 'PubSubController' and when Redis receives -- and processes the subscription change request. #if __GLASGOW_HASKELL__ < 710 currentPChannels :: (MonadIO m, Functor m) => PubSubController -> m [RedisPChannel] #else currentPChannels :: MonadIO m => PubSubController -> m [RedisPChannel] #endif currentPChannels ctrl = HM.keys <$> (liftIO $ atomically $ readTVar $ pcallbacks ctrl) -- | Add channels into the 'PubSubController', and if there is an active 'pubSubForever', send the subscribe -- and psubscribe commands to Redis. The 'addChannels' function is thread-safe. This function -- does not wait for Redis to acknowledge that the channels have actually been subscribed; use -- 'addChannelsAndWait' for that. -- -- You can subscribe to the same channel or pattern channel multiple times; the 'PubSubController' keeps -- a list of callbacks and executes each callback in response to a message. -- -- The return value is an action 'UnregisterCallbacksAction' which will unregister the callbacks, -- which should typically used with 'bracket'. addChannels :: MonadIO m => PubSubController -> [(RedisChannel, MessageCallback)] -- ^ the channels to subscribe to -> [(RedisPChannel, PMessageCallback)] -- ^ the channels to pattern subscribe to -> m UnregisterCallbacksAction addChannels _ [] [] = return $ return () addChannels ctrl newChans newPChans = liftIO $ do ident <- atomically $ do modifyTVar (lastUsedCallbackId ctrl) (+1) ident <- readTVar $ lastUsedCallbackId ctrl cm <- readTVar $ callbacks ctrl pm <- readTVar $ pcallbacks ctrl let newChans' = [ n | (n,_) <- newChans, not $ HM.member n cm] newPChans' = [ n | (n, _) <- newPChans, not $ HM.member n pm] ps = subscribe newChans' `mappend` psubscribe newPChans' writeTBQueue (sendChanges ctrl) ps writeTVar (callbacks ctrl) (HM.unionWith (++) cm (fmap (\z -> [(ident,z)]) $ HM.fromList newChans)) writeTVar (pcallbacks ctrl) (HM.unionWith (++) pm (fmap (\z -> [(ident,z)]) $ HM.fromList newPChans)) modifyTVar (pendingCnt ctrl) (+ totalPendingChanges ps) return ident return $ unsubChannels ctrl (map fst newChans) (map fst newPChans) ident -- | Call 'addChannels' and then wait for Redis to acknowledge that the channels are actually subscribed. -- -- Note that this function waits for all pending subscription change requests, so if you for example call -- 'addChannelsAndWait' from multiple threads simultaneously, they all will wait for all pending -- subscription changes to be acknowledged by Redis (this is due to the fact that we just track the total -- number of pending change requests sent to Redis and just wait until that count reaches zero). -- -- This also correctly waits if the network connection dies during the subscription change. Say that the -- network connection dies right after we send a subscription change to Redis. 'pubSubForever' will throw -- 'ConnectionLost' and 'addChannelsAndWait' will continue to wait. Once you recall 'pubSubForever' -- with the same 'PubSubController', 'pubSubForever' will open a new connection, send subscription commands -- for all channels in the 'PubSubController' (which include the ones we are waiting for), -- and wait for the responses from Redis. Only once we receive the response from Redis that it has subscribed -- to all channels in 'PubSubController' will 'addChannelsAndWait' unblock and return. addChannelsAndWait :: MonadIO m => PubSubController -> [(RedisChannel, MessageCallback)] -- ^ the channels to subscribe to -> [(RedisPChannel, PMessageCallback)] -- ^ the channels to psubscribe to -> m UnregisterCallbacksAction addChannelsAndWait _ [] [] = return $ return () addChannelsAndWait ctrl newChans newPChans = do unreg <- addChannels ctrl newChans newPChans liftIO $ atomically $ do r <- readTVar (pendingCnt ctrl) when (r > 0) retry return unreg -- | Remove channels from the 'PubSubController', and if there is an active 'pubSubForever', send the -- unsubscribe commands to Redis. Note that as soon as this function returns, no more callbacks will be -- executed even if more messages arrive during the period when we request to unsubscribe from the channel -- and Redis actually processes the unsubscribe request. This function is thread-safe. -- -- If you remove all channels, the connection in 'pubSubForever' to redis will stay open and waiting for -- any new channels from a call to 'addChannels'. If you really want to close the connection, -- use 'Control.Concurrent.killThread' or 'Control.Concurrent.Async.cancel' to kill the thread running -- 'pubSubForever'. removeChannels :: MonadIO m => PubSubController -> [RedisChannel] -> [RedisPChannel] -> m () removeChannels _ [] [] = return () removeChannels ctrl remChans remPChans = liftIO $ atomically $ do cm <- readTVar $ callbacks ctrl pm <- readTVar $ pcallbacks ctrl let remChans' = filter (\n -> HM.member n cm) remChans remPChans' = filter (\n -> HM.member n pm) remPChans ps = (if null remChans' then mempty else unsubscribe remChans') `mappend` (if null remPChans' then mempty else punsubscribe remPChans') writeTBQueue (sendChanges ctrl) ps writeTVar (callbacks ctrl) (foldl' (flip HM.delete) cm remChans') writeTVar (pcallbacks ctrl) (foldl' (flip HM.delete) pm remPChans') modifyTVar (pendingCnt ctrl) (+ totalPendingChanges ps) -- | Internal function to unsubscribe only from those channels matching the given handle. unsubChannels :: PubSubController -> [RedisChannel] -> [RedisPChannel] -> UnregisterHandle -> IO () unsubChannels ctrl chans pchans h = liftIO $ atomically $ do cm <- readTVar $ callbacks ctrl pm <- readTVar $ pcallbacks ctrl -- only worry about channels that exist let remChans = filter (\n -> HM.member n cm) chans remPChans = filter (\n -> HM.member n pm) pchans -- helper functions to filter out handlers that match let filterHandle :: Maybe [(UnregisterHandle,a)] -> Maybe [(UnregisterHandle,a)] filterHandle Nothing = Nothing filterHandle (Just lst) = case filter (\x -> fst x /= h) lst of [] -> Nothing xs -> Just xs let removeHandles :: HM.HashMap ByteString [(UnregisterHandle,a)] -> ByteString -> HM.HashMap ByteString [(UnregisterHandle,a)] removeHandles m k = case filterHandle (HM.lookup k m) of -- recent versions of unordered-containers have alter Nothing -> HM.delete k m Just v -> HM.insert k v m -- maps after taking out channels matching the handle let cm' = foldl' removeHandles cm remChans pm' = foldl' removeHandles pm remPChans -- the channels to unsubscribe are those that no longer exist in cm' and pm' let remChans' = filter (\n -> not $ HM.member n cm') remChans remPChans' = filter (\n -> not $ HM.member n pm') remPChans ps = (if null remChans' then mempty else unsubscribe remChans') `mappend` (if null remPChans' then mempty else punsubscribe remPChans') -- do the unsubscribe writeTBQueue (sendChanges ctrl) ps writeTVar (callbacks ctrl) cm' writeTVar (pcallbacks ctrl) pm' modifyTVar (pendingCnt ctrl) (+ totalPendingChanges ps) return () -- | Call 'removeChannels' and then wait for all pending subscription change requests to be acknowledged -- by Redis. This uses the same waiting logic as 'addChannelsAndWait'. Since 'removeChannels' immediately -- notifies the 'PubSubController' to start discarding messages, you likely don't need this function and -- can just use 'removeChannels'. removeChannelsAndWait :: MonadIO m => PubSubController -> [RedisChannel] -> [RedisPChannel] -> m () removeChannelsAndWait _ [] [] = return () removeChannelsAndWait ctrl remChans remPChans = do removeChannels ctrl remChans remPChans liftIO $ atomically $ do r <- readTVar (pendingCnt ctrl) when (r > 0) retry -- | Internal thread which listens for messages and executes callbacks. -- This is the only thread which ever receives data from the underlying -- connection. listenThread :: PubSubController -> PP.Connection -> IO () listenThread ctrl rawConn = forever $ do msg <- PP.recv rawConn case decodeMsg msg of Msg (Message channel msgCt) -> do cm <- atomically $ readTVar (callbacks ctrl) case HM.lookup channel cm of Nothing -> return () Just c -> mapM_ (\(_,x) -> x msgCt) c Msg (PMessage pattern channel msgCt) -> do pm <- atomically $ readTVar (pcallbacks ctrl) case HM.lookup pattern pm of Nothing -> return () Just c -> mapM_ (\(_,x) -> x channel msgCt) c Subscribed -> atomically $ modifyTVar (pendingCnt ctrl) (\x -> x - 1) Unsubscribed _ -> atomically $ modifyTVar (pendingCnt ctrl) (\x -> x - 1) -- | Internal thread which sends subscription change requests. -- This is the only thread which ever sends data on the underlying -- connection. sendThread :: PubSubController -> PP.Connection -> IO () sendThread ctrl rawConn = forever $ do PubSub{..} <- atomically $ readTBQueue (sendChanges ctrl) rawSendCmd rawConn subs rawSendCmd rawConn unsubs rawSendCmd rawConn psubs rawSendCmd rawConn punsubs -- normally, the socket is flushed during 'recv', but -- 'recv' could currently be blocking on a message. PP.flush rawConn -- | Open a connection to the Redis server, register to all channels in the 'PubSubController', -- and process messages and subscription change requests forever. The only way this will ever -- exit is if there is an exception from the network code or an unhandled exception -- in a 'MessageCallback' or 'PMessageCallback'. For example, if the network connection to Redis -- dies, 'pubSubForever' will throw a 'ConnectionLost'. When such an exception is -- thrown, you can recall 'pubSubForever' with the same 'PubSubController' which will open a -- new connection and resubscribe to all the channels which are tracked in the 'PubSubController'. -- -- The general pattern is therefore during program startup create a 'PubSubController' and fork -- a thread which calls 'pubSubForever' in a loop (using an exponential backoff algorithm -- such as the <https://hackage.haskell.org/package/retry retry> package to not hammer the Redis -- server if it does die). For example, -- -- @ -- myhandler :: ByteString -> IO () -- myhandler msg = putStrLn $ unpack $ decodeUtf8 msg -- -- onInitialComplete :: IO () -- onInitialComplete = putStrLn "Redis acknowledged that mychannel is now subscribed" -- -- main :: IO () -- main = do -- conn <- connect defaultConnectInfo -- pubSubCtrl <- newPubSubController [("mychannel", myhandler)] [] -- forkIO $ forever $ -- pubSubForever conn pubSubCtrl onInitialComplete -- \`catch\` (\\(e :: SomeException) -> do -- putStrLn $ "Got error: " ++ show e -- threadDelay $ 50*1000) -- TODO: use exponential backoff -- -- {- elsewhere in your program, use pubSubCtrl to change subscriptions -} -- @ -- -- At most one active 'pubSubForever' can be running against a single 'PubSubController' at any time. If -- two active calls to 'pubSubForever' share a single 'PubSubController' there will be deadlocks. If -- you do want to process messages using multiple connections to Redis, you can create more than one -- 'PubSubController'. For example, create one PubSubController for each 'Control.Concurrent.getNumCapabilities' -- and then create a Haskell thread bound to each capability each calling 'pubSubForever' in a loop. -- This will create one network connection per controller/capability and allow you to -- register separate channels and callbacks for each controller, spreading the load across the capabilities. pubSubForever :: Core.Connection -- ^ The connection pool -> PubSubController -- ^ The controller which keeps track of all subscriptions and handlers -> IO () -- ^ This action is executed once Redis acknowledges that all the subscriptions in -- the controller are now subscribed. You can use this after an exception (such as -- 'ConnectionLost') to signal that all subscriptions are now reactivated. -> IO () pubSubForever (Core.Conn pool) ctrl onInitialLoad = withResource pool $ \rawConn -> do -- get initial subscriptions and write them into the queue. atomically $ do let loop = tryReadTBQueue (sendChanges ctrl) >>= \x -> if isJust x then loop else return () loop cm <- readTVar $ callbacks ctrl pm <- readTVar $ pcallbacks ctrl let ps = subscribe (HM.keys cm) `mappend` psubscribe (HM.keys pm) writeTBQueue (sendChanges ctrl) ps writeTVar (pendingCnt ctrl) (totalPendingChanges ps) withAsync (listenThread ctrl rawConn) $ \listenT -> withAsync (sendThread ctrl rawConn) $ \sendT -> do -- wait for initial subscription count to go to zero or for threads to fail mret <- atomically $ (Left <$> (waitEitherCatchSTM listenT sendT)) `orElse` (Right <$> (readTVar (pendingCnt ctrl) >>= \x -> if x > 0 then retry else return ())) case mret of Right () -> onInitialLoad _ -> return () -- if there is an error, waitEitherCatch below will also see it -- wait for threads to end with error merr <- waitEitherCatch listenT sendT case merr of (Right (Left err)) -> throwIO err (Left (Left err)) -> throwIO err _ -> return () -- should never happen, since threads exit only with an error ------------------------------------------------------------------------------ -- Helpers -- decodeMsg :: Reply -> PubSubReply decodeMsg r@(MultiBulk (Just (r0:r1:r2:rs))) = either (errMsg r) id $ do kind <- decode r0 case kind :: ByteString of "message" -> Msg <$> decodeMessage "pmessage" -> Msg <$> decodePMessage "subscribe" -> return Subscribed "psubscribe" -> return Subscribed "unsubscribe" -> Unsubscribed <$> decodeCnt "punsubscribe" -> Unsubscribed <$> decodeCnt _ -> errMsg r where decodeMessage = Message <$> decode r1 <*> decode r2 decodePMessage = PMessage <$> decode r1 <*> decode r2 <*> decode (head rs) decodeCnt = fromInteger <$> decode r2 decodeMsg r = errMsg r errMsg :: Reply -> a errMsg r = error $ "Hedis: expected pub/sub-message but got: " ++ show r -- $pubsubexpl -- There are two Pub/Sub implementations. First, there is a single-threaded implementation 'pubSub' -- which is simpler to use but has the restriction that subscription changes can only be made in -- response to a message. Secondly, there is a more complicated Pub/Sub controller 'pubSubForever' -- that uses concurrency to support changing subscriptions at any time but requires more setup. -- You should only use one or the other. In addition, no types or utility functions (that are part -- of the public API) are shared, so functions or types in one of the following sections cannot -- be used for the other. In particular, be aware that they use different utility functions to subscribe -- and unsubscribe to channels.