{-# LANGUAGE CPP, OverloadedStrings, RecordWildCards, EmptyDataDecls,
FlexibleInstances, FlexibleContexts, GeneralizedNewtypeDeriving #-}
module Database.Redis.PubSub (
subscribe, unsubscribe, psubscribe, punsubscribe,
RedisChannel, RedisPChannel, MessageCallback, PMessageCallback,
PubSubController, newPubSubController, currentChannels, currentPChannels,
addChannels, addChannelsAndWait, removeChannels, removeChannelsAndWait,
) where
#if __GLASGOW_HASKELL__ < 710
import Control.Applicative
import Data.Monoid hiding (<>)
import Control.Concurrent.Async (withAsync, waitEitherCatch, waitEitherCatchSTM)
import Control.Concurrent.STM
import Control.Exception (throwIO)
import Control.Monad
import Control.Monad.State
import Data.ByteString.Char8 (ByteString)
import Data.List (foldl')
import Data.Maybe (isJust)
import Data.Pool
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
import qualified Data.HashMap.Strict as HM
import qualified Database.Redis.Core as Core
import qualified Database.Redis.Connection as Connection
import qualified Database.Redis.ProtocolPipelining as PP
import Database.Redis.Protocol (Reply(..), renderRequest)
import Database.Redis.Types
data PubSubState = PubSubState { PubSubState -> Int
subCnt, PubSubState -> Int
pending :: Int }
modifyPending :: (MonadState PubSubState m) => (Int -> Int) -> m ()
modifyPending :: (Int -> Int) -> m ()
modifyPending Int -> Int
f = (PubSubState -> PubSubState) -> m ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify ((PubSubState -> PubSubState) -> m ())
-> (PubSubState -> PubSubState) -> m ()
forall a b. (a -> b) -> a -> b
$ \PubSubState
s -> PubSubState
s{ pending :: Int
pending = Int -> Int
f (PubSubState -> Int
pending PubSubState
s) }
putSubCnt :: (MonadState PubSubState m) => Int -> m ()
putSubCnt :: Int -> m ()
putSubCnt Int
n = (PubSubState -> PubSubState) -> m ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify ((PubSubState -> PubSubState) -> m ())
-> (PubSubState -> PubSubState) -> m ()
forall a b. (a -> b) -> a -> b
$ \PubSubState
s -> PubSubState
s{ subCnt :: Int
subCnt = Int
n }
data Subscribe
data Unsubscribe
data Channel
data Pattern
data PubSub = PubSub
{ PubSub -> Cmd Subscribe Channel
subs :: Cmd Subscribe Channel
, PubSub -> Cmd Unsubscribe Channel
unsubs :: Cmd Unsubscribe Channel
, PubSub -> Cmd Subscribe Pattern
psubs :: Cmd Subscribe Pattern
, PubSub -> Cmd Unsubscribe Pattern
punsubs :: Cmd Unsubscribe Pattern
} deriving (PubSub -> PubSub -> Bool
(PubSub -> PubSub -> Bool)
-> (PubSub -> PubSub -> Bool) -> Eq PubSub
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: PubSub -> PubSub -> Bool
$c/= :: PubSub -> PubSub -> Bool
== :: PubSub -> PubSub -> Bool
$c== :: PubSub -> PubSub -> Bool
instance Semigroup PubSub where
<> :: PubSub -> PubSub -> PubSub
(<>) PubSub
p1 PubSub
p2 = PubSub :: Cmd Subscribe Channel
-> Cmd Unsubscribe Channel
-> Cmd Subscribe Pattern
-> Cmd Unsubscribe Pattern
-> PubSub
PubSub { subs :: Cmd Subscribe Channel
subs = PubSub -> Cmd Subscribe Channel
subs PubSub
p1 Cmd Subscribe Channel
-> Cmd Subscribe Channel -> Cmd Subscribe Channel
forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Subscribe Channel
subs PubSub
, unsubs :: Cmd Unsubscribe Channel
unsubs = PubSub -> Cmd Unsubscribe Channel
unsubs PubSub
p1 Cmd Unsubscribe Channel
-> Cmd Unsubscribe Channel -> Cmd Unsubscribe Channel
forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Unsubscribe Channel
unsubs PubSub
, psubs :: Cmd Subscribe Pattern
psubs = PubSub -> Cmd Subscribe Pattern
psubs PubSub
p1 Cmd Subscribe Pattern
-> Cmd Subscribe Pattern -> Cmd Subscribe Pattern
forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Subscribe Pattern
psubs PubSub
, punsubs :: Cmd Unsubscribe Pattern
punsubs = PubSub -> Cmd Unsubscribe Pattern
punsubs PubSub
p1 Cmd Unsubscribe Pattern
-> Cmd Unsubscribe Pattern -> Cmd Unsubscribe Pattern
forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Unsubscribe Pattern
punsubs PubSub
instance Monoid PubSub where
mempty :: PubSub
mempty = Cmd Subscribe Channel
-> Cmd Unsubscribe Channel
-> Cmd Subscribe Pattern
-> Cmd Unsubscribe Pattern
-> PubSub
PubSub Cmd Subscribe Channel
forall a. Monoid a => a
mempty Cmd Unsubscribe Channel
forall a. Monoid a => a
mempty Cmd Subscribe Pattern
forall a. Monoid a => a
mempty Cmd Unsubscribe Pattern
forall a. Monoid a => a
mappend :: PubSub -> PubSub -> PubSub
mappend = PubSub -> PubSub -> PubSub
forall a. Semigroup a => a -> a -> a
data Cmd a b = DoNothing | Cmd { Cmd a b -> [ByteString]
changes :: [ByteString] } deriving (Cmd a b -> Cmd a b -> Bool
(Cmd a b -> Cmd a b -> Bool)
-> (Cmd a b -> Cmd a b -> Bool) -> Eq (Cmd a b)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
forall a b. Cmd a b -> Cmd a b -> Bool
/= :: Cmd a b -> Cmd a b -> Bool
$c/= :: forall a b. Cmd a b -> Cmd a b -> Bool
== :: Cmd a b -> Cmd a b -> Bool
$c== :: forall a b. Cmd a b -> Cmd a b -> Bool
instance Semigroup (Cmd Subscribe a) where
<> :: Cmd Subscribe a -> Cmd Subscribe a -> Cmd Subscribe a
(<>) Cmd Subscribe a
DoNothing Cmd Subscribe a
x = Cmd Subscribe a
(<>) Cmd Subscribe a
x Cmd Subscribe a
DoNothing = Cmd Subscribe a
(<>) (Cmd [ByteString]
xs) (Cmd [ByteString]
ys) = [ByteString] -> Cmd Subscribe a
forall a b. [ByteString] -> Cmd a b
Cmd ([ByteString]
xs [ByteString] -> [ByteString] -> [ByteString]
forall a. [a] -> [a] -> [a]
++ [ByteString]
instance Monoid (Cmd Subscribe a) where
mempty :: Cmd Subscribe a
mempty = Cmd Subscribe a
forall a b. Cmd a b
mappend :: Cmd Subscribe a -> Cmd Subscribe a -> Cmd Subscribe a
mappend = Cmd Subscribe a -> Cmd Subscribe a -> Cmd Subscribe a
forall a. Semigroup a => a -> a -> a
instance Semigroup (Cmd Unsubscribe a) where
<> :: Cmd Unsubscribe a -> Cmd Unsubscribe a -> Cmd Unsubscribe a
(<>) Cmd Unsubscribe a
DoNothing Cmd Unsubscribe a
x = Cmd Unsubscribe a
(<>) Cmd Unsubscribe a
x Cmd Unsubscribe a
DoNothing = Cmd Unsubscribe a
(<>) (Cmd []) Cmd Unsubscribe a
_ = [ByteString] -> Cmd Unsubscribe a
forall a b. [ByteString] -> Cmd a b
Cmd []
(<>) Cmd Unsubscribe a
_ (Cmd []) = [ByteString] -> Cmd Unsubscribe a
forall a b. [ByteString] -> Cmd a b
Cmd []
(<>) (Cmd [ByteString]
xs) (Cmd [ByteString]
ys) = [ByteString] -> Cmd Unsubscribe a
forall a b. [ByteString] -> Cmd a b
Cmd ([ByteString]
xs [ByteString] -> [ByteString] -> [ByteString]
forall a. [a] -> [a] -> [a]
++ [ByteString]
instance Monoid (Cmd Unsubscribe a) where
mempty :: Cmd Unsubscribe a
mempty = Cmd Unsubscribe a
forall a b. Cmd a b
mappend :: Cmd Unsubscribe a -> Cmd Unsubscribe a -> Cmd Unsubscribe a
mappend = Cmd Unsubscribe a -> Cmd Unsubscribe a -> Cmd Unsubscribe a
forall a. Semigroup a => a -> a -> a
class Command a where
redisCmd :: a -> ByteString
updatePending :: a -> Int -> Int
sendCmd :: (Command (Cmd a b)) => Cmd a b -> StateT PubSubState Core.Redis ()
sendCmd :: Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd a b
DoNothing = () -> StateT PubSubState Redis ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
sendCmd Cmd a b
cmd = do
Redis () -> StateT PubSubState Redis ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Redis () -> StateT PubSubState Redis ())
-> Redis () -> StateT PubSubState Redis ()
forall a b. (a -> b) -> a -> b
$ [ByteString] -> Redis ()
forall (m :: * -> *). MonadRedis m => [ByteString] -> m ()
Core.send (Cmd a b -> ByteString
forall a. Command a => a -> ByteString
redisCmd Cmd a b
cmd ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: Cmd a b -> [ByteString]
forall a b. Cmd a b -> [ByteString]
changes Cmd a b
(Int -> Int) -> StateT PubSubState Redis ()
forall (m :: * -> *).
MonadState PubSubState m =>
(Int -> Int) -> m ()
modifyPending (Cmd a b -> Int -> Int
forall a. Command a => a -> Int -> Int
updatePending Cmd a b
cmdCount :: Cmd a b -> Int
cmdCount :: Cmd a b -> Int
cmdCount Cmd a b
DoNothing = Int
cmdCount (Cmd [ByteString]
c) = [ByteString] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [ByteString]
totalPendingChanges :: PubSub -> Int
totalPendingChanges :: PubSub -> Int
totalPendingChanges (PubSub{Cmd Unsubscribe Pattern
Cmd Unsubscribe Channel
Cmd Subscribe Pattern
Cmd Subscribe Channel
punsubs :: Cmd Unsubscribe Pattern
psubs :: Cmd Subscribe Pattern
unsubs :: Cmd Unsubscribe Channel
subs :: Cmd Subscribe Channel
punsubs :: PubSub -> Cmd Unsubscribe Pattern
psubs :: PubSub -> Cmd Subscribe Pattern
unsubs :: PubSub -> Cmd Unsubscribe Channel
subs :: PubSub -> Cmd Subscribe Channel
..}) =
Cmd Subscribe Channel -> Int
forall a b. Cmd a b -> Int
cmdCount Cmd Subscribe Channel
subs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Cmd Unsubscribe Channel -> Int
forall a b. Cmd a b -> Int
cmdCount Cmd Unsubscribe Channel
unsubs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Cmd Subscribe Pattern -> Int
forall a b. Cmd a b -> Int
cmdCount Cmd Subscribe Pattern
psubs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Cmd Unsubscribe Pattern -> Int
forall a b. Cmd a b -> Int
cmdCount Cmd Unsubscribe Pattern
rawSendCmd :: (Command (Cmd a b)) => PP.Connection -> Cmd a b -> IO ()
rawSendCmd :: Connection -> Cmd a b -> IO ()
rawSendCmd Connection
_ Cmd a b
DoNothing = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
rawSendCmd Connection
conn Cmd a b
cmd = Connection -> ByteString -> IO ()
PP.send Connection
conn (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
renderRequest ([ByteString] -> ByteString) -> [ByteString] -> ByteString
forall a b. (a -> b) -> a -> b
$ Cmd a b -> ByteString
forall a. Command a => a -> ByteString
redisCmd Cmd a b
cmd ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: Cmd a b -> [ByteString]
forall a b. Cmd a b -> [ByteString]
changes Cmd a b
plusChangeCnt :: Cmd a b -> Int -> Int
plusChangeCnt :: Cmd a b -> Int -> Int
plusChangeCnt Cmd a b
DoNothing = Int -> Int
forall a. a -> a
plusChangeCnt (Cmd [ByteString]
cs) = (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ [ByteString] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [ByteString]
instance Command (Cmd Subscribe Channel) where
redisCmd :: Cmd Subscribe Channel -> ByteString
redisCmd = ByteString -> Cmd Subscribe Channel -> ByteString
forall a b. a -> b -> a
const ByteString
updatePending :: Cmd Subscribe Channel -> Int -> Int
updatePending = Cmd Subscribe Channel -> Int -> Int
forall a b. Cmd a b -> Int -> Int
instance Command (Cmd Subscribe Pattern) where
redisCmd :: Cmd Subscribe Pattern -> ByteString
redisCmd = ByteString -> Cmd Subscribe Pattern -> ByteString
forall a b. a -> b -> a
const ByteString
updatePending :: Cmd Subscribe Pattern -> Int -> Int
updatePending = Cmd Subscribe Pattern -> Int -> Int
forall a b. Cmd a b -> Int -> Int
instance Command (Cmd Unsubscribe Channel) where
redisCmd :: Cmd Unsubscribe Channel -> ByteString
redisCmd = ByteString -> Cmd Unsubscribe Channel -> ByteString
forall a b. a -> b -> a
const ByteString
updatePending :: Cmd Unsubscribe Channel -> Int -> Int
updatePending = (Int -> Int) -> Cmd Unsubscribe Channel -> Int -> Int
forall a b. a -> b -> a
const Int -> Int
forall a. a -> a
instance Command (Cmd Unsubscribe Pattern) where
redisCmd :: Cmd Unsubscribe Pattern -> ByteString
redisCmd = ByteString -> Cmd Unsubscribe Pattern -> ByteString
forall a b. a -> b -> a
const ByteString
updatePending :: Cmd Unsubscribe Pattern -> Int -> Int
updatePending = (Int -> Int) -> Cmd Unsubscribe Pattern -> Int -> Int
forall a b. a -> b -> a
const Int -> Int
forall a. a -> a
data Message = Message { Message -> ByteString
msgChannel, Message -> ByteString
msgMessage :: ByteString}
| PMessage { Message -> ByteString
msgPattern, msgChannel, msgMessage :: ByteString}
deriving (Int -> Message -> ShowS
[Message] -> ShowS
Message -> String
(Int -> Message -> ShowS)
-> (Message -> String) -> ([Message] -> ShowS) -> Show Message
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Message] -> ShowS
$cshowList :: [Message] -> ShowS
show :: Message -> String
$cshow :: Message -> String
showsPrec :: Int -> Message -> ShowS
$cshowsPrec :: Int -> Message -> ShowS
data PubSubReply = Subscribed | Unsubscribed Int | Msg Message
:: (Core.RedisCtx m f)
=> ByteString
-> ByteString
-> m (f Integer)
publish :: ByteString -> ByteString -> m (f Integer)
publish ByteString
channel ByteString
message =
[ByteString] -> m (f Integer)
forall (m :: * -> *) (f :: * -> *) a.
(RedisCtx m f, RedisResult a) =>
[ByteString] -> m (f a)
Core.sendRequest [ByteString
"PUBLISH", ByteString
channel, ByteString
:: [ByteString]
-> PubSub
subscribe :: [ByteString] -> PubSub
subscribe [] = PubSub
forall a. Monoid a => a
subscribe [ByteString]
cs = PubSub
forall a. Monoid a => a
mempty{ subs :: Cmd Subscribe Channel
subs = [ByteString] -> Cmd Subscribe Channel
forall a b. [ByteString] -> Cmd a b
Cmd [ByteString]
cs }
:: [ByteString]
-> PubSub
unsubscribe :: [ByteString] -> PubSub
unsubscribe [ByteString]
cs = PubSub
forall a. Monoid a => a
mempty{ unsubs :: Cmd Unsubscribe Channel
unsubs = [ByteString] -> Cmd Unsubscribe Channel
forall a b. [ByteString] -> Cmd a b
Cmd [ByteString]
cs }
:: [ByteString]
-> PubSub
psubscribe :: [ByteString] -> PubSub
psubscribe [] = PubSub
forall a. Monoid a => a
psubscribe [ByteString]
ps = PubSub
forall a. Monoid a => a
mempty{ psubs :: Cmd Subscribe Pattern
psubs = [ByteString] -> Cmd Subscribe Pattern
forall a b. [ByteString] -> Cmd a b
Cmd [ByteString]
ps }
:: [ByteString]
-> PubSub
punsubscribe :: [ByteString] -> PubSub
punsubscribe [ByteString]
ps = PubSub
forall a. Monoid a => a
mempty{ punsubs :: Cmd Unsubscribe Pattern
punsubs = [ByteString] -> Cmd Unsubscribe Pattern
forall a b. [ByteString] -> Cmd a b
Cmd [ByteString]
ps }
:: PubSub
-> (Message -> IO PubSub)
-> Core.Redis ()
pubSub :: PubSub -> (Message -> IO PubSub) -> Redis ()
pubSub PubSub
initial Message -> IO PubSub
| PubSub
initial PubSub -> PubSub -> Bool
forall a. Eq a => a -> a -> Bool
== PubSub
forall a. Monoid a => a
mempty = () -> Redis ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
| Bool
otherwise = StateT PubSubState Redis () -> PubSubState -> Redis ()
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m a
evalStateT (PubSub -> StateT PubSubState Redis ()
send PubSub
initial) (Int -> Int -> PubSubState
PubSubState Int
0 Int
send :: PubSub -> StateT PubSubState Core.Redis ()
send :: PubSub -> StateT PubSubState Redis ()
send PubSub{Cmd Unsubscribe Pattern
Cmd Unsubscribe Channel
Cmd Subscribe Pattern
Cmd Subscribe Channel
punsubs :: Cmd Unsubscribe Pattern
psubs :: Cmd Subscribe Pattern
unsubs :: Cmd Unsubscribe Channel
subs :: Cmd Subscribe Channel
punsubs :: PubSub -> Cmd Unsubscribe Pattern
psubs :: PubSub -> Cmd Subscribe Pattern
unsubs :: PubSub -> Cmd Unsubscribe Channel
subs :: PubSub -> Cmd Subscribe Channel
..} = do
Cmd Subscribe Channel -> StateT PubSubState Redis ()
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Subscribe Channel
Cmd Unsubscribe Channel -> StateT PubSubState Redis ()
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Unsubscribe Channel
Cmd Subscribe Pattern -> StateT PubSubState Redis ()
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Subscribe Pattern
Cmd Unsubscribe Pattern -> StateT PubSubState Redis ()
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Unsubscribe Pattern
StateT PubSubState Redis ()
recv :: StateT PubSubState Core.Redis ()
recv :: StateT PubSubState Redis ()
recv = do
reply <- Redis Reply -> StateT PubSubState Redis Reply
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Redis Reply
forall (m :: * -> *). MonadRedis m => m Reply
case Reply -> PubSubReply
decodeMsg Reply
reply of
Msg Message
msg -> IO PubSub -> StateT PubSubState Redis PubSub
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Message -> IO PubSub
callback Message
msg) StateT PubSubState Redis PubSub
-> (PubSub -> StateT PubSubState Redis ())
-> StateT PubSubState Redis ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= PubSub -> StateT PubSubState Redis ()
Subscribed -> (Int -> Int) -> StateT PubSubState Redis ()
forall (m :: * -> *).
MonadState PubSubState m =>
(Int -> Int) -> m ()
modifyPending (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1) StateT PubSubState Redis ()
-> StateT PubSubState Redis () -> StateT PubSubState Redis ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StateT PubSubState Redis ()
Unsubscribed Int
n -> do
Int -> StateT PubSubState Redis ()
forall (m :: * -> *). MonadState PubSubState m => Int -> m ()
putSubCnt Int
pending :: Int
subCnt :: Int
pending :: PubSubState -> Int
subCnt :: PubSubState -> Int
..} <- StateT PubSubState Redis PubSubState
forall s (m :: * -> *). MonadState s m => m s
Bool -> StateT PubSubState Redis () -> StateT PubSubState Redis ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Int
subCnt Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 Bool -> Bool -> Bool
&& Int
pending Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0) StateT PubSubState Redis ()
type RedisChannel = ByteString
type RedisPChannel = ByteString
type MessageCallback = ByteString -> IO ()
type PMessageCallback = RedisChannel -> ByteString -> IO ()
type UnregisterCallbacksAction = IO ()
newtype UnregisterHandle = UnregisterHandle Integer
deriving (UnregisterHandle -> UnregisterHandle -> Bool
(UnregisterHandle -> UnregisterHandle -> Bool)
-> (UnregisterHandle -> UnregisterHandle -> Bool)
-> Eq UnregisterHandle
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: UnregisterHandle -> UnregisterHandle -> Bool
$c/= :: UnregisterHandle -> UnregisterHandle -> Bool
== :: UnregisterHandle -> UnregisterHandle -> Bool
$c== :: UnregisterHandle -> UnregisterHandle -> Bool
Eq, Int -> UnregisterHandle -> ShowS
[UnregisterHandle] -> ShowS
UnregisterHandle -> String
(Int -> UnregisterHandle -> ShowS)
-> (UnregisterHandle -> String)
-> ([UnregisterHandle] -> ShowS)
-> Show UnregisterHandle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [UnregisterHandle] -> ShowS
$cshowList :: [UnregisterHandle] -> ShowS
show :: UnregisterHandle -> String
$cshow :: UnregisterHandle -> String
showsPrec :: Int -> UnregisterHandle -> ShowS
$cshowsPrec :: Int -> UnregisterHandle -> ShowS
Show, Integer -> UnregisterHandle
UnregisterHandle -> UnregisterHandle
UnregisterHandle -> UnregisterHandle -> UnregisterHandle
(UnregisterHandle -> UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle)
-> (Integer -> UnregisterHandle)
-> Num UnregisterHandle
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> UnregisterHandle
$cfromInteger :: Integer -> UnregisterHandle
signum :: UnregisterHandle -> UnregisterHandle
$csignum :: UnregisterHandle -> UnregisterHandle
abs :: UnregisterHandle -> UnregisterHandle
$cabs :: UnregisterHandle -> UnregisterHandle
negate :: UnregisterHandle -> UnregisterHandle
$cnegate :: UnregisterHandle -> UnregisterHandle
* :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
$c* :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
- :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
$c- :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
+ :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
$c+ :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
data PubSubController = PubSubController
{ PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks :: TVar (HM.HashMap RedisChannel [(UnregisterHandle, MessageCallback)])
, PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks :: TVar (HM.HashMap RedisPChannel [(UnregisterHandle, PMessageCallback)])
, PubSubController -> TBQueue PubSub
sendChanges :: TBQueue PubSub
, PubSubController -> TVar Int
pendingCnt :: TVar Int
, PubSubController -> TVar UnregisterHandle
lastUsedCallbackId :: TVar UnregisterHandle
newPubSubController :: MonadIO m => [(RedisChannel, MessageCallback)]
-> [(RedisPChannel, PMessageCallback)]
-> m PubSubController
newPubSubController :: [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)] -> m PubSubController
newPubSubController [(ByteString, ByteString -> IO ())]
x [(ByteString, PMessageCallback)]
y = IO PubSubController -> m PubSubController
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO PubSubController -> m PubSubController)
-> IO PubSubController -> m PubSubController
forall a b. (a -> b) -> a -> b
$ do
TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
cbs <- HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> IO
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
forall a. a -> IO (TVar a)
newTVarIO (((ByteString -> IO ())
-> [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString (ByteString -> IO ())
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall v1 v2 k. (v1 -> v2) -> HashMap k v1 -> HashMap k v2
HM.map (\ByteString -> IO ()
z -> [(UnregisterHandle
0,ByteString -> IO ()
z)]) (HashMap ByteString (ByteString -> IO ())
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString (ByteString -> IO ())
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall a b. (a -> b) -> a -> b
$ [(ByteString, ByteString -> IO ())]
-> HashMap ByteString (ByteString -> IO ())
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HM.fromList [(ByteString, ByteString -> IO ())]
TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcbs <- HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> IO
(TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
forall a. a -> IO (TVar a)
newTVarIO ((PMessageCallback -> [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString PMessageCallback
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall v1 v2 k. (v1 -> v2) -> HashMap k v1 -> HashMap k v2
HM.map (\PMessageCallback
z -> [(UnregisterHandle
z)]) (HashMap ByteString PMessageCallback
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString PMessageCallback
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall a b. (a -> b) -> a -> b
$ [(ByteString, PMessageCallback)]
-> HashMap ByteString PMessageCallback
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HM.fromList [(ByteString, PMessageCallback)]
TBQueue PubSub
c <- Natural -> IO (TBQueue PubSub)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
TVar Int
pending <- Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
TVar UnregisterHandle
lastId <- UnregisterHandle -> IO (TVar UnregisterHandle)
forall a. a -> IO (TVar a)
newTVarIO UnregisterHandle
PubSubController -> IO PubSubController
forall (m :: * -> *) a. Monad m => a -> m a
return (PubSubController -> IO PubSubController)
-> PubSubController -> IO PubSubController
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> TBQueue PubSub
-> TVar Int
-> TVar UnregisterHandle
-> PubSubController
PubSubController TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
cbs TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcbs TBQueue PubSub
c TVar Int
pending TVar UnregisterHandle
#if __GLASGOW_HASKELL__ < 710
currentChannels :: (MonadIO m, Functor m) => PubSubController -> m [RedisChannel]
currentChannels :: MonadIO m => PubSubController -> m [RedisChannel]
currentChannels :: PubSubController -> m [ByteString]
currentChannels PubSubController
ctrl = HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> [ByteString]
forall k v. HashMap k v -> [k]
HM.keys (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> [ByteString])
-> m (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> m [ByteString]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (IO (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> m (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> m (HashMap
ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> IO
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> m (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ STM (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. STM a -> IO a
atomically (STM (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
#if __GLASGOW_HASKELL__ < 710
currentPChannels :: (MonadIO m, Functor m) => PubSubController -> m [RedisPChannel]
currentPChannels :: MonadIO m => PubSubController -> m [RedisPChannel]
currentPChannels :: PubSubController -> m [ByteString]
currentPChannels PubSubController
ctrl = HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> [ByteString]
forall k v. HashMap k v -> [k]
HM.keys (HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> [ByteString])
-> m (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> m [ByteString]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> m (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> m (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> m (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. STM a -> IO a
atomically (STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
addChannels :: MonadIO m => PubSubController
-> [(RedisChannel, MessageCallback)]
-> [(RedisPChannel, PMessageCallback)]
-> m UnregisterCallbacksAction
addChannels :: PubSubController
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
addChannels PubSubController
_ [] [] = IO () -> m (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> m (IO ())) -> IO () -> m (IO ())
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
addChannels PubSubController
ctrl [(ByteString, ByteString -> IO ())]
newChans [(ByteString, PMessageCallback)]
newPChans = IO (IO ()) -> m (IO ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IO ()) -> m (IO ())) -> IO (IO ()) -> m (IO ())
forall a b. (a -> b) -> a -> b
$ do
ident <- STM UnregisterHandle -> IO UnregisterHandle
forall a. STM a -> IO a
atomically (STM UnregisterHandle -> IO UnregisterHandle)
-> STM UnregisterHandle -> IO UnregisterHandle
forall a b. (a -> b) -> a -> b
$ do
TVar UnregisterHandle
-> (UnregisterHandle -> UnregisterHandle) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar UnregisterHandle
lastUsedCallbackId PubSubController
ctrl) (UnregisterHandle -> UnregisterHandle -> UnregisterHandle
forall a. Num a => a -> a -> a
ident <- TVar UnregisterHandle -> STM UnregisterHandle
forall a. TVar a -> STM a
readTVar (TVar UnregisterHandle -> STM UnregisterHandle)
-> TVar UnregisterHandle -> STM UnregisterHandle
forall a b. (a -> b) -> a -> b
$ PubSubController -> TVar UnregisterHandle
lastUsedCallbackId PubSubController
HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm <- TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm <- TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
let newChans' :: [ByteString]
newChans' = [ ByteString
n | (ByteString
n,ByteString -> IO ()
_) <- [(ByteString, ByteString -> IO ())]
newChans, Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
newPChans' :: [ByteString]
newPChans' = [ ByteString
n | (ByteString
n, PMessageCallback
_) <- [(ByteString, PMessageCallback)]
newPChans, Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, PMessageCallback)]
ps :: PubSub
ps = [ByteString] -> PubSub
subscribe [ByteString]
newChans' PubSub -> PubSub -> PubSub
forall a. Monoid a => a -> a -> a
`mappend` [ByteString] -> PubSub
psubscribe [ByteString]
TBQueue PubSub -> PubSub -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl) PubSub
TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl) (([(UnregisterHandle, ByteString -> IO ())]
-> [(UnregisterHandle, ByteString -> IO ())]
-> [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> HashMap k v -> HashMap k v -> HashMap k v
HM.unionWith [(UnregisterHandle, ByteString -> IO ())]
-> [(UnregisterHandle, ByteString -> IO ())]
-> [(UnregisterHandle, ByteString -> IO ())]
forall a. [a] -> [a] -> [a]
(++) HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm (((ByteString -> IO ())
-> [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString (ByteString -> IO ())
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\ByteString -> IO ()
z -> [(UnregisterHandle
ident,ByteString -> IO ()
z)]) (HashMap ByteString (ByteString -> IO ())
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString (ByteString -> IO ())
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall a b. (a -> b) -> a -> b
$ [(ByteString, ByteString -> IO ())]
-> HashMap ByteString (ByteString -> IO ())
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HM.fromList [(ByteString, ByteString -> IO ())]
TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl) (([(UnregisterHandle, PMessageCallback)]
-> [(UnregisterHandle, PMessageCallback)]
-> [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> HashMap k v -> HashMap k v -> HashMap k v
HM.unionWith [(UnregisterHandle, PMessageCallback)]
-> [(UnregisterHandle, PMessageCallback)]
-> [(UnregisterHandle, PMessageCallback)]
forall a. [a] -> [a] -> [a]
(++) HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm ((PMessageCallback -> [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString PMessageCallback
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\PMessageCallback
z -> [(UnregisterHandle
z)]) (HashMap ByteString PMessageCallback
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString PMessageCallback
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall a b. (a -> b) -> a -> b
$ [(ByteString, PMessageCallback)]
-> HashMap ByteString PMessageCallback
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HM.fromList [(ByteString, PMessageCallback)]
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ PubSub -> Int
totalPendingChanges PubSub
UnregisterHandle -> STM UnregisterHandle
forall (m :: * -> *) a. Monad m => a -> m a
return UnregisterHandle
IO () -> IO (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> IO (IO ())) -> IO () -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ PubSubController
-> [ByteString] -> [ByteString] -> UnregisterHandle -> IO ()
unsubChannels PubSubController
ctrl (((ByteString, ByteString -> IO ()) -> ByteString)
-> [(ByteString, ByteString -> IO ())] -> [ByteString]
forall a b. (a -> b) -> [a] -> [b]
map (ByteString, ByteString -> IO ()) -> ByteString
forall a b. (a, b) -> a
fst [(ByteString, ByteString -> IO ())]
newChans) (((ByteString, PMessageCallback) -> ByteString)
-> [(ByteString, PMessageCallback)] -> [ByteString]
forall a b. (a -> b) -> [a] -> [b]
map (ByteString, PMessageCallback) -> ByteString
forall a b. (a, b) -> a
fst [(ByteString, PMessageCallback)]
newPChans) UnregisterHandle
addChannelsAndWait :: MonadIO m => PubSubController
-> [(RedisChannel, MessageCallback)]
-> [(RedisPChannel, PMessageCallback)]
-> m UnregisterCallbacksAction
addChannelsAndWait :: PubSubController
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
addChannelsAndWait PubSubController
_ [] [] = IO () -> m (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> m (IO ())) -> IO () -> m (IO ())
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
addChannelsAndWait PubSubController
ctrl [(ByteString, ByteString -> IO ())]
newChans [(ByteString, PMessageCallback)]
newPChans = do
IO ()
unreg <- PubSubController
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
forall (m :: * -> *).
MonadIO m =>
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
addChannels PubSubController
ctrl [(ByteString, ByteString -> IO ())]
newChans [(ByteString, PMessageCallback)]
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
r <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar (PubSubController -> TVar Int
pendingCnt PubSubController
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
r Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) STM ()
forall a. STM a
IO () -> m (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return IO ()
removeChannels :: MonadIO m => PubSubController
-> [RedisChannel]
-> [RedisPChannel]
-> m ()
removeChannels :: PubSubController -> [ByteString] -> [ByteString] -> m ()
removeChannels PubSubController
_ [] [] = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
removeChannels PubSubController
ctrl [ByteString]
remChans [ByteString]
remPChans = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm <- TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm <- TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
let remChans' :: [ByteString]
remChans' = (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm) [ByteString]
remPChans' :: [ByteString]
remPChans' = (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm) [ByteString]
ps :: PubSub
ps = (if [ByteString] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ByteString]
remChans' then PubSub
forall a. Monoid a => a
mempty else [ByteString] -> PubSub
unsubscribe [ByteString]
PubSub -> PubSub -> PubSub
forall a. Monoid a => a -> a -> a
`mappend` (if [ByteString] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ByteString]
remPChans' then PubSub
forall a. Monoid a => a
mempty else [ByteString] -> PubSub
punsubscribe [ByteString]
TBQueue PubSub -> PubSub -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl) PubSub
TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl) ((HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> [ByteString]
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' ((ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HM.delete) HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm [ByteString]
TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl) ((HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> [ByteString]
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' ((ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HM.delete) HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm [ByteString]
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ PubSub -> Int
totalPendingChanges PubSub
unsubChannels :: PubSubController -> [RedisChannel] -> [RedisPChannel] -> UnregisterHandle -> IO ()
unsubChannels :: PubSubController
-> [ByteString] -> [ByteString] -> UnregisterHandle -> IO ()
unsubChannels PubSubController
ctrl [ByteString]
chans [ByteString]
pchans UnregisterHandle
h = IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm <- TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm <- TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
let remChans :: [ByteString]
remChans = (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm) [ByteString]
remPChans :: [ByteString]
remPChans = (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm) [ByteString]
let filterHandle :: Maybe [(UnregisterHandle,a)] -> Maybe [(UnregisterHandle,a)]
filterHandle :: Maybe [(UnregisterHandle, a)] -> Maybe [(UnregisterHandle, a)]
filterHandle Maybe [(UnregisterHandle, a)]
Nothing = Maybe [(UnregisterHandle, a)]
forall a. Maybe a
filterHandle (Just [(UnregisterHandle, a)]
lst) = case ((UnregisterHandle, a) -> Bool)
-> [(UnregisterHandle, a)] -> [(UnregisterHandle, a)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(UnregisterHandle, a)
x -> (UnregisterHandle, a) -> UnregisterHandle
forall a b. (a, b) -> a
fst (UnregisterHandle, a)
x UnregisterHandle -> UnregisterHandle -> Bool
forall a. Eq a => a -> a -> Bool
/= UnregisterHandle
h) [(UnregisterHandle, a)]
lst of
[] -> Maybe [(UnregisterHandle, a)]
forall a. Maybe a
[(UnregisterHandle, a)]
xs -> [(UnregisterHandle, a)] -> Maybe [(UnregisterHandle, a)]
forall a. a -> Maybe a
Just [(UnregisterHandle, a)]
let removeHandles :: HM.HashMap ByteString [(UnregisterHandle,a)]
-> ByteString
-> HM.HashMap ByteString [(UnregisterHandle,a)]
removeHandles :: HashMap ByteString [(UnregisterHandle, a)]
-> ByteString -> HashMap ByteString [(UnregisterHandle, a)]
removeHandles HashMap ByteString [(UnregisterHandle, a)]
m ByteString
k = case Maybe [(UnregisterHandle, a)] -> Maybe [(UnregisterHandle, a)]
forall a.
Maybe [(UnregisterHandle, a)] -> Maybe [(UnregisterHandle, a)]
filterHandle (ByteString
-> HashMap ByteString [(UnregisterHandle, a)]
-> Maybe [(UnregisterHandle, a)]
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup ByteString
k HashMap ByteString [(UnregisterHandle, a)]
m) of
Maybe [(UnregisterHandle, a)]
Nothing -> ByteString
-> HashMap ByteString [(UnregisterHandle, a)]
-> HashMap ByteString [(UnregisterHandle, a)]
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HM.delete ByteString
k HashMap ByteString [(UnregisterHandle, a)]
Just [(UnregisterHandle, a)]
v -> ByteString
-> [(UnregisterHandle, a)]
-> HashMap ByteString [(UnregisterHandle, a)]
-> HashMap ByteString [(UnregisterHandle, a)]
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
HM.insert ByteString
k [(UnregisterHandle, a)]
v HashMap ByteString [(UnregisterHandle, a)]
let cm' :: HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm' = (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> [ByteString]
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall a.
HashMap ByteString [(UnregisterHandle, a)]
-> ByteString -> HashMap ByteString [(UnregisterHandle, a)]
removeHandles HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm [ByteString]
pm' :: HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm' = (HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> [ByteString]
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall a.
HashMap ByteString [(UnregisterHandle, a)]
-> ByteString -> HashMap ByteString [(UnregisterHandle, a)]
removeHandles HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm [ByteString]
let remChans' :: [ByteString]
remChans' = (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm') [ByteString]
remPChans' :: [ByteString]
remPChans' = (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm') [ByteString]
ps :: PubSub
ps = (if [ByteString] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ByteString]
remChans' then PubSub
forall a. Monoid a => a
mempty else [ByteString] -> PubSub
unsubscribe [ByteString]
PubSub -> PubSub -> PubSub
forall a. Monoid a => a -> a -> a
`mappend` (if [ByteString] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ByteString]
remPChans' then PubSub
forall a. Monoid a => a
mempty else [ByteString] -> PubSub
punsubscribe [ByteString]
TBQueue PubSub -> PubSub -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl) PubSub
TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl) HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl) HashMap ByteString [(UnregisterHandle, PMessageCallback)]
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ PubSub -> Int
totalPendingChanges PubSub
() -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
removeChannelsAndWait :: MonadIO m => PubSubController
-> [RedisChannel]
-> [RedisPChannel]
-> m ()
removeChannelsAndWait :: PubSubController -> [ByteString] -> [ByteString] -> m ()
removeChannelsAndWait PubSubController
_ [] [] = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
removeChannelsAndWait PubSubController
ctrl [ByteString]
remChans [ByteString]
remPChans = do
PubSubController -> [ByteString] -> [ByteString] -> m ()
forall (m :: * -> *).
MonadIO m =>
PubSubController -> [ByteString] -> [ByteString] -> m ()
removeChannels PubSubController
ctrl [ByteString]
remChans [ByteString]
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
r <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar (PubSubController -> TVar Int
pendingCnt PubSubController
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
r Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) STM ()
forall a. STM a
listenThread :: PubSubController -> PP.Connection -> IO ()
listenThread :: PubSubController -> Connection -> IO ()
listenThread PubSubController
ctrl Connection
rawConn = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
msg <- Connection -> IO Reply
PP.recv Connection
case Reply -> PubSubReply
decodeMsg Reply
msg of
Msg (Message ByteString
channel ByteString
msgCt) -> do
HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm <- STM (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. STM a -> IO a
atomically (STM (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
case ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> Maybe [(UnregisterHandle, ByteString -> IO ())]
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup ByteString
channel HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm of
Maybe [(UnregisterHandle, ByteString -> IO ())]
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just [(UnregisterHandle, ByteString -> IO ())]
c -> ((UnregisterHandle, ByteString -> IO ()) -> IO ())
-> [(UnregisterHandle, ByteString -> IO ())] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(UnregisterHandle
_,ByteString -> IO ()
x) -> ByteString -> IO ()
x ByteString
msgCt) [(UnregisterHandle, ByteString -> IO ())]
Msg (PMessage ByteString
pattern ByteString
channel ByteString
msgCt) -> do
HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm <- STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. STM a -> IO a
atomically (STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. TVar a -> STM a
readTVar (PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
case ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> Maybe [(UnregisterHandle, PMessageCallback)]
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup ByteString
pattern HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm of
Maybe [(UnregisterHandle, PMessageCallback)]
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just [(UnregisterHandle, PMessageCallback)]
c -> ((UnregisterHandle, PMessageCallback) -> IO ())
-> [(UnregisterHandle, PMessageCallback)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(UnregisterHandle
x) -> PMessageCallback
x ByteString
channel ByteString
msgCt) [(UnregisterHandle, PMessageCallback)]
Subscribed -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (\Int
x -> Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
Unsubscribed Int
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (\Int
x -> Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
sendThread :: PubSubController -> PP.Connection -> IO ()
sendThread :: PubSubController -> Connection -> IO ()
sendThread PubSubController
ctrl Connection
rawConn = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
PubSub{Cmd Unsubscribe Pattern
Cmd Unsubscribe Channel
Cmd Subscribe Pattern
Cmd Subscribe Channel
punsubs :: Cmd Unsubscribe Pattern
psubs :: Cmd Subscribe Pattern
unsubs :: Cmd Unsubscribe Channel
subs :: Cmd Subscribe Channel
punsubs :: PubSub -> Cmd Unsubscribe Pattern
psubs :: PubSub -> Cmd Subscribe Pattern
unsubs :: PubSub -> Cmd Unsubscribe Channel
subs :: PubSub -> Cmd Subscribe Channel
..} <- STM PubSub -> IO PubSub
forall a. STM a -> IO a
atomically (STM PubSub -> IO PubSub) -> STM PubSub -> IO PubSub
forall a b. (a -> b) -> a -> b
$ TBQueue PubSub -> STM PubSub
forall a. TBQueue a -> STM a
readTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
Connection -> Cmd Subscribe Channel -> IO ()
forall a b. Command (Cmd a b) => Connection -> Cmd a b -> IO ()
rawSendCmd Connection
rawConn Cmd Subscribe Channel
Connection -> Cmd Unsubscribe Channel -> IO ()
forall a b. Command (Cmd a b) => Connection -> Cmd a b -> IO ()
rawSendCmd Connection
rawConn Cmd Unsubscribe Channel
Connection -> Cmd Subscribe Pattern -> IO ()
forall a b. Command (Cmd a b) => Connection -> Cmd a b -> IO ()
rawSendCmd Connection
rawConn Cmd Subscribe Pattern
Connection -> Cmd Unsubscribe Pattern -> IO ()
forall a b. Command (Cmd a b) => Connection -> Cmd a b -> IO ()
rawSendCmd Connection
rawConn Cmd Unsubscribe Pattern
Connection -> IO ()
PP.flush Connection
pubSubForever :: Connection.Connection
-> PubSubController
-> IO ()
-> IO ()
pubSubForever :: Connection -> PubSubController -> IO () -> IO ()
pubSubForever (Connection.NonClusteredConnection Pool Connection
pool) PubSubController
ctrl IO ()
onInitialLoad = Pool Connection -> (Connection -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Pool a -> (a -> m b) -> m b
withResource Pool Connection
pool ((Connection -> IO ()) -> IO ()) -> (Connection -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
rawConn -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
let loop :: STM ()
loop = TBQueue PubSub -> STM (Maybe PubSub)
forall a. TBQueue a -> STM (Maybe a)
tryReadTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl) STM (Maybe PubSub) -> (Maybe PubSub -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
\Maybe PubSub
x -> if Maybe PubSub -> Bool
forall a. Maybe a -> Bool
isJust Maybe PubSub
x then STM ()
loop else () -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
STM ()
HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm <- TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm <- TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
let ps :: PubSub
ps = [ByteString] -> PubSub
subscribe (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> [ByteString]
forall k v. HashMap k v -> [k]
HM.keys HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm) PubSub -> PubSub -> PubSub
forall a. Monoid a => a -> a -> a
`mappend` [ByteString] -> PubSub
psubscribe (HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> [ByteString]
forall k v. HashMap k v -> [k]
HM.keys HashMap ByteString [(UnregisterHandle, PMessageCallback)]
TBQueue PubSub -> PubSub -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl) PubSub
TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (PubSub -> Int
totalPendingChanges PubSub
IO () -> (Async () -> IO ()) -> IO ()
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (PubSubController -> Connection -> IO ()
listenThread PubSubController
ctrl Connection
rawConn) ((Async () -> IO ()) -> IO ()) -> (Async () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async ()
listenT ->
IO () -> (Async () -> IO ()) -> IO ()
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (PubSubController -> Connection -> IO ()
sendThread PubSubController
ctrl Connection
rawConn) ((Async () -> IO ()) -> IO ()) -> (Async () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async ()
sendT -> do
(Either (Either SomeException ()) (Either SomeException ())) ()
mret <- STM
(Either (Either SomeException ()) (Either SomeException ())) ())
-> IO
(Either (Either SomeException ()) (Either SomeException ())) ())
forall a. STM a -> IO a
atomically (STM
(Either (Either SomeException ()) (Either SomeException ())) ())
-> IO
(Either (Either SomeException ()) (Either SomeException ())) ()))
-> STM
(Either (Either SomeException ()) (Either SomeException ())) ())
-> IO
(Either (Either SomeException ()) (Either SomeException ())) ())
forall a b. (a -> b) -> a -> b
(Either (Either SomeException ()) (Either SomeException ())
-> Either
(Either (Either SomeException ()) (Either SomeException ())) ()
forall a b. a -> Either a b
Left (Either (Either SomeException ()) (Either SomeException ())
-> Either
(Either (Either SomeException ()) (Either SomeException ())) ())
-> STM (Either (Either SomeException ()) (Either SomeException ()))
-> STM
(Either (Either SomeException ()) (Either SomeException ())) ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Async ()
-> Async ()
-> STM (Either (Either SomeException ()) (Either SomeException ()))
forall a b.
Async a
-> Async b
-> STM (Either (Either SomeException a) (Either SomeException b))
waitEitherCatchSTM Async ()
listenT Async ()
(Either (Either SomeException ()) (Either SomeException ())) ())
-> STM
(Either (Either SomeException ()) (Either SomeException ())) ())
-> STM
(Either (Either SomeException ()) (Either SomeException ())) ())
forall a. STM a -> STM a -> STM a
-> Either
(Either (Either SomeException ()) (Either SomeException ())) ()
forall a b. b -> Either a b
Right (()
-> Either
(Either (Either SomeException ()) (Either SomeException ())) ())
-> STM ()
-> STM
(Either (Either SomeException ()) (Either SomeException ())) ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) STM Int -> (Int -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
x -> if Int
x Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 then STM ()
forall a. STM a
retry else () -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()))
case Either
(Either (Either SomeException ()) (Either SomeException ())) ()
mret of
Right () -> IO ()
(Either (Either SomeException ()) (Either SomeException ())) ()
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Either (Either SomeException ()) (Either SomeException ())
merr <- Async ()
-> Async ()
-> IO (Either (Either SomeException ()) (Either SomeException ()))
forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch Async ()
listenT Async ()
case Either (Either SomeException ()) (Either SomeException ())
merr of
(Right (Left SomeException
err)) -> SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO SomeException
(Left (Left SomeException
err)) -> SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO SomeException
Either (Either SomeException ()) (Either SomeException ())
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
pubSubForever (Connection.ClusteredConnection MVar ShardMap
_ Pool Connection
_) PubSubController
_ IO ()
_ = IO ()
forall a. HasCallStack => a
decodeMsg :: Reply -> PubSubReply
decodeMsg :: Reply -> PubSubReply
decodeMsg r :: Reply
r@(MultiBulk (Just (Reply
rs))) = (Reply -> PubSubReply)
-> (PubSubReply -> PubSubReply)
-> Either Reply PubSubReply
-> PubSubReply
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (Reply -> Reply -> PubSubReply
forall a. Reply -> a
errMsg Reply
r) PubSubReply -> PubSubReply
forall a. a -> a
id (Either Reply PubSubReply -> PubSubReply)
-> Either Reply PubSubReply -> PubSubReply
forall a b. (a -> b) -> a -> b
$ do
kind <- Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
case ByteString
kind :: ByteString of
"message" -> Message -> PubSubReply
Msg (Message -> PubSubReply)
-> Either Reply Message -> Either Reply PubSubReply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply Message
"pmessage" -> Message -> PubSubReply
Msg (Message -> PubSubReply)
-> Either Reply Message -> Either Reply PubSubReply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply Message
"subscribe" -> PubSubReply -> Either Reply PubSubReply
forall (m :: * -> *) a. Monad m => a -> m a
return PubSubReply
"psubscribe" -> PubSubReply -> Either Reply PubSubReply
forall (m :: * -> *) a. Monad m => a -> m a
return PubSubReply
"unsubscribe" -> Int -> PubSubReply
Unsubscribed (Int -> PubSubReply)
-> Either Reply Int -> Either Reply PubSubReply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply Int
"punsubscribe" -> Int -> PubSubReply
Unsubscribed (Int -> PubSubReply)
-> Either Reply Int -> Either Reply PubSubReply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply Int
_ -> Reply -> Either Reply PubSubReply
forall a. Reply -> a
errMsg Reply
decodeMessage :: Either Reply Message
decodeMessage = ByteString -> ByteString -> Message
Message (ByteString -> ByteString -> Message)
-> Either Reply ByteString -> Either Reply (ByteString -> Message)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r1 Either Reply (ByteString -> Message)
-> Either Reply ByteString -> Either Reply Message
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
decodePMessage :: Either Reply Message
decodePMessage = ByteString -> ByteString -> ByteString -> Message
PMessage (ByteString -> ByteString -> ByteString -> Message)
-> Either Reply ByteString
-> Either Reply (ByteString -> ByteString -> Message)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r1 Either Reply (ByteString -> ByteString -> Message)
-> Either Reply ByteString -> Either Reply (ByteString -> Message)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r2 Either Reply (ByteString -> Message)
-> Either Reply ByteString -> Either Reply Message
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode ([Reply] -> Reply
forall a. [a] -> a
head [Reply]
decodeCnt :: Either Reply Int
decodeCnt = Integer -> Int
forall a. Num a => Integer -> a
fromInteger (Integer -> Int) -> Either Reply Integer -> Either Reply Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Reply -> Either Reply Integer
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
decodeMsg Reply
r = Reply -> PubSubReply
forall a. Reply -> a
errMsg Reply
errMsg :: Reply -> a
errMsg :: Reply -> a
errMsg Reply
r = String -> a
forall a. HasCallStack => String -> a
error (String -> a) -> String -> a
forall a b. (a -> b) -> a -> b
$ String
"Hedis: expected pub/sub-message but got: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Reply -> String
forall a. Show a => a -> String
show Reply