{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE ViewPatterns #-}
module Database.Redis.Cluster
( Connection(..)
, NodeRole(..)
, NodeConnection(..)
, Node(..)
, ShardMap(..)
, HashSlot
, Shard(..)
, connect
, disconnect
, requestPipelined
, nodes
) where
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as Char8
import qualified Data.IORef as IOR
import Data.List(nub, sortBy, find)
import Data.Map(fromListWith, assocs)
import Data.Function(on)
import Control.Exception(Exception, throwIO, BlockedIndefinitelyOnMVar(..), catches, Handler(..))
import Control.Concurrent.MVar(MVar, newMVar, readMVar, modifyMVar, modifyMVar_)
import Control.Monad(zipWithM, when, replicateM)
import Database.Redis.Cluster.HashSlot(HashSlot, keyToSlot)
import qualified Database.Redis.ConnectionContext as CC
import qualified Data.HashMap.Strict as HM
import qualified Data.IntMap.Strict as IntMap
import Data.Typeable
import qualified Scanner
import System.IO.Unsafe(unsafeInterleaveIO)
import Database.Redis.Protocol(Reply(Error), renderRequest, reply)
import qualified Database.Redis.Cluster.Command as CMD
data Connection = Connection (HM.HashMap NodeID NodeConnection) (MVar Pipeline) (MVar ShardMap) CMD.InfoMap
data NodeConnection = NodeConnection CC.ConnectionContext (IOR.IORef (Maybe B.ByteString)) NodeID
instance Eq NodeConnection where
(NodeConnection ConnectionContext
_ IORef (Maybe ByteString)
_ ByteString
id1) == :: NodeConnection -> NodeConnection -> Bool
== (NodeConnection ConnectionContext
_ IORef (Maybe ByteString)
_ ByteString
id2) = ByteString
id1 ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
instance Ord NodeConnection where
compare :: NodeConnection -> NodeConnection -> Ordering
compare (NodeConnection ConnectionContext
_ IORef (Maybe ByteString)
_ ByteString
id1) (NodeConnection ConnectionContext
_ IORef (Maybe ByteString)
_ ByteString
id2) = ByteString -> ByteString -> Ordering
forall a. Ord a => a -> a -> Ordering
compare ByteString
id1 ByteString
data PipelineState =
Pending [[B.ByteString]]
| Executed [Reply]
| TransactionPending [[B.ByteString]]
newtype Pipeline = Pipeline (MVar PipelineState)
data NodeRole = Master | Slave deriving (Int -> NodeRole -> ShowS
[NodeRole] -> ShowS
NodeRole -> String
(Int -> NodeRole -> ShowS)
-> (NodeRole -> String) -> ([NodeRole] -> ShowS) -> Show NodeRole
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [NodeRole] -> ShowS
$cshowList :: [NodeRole] -> ShowS
show :: NodeRole -> String
$cshow :: NodeRole -> String
showsPrec :: Int -> NodeRole -> ShowS
$cshowsPrec :: Int -> NodeRole -> ShowS
Show, NodeRole -> NodeRole -> Bool
(NodeRole -> NodeRole -> Bool)
-> (NodeRole -> NodeRole -> Bool) -> Eq NodeRole
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: NodeRole -> NodeRole -> Bool
$c/= :: NodeRole -> NodeRole -> Bool
== :: NodeRole -> NodeRole -> Bool
$c== :: NodeRole -> NodeRole -> Bool
Eq, Eq NodeRole
Eq NodeRole
-> (NodeRole -> NodeRole -> Ordering)
-> (NodeRole -> NodeRole -> Bool)
-> (NodeRole -> NodeRole -> Bool)
-> (NodeRole -> NodeRole -> Bool)
-> (NodeRole -> NodeRole -> Bool)
-> (NodeRole -> NodeRole -> NodeRole)
-> (NodeRole -> NodeRole -> NodeRole)
-> Ord NodeRole
NodeRole -> NodeRole -> Bool
NodeRole -> NodeRole -> Ordering
NodeRole -> NodeRole -> NodeRole
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: NodeRole -> NodeRole -> NodeRole
$cmin :: NodeRole -> NodeRole -> NodeRole
max :: NodeRole -> NodeRole -> NodeRole
$cmax :: NodeRole -> NodeRole -> NodeRole
>= :: NodeRole -> NodeRole -> Bool
$c>= :: NodeRole -> NodeRole -> Bool
> :: NodeRole -> NodeRole -> Bool
$c> :: NodeRole -> NodeRole -> Bool
<= :: NodeRole -> NodeRole -> Bool
$c<= :: NodeRole -> NodeRole -> Bool
< :: NodeRole -> NodeRole -> Bool
$c< :: NodeRole -> NodeRole -> Bool
compare :: NodeRole -> NodeRole -> Ordering
$ccompare :: NodeRole -> NodeRole -> Ordering
$cp1Ord :: Eq NodeRole
type Host = String
type Port = Int
type NodeID = B.ByteString
data Node = Node NodeID NodeRole Host Port deriving (Int -> Node -> ShowS
[Node] -> ShowS
Node -> String
(Int -> Node -> ShowS)
-> (Node -> String) -> ([Node] -> ShowS) -> Show Node
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Node] -> ShowS
$cshowList :: [Node] -> ShowS
show :: Node -> String
$cshow :: Node -> String
showsPrec :: Int -> Node -> ShowS
$cshowsPrec :: Int -> Node -> ShowS
Show, Node -> Node -> Bool
(Node -> Node -> Bool) -> (Node -> Node -> Bool) -> Eq Node
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Node -> Node -> Bool
$c/= :: Node -> Node -> Bool
== :: Node -> Node -> Bool
$c== :: Node -> Node -> Bool
Eq, Eq Node
Eq Node
-> (Node -> Node -> Ordering)
-> (Node -> Node -> Bool)
-> (Node -> Node -> Bool)
-> (Node -> Node -> Bool)
-> (Node -> Node -> Bool)
-> (Node -> Node -> Node)
-> (Node -> Node -> Node)
-> Ord Node
Node -> Node -> Bool
Node -> Node -> Ordering
Node -> Node -> Node
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: Node -> Node -> Node
$cmin :: Node -> Node -> Node
max :: Node -> Node -> Node
$cmax :: Node -> Node -> Node
>= :: Node -> Node -> Bool
$c>= :: Node -> Node -> Bool
> :: Node -> Node -> Bool
$c> :: Node -> Node -> Bool
<= :: Node -> Node -> Bool
$c<= :: Node -> Node -> Bool
< :: Node -> Node -> Bool
$c< :: Node -> Node -> Bool
compare :: Node -> Node -> Ordering
$ccompare :: Node -> Node -> Ordering
$cp1Ord :: Eq Node
type MasterNode = Node
type SlaveNode = Node
data Shard = Shard MasterNode [SlaveNode] deriving (Int -> Shard -> ShowS
[Shard] -> ShowS
Shard -> String
(Int -> Shard -> ShowS)
-> (Shard -> String) -> ([Shard] -> ShowS) -> Show Shard
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Shard] -> ShowS
$cshowList :: [Shard] -> ShowS
show :: Shard -> String
$cshow :: Shard -> String
showsPrec :: Int -> Shard -> ShowS
$cshowsPrec :: Int -> Shard -> ShowS
Show, Shard -> Shard -> Bool
(Shard -> Shard -> Bool) -> (Shard -> Shard -> Bool) -> Eq Shard
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Shard -> Shard -> Bool
$c/= :: Shard -> Shard -> Bool
== :: Shard -> Shard -> Bool
$c== :: Shard -> Shard -> Bool
Eq, Eq Shard
Eq Shard
-> (Shard -> Shard -> Ordering)
-> (Shard -> Shard -> Bool)
-> (Shard -> Shard -> Bool)
-> (Shard -> Shard -> Bool)
-> (Shard -> Shard -> Bool)
-> (Shard -> Shard -> Shard)
-> (Shard -> Shard -> Shard)
-> Ord Shard
Shard -> Shard -> Bool
Shard -> Shard -> Ordering
Shard -> Shard -> Shard
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: Shard -> Shard -> Shard
$cmin :: Shard -> Shard -> Shard
max :: Shard -> Shard -> Shard
$cmax :: Shard -> Shard -> Shard
>= :: Shard -> Shard -> Bool
$c>= :: Shard -> Shard -> Bool
> :: Shard -> Shard -> Bool
$c> :: Shard -> Shard -> Bool
<= :: Shard -> Shard -> Bool
$c<= :: Shard -> Shard -> Bool
< :: Shard -> Shard -> Bool
$c< :: Shard -> Shard -> Bool
compare :: Shard -> Shard -> Ordering
$ccompare :: Shard -> Shard -> Ordering
$cp1Ord :: Eq Shard
newtype ShardMap = ShardMap (IntMap.IntMap Shard) deriving (Int -> ShardMap -> ShowS
[ShardMap] -> ShowS
ShardMap -> String
(Int -> ShardMap -> ShowS)
-> (ShardMap -> String) -> ([ShardMap] -> ShowS) -> Show ShardMap
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ShardMap] -> ShowS
$cshowList :: [ShardMap] -> ShowS
show :: ShardMap -> String
$cshow :: ShardMap -> String
showsPrec :: Int -> ShardMap -> ShowS
$cshowsPrec :: Int -> ShardMap -> ShowS
newtype MissingNodeException = MissingNodeException [B.ByteString] deriving (Int -> MissingNodeException -> ShowS
[MissingNodeException] -> ShowS
MissingNodeException -> String
(Int -> MissingNodeException -> ShowS)
-> (MissingNodeException -> String)
-> ([MissingNodeException] -> ShowS)
-> Show MissingNodeException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [MissingNodeException] -> ShowS
$cshowList :: [MissingNodeException] -> ShowS
show :: MissingNodeException -> String
$cshow :: MissingNodeException -> String
showsPrec :: Int -> MissingNodeException -> ShowS
$cshowsPrec :: Int -> MissingNodeException -> ShowS
Show, Typeable)
instance Exception MissingNodeException
newtype UnsupportedClusterCommandException = UnsupportedClusterCommandException [B.ByteString] deriving (Int -> UnsupportedClusterCommandException -> ShowS
[UnsupportedClusterCommandException] -> ShowS
UnsupportedClusterCommandException -> String
(Int -> UnsupportedClusterCommandException -> ShowS)
-> (UnsupportedClusterCommandException -> String)
-> ([UnsupportedClusterCommandException] -> ShowS)
-> Show UnsupportedClusterCommandException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [UnsupportedClusterCommandException] -> ShowS
$cshowList :: [UnsupportedClusterCommandException] -> ShowS
show :: UnsupportedClusterCommandException -> String
$cshow :: UnsupportedClusterCommandException -> String
showsPrec :: Int -> UnsupportedClusterCommandException -> ShowS
$cshowsPrec :: Int -> UnsupportedClusterCommandException -> ShowS
Show, Typeable)
instance Exception UnsupportedClusterCommandException
newtype CrossSlotException = CrossSlotException [[B.ByteString]] deriving (Int -> CrossSlotException -> ShowS
[CrossSlotException] -> ShowS
CrossSlotException -> String
(Int -> CrossSlotException -> ShowS)
-> (CrossSlotException -> String)
-> ([CrossSlotException] -> ShowS)
-> Show CrossSlotException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [CrossSlotException] -> ShowS
$cshowList :: [CrossSlotException] -> ShowS
show :: CrossSlotException -> String
$cshow :: CrossSlotException -> String
showsPrec :: Int -> CrossSlotException -> ShowS
$cshowsPrec :: Int -> CrossSlotException -> ShowS
Show, Typeable)
instance Exception CrossSlotException
connect :: [CMD.CommandInfo] -> MVar ShardMap -> Maybe Int -> IO Connection
connect :: [CommandInfo] -> MVar ShardMap -> Maybe Int -> IO Connection
connect [CommandInfo]
commandInfos MVar ShardMap
shardMapVar Maybe Int
timeoutOpt = do
shardMap <- MVar ShardMap -> IO ShardMap
forall a. MVar a -> IO a
readMVar MVar ShardMap
MVar PipelineState
stateVar <- PipelineState -> IO (MVar PipelineState)
forall a. a -> IO (MVar a)
newMVar (PipelineState -> IO (MVar PipelineState))
-> PipelineState -> IO (MVar PipelineState)
forall a b. (a -> b) -> a -> b
$ [[ByteString]] -> PipelineState
Pending []
MVar Pipeline
pipelineVar <- Pipeline -> IO (MVar Pipeline)
forall a. a -> IO (MVar a)
newMVar (Pipeline -> IO (MVar Pipeline)) -> Pipeline -> IO (MVar Pipeline)
forall a b. (a -> b) -> a -> b
$ MVar PipelineState -> Pipeline
Pipeline MVar PipelineState
HashMap ByteString NodeConnection
nodeConns <- ShardMap -> IO (HashMap ByteString NodeConnection)
nodeConnections ShardMap
Connection -> IO Connection
forall (m :: * -> *) a. Monad m => a -> m a
return (Connection -> IO Connection) -> Connection -> IO Connection
forall a b. (a -> b) -> a -> b
$ HashMap ByteString NodeConnection
-> MVar Pipeline -> MVar ShardMap -> InfoMap -> Connection
Connection HashMap ByteString NodeConnection
nodeConns MVar Pipeline
pipelineVar MVar ShardMap
shardMapVar ([CommandInfo] -> InfoMap
CMD.newInfoMap [CommandInfo]
commandInfos) where
nodeConnections :: ShardMap -> IO (HM.HashMap NodeID NodeConnection)
nodeConnections :: ShardMap -> IO (HashMap ByteString NodeConnection)
nodeConnections ShardMap
shardMap = [(ByteString, NodeConnection)] -> HashMap ByteString NodeConnection
forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HM.fromList ([(ByteString, NodeConnection)]
-> HashMap ByteString NodeConnection)
-> IO [(ByteString, NodeConnection)]
-> IO (HashMap ByteString NodeConnection)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Node -> IO (ByteString, NodeConnection))
-> [Node] -> IO [(ByteString, NodeConnection)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM Node -> IO (ByteString, NodeConnection)
connectNode ([Node] -> [Node]
forall a. Eq a => [a] -> [a]
nub ([Node] -> [Node]) -> [Node] -> [Node]
forall a b. (a -> b) -> a -> b
$ ShardMap -> [Node]
nodes ShardMap
connectNode :: Node -> IO (NodeID, NodeConnection)
connectNode :: Node -> IO (ByteString, NodeConnection)
connectNode (Node ByteString
n NodeRole
_ String
host Int
port) = do
ctx <- String -> PortID -> Maybe Int -> IO ConnectionContext
CC.connect String
host (PortNumber -> PortID
CC.PortNumber (PortNumber -> PortID) -> PortNumber -> PortID
forall a b. (a -> b) -> a -> b
$ Int -> PortNumber
forall a. Enum a => Int -> a
toEnum Int
port) Maybe Int
IORef (Maybe ByteString)
ref <- Maybe ByteString -> IO (IORef (Maybe ByteString))
forall a. a -> IO (IORef a)
IOR.newIORef Maybe ByteString
forall a. Maybe a
(ByteString, NodeConnection) -> IO (ByteString, NodeConnection)
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString
n, ConnectionContext
-> IORef (Maybe ByteString) -> ByteString -> NodeConnection
NodeConnection ConnectionContext
ctx IORef (Maybe ByteString)
ref ByteString
disconnect :: Connection -> IO ()
disconnect :: Connection -> IO ()
disconnect (Connection HashMap ByteString NodeConnection
nodeConnMap MVar Pipeline
_ MVar ShardMap
_ InfoMap
_) = (NodeConnection -> IO ()) -> [NodeConnection] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ NodeConnection -> IO ()
disconnectNode (HashMap ByteString NodeConnection -> [NodeConnection]
forall k v. HashMap k v -> [v]
HM.elems HashMap ByteString NodeConnection
nodeConnMap) where
disconnectNode :: NodeConnection -> IO ()
disconnectNode (NodeConnection ConnectionContext
nodeCtx IORef (Maybe ByteString)
_ ByteString
_) = ConnectionContext -> IO ()
CC.disconnect ConnectionContext
requestPipelined :: IO ShardMap -> Connection -> [B.ByteString] -> IO Reply
requestPipelined :: IO ShardMap -> Connection -> [ByteString] -> IO Reply
requestPipelined IO ShardMap
refreshAction conn :: Connection
conn@(Connection HashMap ByteString NodeConnection
_ MVar Pipeline
pipelineVar MVar ShardMap
shardMapVar InfoMap
_) [ByteString]
nextRequest = MVar Pipeline -> (Pipeline -> IO (Pipeline, Reply)) -> IO Reply
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar Pipeline
pipelineVar ((Pipeline -> IO (Pipeline, Reply)) -> IO Reply)
-> (Pipeline -> IO (Pipeline, Reply)) -> IO Reply
forall a b. (a -> b) -> a -> b
$ \(Pipeline MVar PipelineState
stateVar) -> do
(MVar PipelineState
newStateVar, Int
repliesIndex) <- IO (MVar PipelineState, Int) -> IO (MVar PipelineState, Int)
forall a. IO a -> IO a
hasLocked (IO (MVar PipelineState, Int) -> IO (MVar PipelineState, Int))
-> IO (MVar PipelineState, Int) -> IO (MVar PipelineState, Int)
forall a b. (a -> b) -> a -> b
$ MVar PipelineState
-> (PipelineState -> IO (PipelineState, (MVar PipelineState, Int)))
-> IO (MVar PipelineState, Int)
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar PipelineState
stateVar ((PipelineState -> IO (PipelineState, (MVar PipelineState, Int)))
-> IO (MVar PipelineState, Int))
-> (PipelineState -> IO (PipelineState, (MVar PipelineState, Int)))
-> IO (MVar PipelineState, Int)
forall a b. (a -> b) -> a -> b
$ \case
Pending [[ByteString]]
requests | [ByteString] -> Bool
isMulti [ByteString]
nextRequest -> do
replies <- MVar ShardMap
-> IO ShardMap -> Connection -> [[ByteString]] -> IO [Reply]
evaluatePipeline MVar ShardMap
shardMapVar IO ShardMap
refreshAction Connection
conn [[ByteString]]
MVar PipelineState
s' <- PipelineState -> IO (MVar PipelineState)
forall a. a -> IO (MVar a)
newMVar (PipelineState -> IO (MVar PipelineState))
-> PipelineState -> IO (MVar PipelineState)
forall a b. (a -> b) -> a -> b
$ [[ByteString]] -> PipelineState
TransactionPending [[ByteString]
(PipelineState, (MVar PipelineState, Int))
-> IO (PipelineState, (MVar PipelineState, Int))
forall (m :: * -> *) a. Monad m => a -> m a
return ([Reply] -> PipelineState
Executed [Reply]
replies, (MVar PipelineState
s', Int
Pending [[ByteString]]
requests | [[ByteString]] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [[ByteString]]
requests Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1000 -> do
replies <- MVar ShardMap
-> IO ShardMap -> Connection -> [[ByteString]] -> IO [Reply]
evaluatePipeline MVar ShardMap
shardMapVar IO ShardMap
refreshAction Connection
conn ([ByteString]
nextRequest[ByteString] -> [[ByteString]] -> [[ByteString]]
forall a. a -> [a] -> [a]
(PipelineState, (MVar PipelineState, Int))
-> IO (PipelineState, (MVar PipelineState, Int))
forall (m :: * -> *) a. Monad m => a -> m a
return ([Reply] -> PipelineState
Executed [Reply]
replies, (MVar PipelineState
stateVar, [[ByteString]] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [[ByteString]]
Pending [[ByteString]]
requests ->
(PipelineState, (MVar PipelineState, Int))
-> IO (PipelineState, (MVar PipelineState, Int))
forall (m :: * -> *) a. Monad m => a -> m a
return ([[ByteString]] -> PipelineState
Pending ([ByteString]
nextRequest[ByteString] -> [[ByteString]] -> [[ByteString]]
forall a. a -> [a] -> [a]
requests), (MVar PipelineState
stateVar, [[ByteString]] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [[ByteString]]
TransactionPending [[ByteString]]
requests ->
if [ByteString] -> Bool
isExec [ByteString]
nextRequest then do
replies <- MVar ShardMap
-> IO ShardMap -> Connection -> [[ByteString]] -> IO [Reply]
evaluateTransactionPipeline MVar ShardMap
shardMapVar IO ShardMap
refreshAction Connection
conn ([ByteString]
nextRequest[ByteString] -> [[ByteString]] -> [[ByteString]]
forall a. a -> [a] -> [a]
(PipelineState, (MVar PipelineState, Int))
-> IO (PipelineState, (MVar PipelineState, Int))
forall (m :: * -> *) a. Monad m => a -> m a
return ([Reply] -> PipelineState
Executed [Reply]
replies, (MVar PipelineState
stateVar, [[ByteString]] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [[ByteString]]
(PipelineState, (MVar PipelineState, Int))
-> IO (PipelineState, (MVar PipelineState, Int))
forall (m :: * -> *) a. Monad m => a -> m a
return ([[ByteString]] -> PipelineState
TransactionPending ([ByteString]
nextRequest[ByteString] -> [[ByteString]] -> [[ByteString]]
forall a. a -> [a] -> [a]
requests), (MVar PipelineState
stateVar, [[ByteString]] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [[ByteString]]
e :: PipelineState
e@(Executed [Reply]
_) -> do
MVar PipelineState
s' <- PipelineState -> IO (MVar PipelineState)
forall a. a -> IO (MVar a)
newMVar (PipelineState -> IO (MVar PipelineState))
-> PipelineState -> IO (MVar PipelineState)
forall a b. (a -> b) -> a -> b
if [ByteString] -> Bool
isMulti [ByteString]
nextRequest then
[[ByteString]] -> PipelineState
TransactionPending [[ByteString]
[[ByteString]] -> PipelineState
Pending [[ByteString]
(PipelineState, (MVar PipelineState, Int))
-> IO (PipelineState, (MVar PipelineState, Int))
forall (m :: * -> *) a. Monad m => a -> m a
return (PipelineState
e, (MVar PipelineState
s', Int
evaluateAction <- IO Reply -> IO Reply
forall a. IO a -> IO a
unsafeInterleaveIO (IO Reply -> IO Reply) -> IO Reply -> IO Reply
forall a b. (a -> b) -> a -> b
$ do
replies <- IO [Reply] -> IO [Reply]
forall a. IO a -> IO a
hasLocked (IO [Reply] -> IO [Reply]) -> IO [Reply] -> IO [Reply]
forall a b. (a -> b) -> a -> b
$ MVar PipelineState
-> (PipelineState -> IO (PipelineState, [Reply])) -> IO [Reply]
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar PipelineState
newStateVar ((PipelineState -> IO (PipelineState, [Reply])) -> IO [Reply])
-> (PipelineState -> IO (PipelineState, [Reply])) -> IO [Reply]
forall a b. (a -> b) -> a -> b
$ \case
Executed [Reply]
replies ->
(PipelineState, [Reply]) -> IO (PipelineState, [Reply])
forall (m :: * -> *) a. Monad m => a -> m a
return ([Reply] -> PipelineState
Executed [Reply]
replies, [Reply]
Pending [[ByteString]]
requests-> do
replies <- MVar ShardMap
-> IO ShardMap -> Connection -> [[ByteString]] -> IO [Reply]
evaluatePipeline MVar ShardMap
shardMapVar IO ShardMap
refreshAction Connection
conn [[ByteString]]
(PipelineState, [Reply]) -> IO (PipelineState, [Reply])
forall (m :: * -> *) a. Monad m => a -> m a
return ([Reply] -> PipelineState
Executed [Reply]
replies, [Reply]
TransactionPending [[ByteString]]
requests-> do
replies <- MVar ShardMap
-> IO ShardMap -> Connection -> [[ByteString]] -> IO [Reply]
evaluateTransactionPipeline MVar ShardMap
shardMapVar IO ShardMap
refreshAction Connection
conn [[ByteString]]
(PipelineState, [Reply]) -> IO (PipelineState, [Reply])
forall (m :: * -> *) a. Monad m => a -> m a
return ([Reply] -> PipelineState
Executed [Reply]
replies, [Reply]
Reply -> IO Reply
forall (m :: * -> *) a. Monad m => a -> m a
return (Reply -> IO Reply) -> Reply -> IO Reply
forall a b. (a -> b) -> a -> b
$ [Reply]
replies [Reply] -> Int -> Reply
forall a. [a] -> Int -> a
!! Int
(Pipeline, Reply) -> IO (Pipeline, Reply)
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar PipelineState -> Pipeline
Pipeline MVar PipelineState
newStateVar, Reply
isMulti :: [B.ByteString] -> Bool
isMulti :: [ByteString] -> Bool
isMulti (ByteString
"MULTI" : [ByteString]
_) = Bool
isMulti [ByteString]
_ = Bool
isExec :: [B.ByteString] -> Bool
isExec :: [ByteString] -> Bool
isExec (ByteString
"EXEC" : [ByteString]
_) = Bool
isExec [ByteString]
_ = Bool
data PendingRequest = PendingRequest Int [B.ByteString]
data CompletedRequest = CompletedRequest Int [B.ByteString] Reply
rawRequest :: PendingRequest -> [B.ByteString]
rawRequest :: PendingRequest -> [ByteString]
rawRequest (PendingRequest Int
_ [ByteString]
r) = [ByteString]
responseIndex :: CompletedRequest -> Int
responseIndex :: CompletedRequest -> Int
responseIndex (CompletedRequest Int
i [ByteString]
_ Reply
_) = Int
rawResponse :: CompletedRequest -> Reply
rawResponse :: CompletedRequest -> Reply
rawResponse (CompletedRequest Int
_ [ByteString]
_ Reply
r) = Reply
evaluatePipeline :: MVar ShardMap -> IO ShardMap -> Connection -> [[B.ByteString]] -> IO [Reply]
evaluatePipeline :: MVar ShardMap
-> IO ShardMap -> Connection -> [[ByteString]] -> IO [Reply]
evaluatePipeline MVar ShardMap
shardMapVar IO ShardMap
refreshShardmapAction Connection
conn [[ByteString]]
requests = do
shardMap <- IO ShardMap -> IO ShardMap
forall a. IO a -> IO a
hasLocked (IO ShardMap -> IO ShardMap) -> IO ShardMap -> IO ShardMap
forall a b. (a -> b) -> a -> b
$ MVar ShardMap -> IO ShardMap
forall a. MVar a -> IO a
readMVar MVar ShardMap
[(NodeConnection, [PendingRequest])]
requestsByNode <- ShardMap -> IO [(NodeConnection, [PendingRequest])]
getRequestsByNode ShardMap
resps <- [[CompletedRequest]] -> [CompletedRequest]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat ([[CompletedRequest]] -> [CompletedRequest])
-> IO [[CompletedRequest]] -> IO [CompletedRequest]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ((NodeConnection, [PendingRequest]) -> IO [CompletedRequest])
-> [(NodeConnection, [PendingRequest])] -> IO [[CompletedRequest]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM ((NodeConnection -> [PendingRequest] -> IO [CompletedRequest])
-> (NodeConnection, [PendingRequest]) -> IO [CompletedRequest]
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry NodeConnection -> [PendingRequest] -> IO [CompletedRequest]
executeRequests) [(NodeConnection, [PendingRequest])]
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ((CompletedRequest -> Bool) -> [CompletedRequest] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any (Reply -> Bool
moved (Reply -> Bool)
-> (CompletedRequest -> Reply) -> CompletedRequest -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CompletedRequest -> Reply
rawResponse) [CompletedRequest]
resps) IO ()
retriedResps <- (CompletedRequest -> IO CompletedRequest)
-> [CompletedRequest] -> IO [CompletedRequest]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (Int -> CompletedRequest -> IO CompletedRequest
retry Int
0) [CompletedRequest]
[Reply] -> IO [Reply]
forall (m :: * -> *) a. Monad m => a -> m a
return ([Reply] -> IO [Reply]) -> [Reply] -> IO [Reply]
forall a b. (a -> b) -> a -> b
$ (CompletedRequest -> Reply) -> [CompletedRequest] -> [Reply]
forall a b. (a -> b) -> [a] -> [b]
map CompletedRequest -> Reply
rawResponse ([CompletedRequest] -> [Reply]) -> [CompletedRequest] -> [Reply]
forall a b. (a -> b) -> a -> b
$ (CompletedRequest -> CompletedRequest -> Ordering)
-> [CompletedRequest] -> [CompletedRequest]
forall a. (a -> a -> Ordering) -> [a] -> [a]
sortBy ((Int -> Int -> Ordering)
-> (CompletedRequest -> Int)
-> CompletedRequest
-> CompletedRequest
-> Ordering
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
on Int -> Int -> Ordering
forall a. Ord a => a -> a -> Ordering
compare CompletedRequest -> Int
responseIndex) [CompletedRequest]
getRequestsByNode :: ShardMap -> IO [(NodeConnection, [PendingRequest])]
getRequestsByNode :: ShardMap -> IO [(NodeConnection, [PendingRequest])]
getRequestsByNode ShardMap
shardMap = do
[[(NodeConnection, [PendingRequest])]]
commandsWithNodes <- (Int -> [ByteString] -> IO [(NodeConnection, [PendingRequest])])
-> [Int]
-> [[ByteString]]
-> IO [[(NodeConnection, [PendingRequest])]]
forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> m c) -> [a] -> [b] -> m [c]
zipWithM (ShardMap
-> Int -> [ByteString] -> IO [(NodeConnection, [PendingRequest])]
requestWithNodes ShardMap
shardMap) ([Int] -> [Int]
forall a. [a] -> [a]
reverse [Int
0..([[ByteString]] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [[ByteString]]
requests Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)]) [[ByteString]]
[(NodeConnection, [PendingRequest])]
-> IO [(NodeConnection, [PendingRequest])]
forall (m :: * -> *) a. Monad m => a -> m a
return ([(NodeConnection, [PendingRequest])]
-> IO [(NodeConnection, [PendingRequest])])
-> [(NodeConnection, [PendingRequest])]
-> IO [(NodeConnection, [PendingRequest])]
forall a b. (a -> b) -> a -> b
$ Map NodeConnection [PendingRequest]
-> [(NodeConnection, [PendingRequest])]
forall k a. Map k a -> [(k, a)]
assocs (Map NodeConnection [PendingRequest]
-> [(NodeConnection, [PendingRequest])])
-> Map NodeConnection [PendingRequest]
-> [(NodeConnection, [PendingRequest])]
forall a b. (a -> b) -> a -> b
$ ([PendingRequest] -> [PendingRequest] -> [PendingRequest])
-> [(NodeConnection, [PendingRequest])]
-> Map NodeConnection [PendingRequest]
forall k a. Ord k => (a -> a -> a) -> [(k, a)] -> Map k a
fromListWith [PendingRequest] -> [PendingRequest] -> [PendingRequest]
forall a. [a] -> [a] -> [a]
(++) ([[(NodeConnection, [PendingRequest])]]
-> [(NodeConnection, [PendingRequest])]
forall a. Monoid a => [a] -> a
mconcat [[(NodeConnection, [PendingRequest])]]
requestWithNodes :: ShardMap -> Int -> [B.ByteString] -> IO [(NodeConnection, [PendingRequest])]
requestWithNodes :: ShardMap
-> Int -> [ByteString] -> IO [(NodeConnection, [PendingRequest])]
requestWithNodes ShardMap
shardMap Int
index [ByteString]
request = do
nodeConns <- Connection -> ShardMap -> [ByteString] -> IO [NodeConnection]
nodeConnectionForCommand Connection
conn ShardMap
shardMap [ByteString]
[(NodeConnection, [PendingRequest])]
-> IO [(NodeConnection, [PendingRequest])]
forall (m :: * -> *) a. Monad m => a -> m a
return ([(NodeConnection, [PendingRequest])]
-> IO [(NodeConnection, [PendingRequest])])
-> [(NodeConnection, [PendingRequest])]
-> IO [(NodeConnection, [PendingRequest])]
forall a b. (a -> b) -> a -> b
$ (, [Int -> [ByteString] -> PendingRequest
PendingRequest Int
index [ByteString]
request]) (NodeConnection -> (NodeConnection, [PendingRequest]))
-> [NodeConnection] -> [(NodeConnection, [PendingRequest])]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [NodeConnection]
executeRequests :: NodeConnection -> [PendingRequest] -> IO [CompletedRequest]
executeRequests :: NodeConnection -> [PendingRequest] -> IO [CompletedRequest]
executeRequests NodeConnection
nodeConn [PendingRequest]
nodeRequests = do
replies <- NodeConnection -> [[ByteString]] -> IO [Reply]
requestNode NodeConnection
nodeConn ([[ByteString]] -> IO [Reply]) -> [[ByteString]] -> IO [Reply]
forall a b. (a -> b) -> a -> b
$ (PendingRequest -> [ByteString])
-> [PendingRequest] -> [[ByteString]]
forall a b. (a -> b) -> [a] -> [b]
map PendingRequest -> [ByteString]
rawRequest [PendingRequest]
[CompletedRequest] -> IO [CompletedRequest]
forall (m :: * -> *) a. Monad m => a -> m a
return ([CompletedRequest] -> IO [CompletedRequest])
-> [CompletedRequest] -> IO [CompletedRequest]
forall a b. (a -> b) -> a -> b
$ (PendingRequest -> Reply -> CompletedRequest)
-> [PendingRequest] -> [Reply] -> [CompletedRequest]
forall a b c. (a -> b -> c) -> [a] -> [b] -> [c]
zipWith (((PendingRequest, Reply) -> CompletedRequest)
-> PendingRequest -> Reply -> CompletedRequest
forall a b c. ((a, b) -> c) -> a -> b -> c
curry (\(PendingRequest Int
i [ByteString]
r, Reply
rep) -> Int -> [ByteString] -> Reply -> CompletedRequest
CompletedRequest Int
i [ByteString]
r Reply
rep)) [PendingRequest]
nodeRequests [Reply]
retry :: Int -> CompletedRequest -> IO CompletedRequest
retry :: Int -> CompletedRequest -> IO CompletedRequest
retry Int
retryCount (CompletedRequest Int
index [ByteString]
request Reply
thisReply) = do
retryReply <- [Reply] -> Reply
forall a. [a] -> a
head ([Reply] -> Reply) -> IO [Reply] -> IO Reply
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVar ShardMap
-> IO ShardMap
-> Connection
-> Int
-> [[ByteString]]
-> [Reply]
-> IO [Reply]
retryBatch MVar ShardMap
shardMapVar IO ShardMap
refreshShardmapAction Connection
conn Int
retryCount [[ByteString]
request] [Reply
CompletedRequest -> IO CompletedRequest
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> [ByteString] -> Reply -> CompletedRequest
CompletedRequest Int
index [ByteString]
request Reply
refreshShardMapVar :: IO ()
refreshShardMapVar :: IO ()
refreshShardMapVar = IO () -> IO ()
forall a. IO a -> IO a
hasLocked (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar ShardMap -> (ShardMap -> IO ShardMap) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar ShardMap
shardMapVar (IO ShardMap -> ShardMap -> IO ShardMap
forall a b. a -> b -> a
const IO ShardMap
retryBatch :: MVar ShardMap -> IO ShardMap -> Connection -> Int -> [[B.ByteString]] -> [Reply] -> IO [Reply]
retryBatch :: MVar ShardMap
-> IO ShardMap
-> Connection
-> Int
-> [[ByteString]]
-> [Reply]
-> IO [Reply]
retryBatch MVar ShardMap
shardMapVar IO ShardMap
refreshShardmapAction Connection
conn Int
retryCount [[ByteString]]
requests [Reply]
replies =
case [Reply] -> Reply
forall a. [a] -> a
last [Reply]
replies of
(Error ByteString
errString) | ByteString -> ByteString -> Bool
B.isPrefixOf ByteString
"MOVED" ByteString
errString -> do
let (Connection HashMap ByteString NodeConnection
_ MVar Pipeline
_ MVar ShardMap
_ InfoMap
infoMap) = Connection
keys <- [[ByteString]] -> [ByteString]
forall a. Monoid a => [a] -> a
mconcat ([[ByteString]] -> [ByteString])
-> IO [[ByteString]] -> IO [ByteString]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ([ByteString] -> IO [ByteString])
-> [[ByteString]] -> IO [[ByteString]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (InfoMap -> [ByteString] -> IO [ByteString]
requestKeys InfoMap
infoMap) [[ByteString]]
hashSlot <- CrossSlotException -> [ByteString] -> IO HashSlot
forall e. Exception e => e -> [ByteString] -> IO HashSlot
hashSlotForKeys ([[ByteString]] -> CrossSlotException
CrossSlotException [[ByteString]]
requests) [ByteString]
nodeConn <- MVar ShardMap
-> Connection
-> MissingNodeException
-> HashSlot
-> IO NodeConnection
forall e.
Exception e =>
MVar ShardMap -> Connection -> e -> HashSlot -> IO NodeConnection
nodeConnForHashSlot MVar ShardMap
shardMapVar Connection
conn ([ByteString] -> MissingNodeException
MissingNodeException ([[ByteString]] -> [ByteString]
forall a. [a] -> a
head [[ByteString]]
requests)) HashSlot
NodeConnection -> [[ByteString]] -> IO [Reply]
requestNode NodeConnection
nodeConn [[ByteString]]
(Reply -> Maybe (String, Int)
askingRedirection -> Just (String
host, Int
port)) -> do
shardMap <- IO ShardMap -> IO ShardMap
forall a. IO a -> IO a
hasLocked (IO ShardMap -> IO ShardMap) -> IO ShardMap -> IO ShardMap
forall a b. (a -> b) -> a -> b
$ MVar ShardMap -> IO ShardMap
forall a. MVar a -> IO a
readMVar MVar ShardMap
let maybeAskNode :: Maybe NodeConnection
maybeAskNode = ShardMap -> Connection -> String -> Int -> Maybe NodeConnection
nodeConnWithHostAndPort ShardMap
shardMap Connection
conn String
host Int
case Maybe NodeConnection
maybeAskNode of
Just NodeConnection
askNode -> [Reply] -> [Reply]
forall a. [a] -> [a]
tail ([Reply] -> [Reply]) -> IO [Reply] -> IO [Reply]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NodeConnection -> [[ByteString]] -> IO [Reply]
requestNode NodeConnection
askNode ([ByteString
"ASKING"] [ByteString] -> [[ByteString]] -> [[ByteString]]
forall a. a -> [a] -> [a]
: [[ByteString]]
Maybe NodeConnection
Nothing -> case Int
retryCount of
0 -> do
_ <- IO () -> IO ()
forall a. IO a -> IO a
hasLocked (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar ShardMap -> (ShardMap -> IO ShardMap) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar ShardMap
shardMapVar (IO ShardMap -> ShardMap -> IO ShardMap
forall a b. a -> b -> a
const IO ShardMap
MVar ShardMap
-> IO ShardMap
-> Connection
-> Int
-> [[ByteString]]
-> [Reply]
-> IO [Reply]
retryBatch MVar ShardMap
shardMapVar IO ShardMap
refreshShardmapAction Connection
conn (Int
retryCount Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) [[ByteString]]
requests [Reply]
_ -> MissingNodeException -> IO [Reply]
forall e a. Exception e => e -> IO a
throwIO (MissingNodeException -> IO [Reply])
-> MissingNodeException -> IO [Reply]
forall a b. (a -> b) -> a -> b
$ [ByteString] -> MissingNodeException
MissingNodeException ([[ByteString]] -> [ByteString]
forall a. [a] -> a
head [[ByteString]]
_ -> [Reply] -> IO [Reply]
forall (m :: * -> *) a. Monad m => a -> m a
return [Reply]
evaluateTransactionPipeline :: MVar ShardMap -> IO ShardMap -> Connection -> [[B.ByteString]] -> IO [Reply]
evaluateTransactionPipeline :: MVar ShardMap
-> IO ShardMap -> Connection -> [[ByteString]] -> IO [Reply]
evaluateTransactionPipeline MVar ShardMap
shardMapVar IO ShardMap
refreshShardmapAction Connection
conn [[ByteString]]
requests' = do
let requests :: [[ByteString]]
requests = [[ByteString]] -> [[ByteString]]
forall a. [a] -> [a]
reverse [[ByteString]]
let (Connection HashMap ByteString NodeConnection
_ MVar Pipeline
_ MVar ShardMap
_ InfoMap
infoMap) = Connection
keys <- [[ByteString]] -> [ByteString]
forall a. Monoid a => [a] -> a
mconcat ([[ByteString]] -> [ByteString])
-> IO [[ByteString]] -> IO [ByteString]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ([ByteString] -> IO [ByteString])
-> [[ByteString]] -> IO [[ByteString]]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (InfoMap -> [ByteString] -> IO [ByteString]
requestKeys InfoMap
infoMap) [[ByteString]]
hashSlot <- CrossSlotException -> [ByteString] -> IO HashSlot
forall e. Exception e => e -> [ByteString] -> IO HashSlot
hashSlotForKeys ([[ByteString]] -> CrossSlotException
CrossSlotException [[ByteString]]
requests) [ByteString]
nodeConn <- MVar ShardMap
-> Connection
-> MissingNodeException
-> HashSlot
-> IO NodeConnection
forall e.
Exception e =>
MVar ShardMap -> Connection -> e -> HashSlot -> IO NodeConnection
nodeConnForHashSlot MVar ShardMap
shardMapVar Connection
conn ([ByteString] -> MissingNodeException
MissingNodeException ([[ByteString]] -> [ByteString]
forall a. [a] -> a
head [[ByteString]]
requests)) HashSlot
resps <- NodeConnection -> [[ByteString]] -> IO [Reply]
requestNode NodeConnection
nodeConn [[ByteString]]
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ((Reply -> Bool) -> [Reply] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any Reply -> Bool
moved [Reply]
(IO () -> IO ()
forall a. IO a -> IO a
hasLocked (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar ShardMap -> (ShardMap -> IO ShardMap) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar ShardMap
shardMapVar (IO ShardMap -> ShardMap -> IO ShardMap
forall a b. a -> b -> a
const IO ShardMap
retriedResps <- MVar ShardMap
-> IO ShardMap
-> Connection
-> Int
-> [[ByteString]]
-> [Reply]
-> IO [Reply]
retryBatch MVar ShardMap
shardMapVar IO ShardMap
refreshShardmapAction Connection
conn Int
0 [[ByteString]]
requests [Reply]
[Reply] -> IO [Reply]
forall (m :: * -> *) a. Monad m => a -> m a
return [Reply]
nodeConnForHashSlot :: Exception e => MVar ShardMap -> Connection -> e -> HashSlot -> IO NodeConnection
nodeConnForHashSlot :: MVar ShardMap -> Connection -> e -> HashSlot -> IO NodeConnection
nodeConnForHashSlot MVar ShardMap
shardMapVar Connection
conn e
exception HashSlot
hashSlot = do
let (Connection HashMap ByteString NodeConnection
nodeConns MVar Pipeline
_ MVar ShardMap
_ InfoMap
_) = Connection
(ShardMap IntMap Shard
shardMap) <- IO ShardMap -> IO ShardMap
forall a. IO a -> IO a
hasLocked (IO ShardMap -> IO ShardMap) -> IO ShardMap -> IO ShardMap
forall a b. (a -> b) -> a -> b
$ MVar ShardMap -> IO ShardMap
forall a. MVar a -> IO a
readMVar MVar ShardMap
node <-
case Int -> IntMap Shard -> Maybe Shard
forall a. Int -> IntMap a -> Maybe a
IntMap.lookup (HashSlot -> Int
forall a. Enum a => a -> Int
fromEnum HashSlot
hashSlot) IntMap Shard
shardMap of
Maybe Shard
Nothing -> e -> IO Node
forall e a. Exception e => e -> IO a
throwIO e
Just (Shard Node
master [Node]
_) -> Node -> IO Node
forall (m :: * -> *) a. Monad m => a -> m a
return Node
case ByteString
-> HashMap ByteString NodeConnection -> Maybe NodeConnection
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup (Node -> ByteString
nodeId Node
node) HashMap ByteString NodeConnection
nodeConns of
Maybe NodeConnection
Nothing -> e -> IO NodeConnection
forall e a. Exception e => e -> IO a
throwIO e
Just NodeConnection
nodeConn' -> NodeConnection -> IO NodeConnection
forall (m :: * -> *) a. Monad m => a -> m a
return NodeConnection
hashSlotForKeys :: Exception e => e -> [B.ByteString] -> IO HashSlot
hashSlotForKeys :: e -> [ByteString] -> IO HashSlot
hashSlotForKeys e
exception [ByteString]
keys =
case [HashSlot] -> [HashSlot]
forall a. Eq a => [a] -> [a]
nub (ByteString -> HashSlot
keyToSlot (ByteString -> HashSlot) -> [ByteString] -> [HashSlot]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [ByteString]
keys) of
[] -> HashSlot -> IO HashSlot
forall (m :: * -> *) a. Monad m => a -> m a
return HashSlot
hashSlot] -> HashSlot -> IO HashSlot
forall (m :: * -> *) a. Monad m => a -> m a
return HashSlot
_ -> e -> IO HashSlot
forall e a. Exception e => e -> IO a
throwIO (e -> IO HashSlot) -> e -> IO HashSlot
forall a b. (a -> b) -> a -> b
$ e
requestKeys :: CMD.InfoMap -> [B.ByteString] -> IO [B.ByteString]
requestKeys :: InfoMap -> [ByteString] -> IO [ByteString]
requestKeys InfoMap
infoMap [ByteString]
request =
case InfoMap -> [ByteString] -> Maybe [ByteString]
CMD.keysForRequest InfoMap
infoMap [ByteString]
request of
Maybe [ByteString]
Nothing -> UnsupportedClusterCommandException -> IO [ByteString]
forall e a. Exception e => e -> IO a
throwIO (UnsupportedClusterCommandException -> IO [ByteString])
-> UnsupportedClusterCommandException -> IO [ByteString]
forall a b. (a -> b) -> a -> b
$ [ByteString] -> UnsupportedClusterCommandException
UnsupportedClusterCommandException [ByteString]
Just [ByteString]
k -> [ByteString] -> IO [ByteString]
forall (m :: * -> *) a. Monad m => a -> m a
return [ByteString]
askingRedirection :: Reply -> Maybe (Host, Port)
askingRedirection :: Reply -> Maybe (String, Int)
askingRedirection (Error ByteString
errString) = case ByteString -> [ByteString]
Char8.words ByteString
errString of
"ASK", ByteString
_, ByteString
hostport] -> case Char -> ByteString -> [ByteString]
Char8.split Char
':' ByteString
hostport of
host, ByteString
portString] -> case ByteString -> Maybe (Int, ByteString)
Char8.readInt ByteString
portString of
Just (Int
"") -> (String, Int) -> Maybe (String, Int)
forall a. a -> Maybe a
Just (ByteString -> String
Char8.unpack ByteString
host, Int
Maybe (Int, ByteString)
_ -> Maybe (String, Int)
forall a. Maybe a
_ -> Maybe (String, Int)
forall a. Maybe a
_ -> Maybe (String, Int)
forall a. Maybe a
askingRedirection Reply
_ = Maybe (String, Int)
forall a. Maybe a
moved :: Reply -> Bool
moved :: Reply -> Bool
moved (Error ByteString
errString) = case ByteString -> [ByteString]
Char8.words ByteString
errString of
_ -> Bool
_ -> Bool
moved Reply
_ = Bool
nodeConnWithHostAndPort :: ShardMap -> Connection -> Host -> Port -> Maybe NodeConnection
nodeConnWithHostAndPort :: ShardMap -> Connection -> String -> Int -> Maybe NodeConnection
nodeConnWithHostAndPort ShardMap
shardMap (Connection HashMap ByteString NodeConnection
nodeConns MVar Pipeline
_ MVar ShardMap
_ InfoMap
_) String
host Int
port = do
node <- ShardMap -> String -> Int -> Maybe Node
nodeWithHostAndPort ShardMap
shardMap String
host Int
-> HashMap ByteString NodeConnection -> Maybe NodeConnection
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup (Node -> ByteString
nodeId Node
node) HashMap ByteString NodeConnection
nodeConnectionForCommand :: Connection -> ShardMap -> [B.ByteString] -> IO [NodeConnection]
nodeConnectionForCommand :: Connection -> ShardMap -> [ByteString] -> IO [NodeConnection]
nodeConnectionForCommand conn :: Connection
conn@(Connection HashMap ByteString NodeConnection
nodeConns MVar Pipeline
_ MVar ShardMap
_ InfoMap
infoMap) (ShardMap IntMap Shard
shardMap) [ByteString]
request =
case [ByteString]
request of
"FLUSHALL" : [ByteString]
_) -> IO [NodeConnection]
"FLUSHDB" : [ByteString]
_) -> IO [NodeConnection]
"QUIT" : [ByteString]
_) -> IO [NodeConnection]
"UNWATCH" : [ByteString]
_) -> IO [NodeConnection]
_ -> do
keys <- InfoMap -> [ByteString] -> IO [ByteString]
requestKeys InfoMap
infoMap [ByteString]
hashSlot <- CrossSlotException -> [ByteString] -> IO HashSlot
forall e. Exception e => e -> [ByteString] -> IO HashSlot
hashSlotForKeys ([[ByteString]] -> CrossSlotException
CrossSlotException [[ByteString]
request]) [ByteString]
node <- case Int -> IntMap Shard -> Maybe Shard
forall a. Int -> IntMap a -> Maybe a
IntMap.lookup (HashSlot -> Int
forall a. Enum a => a -> Int
fromEnum HashSlot
hashSlot) IntMap Shard
shardMap of
Maybe Shard
Nothing -> MissingNodeException -> IO Node
forall e a. Exception e => e -> IO a
throwIO (MissingNodeException -> IO Node)
-> MissingNodeException -> IO Node
forall a b. (a -> b) -> a -> b
$ [ByteString] -> MissingNodeException
MissingNodeException [ByteString]
Just (Shard Node
master [Node]
_) -> Node -> IO Node
forall (m :: * -> *) a. Monad m => a -> m a
return Node
IO [NodeConnection]
-> (NodeConnection -> IO [NodeConnection])
-> Maybe NodeConnection
-> IO [NodeConnection]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (MissingNodeException -> IO [NodeConnection]
forall e a. Exception e => e -> IO a
throwIO (MissingNodeException -> IO [NodeConnection])
-> MissingNodeException -> IO [NodeConnection]
forall a b. (a -> b) -> a -> b
$ [ByteString] -> MissingNodeException
MissingNodeException [ByteString]
request) ([NodeConnection] -> IO [NodeConnection]
forall (m :: * -> *) a. Monad m => a -> m a
return ([NodeConnection] -> IO [NodeConnection])
-> (NodeConnection -> [NodeConnection])
-> NodeConnection
-> IO [NodeConnection]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NodeConnection -> [NodeConnection]
forall (m :: * -> *) a. Monad m => a -> m a
return) (ByteString
-> HashMap ByteString NodeConnection -> Maybe NodeConnection
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup (Node -> ByteString
nodeId Node
node) HashMap ByteString NodeConnection
allNodes :: IO [NodeConnection]
allNodes =
case Connection -> ShardMap -> Maybe [NodeConnection]
allMasterNodes Connection
conn (IntMap Shard -> ShardMap
ShardMap IntMap Shard
shardMap) of
Maybe [NodeConnection]
Nothing -> MissingNodeException -> IO [NodeConnection]
forall e a. Exception e => e -> IO a
throwIO (MissingNodeException -> IO [NodeConnection])
-> MissingNodeException -> IO [NodeConnection]
forall a b. (a -> b) -> a -> b
$ [ByteString] -> MissingNodeException
MissingNodeException [ByteString]
Just [NodeConnection]
allNodes' -> [NodeConnection] -> IO [NodeConnection]
forall (m :: * -> *) a. Monad m => a -> m a
return [NodeConnection]
allMasterNodes :: Connection -> ShardMap -> Maybe [NodeConnection]
allMasterNodes :: Connection -> ShardMap -> Maybe [NodeConnection]
allMasterNodes (Connection HashMap ByteString NodeConnection
nodeConns MVar Pipeline
_ MVar ShardMap
_ InfoMap
_) (ShardMap IntMap Shard
shardMap) =
(Node -> Maybe NodeConnection) -> [Node] -> Maybe [NodeConnection]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM ((ByteString
-> HashMap ByteString NodeConnection -> Maybe NodeConnection)
-> HashMap ByteString NodeConnection
-> ByteString
-> Maybe NodeConnection
forall a b c. (a -> b -> c) -> b -> a -> c
flip ByteString
-> HashMap ByteString NodeConnection -> Maybe NodeConnection
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup HashMap ByteString NodeConnection
nodeConns (ByteString -> Maybe NodeConnection)
-> (Node -> ByteString) -> Node -> Maybe NodeConnection
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Node -> ByteString
nodeId) [Node]
masterNodes :: [Node]
masterNodes = (\(Shard Node
master [Node]
_) -> Node
master) (Shard -> Node) -> [Shard] -> [Node]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Shard] -> [Shard]
forall a. Eq a => [a] -> [a]
nub (IntMap Shard -> [Shard]
forall a. IntMap a -> [a]
IntMap.elems IntMap Shard
requestNode :: NodeConnection -> [[B.ByteString]] -> IO [Reply]
requestNode :: NodeConnection -> [[ByteString]] -> IO [Reply]
requestNode (NodeConnection ConnectionContext
ctx IORef (Maybe ByteString)
lastRecvRef ByteString
_) [[ByteString]]
requests = do
([ByteString] -> IO ()) -> [[ByteString]] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (ByteString -> IO ()
sendNode (ByteString -> IO ())
-> ([ByteString] -> ByteString) -> [ByteString] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [ByteString] -> ByteString
renderRequest) [[ByteString]]
_ <- ConnectionContext -> IO ()
CC.flush ConnectionContext
Int -> IO Reply -> IO [Reply]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM ([[ByteString]] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [[ByteString]]
requests) IO Reply
sendNode :: B.ByteString -> IO ()
sendNode :: ByteString -> IO ()
sendNode = ConnectionContext -> ByteString -> IO ()
CC.send ConnectionContext
recvNode :: IO Reply
recvNode :: IO Reply
recvNode = do
Maybe ByteString
maybeLastRecv <- IORef (Maybe ByteString) -> IO (Maybe ByteString)
forall a. IORef a -> IO a
IOR.readIORef IORef (Maybe ByteString)
Result Reply
scanResult <- case Maybe ByteString
maybeLastRecv of
Just ByteString
lastRecv -> IO ByteString -> Scanner Reply -> ByteString -> IO (Result Reply)
forall (m :: * -> *) a.
Monad m =>
m ByteString -> Scanner a -> ByteString -> m (Result a)
Scanner.scanWith (ConnectionContext -> IO ByteString
CC.recv ConnectionContext
ctx) Scanner Reply
reply ByteString
Maybe ByteString
Nothing -> IO ByteString -> Scanner Reply -> ByteString -> IO (Result Reply)
forall (m :: * -> *) a.
Monad m =>
m ByteString -> Scanner a -> ByteString -> m (Result a)
Scanner.scanWith (ConnectionContext -> IO ByteString
CC.recv ConnectionContext
ctx) Scanner Reply
reply ByteString
case Result Reply
scanResult of
Scanner.Fail{} -> IO Reply
forall a. IO a
Scanner.More{} -> String -> IO Reply
forall a. HasCallStack => String -> a
error String
"Hedis: parseWith returned Partial"
Scanner.Done ByteString
rest' Reply
r -> do
IORef (Maybe ByteString) -> Maybe ByteString -> IO ()
forall a. IORef a -> a -> IO ()
IOR.writeIORef IORef (Maybe ByteString)
lastRecvRef (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
Reply -> IO Reply
forall (m :: * -> *) a. Monad m => a -> m a
return Reply
nodes :: ShardMap -> [Node]
nodes :: ShardMap -> [Node]
nodes (ShardMap IntMap Shard
shardMap) = ((Int, [Node]) -> [Node]) -> [(Int, [Node])] -> [Node]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (Int, [Node]) -> [Node]
forall a b. (a, b) -> b
snd ([(Int, [Node])] -> [Node]) -> [(Int, [Node])] -> [Node]
forall a b. (a -> b) -> a -> b
$ IntMap [Node] -> [(Int, [Node])]
forall a. IntMap a -> [(Int, a)]
IntMap.toList (IntMap [Node] -> [(Int, [Node])])
-> IntMap [Node] -> [(Int, [Node])]
forall a b. (a -> b) -> a -> b
$ (Shard -> [Node]) -> IntMap Shard -> IntMap [Node]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Shard -> [Node]
shardNodes IntMap Shard
shardMap where
shardNodes :: Shard -> [Node]
shardNodes :: Shard -> [Node]
shardNodes (Shard Node
master [Node]
slaves) = Node
masterNode -> [Node] -> [Node]
forall a. a -> [a] -> [a]
nodeWithHostAndPort :: ShardMap -> Host -> Port -> Maybe Node
nodeWithHostAndPort :: ShardMap -> String -> Int -> Maybe Node
nodeWithHostAndPort ShardMap
shardMap String
host Int
port = (Node -> Bool) -> [Node] -> Maybe Node
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (\(Node ByteString
_ NodeRole
_ String
nodeHost Int
nodePort) -> Int
port Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
nodePort Bool -> Bool -> Bool
&& String
host String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
nodeHost) (ShardMap -> [Node]
nodes ShardMap
nodeId :: Node -> NodeID
nodeId :: Node -> ByteString
nodeId (Node ByteString
theId NodeRole
_ String
_ Int
_) = ByteString
hasLocked :: IO a -> IO a
hasLocked :: IO a -> IO a
hasLocked IO a
action =
IO a
action IO a -> [Handler a] -> IO a
forall a. IO a -> [Handler a] -> IO a
[ (BlockedIndefinitelyOnMVar -> IO a) -> Handler a
forall a e. Exception e => (e -> IO a) -> Handler a
Handler ((BlockedIndefinitelyOnMVar -> IO a) -> Handler a)
-> (BlockedIndefinitelyOnMVar -> IO a) -> Handler a
forall a b. (a -> b) -> a -> b
$ \exc :: BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar -> BlockedIndefinitelyOnMVar -> IO a
forall e a. Exception e => e -> IO a
throwIO BlockedIndefinitelyOnMVar