{-# LANGUAGE TupleSections #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE OverloadedStrings #-}
module Database.Redis.Connection where
import Control.Exception
import qualified Control.Monad.Catch as Catch
import Control.Monad.IO.Class(liftIO, MonadIO)
import Control.Monad(when)
import Control.Concurrent.MVar(MVar, newMVar)
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as Char8
import Data.Functor(void)
import qualified Data.IntMap.Strict as IntMap
import Data.Pool(Pool, withResource, createPool, destroyAllResources)
import Data.Typeable
import qualified Data.Time as Time
import Network.TLS (ClientParams)
import qualified Network.Socket as NS
import qualified Data.HashMap.Strict as HM
import qualified Database.Redis.ProtocolPipelining as PP
import Database.Redis.Core(Redis, runRedisInternal, runRedisClusteredInternal)
import Database.Redis.Protocol(Reply(..))
import Database.Redis.Cluster(ShardMap(..), Node, Shard(..))
import qualified Database.Redis.Cluster as Cluster
import qualified Database.Redis.ConnectionContext as CC
import Database.Redis.Commands
( ping
, select
, auth
, clusterSlots
, command
, ClusterSlotsResponse(..)
, ClusterSlotsResponseEntry(..)
, ClusterSlotsNode(..))
data Connection
= NonClusteredConnection (Pool PP.Connection)
| ClusteredConnection (MVar ShardMap) (Pool Cluster.Connection)
data ConnectInfo = ConnInfo
{ ConnectInfo -> HostName
connectHost :: NS.HostName
, ConnectInfo -> PortID
connectPort :: CC.PortID
, ConnectInfo -> Maybe ByteString
connectAuth :: Maybe B.ByteString
, ConnectInfo -> Integer
connectDatabase :: Integer
, ConnectInfo -> Int
connectMaxConnections :: Int
, ConnectInfo -> NominalDiffTime
connectMaxIdleTime :: Time.NominalDiffTime
, ConnectInfo -> Maybe NominalDiffTime
connectTimeout :: Maybe Time.NominalDiffTime
, ConnectInfo -> Maybe ClientParams
connectTLSParams :: Maybe ClientParams
} deriving Int -> ConnectInfo -> ShowS
[ConnectInfo] -> ShowS
ConnectInfo -> HostName
(Int -> ConnectInfo -> ShowS)
-> (ConnectInfo -> HostName)
-> ([ConnectInfo] -> ShowS)
-> Show ConnectInfo
forall a.
(Int -> a -> ShowS) -> (a -> HostName) -> ([a] -> ShowS) -> Show a
showList :: [ConnectInfo] -> ShowS
$cshowList :: [ConnectInfo] -> ShowS
show :: ConnectInfo -> HostName
$cshow :: ConnectInfo -> HostName
showsPrec :: Int -> ConnectInfo -> ShowS
$cshowsPrec :: Int -> ConnectInfo -> ShowS
Show
data ConnectError = ConnectAuthError Reply
| ConnectSelectError Reply
deriving (ConnectError -> ConnectError -> Bool
(ConnectError -> ConnectError -> Bool)
-> (ConnectError -> ConnectError -> Bool) -> Eq ConnectError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ConnectError -> ConnectError -> Bool
$c/= :: ConnectError -> ConnectError -> Bool
== :: ConnectError -> ConnectError -> Bool
$c== :: ConnectError -> ConnectError -> Bool
Eq, Int -> ConnectError -> ShowS
[ConnectError] -> ShowS
ConnectError -> HostName
(Int -> ConnectError -> ShowS)
-> (ConnectError -> HostName)
-> ([ConnectError] -> ShowS)
-> Show ConnectError
forall a.
(Int -> a -> ShowS) -> (a -> HostName) -> ([a] -> ShowS) -> Show a
showList :: [ConnectError] -> ShowS
$cshowList :: [ConnectError] -> ShowS
show :: ConnectError -> HostName
$cshow :: ConnectError -> HostName
showsPrec :: Int -> ConnectError -> ShowS
$cshowsPrec :: Int -> ConnectError -> ShowS
Show, Typeable)
instance Exception ConnectError
defaultConnectInfo :: ConnectInfo
defaultConnectInfo :: ConnectInfo
defaultConnectInfo = ConnInfo :: HostName
-> PortID
-> Maybe ByteString
-> Integer
-> Int
-> NominalDiffTime
-> Maybe NominalDiffTime
-> Maybe ClientParams
-> ConnectInfo
ConnInfo
{ connectHost :: HostName
connectHost = HostName
"localhost"
, connectPort :: PortID
connectPort = PortNumber -> PortID
CC.PortNumber PortNumber
6379
, connectAuth :: Maybe ByteString
connectAuth = Maybe ByteString
forall a. Maybe a
Nothing
, connectDatabase :: Integer
connectDatabase = Integer
0
, connectMaxConnections :: Int
connectMaxConnections = Int
50
, connectMaxIdleTime :: NominalDiffTime
connectMaxIdleTime = NominalDiffTime
30
, connectTimeout :: Maybe NominalDiffTime
connectTimeout = Maybe NominalDiffTime
forall a. Maybe a
Nothing
, connectTLSParams :: Maybe ClientParams
connectTLSParams = Maybe ClientParams
forall a. Maybe a
Nothing
}
createConnection :: ConnectInfo -> IO PP.Connection
createConnection :: ConnectInfo -> IO Connection
createConnection ConnInfo{Int
Integer
HostName
Maybe ByteString
Maybe NominalDiffTime
Maybe ClientParams
NominalDiffTime
PortID
connectTLSParams :: Maybe ClientParams
connectTimeout :: Maybe NominalDiffTime
connectMaxIdleTime :: NominalDiffTime
connectMaxConnections :: Int
connectDatabase :: Integer
connectAuth :: Maybe ByteString
connectPort :: PortID
connectHost :: HostName
connectTLSParams :: ConnectInfo -> Maybe ClientParams
connectTimeout :: ConnectInfo -> Maybe NominalDiffTime
connectMaxIdleTime :: ConnectInfo -> NominalDiffTime
connectMaxConnections :: ConnectInfo -> Int
connectDatabase :: ConnectInfo -> Integer
connectAuth :: ConnectInfo -> Maybe ByteString
connectPort :: ConnectInfo -> PortID
connectHost :: ConnectInfo -> HostName
..} = do
let timeoutOptUs :: Maybe Int
timeoutOptUs =
NominalDiffTime -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (NominalDiffTime -> Int)
-> (NominalDiffTime -> NominalDiffTime) -> NominalDiffTime -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (NominalDiffTime
1000000 NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
*) (NominalDiffTime -> Int) -> Maybe NominalDiffTime -> Maybe Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe NominalDiffTime
connectTimeout
Connection
conn <- HostName -> PortID -> Maybe Int -> IO Connection
PP.connect HostName
connectHost PortID
connectPort Maybe Int
timeoutOptUs
Connection
conn' <- case Maybe ClientParams
connectTLSParams of
Maybe ClientParams
Nothing -> Connection -> IO Connection
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn
Just ClientParams
tlsParams -> ClientParams -> Connection -> IO Connection
PP.enableTLS ClientParams
tlsParams Connection
conn
Connection -> IO ()
PP.beginReceiving Connection
conn'
Connection -> Redis () -> IO ()
forall a. Connection -> Redis a -> IO a
runRedisInternal Connection
conn' (Redis () -> IO ()) -> Redis () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
case Maybe ByteString
connectAuth of
Maybe ByteString
Nothing -> () -> Redis ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just ByteString
pass -> do
Either Reply Status
resp <- ByteString -> Redis (Either Reply Status)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
ByteString -> m (f Status)
auth ByteString
pass
case Either Reply Status
resp of
Left Reply
r -> IO () -> Redis ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Redis ()) -> IO () -> Redis ()
forall a b. (a -> b) -> a -> b
$ ConnectError -> IO ()
forall e a. Exception e => e -> IO a
throwIO (ConnectError -> IO ()) -> ConnectError -> IO ()
forall a b. (a -> b) -> a -> b
$ Reply -> ConnectError
ConnectAuthError Reply
r
Either Reply Status
_ -> () -> Redis ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Bool -> Redis () -> Redis ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Integer
connectDatabase Integer -> Integer -> Bool
forall a. Eq a => a -> a -> Bool
/= Integer
0) (Redis () -> Redis ()) -> Redis () -> Redis ()
forall a b. (a -> b) -> a -> b
$ do
Either Reply Status
resp <- Integer -> Redis (Either Reply Status)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
Integer -> m (f Status)
select Integer
connectDatabase
case Either Reply Status
resp of
Left Reply
r -> IO () -> Redis ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Redis ()) -> IO () -> Redis ()
forall a b. (a -> b) -> a -> b
$ ConnectError -> IO ()
forall e a. Exception e => e -> IO a
throwIO (ConnectError -> IO ()) -> ConnectError -> IO ()
forall a b. (a -> b) -> a -> b
$ Reply -> ConnectError
ConnectSelectError Reply
r
Either Reply Status
_ -> () -> Redis ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Connection -> IO Connection
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn'
connect :: ConnectInfo -> IO Connection
connect :: ConnectInfo -> IO Connection
connect cInfo :: ConnectInfo
cInfo@ConnInfo{Int
Integer
HostName
Maybe ByteString
Maybe NominalDiffTime
Maybe ClientParams
NominalDiffTime
PortID
connectTLSParams :: Maybe ClientParams
connectTimeout :: Maybe NominalDiffTime
connectMaxIdleTime :: NominalDiffTime
connectMaxConnections :: Int
connectDatabase :: Integer
connectAuth :: Maybe ByteString
connectPort :: PortID
connectHost :: HostName
connectTLSParams :: ConnectInfo -> Maybe ClientParams
connectTimeout :: ConnectInfo -> Maybe NominalDiffTime
connectMaxIdleTime :: ConnectInfo -> NominalDiffTime
connectMaxConnections :: ConnectInfo -> Int
connectDatabase :: ConnectInfo -> Integer
connectAuth :: ConnectInfo -> Maybe ByteString
connectPort :: ConnectInfo -> PortID
connectHost :: ConnectInfo -> HostName
..} = Pool Connection -> Connection
NonClusteredConnection (Pool Connection -> Connection)
-> IO (Pool Connection) -> IO Connection
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
IO Connection
-> (Connection -> IO ())
-> Int
-> NominalDiffTime
-> Int
-> IO (Pool Connection)
forall a.
IO a
-> (a -> IO ()) -> Int -> NominalDiffTime -> Int -> IO (Pool a)
createPool (ConnectInfo -> IO Connection
createConnection ConnectInfo
cInfo) Connection -> IO ()
PP.disconnect Int
1 NominalDiffTime
connectMaxIdleTime Int
connectMaxConnections
checkedConnect :: ConnectInfo -> IO Connection
checkedConnect :: ConnectInfo -> IO Connection
checkedConnect ConnectInfo
connInfo = do
Connection
conn <- ConnectInfo -> IO Connection
connect ConnectInfo
connInfo
Connection -> Redis () -> IO ()
forall a. Connection -> Redis a -> IO a
runRedis Connection
conn (Redis () -> IO ()) -> Redis () -> IO ()
forall a b. (a -> b) -> a -> b
$ Redis (Either Reply Status) -> Redis ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void Redis (Either Reply Status)
forall (m :: * -> *) (f :: * -> *). RedisCtx m f => m (f Status)
ping
Connection -> IO Connection
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn
disconnect :: Connection -> IO ()
disconnect :: Connection -> IO ()
disconnect (NonClusteredConnection Pool Connection
pool) = Pool Connection -> IO ()
forall a. Pool a -> IO ()
destroyAllResources Pool Connection
pool
disconnect (ClusteredConnection MVar ShardMap
_ Pool Connection
pool) = Pool Connection -> IO ()
forall a. Pool a -> IO ()
destroyAllResources Pool Connection
pool
withConnect :: (Catch.MonadMask m, MonadIO m) => ConnectInfo -> (Connection -> m c) -> m c
withConnect :: ConnectInfo -> (Connection -> m c) -> m c
withConnect ConnectInfo
connInfo = m Connection -> (Connection -> m ()) -> (Connection -> m c) -> m c
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
Catch.bracket (IO Connection -> m Connection
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Connection -> m Connection) -> IO Connection -> m Connection
forall a b. (a -> b) -> a -> b
$ ConnectInfo -> IO Connection
connect ConnectInfo
connInfo) (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Connection -> IO ()) -> Connection -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> IO ()
disconnect)
withCheckedConnect :: ConnectInfo -> (Connection -> IO c) -> IO c
withCheckedConnect :: ConnectInfo -> (Connection -> IO c) -> IO c
withCheckedConnect ConnectInfo
connInfo = IO Connection
-> (Connection -> IO ()) -> (Connection -> IO c) -> IO c
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (ConnectInfo -> IO Connection
checkedConnect ConnectInfo
connInfo) Connection -> IO ()
disconnect
runRedis :: Connection -> Redis a -> IO a
runRedis :: Connection -> Redis a -> IO a
runRedis (NonClusteredConnection Pool Connection
pool) Redis a
redis =
Pool Connection -> (Connection -> IO a) -> IO a
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Pool a -> (a -> m b) -> m b
withResource Pool Connection
pool ((Connection -> IO a) -> IO a) -> (Connection -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> Connection -> Redis a -> IO a
forall a. Connection -> Redis a -> IO a
runRedisInternal Connection
conn Redis a
redis
runRedis (ClusteredConnection MVar ShardMap
_ Pool Connection
pool) Redis a
redis =
Pool Connection -> (Connection -> IO a) -> IO a
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Pool a -> (a -> m b) -> m b
withResource Pool Connection
pool ((Connection -> IO a) -> IO a) -> (Connection -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> Connection -> IO ShardMap -> Redis a -> IO a
forall a. Connection -> IO ShardMap -> Redis a -> IO a
runRedisClusteredInternal Connection
conn (Connection -> IO ShardMap
refreshShardMap Connection
conn) Redis a
redis
newtype ClusterConnectError = ClusterConnectError Reply
deriving (ClusterConnectError -> ClusterConnectError -> Bool
(ClusterConnectError -> ClusterConnectError -> Bool)
-> (ClusterConnectError -> ClusterConnectError -> Bool)
-> Eq ClusterConnectError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ClusterConnectError -> ClusterConnectError -> Bool
$c/= :: ClusterConnectError -> ClusterConnectError -> Bool
== :: ClusterConnectError -> ClusterConnectError -> Bool
$c== :: ClusterConnectError -> ClusterConnectError -> Bool
Eq, Int -> ClusterConnectError -> ShowS
[ClusterConnectError] -> ShowS
ClusterConnectError -> HostName
(Int -> ClusterConnectError -> ShowS)
-> (ClusterConnectError -> HostName)
-> ([ClusterConnectError] -> ShowS)
-> Show ClusterConnectError
forall a.
(Int -> a -> ShowS) -> (a -> HostName) -> ([a] -> ShowS) -> Show a
showList :: [ClusterConnectError] -> ShowS
$cshowList :: [ClusterConnectError] -> ShowS
show :: ClusterConnectError -> HostName
$cshow :: ClusterConnectError -> HostName
showsPrec :: Int -> ClusterConnectError -> ShowS
$cshowsPrec :: Int -> ClusterConnectError -> ShowS
Show, Typeable)
instance Exception ClusterConnectError
connectCluster :: ConnectInfo -> IO Connection
connectCluster :: ConnectInfo -> IO Connection
connectCluster ConnectInfo
bootstrapConnInfo = do
Connection
conn <- ConnectInfo -> IO Connection
createConnection ConnectInfo
bootstrapConnInfo
Either Reply ClusterSlotsResponse
slotsResponse <- Connection
-> Redis (Either Reply ClusterSlotsResponse)
-> IO (Either Reply ClusterSlotsResponse)
forall a. Connection -> Redis a -> IO a
runRedisInternal Connection
conn Redis (Either Reply ClusterSlotsResponse)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
m (f ClusterSlotsResponse)
clusterSlots
MVar ShardMap
shardMapVar <- case Either Reply ClusterSlotsResponse
slotsResponse of
Left Reply
e -> ClusterConnectError -> IO (MVar ShardMap)
forall e a. Exception e => e -> IO a
throwIO (ClusterConnectError -> IO (MVar ShardMap))
-> ClusterConnectError -> IO (MVar ShardMap)
forall a b. (a -> b) -> a -> b
$ Reply -> ClusterConnectError
ClusterConnectError Reply
e
Right ClusterSlotsResponse
slots -> do
ShardMap
shardMap <- ClusterSlotsResponse -> IO ShardMap
shardMapFromClusterSlotsResponse ClusterSlotsResponse
slots
ShardMap -> IO (MVar ShardMap)
forall a. a -> IO (MVar a)
newMVar ShardMap
shardMap
Either Reply [CommandInfo]
commandInfos <- Connection
-> Redis (Either Reply [CommandInfo])
-> IO (Either Reply [CommandInfo])
forall a. Connection -> Redis a -> IO a
runRedisInternal Connection
conn Redis (Either Reply [CommandInfo])
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
m (f [CommandInfo])
command
case Either Reply [CommandInfo]
commandInfos of
Left Reply
e -> ClusterConnectError -> IO Connection
forall e a. Exception e => e -> IO a
throwIO (ClusterConnectError -> IO Connection)
-> ClusterConnectError -> IO Connection
forall a b. (a -> b) -> a -> b
$ Reply -> ClusterConnectError
ClusterConnectError Reply
e
Right [CommandInfo]
infos -> do
Pool Connection
pool <- IO Connection
-> (Connection -> IO ())
-> Int
-> NominalDiffTime
-> Int
-> IO (Pool Connection)
forall a.
IO a
-> (a -> IO ()) -> Int -> NominalDiffTime -> Int -> IO (Pool a)
createPool ([CommandInfo] -> MVar ShardMap -> Maybe Int -> IO Connection
Cluster.connect [CommandInfo]
infos MVar ShardMap
shardMapVar Maybe Int
forall a. Maybe a
Nothing) Connection -> IO ()
Cluster.disconnect Int
1 (ConnectInfo -> NominalDiffTime
connectMaxIdleTime ConnectInfo
bootstrapConnInfo) (ConnectInfo -> Int
connectMaxConnections ConnectInfo
bootstrapConnInfo)
Connection -> IO Connection
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection -> IO Connection) -> Connection -> IO Connection
forall a b. (a -> b) -> a -> b
$ MVar ShardMap -> Pool Connection -> Connection
ClusteredConnection MVar ShardMap
shardMapVar Pool Connection
pool
shardMapFromClusterSlotsResponse :: ClusterSlotsResponse -> IO ShardMap
shardMapFromClusterSlotsResponse :: ClusterSlotsResponse -> IO ShardMap
shardMapFromClusterSlotsResponse ClusterSlotsResponse{[ClusterSlotsResponseEntry]
clusterSlotsResponseEntries :: ClusterSlotsResponse -> [ClusterSlotsResponseEntry]
clusterSlotsResponseEntries :: [ClusterSlotsResponseEntry]
..} = IntMap Shard -> ShardMap
ShardMap (IntMap Shard -> ShardMap) -> IO (IntMap Shard) -> IO ShardMap
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (ClusterSlotsResponseEntry
-> IO (IntMap Shard) -> IO (IntMap Shard))
-> IO (IntMap Shard)
-> [ClusterSlotsResponseEntry]
-> IO (IntMap Shard)
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr ClusterSlotsResponseEntry -> IO (IntMap Shard) -> IO (IntMap Shard)
mkShardMap (IntMap Shard -> IO (IntMap Shard)
forall (f :: * -> *) a. Applicative f => a -> f a
pure IntMap Shard
forall a. IntMap a
IntMap.empty) [ClusterSlotsResponseEntry]
clusterSlotsResponseEntries where
mkShardMap :: ClusterSlotsResponseEntry -> IO (IntMap.IntMap Shard) -> IO (IntMap.IntMap Shard)
mkShardMap :: ClusterSlotsResponseEntry -> IO (IntMap Shard) -> IO (IntMap Shard)
mkShardMap ClusterSlotsResponseEntry{Int
[ClusterSlotsNode]
ClusterSlotsNode
clusterSlotsResponseEntryReplicas :: ClusterSlotsResponseEntry -> [ClusterSlotsNode]
clusterSlotsResponseEntryMaster :: ClusterSlotsResponseEntry -> ClusterSlotsNode
clusterSlotsResponseEntryEndSlot :: ClusterSlotsResponseEntry -> Int
clusterSlotsResponseEntryStartSlot :: ClusterSlotsResponseEntry -> Int
clusterSlotsResponseEntryReplicas :: [ClusterSlotsNode]
clusterSlotsResponseEntryMaster :: ClusterSlotsNode
clusterSlotsResponseEntryEndSlot :: Int
clusterSlotsResponseEntryStartSlot :: Int
..} IO (IntMap Shard)
accumulator = do
IntMap Shard
accumulated <- IO (IntMap Shard)
accumulator
let master :: Node
master = Bool -> ClusterSlotsNode -> Node
nodeFromClusterSlotNode Bool
True ClusterSlotsNode
clusterSlotsResponseEntryMaster
let replicas :: [Node]
replicas = (ClusterSlotsNode -> Node) -> [ClusterSlotsNode] -> [Node]
forall a b. (a -> b) -> [a] -> [b]
map (Bool -> ClusterSlotsNode -> Node
nodeFromClusterSlotNode Bool
False) [ClusterSlotsNode]
clusterSlotsResponseEntryReplicas
let shard :: Shard
shard = Node -> [Node] -> Shard
Shard Node
master [Node]
replicas
let slotMap :: IntMap Shard
slotMap = [(Int, Shard)] -> IntMap Shard
forall a. [(Int, a)] -> IntMap a
IntMap.fromList ([(Int, Shard)] -> IntMap Shard) -> [(Int, Shard)] -> IntMap Shard
forall a b. (a -> b) -> a -> b
$ (Int -> (Int, Shard)) -> [Int] -> [(Int, Shard)]
forall a b. (a -> b) -> [a] -> [b]
map (, Shard
shard) [Int
clusterSlotsResponseEntryStartSlot..Int
clusterSlotsResponseEntryEndSlot]
IntMap Shard -> IO (IntMap Shard)
forall (m :: * -> *) a. Monad m => a -> m a
return (IntMap Shard -> IO (IntMap Shard))
-> IntMap Shard -> IO (IntMap Shard)
forall a b. (a -> b) -> a -> b
$ IntMap Shard -> IntMap Shard -> IntMap Shard
forall a. IntMap a -> IntMap a -> IntMap a
IntMap.union IntMap Shard
slotMap IntMap Shard
accumulated
nodeFromClusterSlotNode :: Bool -> ClusterSlotsNode -> Node
nodeFromClusterSlotNode :: Bool -> ClusterSlotsNode -> Node
nodeFromClusterSlotNode Bool
isMaster ClusterSlotsNode{Int
ByteString
clusterSlotsNodeID :: ClusterSlotsNode -> ByteString
clusterSlotsNodePort :: ClusterSlotsNode -> Int
clusterSlotsNodeIP :: ClusterSlotsNode -> ByteString
clusterSlotsNodeID :: ByteString
clusterSlotsNodePort :: Int
clusterSlotsNodeIP :: ByteString
..} =
let hostname :: HostName
hostname = ByteString -> HostName
Char8.unpack ByteString
clusterSlotsNodeIP
role :: NodeRole
role = if Bool
isMaster then NodeRole
Cluster.Master else NodeRole
Cluster.Slave
in
ByteString -> NodeRole -> HostName -> Int -> Node
Cluster.Node ByteString
clusterSlotsNodeID NodeRole
role HostName
hostname (Int -> Int
forall a. Enum a => Int -> a
toEnum Int
clusterSlotsNodePort)
refreshShardMap :: Cluster.Connection -> IO ShardMap
refreshShardMap :: Connection -> IO ShardMap
refreshShardMap (Cluster.Connection HashMap ByteString NodeConnection
nodeConns MVar Pipeline
_ MVar ShardMap
_ InfoMap
_) = do
let (Cluster.NodeConnection ConnectionContext
ctx IORef (Maybe ByteString)
_ ByteString
_) = [NodeConnection] -> NodeConnection
forall a. [a] -> a
head ([NodeConnection] -> NodeConnection)
-> [NodeConnection] -> NodeConnection
forall a b. (a -> b) -> a -> b
$ HashMap ByteString NodeConnection -> [NodeConnection]
forall k v. HashMap k v -> [v]
HM.elems HashMap ByteString NodeConnection
nodeConns
Connection
pipelineConn <- ConnectionContext -> IO Connection
PP.fromCtx ConnectionContext
ctx
()
_ <- Connection -> IO ()
PP.beginReceiving Connection
pipelineConn
Either Reply ClusterSlotsResponse
slotsResponse <- Connection
-> Redis (Either Reply ClusterSlotsResponse)
-> IO (Either Reply ClusterSlotsResponse)
forall a. Connection -> Redis a -> IO a
runRedisInternal Connection
pipelineConn Redis (Either Reply ClusterSlotsResponse)
forall (m :: * -> *) (f :: * -> *).
RedisCtx m f =>
m (f ClusterSlotsResponse)
clusterSlots
case Either Reply ClusterSlotsResponse
slotsResponse of
Left Reply
e -> ClusterConnectError -> IO ShardMap
forall e a. Exception e => e -> IO a
throwIO (ClusterConnectError -> IO ShardMap)
-> ClusterConnectError -> IO ShardMap
forall a b. (a -> b) -> a -> b
$ Reply -> ClusterConnectError
ClusterConnectError Reply
e
Right ClusterSlotsResponse
slots -> ClusterSlotsResponse -> IO ShardMap
shardMapFromClusterSlotsResponse ClusterSlotsResponse
slots