{-# LANGUAGE CPP, OverloadedStrings, RecordWildCards, EmptyDataDecls,
FlexibleInstances, FlexibleContexts, GeneralizedNewtypeDeriving #-}
module Database.Redis.PubSub (
publish,
pubSub,
Message(..),
PubSub(),
subscribe, unsubscribe, psubscribe, punsubscribe,
pubSubForever,
RedisChannel, RedisPChannel, MessageCallback, PMessageCallback,
PubSubController, newPubSubController, currentChannels, currentPChannels,
addChannels, addChannelsAndWait, removeChannels, removeChannelsAndWait,
UnregisterCallbacksAction
) where
#if __GLASGOW_HASKELL__ < 710
import Control.Applicative
import Data.Monoid hiding (<>)
#endif
import Control.Concurrent.Async (withAsync, waitEitherCatch, waitEitherCatchSTM)
import Control.Concurrent.STM
import Control.Exception (throwIO)
import Control.Monad
import Control.Monad.State
import Data.ByteString.Char8 (ByteString)
import Data.List (foldl')
import Data.Maybe (isJust)
import Data.Pool
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import qualified Data.HashMap.Strict as HM
import qualified Database.Redis.Core as Core
import qualified Database.Redis.Connection as Connection
import qualified Database.Redis.ProtocolPipelining as PP
import Database.Redis.Protocol (Reply(..), renderRequest)
import Database.Redis.Types
data PubSubState = PubSubState { PubSubState -> Int
subCnt, PubSubState -> Int
pending :: Int }
modifyPending :: (MonadState PubSubState m) => (Int -> Int) -> m ()
modifyPending :: (Int -> Int) -> m ()
modifyPending Int -> Int
f = (PubSubState -> PubSubState) -> m ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify ((PubSubState -> PubSubState) -> m ())
-> (PubSubState -> PubSubState) -> m ()
forall a b. (a -> b) -> a -> b
$ \PubSubState
s -> PubSubState
s{ pending :: Int
pending = Int -> Int
f (PubSubState -> Int
pending PubSubState
s) }
putSubCnt :: (MonadState PubSubState m) => Int -> m ()
putSubCnt :: Int -> m ()
putSubCnt Int
n = (PubSubState -> PubSubState) -> m ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify ((PubSubState -> PubSubState) -> m ())
-> (PubSubState -> PubSubState) -> m ()
forall a b. (a -> b) -> a -> b
$ \PubSubState
s -> PubSubState
s{ subCnt :: Int
subCnt = Int
n }
data Subscribe
data Unsubscribe
data Channel
data Pattern
data PubSub = PubSub
{ PubSub -> Cmd Subscribe Channel
subs :: Cmd Subscribe Channel
, PubSub -> Cmd Unsubscribe Channel
unsubs :: Cmd Unsubscribe Channel
, PubSub -> Cmd Subscribe Pattern
psubs :: Cmd Subscribe Pattern
, PubSub -> Cmd Unsubscribe Pattern
punsubs :: Cmd Unsubscribe Pattern
} deriving (PubSub -> PubSub -> Bool
(PubSub -> PubSub -> Bool)
-> (PubSub -> PubSub -> Bool) -> Eq PubSub
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: PubSub -> PubSub -> Bool
$c/= :: PubSub -> PubSub -> Bool
== :: PubSub -> PubSub -> Bool
$c== :: PubSub -> PubSub -> Bool
Eq)
instance Semigroup PubSub where
<> :: PubSub -> PubSub -> PubSub
(<>) PubSub
p1 PubSub
p2 = PubSub :: Cmd Subscribe Channel
-> Cmd Unsubscribe Channel
-> Cmd Subscribe Pattern
-> Cmd Unsubscribe Pattern
-> PubSub
PubSub { subs :: Cmd Subscribe Channel
subs = PubSub -> Cmd Subscribe Channel
subs PubSub
p1 Cmd Subscribe Channel
-> Cmd Subscribe Channel -> Cmd Subscribe Channel
forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Subscribe Channel
subs PubSub
p2
, unsubs :: Cmd Unsubscribe Channel
unsubs = PubSub -> Cmd Unsubscribe Channel
unsubs PubSub
p1 Cmd Unsubscribe Channel
-> Cmd Unsubscribe Channel -> Cmd Unsubscribe Channel
forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Unsubscribe Channel
unsubs PubSub
p2
, psubs :: Cmd Subscribe Pattern
psubs = PubSub -> Cmd Subscribe Pattern
psubs PubSub
p1 Cmd Subscribe Pattern
-> Cmd Subscribe Pattern -> Cmd Subscribe Pattern
forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Subscribe Pattern
psubs PubSub
p2
, punsubs :: Cmd Unsubscribe Pattern
punsubs = PubSub -> Cmd Unsubscribe Pattern
punsubs PubSub
p1 Cmd Unsubscribe Pattern
-> Cmd Unsubscribe Pattern -> Cmd Unsubscribe Pattern
forall a. Monoid a => a -> a -> a
`mappend` PubSub -> Cmd Unsubscribe Pattern
punsubs PubSub
p2
}
instance Monoid PubSub where
mempty :: PubSub
mempty = Cmd Subscribe Channel
-> Cmd Unsubscribe Channel
-> Cmd Subscribe Pattern
-> Cmd Unsubscribe Pattern
-> PubSub
PubSub Cmd Subscribe Channel
forall a. Monoid a => a
mempty Cmd Unsubscribe Channel
forall a. Monoid a => a
mempty Cmd Subscribe Pattern
forall a. Monoid a => a
mempty Cmd Unsubscribe Pattern
forall a. Monoid a => a
mempty
mappend :: PubSub -> PubSub -> PubSub
mappend = PubSub -> PubSub -> PubSub
forall a. Semigroup a => a -> a -> a
(<>)
data Cmd a b = DoNothing | Cmd { Cmd a b -> [ByteString]
changes :: [ByteString] } deriving (Cmd a b -> Cmd a b -> Bool
(Cmd a b -> Cmd a b -> Bool)
-> (Cmd a b -> Cmd a b -> Bool) -> Eq (Cmd a b)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
forall a b. Cmd a b -> Cmd a b -> Bool
/= :: Cmd a b -> Cmd a b -> Bool
$c/= :: forall a b. Cmd a b -> Cmd a b -> Bool
== :: Cmd a b -> Cmd a b -> Bool
$c== :: forall a b. Cmd a b -> Cmd a b -> Bool
Eq)
instance Semigroup (Cmd Subscribe a) where
<> :: Cmd Subscribe a -> Cmd Subscribe a -> Cmd Subscribe a
(<>) Cmd Subscribe a
DoNothing Cmd Subscribe a
x = Cmd Subscribe a
x
(<>) Cmd Subscribe a
x Cmd Subscribe a
DoNothing = Cmd Subscribe a
x
(<>) (Cmd [ByteString]
xs) (Cmd [ByteString]
ys) = [ByteString] -> Cmd Subscribe a
forall a b. [ByteString] -> Cmd a b
Cmd ([ByteString]
xs [ByteString] -> [ByteString] -> [ByteString]
forall a. [a] -> [a] -> [a]
++ [ByteString]
ys)
instance Monoid (Cmd Subscribe a) where
mempty :: Cmd Subscribe a
mempty = Cmd Subscribe a
forall a b. Cmd a b
DoNothing
mappend :: Cmd Subscribe a -> Cmd Subscribe a -> Cmd Subscribe a
mappend = Cmd Subscribe a -> Cmd Subscribe a -> Cmd Subscribe a
forall a. Semigroup a => a -> a -> a
(<>)
instance Semigroup (Cmd Unsubscribe a) where
<> :: Cmd Unsubscribe a -> Cmd Unsubscribe a -> Cmd Unsubscribe a
(<>) Cmd Unsubscribe a
DoNothing Cmd Unsubscribe a
x = Cmd Unsubscribe a
x
(<>) Cmd Unsubscribe a
x Cmd Unsubscribe a
DoNothing = Cmd Unsubscribe a
x
(<>) (Cmd []) Cmd Unsubscribe a
_ = [ByteString] -> Cmd Unsubscribe a
forall a b. [ByteString] -> Cmd a b
Cmd []
(<>) Cmd Unsubscribe a
_ (Cmd []) = [ByteString] -> Cmd Unsubscribe a
forall a b. [ByteString] -> Cmd a b
Cmd []
(<>) (Cmd [ByteString]
xs) (Cmd [ByteString]
ys) = [ByteString] -> Cmd Unsubscribe a
forall a b. [ByteString] -> Cmd a b
Cmd ([ByteString]
xs [ByteString] -> [ByteString] -> [ByteString]
forall a. [a] -> [a] -> [a]
++ [ByteString]
ys)
instance Monoid (Cmd Unsubscribe a) where
mempty :: Cmd Unsubscribe a
mempty = Cmd Unsubscribe a
forall a b. Cmd a b
DoNothing
mappend :: Cmd Unsubscribe a -> Cmd Unsubscribe a -> Cmd Unsubscribe a
mappend = Cmd Unsubscribe a -> Cmd Unsubscribe a -> Cmd Unsubscribe a
forall a. Semigroup a => a -> a -> a
(<>)
class Command a where
redisCmd :: a -> ByteString
updatePending :: a -> Int -> Int
sendCmd :: (Command (Cmd a b)) => Cmd a b -> StateT PubSubState Core.Redis ()
sendCmd :: Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd a b
DoNothing = () -> StateT PubSubState Redis ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
sendCmd Cmd a b
cmd = do
Redis () -> StateT PubSubState Redis ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Redis () -> StateT PubSubState Redis ())
-> Redis () -> StateT PubSubState Redis ()
forall a b. (a -> b) -> a -> b
$ [ByteString] -> Redis ()
forall (m :: * -> *). MonadRedis m => [ByteString] -> m ()
Core.send (Cmd a b -> ByteString
forall a. Command a => a -> ByteString
redisCmd Cmd a b
cmd ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: Cmd a b -> [ByteString]
forall a b. Cmd a b -> [ByteString]
changes Cmd a b
cmd)
(Int -> Int) -> StateT PubSubState Redis ()
forall (m :: * -> *).
MonadState PubSubState m =>
(Int -> Int) -> m ()
modifyPending (Cmd a b -> Int -> Int
forall a. Command a => a -> Int -> Int
updatePending Cmd a b
cmd)
cmdCount :: Cmd a b -> Int
cmdCount :: Cmd a b -> Int
cmdCount Cmd a b
DoNothing = Int
0
cmdCount (Cmd [ByteString]
c) = [ByteString] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [ByteString]
c
totalPendingChanges :: PubSub -> Int
totalPendingChanges :: PubSub -> Int
totalPendingChanges (PubSub{Cmd Unsubscribe Pattern
Cmd Unsubscribe Channel
Cmd Subscribe Pattern
Cmd Subscribe Channel
punsubs :: Cmd Unsubscribe Pattern
psubs :: Cmd Subscribe Pattern
unsubs :: Cmd Unsubscribe Channel
subs :: Cmd Subscribe Channel
punsubs :: PubSub -> Cmd Unsubscribe Pattern
psubs :: PubSub -> Cmd Subscribe Pattern
unsubs :: PubSub -> Cmd Unsubscribe Channel
subs :: PubSub -> Cmd Subscribe Channel
..}) =
Cmd Subscribe Channel -> Int
forall a b. Cmd a b -> Int
cmdCount Cmd Subscribe Channel
subs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Cmd Unsubscribe Channel -> Int
forall a b. Cmd a b -> Int
cmdCount Cmd Unsubscribe Channel
unsubs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Cmd Subscribe Pattern -> Int
forall a b. Cmd a b -> Int
cmdCount Cmd Subscribe Pattern
psubs Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Cmd Unsubscribe Pattern -> Int
forall a b. Cmd a b -> Int
cmdCount Cmd Unsubscribe Pattern
punsubs
rawSendCmd :: (Command (Cmd a b)) => PP.Connection -> Cmd a b -> IO ()
rawSendCmd :: Connection -> Cmd a b -> IO ()
rawSendCmd Connection
_ Cmd a b
DoNothing = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
rawSendCmd Connection
conn Cmd a b
cmd = Connection -> ByteString -> IO ()
PP.send Connection
conn (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
renderRequest ([ByteString] -> ByteString) -> [ByteString] -> ByteString
forall a b. (a -> b) -> a -> b
$ Cmd a b -> ByteString
forall a. Command a => a -> ByteString
redisCmd Cmd a b
cmd ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
: Cmd a b -> [ByteString]
forall a b. Cmd a b -> [ByteString]
changes Cmd a b
cmd
plusChangeCnt :: Cmd a b -> Int -> Int
plusChangeCnt :: Cmd a b -> Int -> Int
plusChangeCnt Cmd a b
DoNothing = Int -> Int
forall a. a -> a
id
plusChangeCnt (Cmd [ByteString]
cs) = (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ [ByteString] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [ByteString]
cs)
instance Command (Cmd Subscribe Channel) where
redisCmd :: Cmd Subscribe Channel -> ByteString
redisCmd = ByteString -> Cmd Subscribe Channel -> ByteString
forall a b. a -> b -> a
const ByteString
"SUBSCRIBE"
updatePending :: Cmd Subscribe Channel -> Int -> Int
updatePending = Cmd Subscribe Channel -> Int -> Int
forall a b. Cmd a b -> Int -> Int
plusChangeCnt
instance Command (Cmd Subscribe Pattern) where
redisCmd :: Cmd Subscribe Pattern -> ByteString
redisCmd = ByteString -> Cmd Subscribe Pattern -> ByteString
forall a b. a -> b -> a
const ByteString
"PSUBSCRIBE"
updatePending :: Cmd Subscribe Pattern -> Int -> Int
updatePending = Cmd Subscribe Pattern -> Int -> Int
forall a b. Cmd a b -> Int -> Int
plusChangeCnt
instance Command (Cmd Unsubscribe Channel) where
redisCmd :: Cmd Unsubscribe Channel -> ByteString
redisCmd = ByteString -> Cmd Unsubscribe Channel -> ByteString
forall a b. a -> b -> a
const ByteString
"UNSUBSCRIBE"
updatePending :: Cmd Unsubscribe Channel -> Int -> Int
updatePending = (Int -> Int) -> Cmd Unsubscribe Channel -> Int -> Int
forall a b. a -> b -> a
const Int -> Int
forall a. a -> a
id
instance Command (Cmd Unsubscribe Pattern) where
redisCmd :: Cmd Unsubscribe Pattern -> ByteString
redisCmd = ByteString -> Cmd Unsubscribe Pattern -> ByteString
forall a b. a -> b -> a
const ByteString
"PUNSUBSCRIBE"
updatePending :: Cmd Unsubscribe Pattern -> Int -> Int
updatePending = (Int -> Int) -> Cmd Unsubscribe Pattern -> Int -> Int
forall a b. a -> b -> a
const Int -> Int
forall a. a -> a
id
data Message = Message { Message -> ByteString
msgChannel, Message -> ByteString
msgMessage :: ByteString}
| PMessage { Message -> ByteString
msgPattern, msgChannel, msgMessage :: ByteString}
deriving (Int -> Message -> ShowS
[Message] -> ShowS
Message -> String
(Int -> Message -> ShowS)
-> (Message -> String) -> ([Message] -> ShowS) -> Show Message
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Message] -> ShowS
$cshowList :: [Message] -> ShowS
show :: Message -> String
$cshow :: Message -> String
showsPrec :: Int -> Message -> ShowS
$cshowsPrec :: Int -> Message -> ShowS
Show)
data PubSubReply = Subscribed | Unsubscribed Int | Msg Message
publish
:: (Core.RedisCtx m f)
=> ByteString
-> ByteString
-> m (f Integer)
publish :: ByteString -> ByteString -> m (f Integer)
publish ByteString
channel ByteString
message =
[ByteString] -> m (f Integer)
forall (m :: * -> *) (f :: * -> *) a.
(RedisCtx m f, RedisResult a) =>
[ByteString] -> m (f a)
Core.sendRequest [ByteString
"PUBLISH", ByteString
channel, ByteString
message]
subscribe
:: [ByteString]
-> PubSub
subscribe :: [ByteString] -> PubSub
subscribe [] = PubSub
forall a. Monoid a => a
mempty
subscribe [ByteString]
cs = PubSub
forall a. Monoid a => a
mempty{ subs :: Cmd Subscribe Channel
subs = [ByteString] -> Cmd Subscribe Channel
forall a b. [ByteString] -> Cmd a b
Cmd [ByteString]
cs }
unsubscribe
:: [ByteString]
-> PubSub
unsubscribe :: [ByteString] -> PubSub
unsubscribe [ByteString]
cs = PubSub
forall a. Monoid a => a
mempty{ unsubs :: Cmd Unsubscribe Channel
unsubs = [ByteString] -> Cmd Unsubscribe Channel
forall a b. [ByteString] -> Cmd a b
Cmd [ByteString]
cs }
psubscribe
:: [ByteString]
-> PubSub
psubscribe :: [ByteString] -> PubSub
psubscribe [] = PubSub
forall a. Monoid a => a
mempty
psubscribe [ByteString]
ps = PubSub
forall a. Monoid a => a
mempty{ psubs :: Cmd Subscribe Pattern
psubs = [ByteString] -> Cmd Subscribe Pattern
forall a b. [ByteString] -> Cmd a b
Cmd [ByteString]
ps }
punsubscribe
:: [ByteString]
-> PubSub
punsubscribe :: [ByteString] -> PubSub
punsubscribe [ByteString]
ps = PubSub
forall a. Monoid a => a
mempty{ punsubs :: Cmd Unsubscribe Pattern
punsubs = [ByteString] -> Cmd Unsubscribe Pattern
forall a b. [ByteString] -> Cmd a b
Cmd [ByteString]
ps }
pubSub
:: PubSub
-> (Message -> IO PubSub)
-> Core.Redis ()
pubSub :: PubSub -> (Message -> IO PubSub) -> Redis ()
pubSub PubSub
initial Message -> IO PubSub
callback
| PubSub
initial PubSub -> PubSub -> Bool
forall a. Eq a => a -> a -> Bool
== PubSub
forall a. Monoid a => a
mempty = () -> Redis ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
| Bool
otherwise = StateT PubSubState Redis () -> PubSubState -> Redis ()
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m a
evalStateT (PubSub -> StateT PubSubState Redis ()
send PubSub
initial) (Int -> Int -> PubSubState
PubSubState Int
0 Int
0)
where
send :: PubSub -> StateT PubSubState Core.Redis ()
send :: PubSub -> StateT PubSubState Redis ()
send PubSub{Cmd Unsubscribe Pattern
Cmd Unsubscribe Channel
Cmd Subscribe Pattern
Cmd Subscribe Channel
punsubs :: Cmd Unsubscribe Pattern
psubs :: Cmd Subscribe Pattern
unsubs :: Cmd Unsubscribe Channel
subs :: Cmd Subscribe Channel
punsubs :: PubSub -> Cmd Unsubscribe Pattern
psubs :: PubSub -> Cmd Subscribe Pattern
unsubs :: PubSub -> Cmd Unsubscribe Channel
subs :: PubSub -> Cmd Subscribe Channel
..} = do
Cmd Subscribe Channel -> StateT PubSubState Redis ()
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Subscribe Channel
subs
Cmd Unsubscribe Channel -> StateT PubSubState Redis ()
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Unsubscribe Channel
unsubs
Cmd Subscribe Pattern -> StateT PubSubState Redis ()
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Subscribe Pattern
psubs
Cmd Unsubscribe Pattern -> StateT PubSubState Redis ()
forall a b.
Command (Cmd a b) =>
Cmd a b -> StateT PubSubState Redis ()
sendCmd Cmd Unsubscribe Pattern
punsubs
StateT PubSubState Redis ()
recv
recv :: StateT PubSubState Core.Redis ()
recv :: StateT PubSubState Redis ()
recv = do
Reply
reply <- Redis Reply -> StateT PubSubState Redis Reply
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Redis Reply
forall (m :: * -> *). MonadRedis m => m Reply
Core.recv
case Reply -> PubSubReply
decodeMsg Reply
reply of
Msg Message
msg -> IO PubSub -> StateT PubSubState Redis PubSub
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Message -> IO PubSub
callback Message
msg) StateT PubSubState Redis PubSub
-> (PubSub -> StateT PubSubState Redis ())
-> StateT PubSubState Redis ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= PubSub -> StateT PubSubState Redis ()
send
PubSubReply
Subscribed -> (Int -> Int) -> StateT PubSubState Redis ()
forall (m :: * -> *).
MonadState PubSubState m =>
(Int -> Int) -> m ()
modifyPending (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1) StateT PubSubState Redis ()
-> StateT PubSubState Redis () -> StateT PubSubState Redis ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> StateT PubSubState Redis ()
recv
Unsubscribed Int
n -> do
Int -> StateT PubSubState Redis ()
forall (m :: * -> *). MonadState PubSubState m => Int -> m ()
putSubCnt Int
n
PubSubState{Int
pending :: Int
subCnt :: Int
pending :: PubSubState -> Int
subCnt :: PubSubState -> Int
..} <- StateT PubSubState Redis PubSubState
forall s (m :: * -> *). MonadState s m => m s
get
Bool -> StateT PubSubState Redis () -> StateT PubSubState Redis ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Int
subCnt Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 Bool -> Bool -> Bool
&& Int
pending Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0) StateT PubSubState Redis ()
recv
type RedisChannel = ByteString
type RedisPChannel = ByteString
type MessageCallback = ByteString -> IO ()
type PMessageCallback = RedisChannel -> ByteString -> IO ()
type UnregisterCallbacksAction = IO ()
newtype UnregisterHandle = UnregisterHandle Integer
deriving (UnregisterHandle -> UnregisterHandle -> Bool
(UnregisterHandle -> UnregisterHandle -> Bool)
-> (UnregisterHandle -> UnregisterHandle -> Bool)
-> Eq UnregisterHandle
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: UnregisterHandle -> UnregisterHandle -> Bool
$c/= :: UnregisterHandle -> UnregisterHandle -> Bool
== :: UnregisterHandle -> UnregisterHandle -> Bool
$c== :: UnregisterHandle -> UnregisterHandle -> Bool
Eq, Int -> UnregisterHandle -> ShowS
[UnregisterHandle] -> ShowS
UnregisterHandle -> String
(Int -> UnregisterHandle -> ShowS)
-> (UnregisterHandle -> String)
-> ([UnregisterHandle] -> ShowS)
-> Show UnregisterHandle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [UnregisterHandle] -> ShowS
$cshowList :: [UnregisterHandle] -> ShowS
show :: UnregisterHandle -> String
$cshow :: UnregisterHandle -> String
showsPrec :: Int -> UnregisterHandle -> ShowS
$cshowsPrec :: Int -> UnregisterHandle -> ShowS
Show, Integer -> UnregisterHandle
UnregisterHandle -> UnregisterHandle
UnregisterHandle -> UnregisterHandle -> UnregisterHandle
(UnregisterHandle -> UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle)
-> (UnregisterHandle -> UnregisterHandle)
-> (Integer -> UnregisterHandle)
-> Num UnregisterHandle
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> UnregisterHandle
$cfromInteger :: Integer -> UnregisterHandle
signum :: UnregisterHandle -> UnregisterHandle
$csignum :: UnregisterHandle -> UnregisterHandle
abs :: UnregisterHandle -> UnregisterHandle
$cabs :: UnregisterHandle -> UnregisterHandle
negate :: UnregisterHandle -> UnregisterHandle
$cnegate :: UnregisterHandle -> UnregisterHandle
* :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
$c* :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
- :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
$c- :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
+ :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
$c+ :: UnregisterHandle -> UnregisterHandle -> UnregisterHandle
Num)
data PubSubController = PubSubController
{ PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks :: TVar (HM.HashMap RedisChannel [(UnregisterHandle, MessageCallback)])
, PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks :: TVar (HM.HashMap RedisPChannel [(UnregisterHandle, PMessageCallback)])
, PubSubController -> TBQueue PubSub
sendChanges :: TBQueue PubSub
, PubSubController -> TVar Int
pendingCnt :: TVar Int
, PubSubController -> TVar UnregisterHandle
lastUsedCallbackId :: TVar UnregisterHandle
}
newPubSubController :: MonadIO m => [(RedisChannel, MessageCallback)]
-> [(RedisPChannel, PMessageCallback)]
-> m PubSubController
newPubSubController :: [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)] -> m PubSubController
newPubSubController [(ByteString, ByteString -> IO ())]
x [(ByteString, PMessageCallback)]
y = IO PubSubController -> m PubSubController
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO PubSubController -> m PubSubController)
-> IO PubSubController -> m PubSubController
forall a b. (a -> b) -> a -> b
$ do
TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
cbs <- HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> IO
(TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
forall a. a -> IO (TVar a)
newTVarIO (((ByteString -> IO ())
-> [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString (ByteString -> IO ())
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall v1 v2 k. (v1 -> v2) -> HashMap k v1 -> HashMap k v2
HM.map (\ByteString -> IO ()
z -> [(UnregisterHandle
0,ByteString -> IO ()
z)]) (HashMap ByteString (ByteString -> IO ())
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString (ByteString -> IO ())
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall a b. (a -> b) -> a -> b
$ [(ByteString, ByteString -> IO ())]
-> HashMap ByteString (ByteString -> IO ())
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HM.fromList [(ByteString, ByteString -> IO ())]
x)
TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcbs <- HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> IO
(TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
forall a. a -> IO (TVar a)
newTVarIO ((PMessageCallback -> [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString PMessageCallback
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall v1 v2 k. (v1 -> v2) -> HashMap k v1 -> HashMap k v2
HM.map (\PMessageCallback
z -> [(UnregisterHandle
0,PMessageCallback
z)]) (HashMap ByteString PMessageCallback
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString PMessageCallback
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall a b. (a -> b) -> a -> b
$ [(ByteString, PMessageCallback)]
-> HashMap ByteString PMessageCallback
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HM.fromList [(ByteString, PMessageCallback)]
y)
TBQueue PubSub
c <- Natural -> IO (TBQueue PubSub)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
10
TVar Int
pending <- Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0
TVar UnregisterHandle
lastId <- UnregisterHandle -> IO (TVar UnregisterHandle)
forall a. a -> IO (TVar a)
newTVarIO UnregisterHandle
0
PubSubController -> IO PubSubController
forall (m :: * -> *) a. Monad m => a -> m a
return (PubSubController -> IO PubSubController)
-> PubSubController -> IO PubSubController
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> TBQueue PubSub
-> TVar Int
-> TVar UnregisterHandle
-> PubSubController
PubSubController TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
cbs TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcbs TBQueue PubSub
c TVar Int
pending TVar UnregisterHandle
lastId
#if __GLASGOW_HASKELL__ < 710
currentChannels :: (MonadIO m, Functor m) => PubSubController -> m [RedisChannel]
#else
currentChannels :: MonadIO m => PubSubController -> m [RedisChannel]
#endif
currentChannels :: PubSubController -> m [ByteString]
currentChannels PubSubController
ctrl = HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> [ByteString]
forall k v. HashMap k v -> [k]
HM.keys (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> [ByteString])
-> m (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> m [ByteString]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (IO (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> m (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> m (HashMap
ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> IO
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> m (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ STM (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. STM a -> IO a
atomically (STM (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl)
#if __GLASGOW_HASKELL__ < 710
currentPChannels :: (MonadIO m, Functor m) => PubSubController -> m [RedisPChannel]
#else
currentPChannels :: MonadIO m => PubSubController -> m [RedisPChannel]
#endif
currentPChannels :: PubSubController -> m [ByteString]
currentPChannels PubSubController
ctrl = HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> [ByteString]
forall k v. HashMap k v -> [k]
HM.keys (HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> [ByteString])
-> m (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> m [ByteString]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> m (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> m (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> m (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. STM a -> IO a
atomically (STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl)
addChannels :: MonadIO m => PubSubController
-> [(RedisChannel, MessageCallback)]
-> [(RedisPChannel, PMessageCallback)]
-> m UnregisterCallbacksAction
addChannels :: PubSubController
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
addChannels PubSubController
_ [] [] = IO () -> m (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> m (IO ())) -> IO () -> m (IO ())
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
addChannels PubSubController
ctrl [(ByteString, ByteString -> IO ())]
newChans [(ByteString, PMessageCallback)]
newPChans = IO (IO ()) -> m (IO ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IO ()) -> m (IO ())) -> IO (IO ()) -> m (IO ())
forall a b. (a -> b) -> a -> b
$ do
UnregisterHandle
ident <- STM UnregisterHandle -> IO UnregisterHandle
forall a. STM a -> IO a
atomically (STM UnregisterHandle -> IO UnregisterHandle)
-> STM UnregisterHandle -> IO UnregisterHandle
forall a b. (a -> b) -> a -> b
$ do
TVar UnregisterHandle
-> (UnregisterHandle -> UnregisterHandle) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar UnregisterHandle
lastUsedCallbackId PubSubController
ctrl) (UnregisterHandle -> UnregisterHandle -> UnregisterHandle
forall a. Num a => a -> a -> a
+UnregisterHandle
1)
UnregisterHandle
ident <- TVar UnregisterHandle -> STM UnregisterHandle
forall a. TVar a -> STM a
readTVar (TVar UnregisterHandle -> STM UnregisterHandle)
-> TVar UnregisterHandle -> STM UnregisterHandle
forall a b. (a -> b) -> a -> b
$ PubSubController -> TVar UnregisterHandle
lastUsedCallbackId PubSubController
ctrl
HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm <- TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl
HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm <- TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl
let newChans' :: [ByteString]
newChans' = [ ByteString
n | (ByteString
n,ByteString -> IO ()
_) <- [(ByteString, ByteString -> IO ())]
newChans, Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm]
newPChans' :: [ByteString]
newPChans' = [ ByteString
n | (ByteString
n, PMessageCallback
_) <- [(ByteString, PMessageCallback)]
newPChans, Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm]
ps :: PubSub
ps = [ByteString] -> PubSub
subscribe [ByteString]
newChans' PubSub -> PubSub -> PubSub
forall a. Monoid a => a -> a -> a
`mappend` [ByteString] -> PubSub
psubscribe [ByteString]
newPChans'
TBQueue PubSub -> PubSub -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl) PubSub
ps
TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl) (([(UnregisterHandle, ByteString -> IO ())]
-> [(UnregisterHandle, ByteString -> IO ())]
-> [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> HashMap k v -> HashMap k v -> HashMap k v
HM.unionWith [(UnregisterHandle, ByteString -> IO ())]
-> [(UnregisterHandle, ByteString -> IO ())]
-> [(UnregisterHandle, ByteString -> IO ())]
forall a. [a] -> [a] -> [a]
(++) HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm (((ByteString -> IO ())
-> [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString (ByteString -> IO ())
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\ByteString -> IO ()
z -> [(UnregisterHandle
ident,ByteString -> IO ()
z)]) (HashMap ByteString (ByteString -> IO ())
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString (ByteString -> IO ())
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall a b. (a -> b) -> a -> b
$ [(ByteString, ByteString -> IO ())]
-> HashMap ByteString (ByteString -> IO ())
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HM.fromList [(ByteString, ByteString -> IO ())]
newChans))
TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl) (([(UnregisterHandle, PMessageCallback)]
-> [(UnregisterHandle, PMessageCallback)]
-> [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> HashMap k v -> HashMap k v -> HashMap k v
HM.unionWith [(UnregisterHandle, PMessageCallback)]
-> [(UnregisterHandle, PMessageCallback)]
-> [(UnregisterHandle, PMessageCallback)]
forall a. [a] -> [a] -> [a]
(++) HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm ((PMessageCallback -> [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString PMessageCallback
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\PMessageCallback
z -> [(UnregisterHandle
ident,PMessageCallback
z)]) (HashMap ByteString PMessageCallback
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString PMessageCallback
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall a b. (a -> b) -> a -> b
$ [(ByteString, PMessageCallback)]
-> HashMap ByteString PMessageCallback
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HM.fromList [(ByteString, PMessageCallback)]
newPChans))
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ PubSub -> Int
totalPendingChanges PubSub
ps)
UnregisterHandle -> STM UnregisterHandle
forall (m :: * -> *) a. Monad m => a -> m a
return UnregisterHandle
ident
IO () -> IO (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> IO (IO ())) -> IO () -> IO (IO ())
forall a b. (a -> b) -> a -> b
$ PubSubController
-> [ByteString] -> [ByteString] -> UnregisterHandle -> IO ()
unsubChannels PubSubController
ctrl (((ByteString, ByteString -> IO ()) -> ByteString)
-> [(ByteString, ByteString -> IO ())] -> [ByteString]
forall a b. (a -> b) -> [a] -> [b]
map (ByteString, ByteString -> IO ()) -> ByteString
forall a b. (a, b) -> a
fst [(ByteString, ByteString -> IO ())]
newChans) (((ByteString, PMessageCallback) -> ByteString)
-> [(ByteString, PMessageCallback)] -> [ByteString]
forall a b. (a -> b) -> [a] -> [b]
map (ByteString, PMessageCallback) -> ByteString
forall a b. (a, b) -> a
fst [(ByteString, PMessageCallback)]
newPChans) UnregisterHandle
ident
addChannelsAndWait :: MonadIO m => PubSubController
-> [(RedisChannel, MessageCallback)]
-> [(RedisPChannel, PMessageCallback)]
-> m UnregisterCallbacksAction
addChannelsAndWait :: PubSubController
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
addChannelsAndWait PubSubController
_ [] [] = IO () -> m (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> m (IO ())) -> IO () -> m (IO ())
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
addChannelsAndWait PubSubController
ctrl [(ByteString, ByteString -> IO ())]
newChans [(ByteString, PMessageCallback)]
newPChans = do
IO ()
unreg <- PubSubController
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
forall (m :: * -> *).
MonadIO m =>
PubSubController
-> [(ByteString, ByteString -> IO ())]
-> [(ByteString, PMessageCallback)]
-> m (IO ())
addChannels PubSubController
ctrl [(ByteString, ByteString -> IO ())]
newChans [(ByteString, PMessageCallback)]
newPChans
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int
r <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl)
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
r Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) STM ()
forall a. STM a
retry
IO () -> m (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return IO ()
unreg
removeChannels :: MonadIO m => PubSubController
-> [RedisChannel]
-> [RedisPChannel]
-> m ()
removeChannels :: PubSubController -> [ByteString] -> [ByteString] -> m ()
removeChannels PubSubController
_ [] [] = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
removeChannels PubSubController
ctrl [ByteString]
remChans [ByteString]
remPChans = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm <- TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl
HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm <- TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl
let remChans' :: [ByteString]
remChans' = (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm) [ByteString]
remChans
remPChans' :: [ByteString]
remPChans' = (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm) [ByteString]
remPChans
ps :: PubSub
ps = (if [ByteString] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ByteString]
remChans' then PubSub
forall a. Monoid a => a
mempty else [ByteString] -> PubSub
unsubscribe [ByteString]
remChans')
PubSub -> PubSub -> PubSub
forall a. Monoid a => a -> a -> a
`mappend` (if [ByteString] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ByteString]
remPChans' then PubSub
forall a. Monoid a => a
mempty else [ByteString] -> PubSub
punsubscribe [ByteString]
remPChans')
TBQueue PubSub -> PubSub -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl) PubSub
ps
TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl) ((HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> [ByteString]
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' ((ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HM.delete) HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm [ByteString]
remChans')
TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl) ((HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> [ByteString]
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' ((ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HM.delete) HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm [ByteString]
remPChans')
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ PubSub -> Int
totalPendingChanges PubSub
ps)
unsubChannels :: PubSubController -> [RedisChannel] -> [RedisPChannel] -> UnregisterHandle -> IO ()
unsubChannels :: PubSubController
-> [ByteString] -> [ByteString] -> UnregisterHandle -> IO ()
unsubChannels PubSubController
ctrl [ByteString]
chans [ByteString]
pchans UnregisterHandle
h = IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm <- TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl
HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm <- TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl
let remChans :: [ByteString]
remChans = (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm) [ByteString]
chans
remPChans :: [ByteString]
remPChans = (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm) [ByteString]
pchans
let filterHandle :: Maybe [(UnregisterHandle,a)] -> Maybe [(UnregisterHandle,a)]
filterHandle :: Maybe [(UnregisterHandle, a)] -> Maybe [(UnregisterHandle, a)]
filterHandle Maybe [(UnregisterHandle, a)]
Nothing = Maybe [(UnregisterHandle, a)]
forall a. Maybe a
Nothing
filterHandle (Just [(UnregisterHandle, a)]
lst) = case ((UnregisterHandle, a) -> Bool)
-> [(UnregisterHandle, a)] -> [(UnregisterHandle, a)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(UnregisterHandle, a)
x -> (UnregisterHandle, a) -> UnregisterHandle
forall a b. (a, b) -> a
fst (UnregisterHandle, a)
x UnregisterHandle -> UnregisterHandle -> Bool
forall a. Eq a => a -> a -> Bool
/= UnregisterHandle
h) [(UnregisterHandle, a)]
lst of
[] -> Maybe [(UnregisterHandle, a)]
forall a. Maybe a
Nothing
[(UnregisterHandle, a)]
xs -> [(UnregisterHandle, a)] -> Maybe [(UnregisterHandle, a)]
forall a. a -> Maybe a
Just [(UnregisterHandle, a)]
xs
let removeHandles :: HM.HashMap ByteString [(UnregisterHandle,a)]
-> ByteString
-> HM.HashMap ByteString [(UnregisterHandle,a)]
removeHandles :: HashMap ByteString [(UnregisterHandle, a)]
-> ByteString -> HashMap ByteString [(UnregisterHandle, a)]
removeHandles HashMap ByteString [(UnregisterHandle, a)]
m ByteString
k = case Maybe [(UnregisterHandle, a)] -> Maybe [(UnregisterHandle, a)]
forall a.
Maybe [(UnregisterHandle, a)] -> Maybe [(UnregisterHandle, a)]
filterHandle (ByteString
-> HashMap ByteString [(UnregisterHandle, a)]
-> Maybe [(UnregisterHandle, a)]
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup ByteString
k HashMap ByteString [(UnregisterHandle, a)]
m) of
Maybe [(UnregisterHandle, a)]
Nothing -> ByteString
-> HashMap ByteString [(UnregisterHandle, a)]
-> HashMap ByteString [(UnregisterHandle, a)]
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> HashMap k v
HM.delete ByteString
k HashMap ByteString [(UnregisterHandle, a)]
m
Just [(UnregisterHandle, a)]
v -> ByteString
-> [(UnregisterHandle, a)]
-> HashMap ByteString [(UnregisterHandle, a)]
-> HashMap ByteString [(UnregisterHandle, a)]
forall k v.
(Eq k, Hashable k) =>
k -> v -> HashMap k v -> HashMap k v
HM.insert ByteString
k [(UnregisterHandle, a)]
v HashMap ByteString [(UnregisterHandle, a)]
m
let cm' :: HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm' = (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> [ByteString]
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
forall a.
HashMap ByteString [(UnregisterHandle, a)]
-> ByteString -> HashMap ByteString [(UnregisterHandle, a)]
removeHandles HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm [ByteString]
remChans
pm' :: HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm' = (HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> [ByteString]
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
forall a.
HashMap ByteString [(UnregisterHandle, a)]
-> ByteString -> HashMap ByteString [(UnregisterHandle, a)]
removeHandles HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm [ByteString]
remPChans
let remChans' :: [ByteString]
remChans' = (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm') [ByteString]
remChans
remPChans' :: [ByteString]
remPChans' = (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
n -> Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> Bool
forall k a. (Eq k, Hashable k) => k -> HashMap k a -> Bool
HM.member ByteString
n HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm') [ByteString]
remPChans
ps :: PubSub
ps = (if [ByteString] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ByteString]
remChans' then PubSub
forall a. Monoid a => a
mempty else [ByteString] -> PubSub
unsubscribe [ByteString]
remChans')
PubSub -> PubSub -> PubSub
forall a. Monoid a => a -> a -> a
`mappend` (if [ByteString] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ByteString]
remPChans' then PubSub
forall a. Monoid a => a
mempty else [ByteString] -> PubSub
punsubscribe [ByteString]
remPChans')
TBQueue PubSub -> PubSub -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl) PubSub
ps
TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl) HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm'
TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl) HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm'
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ PubSub -> Int
totalPendingChanges PubSub
ps)
() -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
removeChannelsAndWait :: MonadIO m => PubSubController
-> [RedisChannel]
-> [RedisPChannel]
-> m ()
removeChannelsAndWait :: PubSubController -> [ByteString] -> [ByteString] -> m ()
removeChannelsAndWait PubSubController
_ [] [] = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
removeChannelsAndWait PubSubController
ctrl [ByteString]
remChans [ByteString]
remPChans = do
PubSubController -> [ByteString] -> [ByteString] -> m ()
forall (m :: * -> *).
MonadIO m =>
PubSubController -> [ByteString] -> [ByteString] -> m ()
removeChannels PubSubController
ctrl [ByteString]
remChans [ByteString]
remPChans
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int
r <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl)
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
r Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) STM ()
forall a. STM a
retry
listenThread :: PubSubController -> PP.Connection -> IO ()
listenThread :: PubSubController -> Connection -> IO ()
listenThread PubSubController
ctrl Connection
rawConn = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Reply
msg <- Connection -> IO Reply
PP.recv Connection
rawConn
case Reply -> PubSubReply
decodeMsg Reply
msg of
Msg (Message ByteString
channel ByteString
msgCt) -> do
HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm <- STM (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. STM a -> IO a
atomically (STM (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> IO
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl)
case ByteString
-> HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> Maybe [(UnregisterHandle, ByteString -> IO ())]
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup ByteString
channel HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm of
Maybe [(UnregisterHandle, ByteString -> IO ())]
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just [(UnregisterHandle, ByteString -> IO ())]
c -> ((UnregisterHandle, ByteString -> IO ()) -> IO ())
-> [(UnregisterHandle, ByteString -> IO ())] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(UnregisterHandle
_,ByteString -> IO ()
x) -> ByteString -> IO ()
x ByteString
msgCt) [(UnregisterHandle, ByteString -> IO ())]
c
Msg (PMessage ByteString
pattern ByteString
channel ByteString
msgCt) -> do
HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm <- STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. STM a -> IO a
atomically (STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> IO (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. TVar a -> STM a
readTVar (PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl)
case ByteString
-> HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> Maybe [(UnregisterHandle, PMessageCallback)]
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup ByteString
pattern HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm of
Maybe [(UnregisterHandle, PMessageCallback)]
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just [(UnregisterHandle, PMessageCallback)]
c -> ((UnregisterHandle, PMessageCallback) -> IO ())
-> [(UnregisterHandle, PMessageCallback)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(UnregisterHandle
_,PMessageCallback
x) -> PMessageCallback
x ByteString
channel ByteString
msgCt) [(UnregisterHandle, PMessageCallback)]
c
PubSubReply
Subscribed -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (\Int
x -> Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
Unsubscribed Int
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (\Int
x -> Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
sendThread :: PubSubController -> PP.Connection -> IO ()
sendThread :: PubSubController -> Connection -> IO ()
sendThread PubSubController
ctrl Connection
rawConn = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
PubSub{Cmd Unsubscribe Pattern
Cmd Unsubscribe Channel
Cmd Subscribe Pattern
Cmd Subscribe Channel
punsubs :: Cmd Unsubscribe Pattern
psubs :: Cmd Subscribe Pattern
unsubs :: Cmd Unsubscribe Channel
subs :: Cmd Subscribe Channel
punsubs :: PubSub -> Cmd Unsubscribe Pattern
psubs :: PubSub -> Cmd Subscribe Pattern
unsubs :: PubSub -> Cmd Unsubscribe Channel
subs :: PubSub -> Cmd Subscribe Channel
..} <- STM PubSub -> IO PubSub
forall a. STM a -> IO a
atomically (STM PubSub -> IO PubSub) -> STM PubSub -> IO PubSub
forall a b. (a -> b) -> a -> b
$ TBQueue PubSub -> STM PubSub
forall a. TBQueue a -> STM a
readTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl)
Connection -> Cmd Subscribe Channel -> IO ()
forall a b. Command (Cmd a b) => Connection -> Cmd a b -> IO ()
rawSendCmd Connection
rawConn Cmd Subscribe Channel
subs
Connection -> Cmd Unsubscribe Channel -> IO ()
forall a b. Command (Cmd a b) => Connection -> Cmd a b -> IO ()
rawSendCmd Connection
rawConn Cmd Unsubscribe Channel
unsubs
Connection -> Cmd Subscribe Pattern -> IO ()
forall a b. Command (Cmd a b) => Connection -> Cmd a b -> IO ()
rawSendCmd Connection
rawConn Cmd Subscribe Pattern
psubs
Connection -> Cmd Unsubscribe Pattern -> IO ()
forall a b. Command (Cmd a b) => Connection -> Cmd a b -> IO ()
rawSendCmd Connection
rawConn Cmd Unsubscribe Pattern
punsubs
Connection -> IO ()
PP.flush Connection
rawConn
pubSubForever :: Connection.Connection
-> PubSubController
-> IO ()
-> IO ()
pubSubForever :: Connection -> PubSubController -> IO () -> IO ()
pubSubForever (Connection.NonClusteredConnection Pool Connection
pool) PubSubController
ctrl IO ()
onInitialLoad = Pool Connection -> (Connection -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Pool a -> (a -> m b) -> m b
withResource Pool Connection
pool ((Connection -> IO ()) -> IO ()) -> (Connection -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
rawConn -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
let loop :: STM ()
loop = TBQueue PubSub -> STM (Maybe PubSub)
forall a. TBQueue a -> STM (Maybe a)
tryReadTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl) STM (Maybe PubSub) -> (Maybe PubSub -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
\Maybe PubSub
x -> if Maybe PubSub -> Bool
forall a. Maybe a -> Bool
isJust Maybe PubSub
x then STM ()
loop else () -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
STM ()
loop
HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm <- TVar (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a. TVar a -> STM a
readTVar (TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]))
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
-> STM
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar
(HashMap ByteString [(UnregisterHandle, ByteString -> IO ())])
callbacks PubSubController
ctrl
HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm <- TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a. TVar a -> STM a
readTVar (TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)]))
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
-> STM (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
forall a b. (a -> b) -> a -> b
$ PubSubController
-> TVar (HashMap ByteString [(UnregisterHandle, PMessageCallback)])
pcallbacks PubSubController
ctrl
let ps :: PubSub
ps = [ByteString] -> PubSub
subscribe (HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
-> [ByteString]
forall k v. HashMap k v -> [k]
HM.keys HashMap ByteString [(UnregisterHandle, ByteString -> IO ())]
cm) PubSub -> PubSub -> PubSub
forall a. Monoid a => a -> a -> a
`mappend` [ByteString] -> PubSub
psubscribe (HashMap ByteString [(UnregisterHandle, PMessageCallback)]
-> [ByteString]
forall k v. HashMap k v -> [k]
HM.keys HashMap ByteString [(UnregisterHandle, PMessageCallback)]
pm)
TBQueue PubSub -> PubSub -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue (PubSubController -> TBQueue PubSub
sendChanges PubSubController
ctrl) PubSub
ps
TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) (PubSub -> Int
totalPendingChanges PubSub
ps)
IO () -> (Async () -> IO ()) -> IO ()
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (PubSubController -> Connection -> IO ()
listenThread PubSubController
ctrl Connection
rawConn) ((Async () -> IO ()) -> IO ()) -> (Async () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async ()
listenT ->
IO () -> (Async () -> IO ()) -> IO ()
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (PubSubController -> Connection -> IO ()
sendThread PubSubController
ctrl Connection
rawConn) ((Async () -> IO ()) -> IO ()) -> (Async () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async ()
sendT -> do
Either
(Either (Either SomeException ()) (Either SomeException ())) ()
mret <- STM
(Either
(Either (Either SomeException ()) (Either SomeException ())) ())
-> IO
(Either
(Either (Either SomeException ()) (Either SomeException ())) ())
forall a. STM a -> IO a
atomically (STM
(Either
(Either (Either SomeException ()) (Either SomeException ())) ())
-> IO
(Either
(Either (Either SomeException ()) (Either SomeException ())) ()))
-> STM
(Either
(Either (Either SomeException ()) (Either SomeException ())) ())
-> IO
(Either
(Either (Either SomeException ()) (Either SomeException ())) ())
forall a b. (a -> b) -> a -> b
$
(Either (Either SomeException ()) (Either SomeException ())
-> Either
(Either (Either SomeException ()) (Either SomeException ())) ()
forall a b. a -> Either a b
Left (Either (Either SomeException ()) (Either SomeException ())
-> Either
(Either (Either SomeException ()) (Either SomeException ())) ())
-> STM (Either (Either SomeException ()) (Either SomeException ()))
-> STM
(Either
(Either (Either SomeException ()) (Either SomeException ())) ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Async ()
-> Async ()
-> STM (Either (Either SomeException ()) (Either SomeException ()))
forall a b.
Async a
-> Async b
-> STM (Either (Either SomeException a) (Either SomeException b))
waitEitherCatchSTM Async ()
listenT Async ()
sendT))
STM
(Either
(Either (Either SomeException ()) (Either SomeException ())) ())
-> STM
(Either
(Either (Either SomeException ()) (Either SomeException ())) ())
-> STM
(Either
(Either (Either SomeException ()) (Either SomeException ())) ())
forall a. STM a -> STM a -> STM a
`orElse`
(()
-> Either
(Either (Either SomeException ()) (Either SomeException ())) ()
forall a b. b -> Either a b
Right (()
-> Either
(Either (Either SomeException ()) (Either SomeException ())) ())
-> STM ()
-> STM
(Either
(Either (Either SomeException ()) (Either SomeException ())) ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar (PubSubController -> TVar Int
pendingCnt PubSubController
ctrl) STM Int -> (Int -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
\Int
x -> if Int
x Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 then STM ()
forall a. STM a
retry else () -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()))
case Either
(Either (Either SomeException ()) (Either SomeException ())) ()
mret of
Right () -> IO ()
onInitialLoad
Either
(Either (Either SomeException ()) (Either SomeException ())) ()
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Either (Either SomeException ()) (Either SomeException ())
merr <- Async ()
-> Async ()
-> IO (Either (Either SomeException ()) (Either SomeException ()))
forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch Async ()
listenT Async ()
sendT
case Either (Either SomeException ()) (Either SomeException ())
merr of
(Right (Left SomeException
err)) -> SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO SomeException
err
(Left (Left SomeException
err)) -> SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO SomeException
err
Either (Either SomeException ()) (Either SomeException ())
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
pubSubForever (Connection.ClusteredConnection MVar ShardMap
_ Pool Connection
_) PubSubController
_ IO ()
_ = IO ()
forall a. HasCallStack => a
undefined
decodeMsg :: Reply -> PubSubReply
decodeMsg :: Reply -> PubSubReply
decodeMsg r :: Reply
r@(MultiBulk (Just (Reply
r0:Reply
r1:Reply
r2:[Reply]
rs))) = (Reply -> PubSubReply)
-> (PubSubReply -> PubSubReply)
-> Either Reply PubSubReply
-> PubSubReply
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (Reply -> Reply -> PubSubReply
forall a. Reply -> a
errMsg Reply
r) PubSubReply -> PubSubReply
forall a. a -> a
id (Either Reply PubSubReply -> PubSubReply)
-> Either Reply PubSubReply -> PubSubReply
forall a b. (a -> b) -> a -> b
$ do
ByteString
kind <- Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r0
case ByteString
kind :: ByteString of
ByteString
"message" -> Message -> PubSubReply
Msg (Message -> PubSubReply)
-> Either Reply Message -> Either Reply PubSubReply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply Message
decodeMessage
ByteString
"pmessage" -> Message -> PubSubReply
Msg (Message -> PubSubReply)
-> Either Reply Message -> Either Reply PubSubReply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply Message
decodePMessage
ByteString
"subscribe" -> PubSubReply -> Either Reply PubSubReply
forall (m :: * -> *) a. Monad m => a -> m a
return PubSubReply
Subscribed
ByteString
"psubscribe" -> PubSubReply -> Either Reply PubSubReply
forall (m :: * -> *) a. Monad m => a -> m a
return PubSubReply
Subscribed
ByteString
"unsubscribe" -> Int -> PubSubReply
Unsubscribed (Int -> PubSubReply)
-> Either Reply Int -> Either Reply PubSubReply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply Int
decodeCnt
ByteString
"punsubscribe" -> Int -> PubSubReply
Unsubscribed (Int -> PubSubReply)
-> Either Reply Int -> Either Reply PubSubReply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either Reply Int
decodeCnt
ByteString
_ -> Reply -> Either Reply PubSubReply
forall a. Reply -> a
errMsg Reply
r
where
decodeMessage :: Either Reply Message
decodeMessage = ByteString -> ByteString -> Message
Message (ByteString -> ByteString -> Message)
-> Either Reply ByteString -> Either Reply (ByteString -> Message)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r1 Either Reply (ByteString -> Message)
-> Either Reply ByteString -> Either Reply Message
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r2
decodePMessage :: Either Reply Message
decodePMessage = ByteString -> ByteString -> ByteString -> Message
PMessage (ByteString -> ByteString -> ByteString -> Message)
-> Either Reply ByteString
-> Either Reply (ByteString -> ByteString -> Message)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r1 Either Reply (ByteString -> ByteString -> Message)
-> Either Reply ByteString -> Either Reply (ByteString -> Message)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r2 Either Reply (ByteString -> Message)
-> Either Reply ByteString -> Either Reply Message
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Reply -> Either Reply ByteString
forall a. RedisResult a => Reply -> Either Reply a
decode ([Reply] -> Reply
forall a. [a] -> a
head [Reply]
rs)
decodeCnt :: Either Reply Int
decodeCnt = Integer -> Int
forall a. Num a => Integer -> a
fromInteger (Integer -> Int) -> Either Reply Integer -> Either Reply Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Reply -> Either Reply Integer
forall a. RedisResult a => Reply -> Either Reply a
decode Reply
r2
decodeMsg Reply
r = Reply -> PubSubReply
forall a. Reply -> a
errMsg Reply
r
errMsg :: Reply -> a
errMsg :: Reply -> a
errMsg Reply
r = String -> a
forall a. HasCallStack => String -> a
error (String -> a) -> String -> a
forall a b. (a -> b) -> a -> b
$ String
"Hedis: expected pub/sub-message but got: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Reply -> String
forall a. Show a => a -> String
show Reply
r