{-# LANGUAGE CPP, OverloadedStrings, RecordWildCards, EmptyDataDecls,
    FlexibleInstances, FlexibleContexts, GeneralizedNewtypeDeriving #-}

module Database.Redis.PubSub (
    publish,

    -- ** Subscribing to channels
    -- $pubsubexpl

    -- *** Single-thread Pub/Sub
    pubSub,
    Message(..),
    PubSub(),
    subscribe, unsubscribe, psubscribe, punsubscribe,
    -- *** Continuous Pub/Sub message controller
    pubSubForever,
    RedisChannel, RedisPChannel, MessageCallback, PMessageCallback,
    PubSubController, newPubSubController, currentChannels, currentPChannels,
    addChannels, addChannelsAndWait, removeChannels, removeChannelsAndWait,
    UnregisterCallbacksAction
) where

#if __GLASGOW_HASKELL__ < 710
import Control.Applicative
import Data.Monoid
#endif
import Control.Concurrent.Async (withAsync, waitEitherCatch, waitEitherCatchSTM)
import Control.Concurrent.STM
import Control.Exception (throwIO)
import Control.Monad
import Control.Monad.State
import Data.ByteString.Char8 (ByteString)
import Data.List (foldl')
import Data.Maybe (isJust)
import Data.Pool
import qualified Data.HashMap.Strict as HM
import qualified Database.Redis.Core as Core
import qualified Database.Redis.ProtocolPipelining as PP
import Database.Redis.Protocol (Reply(..), renderRequest)
import Database.Redis.Types

-- |While in PubSub mode, we keep track of the number of current subscriptions
--  (as reported by Redis replies) and the number of messages we expect to
--  receive after a SUBSCRIBE or PSUBSCRIBE command. We can safely leave the
--  PubSub mode when both these numbers are zero.
data PubSubState = PubSubState { subCnt, pending :: Int }

modifyPending :: (MonadState PubSubState m) => (Int -> Int) -> m ()
modifyPending f = modify $ \s -> s{ pending = f (pending s) }

putSubCnt :: (MonadState PubSubState m) => Int -> m ()
putSubCnt n = modify $ \s -> s{ subCnt = n }

data Subscribe
data Unsubscribe
data Channel
data Pattern

-- |Encapsulates subscription changes. Use 'subscribe', 'unsubscribe',
--  'psubscribe', 'punsubscribe' or 'mempty' to construct a value. Combine
--  values by using the 'Monoid' interface, i.e. 'mappend' and 'mconcat'.
data PubSub = PubSub
    { subs    :: Cmd Subscribe Channel
    , unsubs  :: Cmd Unsubscribe Channel
    , psubs   :: Cmd Subscribe Pattern
    , punsubs :: Cmd Unsubscribe Pattern
    } deriving (Eq)

instance Monoid PubSub where
    mempty        = PubSub mempty mempty mempty mempty
    mappend p1 p2 = PubSub { subs    = subs p1 `mappend` subs p2
                           , unsubs  = unsubs p1 `mappend` unsubs p2
                           , psubs   = psubs p1 `mappend` psubs p2
                           , punsubs = punsubs p1 `mappend` punsubs p2
                           }

data Cmd a b = DoNothing | Cmd { changes :: [ByteString] } deriving (Eq)

instance Monoid (Cmd Subscribe a) where
    mempty                      = DoNothing
    mappend DoNothing x         = x
    mappend x         DoNothing = x
    mappend (Cmd xs)  (Cmd ys)  = Cmd (xs ++ ys)
    
instance Monoid (Cmd Unsubscribe a) where
    mempty                       = DoNothing
    mappend DoNothing x          = x
    mappend x         DoNothing  = x
    -- empty subscription list => unsubscribe all channels and patterns
    mappend (Cmd [])  _          = Cmd []
    mappend _         (Cmd [])   = Cmd []
    mappend (Cmd xs)  (Cmd ys)   = Cmd (xs ++ ys)


class Command a where
    redisCmd      :: a -> ByteString
    updatePending :: a -> Int -> Int

sendCmd :: (Command (Cmd a b)) => Cmd a b -> StateT PubSubState Core.Redis ()
sendCmd DoNothing = return ()
sendCmd cmd       = do
    lift $ Core.send (redisCmd cmd : changes cmd)
    modifyPending (updatePending cmd)

cmdCount :: Cmd a b -> Int
cmdCount DoNothing = 0
cmdCount (Cmd c) = length c

totalPendingChanges :: PubSub -> Int
totalPendingChanges (PubSub{..}) =
  cmdCount subs + cmdCount unsubs + cmdCount psubs + cmdCount punsubs

rawSendCmd :: (Command (Cmd a b)) => PP.Connection -> Cmd a b -> IO ()
rawSendCmd _ DoNothing = return ()
rawSendCmd conn cmd    = PP.send conn $ renderRequest $ redisCmd cmd : changes cmd

plusChangeCnt :: Cmd a b -> Int -> Int
plusChangeCnt DoNothing = id
plusChangeCnt (Cmd cs)  = (+ length cs)

instance Command (Cmd Subscribe Channel) where
    redisCmd      = const "SUBSCRIBE"
    updatePending = plusChangeCnt

instance Command (Cmd Subscribe Pattern) where
    redisCmd      = const "PSUBSCRIBE"
    updatePending = plusChangeCnt

instance Command (Cmd Unsubscribe Channel) where
    redisCmd      = const "UNSUBSCRIBE"
    updatePending = const id

instance Command (Cmd Unsubscribe Pattern) where
    redisCmd      = const "PUNSUBSCRIBE"
    updatePending = const id


data Message = Message  { msgChannel, msgMessage :: ByteString}
             | PMessage { msgPattern, msgChannel, msgMessage :: ByteString}
    deriving (Show)

data PubSubReply = Subscribed | Unsubscribed Int | Msg Message


------------------------------------------------------------------------------
-- Public Interface
--

-- |Post a message to a channel (<http://redis.io/commands/publish>).
publish
    :: (Core.RedisCtx m f)
    => ByteString -- ^ channel
    -> ByteString -- ^ message
    -> m (f Integer)
publish channel message =
    Core.sendRequest ["PUBLISH", channel, message]

-- |Listen for messages published to the given channels
--  (<http://redis.io/commands/subscribe>).
subscribe
    :: [ByteString] -- ^ channel
    -> PubSub
subscribe []       = mempty
subscribe cs = mempty{ subs = Cmd cs }

-- |Stop listening for messages posted to the given channels
--  (<http://redis.io/commands/unsubscribe>).
unsubscribe
    :: [ByteString] -- ^ channel
    -> PubSub
unsubscribe cs = mempty{ unsubs = Cmd cs }

-- |Listen for messages published to channels matching the given patterns 
--  (<http://redis.io/commands/psubscribe>).
psubscribe
    :: [ByteString] -- ^ pattern
    -> PubSub
psubscribe []       = mempty
psubscribe ps = mempty{ psubs = Cmd ps }

-- |Stop listening for messages posted to channels matching the given patterns 
--  (<http://redis.io/commands/punsubscribe>).
punsubscribe
    :: [ByteString] -- ^ pattern
    -> PubSub
punsubscribe ps = mempty{ punsubs = Cmd ps }

-- |Listens to published messages on subscribed channels and channels matching
--  the subscribed patterns. For documentation on the semantics of Redis
--  Pub\/Sub see <http://redis.io/topics/pubsub>.
--  
--  The given callback function is called for each received message. 
--  Subscription changes are triggered by the returned 'PubSub'. To keep
--  subscriptions unchanged, the callback can return 'mempty'.
--  
--  Example: Subscribe to the \"news\" channel indefinitely.
--
--  @
--  pubSub (subscribe [\"news\"]) $ \\msg -> do
--      putStrLn $ \"Message from \" ++ show (msgChannel msg)
--      return mempty
--  @
--
--  Example: Receive a single message from the \"chat\" channel.
--
--  @
--  pubSub (subscribe [\"chat\"]) $ \\msg -> do
--      putStrLn $ \"Message from \" ++ show (msgChannel msg)
--      return $ unsubscribe [\"chat\"]
--  @
--
-- It should be noted that Redis Pub\/Sub by its nature is asynchronous
-- so returning `unsubscribe` does not mean that callback won't be able
-- to receive any further messages. And to guarantee that you won't
-- won't process messages after unsubscription and won't unsubscribe
-- from the same channel more than once you need to use `IORef` or
-- something similar
--
pubSub
    :: PubSub                 -- ^ Initial subscriptions.
    -> (Message -> IO PubSub) -- ^ Callback function.
    -> Core.Redis ()
pubSub initial callback
    | initial == mempty = return ()
    | otherwise         = evalStateT (send initial) (PubSubState 0 0)
  where
    send :: PubSub -> StateT PubSubState Core.Redis ()
    send PubSub{..} = do
        sendCmd subs
        sendCmd unsubs
        sendCmd psubs
        sendCmd punsubs
        recv

    recv :: StateT PubSubState Core.Redis ()
    recv = do
        reply <- lift Core.recv
        case decodeMsg reply of
            Msg msg        -> liftIO (callback msg) >>= send
            Subscribed     -> modifyPending (subtract 1) >> recv
            Unsubscribed n -> do
                putSubCnt n
                PubSubState{..} <- get
                unless (subCnt == 0 && pending == 0) recv

-- | A Redis channel name
type RedisChannel = ByteString

-- | A Redis pattern channel name
type RedisPChannel = ByteString

-- | A handler for a message from a subscribed channel.
-- The callback is passed the message content.
--
-- Messages are processed synchronously in the receiving thread, so if the callback
-- takes a long time it will block other callbacks and other messages from being
-- received.  If you need to move long-running work to a different thread, we suggest
-- you use 'TBQueue' with a reasonable bound, so that if messages are arriving faster
-- than you can process them, you do eventually block.
--
-- If the callback throws an exception, the exception will be thrown from 'pubSubForever'
-- which will cause the entire Redis connection for all subscriptions to be closed.
-- As long as you call 'pubSubForever' in a loop you will reconnect to your subscribed
-- channels, but you should probably add an exception handler to each callback to
-- prevent this.
type MessageCallback = ByteString -> IO ()

-- | A handler for a message from a psubscribed channel.
-- The callback is passed the channel the message was sent on plus the message content.
--
-- Similar to 'MessageCallback', callbacks are executed synchronously and any exceptions
-- are rethrown from 'pubSubForever'.
type PMessageCallback = RedisChannel -> ByteString -> IO ()

-- | An action that when executed will unregister the callbacks.  It is returned from 'addChannels'
-- or 'addChannelsAndWait' and typically you would use it in 'bracket' to guarantee that you
-- unsubscribe from channels.  For example, if you are using websockets to distribute messages to
-- clients, you could use something such as:
--
-- > websocketConn <- Network.WebSockets.acceptRequest pending
-- > let mycallback msg = Network.WebSockets.sendTextData websocketConn msg
-- > bracket (addChannelsAndWait ctrl [("hello", mycallback)] []) id $ const $ do
-- >   {- loop here calling Network.WebSockets.receiveData -}
type UnregisterCallbacksAction = IO ()

newtype UnregisterHandle = UnregisterHandle Integer
  deriving (Eq, Show, Num)

-- | A controller that stores a set of channels, pattern channels, and callbacks.
-- It allows you to manage Pub/Sub subscriptions and pattern subscriptions and alter them at
-- any time throughout the life of your program.
-- You should typically create the controller at the start of your program and then store it
-- through the life of your program, using 'addChannels' and 'removeChannels' to update the
-- current subscriptions.
data PubSubController = PubSubController
  { callbacks :: TVar (HM.HashMap RedisChannel [(UnregisterHandle, MessageCallback)])
  , pcallbacks :: TVar (HM.HashMap RedisPChannel [(UnregisterHandle, PMessageCallback)])
  , sendChanges :: TBQueue PubSub
  , pendingCnt :: TVar Int
  , lastUsedCallbackId :: TVar UnregisterHandle
  }

-- | Create a new 'PubSubController'.  Note that this does not subscribe to any channels, it just
-- creates the controller.  The subscriptions will happen once 'pubSubForever' is called.
newPubSubController :: MonadIO m => [(RedisChannel, MessageCallback)] -- ^ the initial subscriptions
                                 -> [(RedisPChannel, PMessageCallback)] -- ^ the initial pattern subscriptions
                                 -> m PubSubController
newPubSubController x y = liftIO $ do
    cbs <- newTVarIO (HM.map (\z -> [(0,z)]) $ HM.fromList x)
    pcbs <- newTVarIO (HM.map (\z -> [(0,z)]) $ HM.fromList y)
    c <- newTBQueueIO 10
    pending <- newTVarIO 0
    lastId <- newTVarIO 0
    return $ PubSubController cbs pcbs c pending lastId

-- | Get the list of current channels in the 'PubSubController'.  WARNING! This might not
-- exactly reflect the subscribed channels in the Redis server, because there is a delay
-- between adding or removing a channel in the 'PubSubController' and when Redis receives
-- and processes the subscription change request.
#if __GLASGOW_HASKELL__ < 710
currentChannels :: (MonadIO m, Functor m) => PubSubController -> m [RedisChannel]
#else
currentChannels :: MonadIO m => PubSubController -> m [RedisChannel]
#endif
currentChannels ctrl = HM.keys <$> (liftIO $ atomically $ readTVar $ callbacks ctrl)

-- | Get the list of current pattern channels in the 'PubSubController'.  WARNING! This might not
-- exactly reflect the subscribed channels in the Redis server, because there is a delay
-- between adding or removing a channel in the 'PubSubController' and when Redis receives
-- and processes the subscription change request.
#if __GLASGOW_HASKELL__ < 710
currentPChannels :: (MonadIO m, Functor m) => PubSubController -> m [RedisPChannel]
#else
currentPChannels :: MonadIO m => PubSubController -> m [RedisPChannel]
#endif
currentPChannels ctrl = HM.keys <$> (liftIO $ atomically $ readTVar $ pcallbacks ctrl)

-- | Add channels into the 'PubSubController', and if there is an active 'pubSubForever', send the subscribe
-- and psubscribe commands to Redis.  The 'addChannels' function is thread-safe.  This function
-- does not wait for Redis to acknowledge that the channels have actually been subscribed; use
-- 'addChannelsAndWait' for that.
--
-- You can subscribe to the same channel or pattern channel multiple times; the 'PubSubController' keeps
-- a list of callbacks and executes each callback in response to a message.
--
-- The return value is an action 'UnregisterCallbacksAction' which will unregister the callbacks,
-- which should typically used with 'bracket'.
addChannels :: MonadIO m => PubSubController
                         -> [(RedisChannel, MessageCallback)] -- ^ the channels to subscribe to
                         -> [(RedisPChannel, PMessageCallback)] -- ^ the channels to pattern subscribe to
                         -> m UnregisterCallbacksAction
addChannels _ [] [] = return $ return ()
addChannels ctrl newChans newPChans = liftIO $ do
    ident <- atomically $ do
      modifyTVar (lastUsedCallbackId ctrl) (+1)
      ident <- readTVar $ lastUsedCallbackId ctrl
      cm <- readTVar $ callbacks ctrl
      pm <- readTVar $ pcallbacks ctrl
      let newChans' = [ n | (n,_) <- newChans, not $ HM.member n cm]
          newPChans' = [ n | (n, _) <- newPChans, not $ HM.member n pm]
          ps = subscribe newChans' `mappend` psubscribe newPChans'
      writeTBQueue (sendChanges ctrl) ps
      writeTVar (callbacks ctrl) (HM.unionWith (++) cm (fmap (\z -> [(ident,z)]) $ HM.fromList newChans))
      writeTVar (pcallbacks ctrl) (HM.unionWith (++) pm (fmap (\z -> [(ident,z)]) $ HM.fromList newPChans))
      modifyTVar (pendingCnt ctrl) (+ totalPendingChanges ps)
      return ident
    return $ unsubChannels ctrl (map fst newChans) (map fst newPChans) ident

-- | Call 'addChannels' and then wait for Redis to acknowledge that the channels are actually subscribed.
--
-- Note that this function waits for all pending subscription change requests, so if you for example call
-- 'addChannelsAndWait' from multiple threads simultaneously, they all will wait for all pending
-- subscription changes to be acknowledged by Redis (this is due to the fact that we just track the total
-- number of pending change requests sent to Redis and just wait until that count reaches zero).
--
-- This also correctly waits if the network connection dies during the subscription change.  Say that the
-- network connection dies right after we send a subscription change to Redis.  'pubSubForever' will throw
-- 'ConnectionLost' and 'addChannelsAndWait' will continue to wait.  Once you recall 'pubSubForever'
-- with the same 'PubSubController', 'pubSubForever' will open a new connection, send subscription commands
-- for all channels in the 'PubSubController' (which include the ones we are waiting for),
-- and wait for the responses from Redis.  Only once we receive the response from Redis that it has subscribed
-- to all channels in 'PubSubController' will 'addChannelsAndWait' unblock and return.
addChannelsAndWait :: MonadIO m => PubSubController
                                -> [(RedisChannel, MessageCallback)] -- ^ the channels to subscribe to
                                -> [(RedisPChannel, PMessageCallback)] -- ^ the channels to psubscribe to
                                -> m UnregisterCallbacksAction
addChannelsAndWait _ [] [] = return $ return ()
addChannelsAndWait ctrl newChans newPChans = do
  unreg <- addChannels ctrl newChans newPChans
  liftIO $ atomically $ do
    r <- readTVar (pendingCnt ctrl)
    when (r > 0) retry
  return unreg

-- | Remove channels from the 'PubSubController', and if there is an active 'pubSubForever', send the
-- unsubscribe commands to Redis.  Note that as soon as this function returns, no more callbacks will be
-- executed even if more messages arrive during the period when we request to unsubscribe from the channel
-- and Redis actually processes the unsubscribe request.  This function is thread-safe.
--
-- If you remove all channels, the connection in 'pubSubForever' to redis will stay open and waiting for
-- any new channels from a call to 'addChannels'.  If you really want to close the connection,
-- use 'Control.Concurrent.killThread' or 'Control.Concurrent.Async.cancel' to kill the thread running
-- 'pubSubForever'.
removeChannels :: MonadIO m => PubSubController
                            -> [RedisChannel]
                            -> [RedisPChannel]
                            -> m ()
removeChannels _ [] [] = return ()
removeChannels ctrl remChans remPChans = liftIO $ atomically $ do
    cm <- readTVar $ callbacks ctrl
    pm <- readTVar $ pcallbacks ctrl
    let remChans' = filter (\n -> HM.member n cm) remChans
        remPChans' = filter (\n -> HM.member n pm) remPChans
        ps =        (if null remChans' then mempty else unsubscribe remChans')
          `mappend` (if null remPChans' then mempty else punsubscribe remPChans')
    writeTBQueue (sendChanges ctrl) ps
    writeTVar (callbacks ctrl) (foldl' (flip HM.delete) cm remChans')
    writeTVar (pcallbacks ctrl) (foldl' (flip HM.delete) pm remPChans')
    modifyTVar (pendingCnt ctrl) (+ totalPendingChanges ps)

-- | Internal function to unsubscribe only from those channels matching the given handle.
unsubChannels :: PubSubController -> [RedisChannel] -> [RedisPChannel] -> UnregisterHandle -> IO ()
unsubChannels ctrl chans pchans h = liftIO $ atomically $ do
    cm <- readTVar $ callbacks ctrl
    pm <- readTVar $ pcallbacks ctrl

    -- only worry about channels that exist
    let remChans = filter (\n -> HM.member n cm) chans
        remPChans = filter (\n -> HM.member n pm) pchans

    -- helper functions to filter out handlers that match
    let filterHandle :: Maybe [(UnregisterHandle,a)] -> Maybe [(UnregisterHandle,a)]
        filterHandle Nothing = Nothing
        filterHandle (Just lst) = case filter (\x -> fst x /= h) lst of
                                    [] -> Nothing
                                    xs -> Just xs
    let removeHandles :: HM.HashMap ByteString [(UnregisterHandle,a)]
                      -> ByteString
                      -> HM.HashMap ByteString [(UnregisterHandle,a)]
        removeHandles m k = case filterHandle (HM.lookup k m) of -- recent versions of unordered-containers have alter
            Nothing -> HM.delete k m
            Just v -> HM.insert k v m

    -- maps after taking out channels matching the handle
    let cm' = foldl' removeHandles cm remChans
        pm' = foldl' removeHandles pm remPChans

    -- the channels to unsubscribe are those that no longer exist in cm' and pm'
    let remChans' = filter (\n -> not $ HM.member n cm') remChans
        remPChans' = filter (\n -> not $ HM.member n pm') remPChans
        ps =        (if null remChans' then mempty else unsubscribe remChans')
          `mappend` (if null remPChans' then mempty else punsubscribe remPChans')

    -- do the unsubscribe
    writeTBQueue (sendChanges ctrl) ps
    writeTVar (callbacks ctrl) cm'
    writeTVar (pcallbacks ctrl) pm'
    modifyTVar (pendingCnt ctrl) (+ totalPendingChanges ps)
    return ()

-- | Call 'removeChannels' and then wait for all pending subscription change requests to be acknowledged
-- by Redis.  This uses the same waiting logic as 'addChannelsAndWait'.  Since 'removeChannels' immediately
-- notifies the 'PubSubController' to start discarding messages, you likely don't need this function and
-- can just use 'removeChannels'.
removeChannelsAndWait :: MonadIO m => PubSubController
                                   -> [RedisChannel]
                                   -> [RedisPChannel]
                                   -> m ()
removeChannelsAndWait _ [] [] = return ()
removeChannelsAndWait ctrl remChans remPChans = do
  removeChannels ctrl remChans remPChans
  liftIO $ atomically $ do
    r <- readTVar (pendingCnt ctrl)
    when (r > 0) retry

-- | Internal thread which listens for messages and executes callbacks.
-- This is the only thread which ever receives data from the underlying
-- connection.
listenThread :: PubSubController -> PP.Connection -> IO ()
listenThread ctrl rawConn = forever $ do
    msg <- PP.recv rawConn
    case decodeMsg msg of
        Msg (Message channel msgCt) -> do
          cm <- atomically $ readTVar (callbacks ctrl)
          case HM.lookup channel cm of
            Nothing -> return ()
            Just c -> mapM_ (\(_,x) -> x msgCt) c
        Msg (PMessage pattern channel msgCt) -> do
          pm <- atomically $ readTVar (pcallbacks ctrl)
          case HM.lookup pattern pm of
            Nothing -> return ()
            Just c -> mapM_ (\(_,x) -> x channel msgCt) c
        Subscribed -> atomically $
          modifyTVar (pendingCnt ctrl) (\x -> x - 1)
        Unsubscribed _ -> atomically $
          modifyTVar (pendingCnt ctrl) (\x -> x - 1)

-- | Internal thread which sends subscription change requests.
-- This is the only thread which ever sends data on the underlying
-- connection.
sendThread :: PubSubController -> PP.Connection -> IO ()
sendThread ctrl rawConn = forever $ do
    PubSub{..} <- atomically $ readTBQueue (sendChanges ctrl)
    rawSendCmd rawConn subs
    rawSendCmd rawConn unsubs
    rawSendCmd rawConn psubs
    rawSendCmd rawConn punsubs
    -- normally, the socket is flushed during 'recv', but
    -- 'recv' could currently be blocking on a message.
    PP.flush rawConn

-- | Open a connection to the Redis server, register to all channels in the 'PubSubController',
-- and process messages and subscription change requests forever.  The only way this will ever
-- exit is if there is an exception from the network code or an unhandled exception
-- in a 'MessageCallback' or 'PMessageCallback'. For example, if the network connection to Redis
-- dies, 'pubSubForever' will throw a 'ConnectionLost'.  When such an exception is
-- thrown, you can recall 'pubSubForever' with the same 'PubSubController' which will open a
-- new connection and resubscribe to all the channels which are tracked in the 'PubSubController'.
--
-- The general pattern is therefore during program startup create a 'PubSubController' and fork
-- a thread which calls 'pubSubForever' in a loop (using an exponential backoff algorithm
-- such as the <https://hackage.haskell.org/package/retry retry> package to not hammer the Redis
-- server if it does die).  For example,
--
-- @
-- myhandler :: ByteString -> IO ()
-- myhandler msg = putStrLn $ unpack $ decodeUtf8 msg
--
-- onInitialComplete :: IO ()
-- onInitialComplete = putStrLn "Redis acknowledged that mychannel is now subscribed"
--
-- main :: IO ()
-- main = do
--   conn <- connect defaultConnectInfo
--   pubSubCtrl <- newPubSubController [("mychannel", myhandler)] []
--   forkIO $ forever $
--       pubSubForever conn pubSubCtrl onInitialComplete
--         \`catch\` (\\(e :: SomeException) -> do
--           putStrLn $ "Got error: " ++ show e
--           threadDelay $ 50*1000) -- TODO: use exponential backoff
--
--   {- elsewhere in your program, use pubSubCtrl to change subscriptions -}
-- @
--
-- At most one active 'pubSubForever' can be running against a single 'PubSubController' at any time.  If
-- two active calls to 'pubSubForever' share a single 'PubSubController' there will be deadlocks.  If
-- you do want to process messages using multiple connections to Redis, you can create more than one
-- 'PubSubController'.  For example, create one PubSubController for each 'Control.Concurrent.getNumCapabilities'
-- and then create a Haskell thread bound to each capability each calling 'pubSubForever' in a loop.
-- This will create one network connection per controller/capability and allow you to
-- register separate channels and callbacks for each controller, spreading the load across the capabilities.
pubSubForever :: Core.Connection -- ^ The connection pool
              -> PubSubController -- ^ The controller which keeps track of all subscriptions and handlers
              -> IO () -- ^ This action is executed once Redis acknowledges that all the subscriptions in
                       -- the controller are now subscribed.  You can use this after an exception (such as
                       -- 'ConnectionLost') to signal that all subscriptions are now reactivated.
              -> IO ()
pubSubForever (Core.Conn pool) ctrl onInitialLoad = withResource pool $ \rawConn -> do
    -- get initial subscriptions and write them into the queue.
    atomically $ do
      let loop = tryReadTBQueue (sendChanges ctrl) >>=
                   \x -> if isJust x then loop else return ()
      loop
      cm <- readTVar $ callbacks ctrl
      pm <- readTVar $ pcallbacks ctrl
      let ps = subscribe (HM.keys cm) `mappend` psubscribe (HM.keys pm)
      writeTBQueue (sendChanges ctrl) ps
      writeTVar (pendingCnt ctrl) (totalPendingChanges ps)

    withAsync (listenThread ctrl rawConn) $ \listenT ->
      withAsync (sendThread ctrl rawConn) $ \sendT -> do

        -- wait for initial subscription count to go to zero or for threads to fail
        mret <- atomically $
            (Left <$> (waitEitherCatchSTM listenT sendT))
          `orElse`
            (Right <$> (readTVar (pendingCnt ctrl) >>=
                           \x -> if x > 0 then retry else return ()))
        case mret of
          Right () -> onInitialLoad
          _ -> return () -- if there is an error, waitEitherCatch below will also see it

        -- wait for threads to end with error
        merr <- waitEitherCatch listenT sendT
        case merr of
          (Right (Left err)) -> throwIO err
          (Left (Left err)) -> throwIO err
          _ -> return ()  -- should never happen, since threads exit only with an error


------------------------------------------------------------------------------
-- Helpers
--
decodeMsg :: Reply -> PubSubReply
decodeMsg r@(MultiBulk (Just (r0:r1:r2:rs))) = either (errMsg r) id $ do
    kind <- decode r0
    case kind :: ByteString of
        "message"      -> Msg <$> decodeMessage
        "pmessage"     -> Msg <$> decodePMessage
        "subscribe"    -> return Subscribed
        "psubscribe"   -> return Subscribed
        "unsubscribe"  -> Unsubscribed <$> decodeCnt
        "punsubscribe" -> Unsubscribed <$> decodeCnt
        _              -> errMsg r
  where
    decodeMessage  = Message  <$> decode r1 <*> decode r2
    decodePMessage = PMessage <$> decode r1 <*> decode r2 <*> decode (head rs)
    decodeCnt      = fromInteger <$> decode r2

decodeMsg r = errMsg r

errMsg :: Reply -> a
errMsg r = error $ "Hedis: expected pub/sub-message but got: " ++ show r


-- $pubsubexpl
-- There are two Pub/Sub implementations.  First, there is a single-threaded implementation 'pubSub'
-- which is simpler to use but has the restriction that subscription changes can only be made in
-- response to a message.  Secondly, there is a more complicated Pub/Sub controller 'pubSubForever'
-- that uses concurrency to support changing subscriptions at any time but requires more setup.
-- You should only use one or the other.  In addition, no types or utility functions (that are part
-- of the public API) are shared, so functions or types in one of the following sections cannot
-- be used for the other.  In particular, be aware that they use different utility functions to subscribe
-- and unsubscribe to channels.