{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE ViewPatterns #-}
module Database.Redis.Cluster
( Connection(..)
, NodeRole(..)
, NodeConnection(..)
, Node(..)
, ShardMap(..)
, HashSlot
, Shard(..)
, connect
, disconnect
, requestPipelined
, nodes
) where
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as Char8
import qualified Data.IORef as IOR
import Data.List(nub, sortBy, find)
import Data.Map(fromListWith, assocs)
import Data.Function(on)
import Control.Exception(Exception, throwIO, BlockedIndefinitelyOnMVar(..), catches, Handler(..))
import Control.Concurrent.MVar(MVar, newMVar, readMVar, modifyMVar, modifyMVar_)
import Control.Monad(zipWithM, when, replicateM)
import Database.Redis.Cluster.HashSlot(HashSlot, keyToSlot)
import qualified Database.Redis.ConnectionContext as CC
import qualified Data.HashMap.Strict as HM
import qualified Data.IntMap.Strict as IntMap
import Data.Typeable
import qualified Scanner
import System.IO.Unsafe(unsafeInterleaveIO)
import Database.Redis.Protocol(Reply(Error), renderRequest, reply)
import qualified Database.Redis.Cluster.Command as CMD
data Connection = Connection (HM.HashMap NodeID NodeConnection) (MVar Pipeline) (MVar ShardMap) CMD.InfoMap
data NodeConnection = NodeConnection CC.ConnectionContext (IOR.IORef (Maybe B.ByteString)) NodeID
instance Eq NodeConnection where
(NodeConnection ConnectionContext
_ IORef (Maybe NodeID)
_ NodeID
id1) == :: NodeConnection -> NodeConnection -> Bool
== (NodeConnection ConnectionContext
_ IORef (Maybe NodeID)
_ NodeID
id2) = NodeID
id1 forall a. Eq a => a -> a -> Bool
== NodeID
id2
instance Ord NodeConnection where
compare :: NodeConnection -> NodeConnection -> Ordering
compare (NodeConnection ConnectionContext
_ IORef (Maybe NodeID)
_ NodeID
id1) (NodeConnection ConnectionContext
_ IORef (Maybe NodeID)
_ NodeID
id2) = forall a. Ord a => a -> a -> Ordering
compare NodeID
id1 NodeID
id2
data PipelineState =
Pending [[B.ByteString]]
| Executed [Reply]
| TransactionPending [[B.ByteString]]
newtype Pipeline = Pipeline (MVar PipelineState)
data NodeRole = Master | Slave deriving (Port -> NodeRole -> ShowS
[NodeRole] -> ShowS
NodeRole -> Host
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
showList :: [NodeRole] -> ShowS
$cshowList :: [NodeRole] -> ShowS
show :: NodeRole -> Host
$cshow :: NodeRole -> Host
showsPrec :: Port -> NodeRole -> ShowS
$cshowsPrec :: Port -> NodeRole -> ShowS
Show, NodeRole -> NodeRole -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: NodeRole -> NodeRole -> Bool
$c/= :: NodeRole -> NodeRole -> Bool
== :: NodeRole -> NodeRole -> Bool
$c== :: NodeRole -> NodeRole -> Bool
Eq, Eq NodeRole
NodeRole -> NodeRole -> Bool
NodeRole -> NodeRole -> Ordering
NodeRole -> NodeRole -> NodeRole
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: NodeRole -> NodeRole -> NodeRole
$cmin :: NodeRole -> NodeRole -> NodeRole
max :: NodeRole -> NodeRole -> NodeRole
$cmax :: NodeRole -> NodeRole -> NodeRole
>= :: NodeRole -> NodeRole -> Bool
$c>= :: NodeRole -> NodeRole -> Bool
> :: NodeRole -> NodeRole -> Bool
$c> :: NodeRole -> NodeRole -> Bool
<= :: NodeRole -> NodeRole -> Bool
$c<= :: NodeRole -> NodeRole -> Bool
< :: NodeRole -> NodeRole -> Bool
$c< :: NodeRole -> NodeRole -> Bool
compare :: NodeRole -> NodeRole -> Ordering
$ccompare :: NodeRole -> NodeRole -> Ordering
Ord)
type Host = String
type Port = Int
type NodeID = B.ByteString
data Node = Node NodeID NodeRole Host Port deriving (Port -> Node -> ShowS
[Node] -> ShowS
Node -> Host
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
showList :: [Node] -> ShowS
$cshowList :: [Node] -> ShowS
show :: Node -> Host
$cshow :: Node -> Host
showsPrec :: Port -> Node -> ShowS
$cshowsPrec :: Port -> Node -> ShowS
Show, Node -> Node -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Node -> Node -> Bool
$c/= :: Node -> Node -> Bool
== :: Node -> Node -> Bool
$c== :: Node -> Node -> Bool
Eq, Eq Node
Node -> Node -> Bool
Node -> Node -> Ordering
Node -> Node -> Node
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: Node -> Node -> Node
$cmin :: Node -> Node -> Node
max :: Node -> Node -> Node
$cmax :: Node -> Node -> Node
>= :: Node -> Node -> Bool
$c>= :: Node -> Node -> Bool
> :: Node -> Node -> Bool
$c> :: Node -> Node -> Bool
<= :: Node -> Node -> Bool
$c<= :: Node -> Node -> Bool
< :: Node -> Node -> Bool
$c< :: Node -> Node -> Bool
compare :: Node -> Node -> Ordering
$ccompare :: Node -> Node -> Ordering
Ord)
type MasterNode = Node
type SlaveNode = Node
data Shard = Shard MasterNode [SlaveNode] deriving (Port -> Shard -> ShowS
[Shard] -> ShowS
Shard -> Host
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
showList :: [Shard] -> ShowS
$cshowList :: [Shard] -> ShowS
show :: Shard -> Host
$cshow :: Shard -> Host
showsPrec :: Port -> Shard -> ShowS
$cshowsPrec :: Port -> Shard -> ShowS
Show, Shard -> Shard -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Shard -> Shard -> Bool
$c/= :: Shard -> Shard -> Bool
== :: Shard -> Shard -> Bool
$c== :: Shard -> Shard -> Bool
Eq, Eq Shard
Shard -> Shard -> Bool
Shard -> Shard -> Ordering
Shard -> Shard -> Shard
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: Shard -> Shard -> Shard
$cmin :: Shard -> Shard -> Shard
max :: Shard -> Shard -> Shard
$cmax :: Shard -> Shard -> Shard
>= :: Shard -> Shard -> Bool
$c>= :: Shard -> Shard -> Bool
> :: Shard -> Shard -> Bool
$c> :: Shard -> Shard -> Bool
<= :: Shard -> Shard -> Bool
$c<= :: Shard -> Shard -> Bool
< :: Shard -> Shard -> Bool
$c< :: Shard -> Shard -> Bool
compare :: Shard -> Shard -> Ordering
$ccompare :: Shard -> Shard -> Ordering
Ord)
newtype ShardMap = ShardMap (IntMap.IntMap Shard) deriving (Port -> ShardMap -> ShowS
[ShardMap] -> ShowS
ShardMap -> Host
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
showList :: [ShardMap] -> ShowS
$cshowList :: [ShardMap] -> ShowS
show :: ShardMap -> Host
$cshow :: ShardMap -> Host
showsPrec :: Port -> ShardMap -> ShowS
$cshowsPrec :: Port -> ShardMap -> ShowS
Show)
newtype MissingNodeException = MissingNodeException [B.ByteString] deriving (Port -> MissingNodeException -> ShowS
[MissingNodeException] -> ShowS
MissingNodeException -> Host
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
showList :: [MissingNodeException] -> ShowS
$cshowList :: [MissingNodeException] -> ShowS
show :: MissingNodeException -> Host
$cshow :: MissingNodeException -> Host
showsPrec :: Port -> MissingNodeException -> ShowS
$cshowsPrec :: Port -> MissingNodeException -> ShowS
Show, Typeable)
instance Exception MissingNodeException
newtype UnsupportedClusterCommandException = UnsupportedClusterCommandException [B.ByteString] deriving (Port -> UnsupportedClusterCommandException -> ShowS
[UnsupportedClusterCommandException] -> ShowS
UnsupportedClusterCommandException -> Host
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
showList :: [UnsupportedClusterCommandException] -> ShowS
$cshowList :: [UnsupportedClusterCommandException] -> ShowS
show :: UnsupportedClusterCommandException -> Host
$cshow :: UnsupportedClusterCommandException -> Host
showsPrec :: Port -> UnsupportedClusterCommandException -> ShowS
$cshowsPrec :: Port -> UnsupportedClusterCommandException -> ShowS
Show, Typeable)
instance Exception UnsupportedClusterCommandException
newtype CrossSlotException = CrossSlotException [[B.ByteString]] deriving (Port -> CrossSlotException -> ShowS
[CrossSlotException] -> ShowS
CrossSlotException -> Host
forall a.
(Port -> a -> ShowS) -> (a -> Host) -> ([a] -> ShowS) -> Show a
showList :: [CrossSlotException] -> ShowS
$cshowList :: [CrossSlotException] -> ShowS
show :: CrossSlotException -> Host
$cshow :: CrossSlotException -> Host
showsPrec :: Port -> CrossSlotException -> ShowS
$cshowsPrec :: Port -> CrossSlotException -> ShowS
Show, Typeable)
instance Exception CrossSlotException
connect :: [CMD.CommandInfo] -> MVar ShardMap -> Maybe Int -> IO Connection
connect :: [CommandInfo] -> MVar ShardMap -> Maybe Port -> IO Connection
connect [CommandInfo]
commandInfos MVar ShardMap
shardMapVar Maybe Port
timeoutOpt = do
ShardMap
shardMap <- forall a. MVar a -> IO a
readMVar MVar ShardMap
shardMapVar
MVar PipelineState
stateVar <- forall a. a -> IO (MVar a)
newMVar forall a b. (a -> b) -> a -> b
$ [[NodeID]] -> PipelineState
Pending []
MVar Pipeline
pipelineVar <- forall a. a -> IO (MVar a)
newMVar forall a b. (a -> b) -> a -> b
$ MVar PipelineState -> Pipeline
Pipeline MVar PipelineState
stateVar
HashMap NodeID NodeConnection
nodeConns <- ShardMap -> IO (HashMap NodeID NodeConnection)
nodeConnections ShardMap
shardMap
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ HashMap NodeID NodeConnection
-> MVar Pipeline -> MVar ShardMap -> InfoMap -> Connection
Connection HashMap NodeID NodeConnection
nodeConns MVar Pipeline
pipelineVar MVar ShardMap
shardMapVar ([CommandInfo] -> InfoMap
CMD.newInfoMap [CommandInfo]
commandInfos) where
nodeConnections :: ShardMap -> IO (HM.HashMap NodeID NodeConnection)
nodeConnections :: ShardMap -> IO (HashMap NodeID NodeConnection)
nodeConnections ShardMap
shardMap = forall k v. (Eq k, Hashable k) => [(k, v)] -> HashMap k v
HM.fromList forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM Node -> IO (NodeID, NodeConnection)
connectNode (forall a. Eq a => [a] -> [a]
nub forall a b. (a -> b) -> a -> b
$ ShardMap -> [Node]
nodes ShardMap
shardMap)
connectNode :: Node -> IO (NodeID, NodeConnection)
connectNode :: Node -> IO (NodeID, NodeConnection)
connectNode (Node NodeID
n NodeRole
_ Host
host Port
port) = do
ConnectionContext
ctx <- Host -> PortID -> Maybe Port -> IO ConnectionContext
CC.connect Host
host (PortNumber -> PortID
CC.PortNumber forall a b. (a -> b) -> a -> b
$ forall a. Enum a => Port -> a
toEnum Port
port) Maybe Port
timeoutOpt
IORef (Maybe NodeID)
ref <- forall a. a -> IO (IORef a)
IOR.newIORef forall a. Maybe a
Nothing
forall (m :: * -> *) a. Monad m => a -> m a
return (NodeID
n, ConnectionContext
-> IORef (Maybe NodeID) -> NodeID -> NodeConnection
NodeConnection ConnectionContext
ctx IORef (Maybe NodeID)
ref NodeID
n)
disconnect :: Connection -> IO ()
disconnect :: Connection -> IO ()
disconnect (Connection HashMap NodeID NodeConnection
nodeConnMap MVar Pipeline
_ MVar ShardMap
_ InfoMap
_) = forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ NodeConnection -> IO ()
disconnectNode (forall k v. HashMap k v -> [v]
HM.elems HashMap NodeID NodeConnection
nodeConnMap) where
disconnectNode :: NodeConnection -> IO ()
disconnectNode (NodeConnection ConnectionContext
nodeCtx IORef (Maybe NodeID)
_ NodeID
_) = ConnectionContext -> IO ()
CC.disconnect ConnectionContext
nodeCtx
requestPipelined :: IO ShardMap -> Connection -> [B.ByteString] -> IO Reply
requestPipelined :: IO ShardMap -> Connection -> [NodeID] -> IO Reply
requestPipelined IO ShardMap
refreshAction conn :: Connection
conn@(Connection HashMap NodeID NodeConnection
_ MVar Pipeline
pipelineVar MVar ShardMap
shardMapVar InfoMap
_) [NodeID]
nextRequest = forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar Pipeline
pipelineVar forall a b. (a -> b) -> a -> b
$ \(Pipeline MVar PipelineState
stateVar) -> do
(MVar PipelineState
newStateVar, Port
repliesIndex) <- forall a. IO a -> IO a
hasLocked forall a b. (a -> b) -> a -> b
$ forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar PipelineState
stateVar forall a b. (a -> b) -> a -> b
$ \case
Pending [[NodeID]]
requests | [NodeID] -> Bool
isMulti [NodeID]
nextRequest -> do
[Reply]
replies <- MVar ShardMap
-> IO ShardMap -> Connection -> [[NodeID]] -> IO [Reply]
evaluatePipeline MVar ShardMap
shardMapVar IO ShardMap
refreshAction Connection
conn [[NodeID]]
requests
MVar PipelineState
s' <- forall a. a -> IO (MVar a)
newMVar forall a b. (a -> b) -> a -> b
$ [[NodeID]] -> PipelineState
TransactionPending [[NodeID]
nextRequest]
forall (m :: * -> *) a. Monad m => a -> m a
return ([Reply] -> PipelineState
Executed [Reply]
replies, (MVar PipelineState
s', Port
0))
Pending [[NodeID]]
requests | forall (t :: * -> *) a. Foldable t => t a -> Port
length [[NodeID]]
requests forall a. Ord a => a -> a -> Bool
> Port
1000 -> do
[Reply]
replies <- MVar ShardMap
-> IO ShardMap -> Connection -> [[NodeID]] -> IO [Reply]
evaluatePipeline MVar ShardMap
shardMapVar IO ShardMap
refreshAction Connection
conn ([NodeID]
nextRequestforall a. a -> [a] -> [a]
:[[NodeID]]
requests)
forall (m :: * -> *) a. Monad m => a -> m a
return ([Reply] -> PipelineState
Executed [Reply]
replies, (MVar PipelineState
stateVar, forall (t :: * -> *) a. Foldable t => t a -> Port
length [[NodeID]]
requests))
Pending [[NodeID]]
requests ->
forall (m :: * -> *) a. Monad m => a -> m a
return ([[NodeID]] -> PipelineState
Pending ([NodeID]
nextRequestforall a. a -> [a] -> [a]
:[[NodeID]]
requests), (MVar PipelineState
stateVar, forall (t :: * -> *) a. Foldable t => t a -> Port
length [[NodeID]]
requests))
TransactionPending [[NodeID]]
requests ->
if [NodeID] -> Bool
isExec [NodeID]
nextRequest then do
[Reply]
replies <- MVar ShardMap
-> IO ShardMap -> Connection -> [[NodeID]] -> IO [Reply]
evaluateTransactionPipeline MVar ShardMap
shardMapVar IO ShardMap
refreshAction Connection
conn ([NodeID]
nextRequestforall a. a -> [a] -> [a]
:[[NodeID]]
requests)
forall (m :: * -> *) a. Monad m => a -> m a
return ([Reply] -> PipelineState
Executed [Reply]
replies, (MVar PipelineState
stateVar, forall (t :: * -> *) a. Foldable t => t a -> Port
length [[NodeID]]
requests))
else
forall (m :: * -> *) a. Monad m => a -> m a
return ([[NodeID]] -> PipelineState
TransactionPending ([NodeID]
nextRequestforall a. a -> [a] -> [a]
:[[NodeID]]
requests), (MVar PipelineState
stateVar, forall (t :: * -> *) a. Foldable t => t a -> Port
length [[NodeID]]
requests))
e :: PipelineState
e@(Executed [Reply]
_) -> do
MVar PipelineState
s' <- forall a. a -> IO (MVar a)
newMVar forall a b. (a -> b) -> a -> b
$
if [NodeID] -> Bool
isMulti [NodeID]
nextRequest then
[[NodeID]] -> PipelineState
TransactionPending [[NodeID]
nextRequest]
else
[[NodeID]] -> PipelineState
Pending [[NodeID]
nextRequest]
forall (m :: * -> *) a. Monad m => a -> m a
return (PipelineState
e, (MVar PipelineState
s', Port
0))
Reply
evaluateAction <- forall a. IO a -> IO a
unsafeInterleaveIO forall a b. (a -> b) -> a -> b
$ do
[Reply]
replies <- forall a. IO a -> IO a
hasLocked forall a b. (a -> b) -> a -> b
$ forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar PipelineState
newStateVar forall a b. (a -> b) -> a -> b
$ \case
Executed [Reply]
replies ->
forall (m :: * -> *) a. Monad m => a -> m a
return ([Reply] -> PipelineState
Executed [Reply]
replies, [Reply]
replies)
Pending [[NodeID]]
requests-> do
[Reply]
replies <- MVar ShardMap
-> IO ShardMap -> Connection -> [[NodeID]] -> IO [Reply]
evaluatePipeline MVar ShardMap
shardMapVar IO ShardMap
refreshAction Connection
conn [[NodeID]]
requests
forall (m :: * -> *) a. Monad m => a -> m a
return ([Reply] -> PipelineState
Executed [Reply]
replies, [Reply]
replies)
TransactionPending [[NodeID]]
requests-> do
[Reply]
replies <- MVar ShardMap
-> IO ShardMap -> Connection -> [[NodeID]] -> IO [Reply]
evaluateTransactionPipeline MVar ShardMap
shardMapVar IO ShardMap
refreshAction Connection
conn [[NodeID]]
requests
forall (m :: * -> *) a. Monad m => a -> m a
return ([Reply] -> PipelineState
Executed [Reply]
replies, [Reply]
replies)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [Reply]
replies forall a. [a] -> Port -> a
!! Port
repliesIndex
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar PipelineState -> Pipeline
Pipeline MVar PipelineState
newStateVar, Reply
evaluateAction)
isMulti :: [B.ByteString] -> Bool
isMulti :: [NodeID] -> Bool
isMulti (NodeID
"MULTI" : [NodeID]
_) = Bool
True
isMulti [NodeID]
_ = Bool
False
isExec :: [B.ByteString] -> Bool
isExec :: [NodeID] -> Bool
isExec (NodeID
"EXEC" : [NodeID]
_) = Bool
True
isExec [NodeID]
_ = Bool
False
data PendingRequest = PendingRequest Int [B.ByteString]
data CompletedRequest = CompletedRequest Int [B.ByteString] Reply
rawRequest :: PendingRequest -> [B.ByteString]
rawRequest :: PendingRequest -> [NodeID]
rawRequest (PendingRequest Port
_ [NodeID]
r) = [NodeID]
r
responseIndex :: CompletedRequest -> Int
responseIndex :: CompletedRequest -> Port
responseIndex (CompletedRequest Port
i [NodeID]
_ Reply
_) = Port
i
rawResponse :: CompletedRequest -> Reply
rawResponse :: CompletedRequest -> Reply
rawResponse (CompletedRequest Port
_ [NodeID]
_ Reply
r) = Reply
r
evaluatePipeline :: MVar ShardMap -> IO ShardMap -> Connection -> [[B.ByteString]] -> IO [Reply]
evaluatePipeline :: MVar ShardMap
-> IO ShardMap -> Connection -> [[NodeID]] -> IO [Reply]
evaluatePipeline MVar ShardMap
shardMapVar IO ShardMap
refreshShardmapAction Connection
conn [[NodeID]]
requests = do
ShardMap
shardMap <- forall a. IO a -> IO a
hasLocked forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
readMVar MVar ShardMap
shardMapVar
[(NodeConnection, [PendingRequest])]
requestsByNode <- ShardMap -> IO [(NodeConnection, [PendingRequest])]
getRequestsByNode ShardMap
shardMap
[CompletedRequest]
resps <- forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry NodeConnection -> [PendingRequest] -> IO [CompletedRequest]
executeRequests) [(NodeConnection, [PendingRequest])]
requestsByNode
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any (Reply -> Bool
moved forall b c a. (b -> c) -> (a -> b) -> a -> c
. CompletedRequest -> Reply
rawResponse) [CompletedRequest]
resps) IO ()
refreshShardMapVar
[CompletedRequest]
retriedResps <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (Port -> CompletedRequest -> IO CompletedRequest
retry Port
0) [CompletedRequest]
resps
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map CompletedRequest -> Reply
rawResponse forall a b. (a -> b) -> a -> b
$ forall a. (a -> a -> Ordering) -> [a] -> [a]
sortBy (forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
on forall a. Ord a => a -> a -> Ordering
compare CompletedRequest -> Port
responseIndex) [CompletedRequest]
retriedResps
where
getRequestsByNode :: ShardMap -> IO [(NodeConnection, [PendingRequest])]
getRequestsByNode :: ShardMap -> IO [(NodeConnection, [PendingRequest])]
getRequestsByNode ShardMap
shardMap = do
[[(NodeConnection, [PendingRequest])]]
commandsWithNodes <- forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> m c) -> [a] -> [b] -> m [c]
zipWithM (ShardMap
-> Port -> [NodeID] -> IO [(NodeConnection, [PendingRequest])]
requestWithNodes ShardMap
shardMap) (forall a. [a] -> [a]
reverse [Port
0..(forall (t :: * -> *) a. Foldable t => t a -> Port
length [[NodeID]]
requests forall a. Num a => a -> a -> a
- Port
1)]) [[NodeID]]
requests
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall k a. Map k a -> [(k, a)]
assocs forall a b. (a -> b) -> a -> b
$ forall k a. Ord k => (a -> a -> a) -> [(k, a)] -> Map k a
fromListWith forall a. [a] -> [a] -> [a]
(++) (forall a. Monoid a => [a] -> a
mconcat [[(NodeConnection, [PendingRequest])]]
commandsWithNodes)
requestWithNodes :: ShardMap -> Int -> [B.ByteString] -> IO [(NodeConnection, [PendingRequest])]
requestWithNodes :: ShardMap
-> Port -> [NodeID] -> IO [(NodeConnection, [PendingRequest])]
requestWithNodes ShardMap
shardMap Port
index [NodeID]
request = do
[NodeConnection]
nodeConns <- Connection -> ShardMap -> [NodeID] -> IO [NodeConnection]
nodeConnectionForCommand Connection
conn ShardMap
shardMap [NodeID]
request
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ (, [Port -> [NodeID] -> PendingRequest
PendingRequest Port
index [NodeID]
request]) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [NodeConnection]
nodeConns
executeRequests :: NodeConnection -> [PendingRequest] -> IO [CompletedRequest]
executeRequests :: NodeConnection -> [PendingRequest] -> IO [CompletedRequest]
executeRequests NodeConnection
nodeConn [PendingRequest]
nodeRequests = do
[Reply]
replies <- NodeConnection -> [[NodeID]] -> IO [Reply]
requestNode NodeConnection
nodeConn forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map PendingRequest -> [NodeID]
rawRequest [PendingRequest]
nodeRequests
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b c. (a -> b -> c) -> [a] -> [b] -> [c]
zipWith (forall a b c. ((a, b) -> c) -> a -> b -> c
curry (\(PendingRequest Port
i [NodeID]
r, Reply
rep) -> Port -> [NodeID] -> Reply -> CompletedRequest
CompletedRequest Port
i [NodeID]
r Reply
rep)) [PendingRequest]
nodeRequests [Reply]
replies
retry :: Int -> CompletedRequest -> IO CompletedRequest
retry :: Port -> CompletedRequest -> IO CompletedRequest
retry Port
retryCount (CompletedRequest Port
index [NodeID]
request Reply
thisReply) = do
Reply
retryReply <- forall a. [a] -> a
head forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVar ShardMap
-> IO ShardMap
-> Connection
-> Port
-> [[NodeID]]
-> [Reply]
-> IO [Reply]
retryBatch MVar ShardMap
shardMapVar IO ShardMap
refreshShardmapAction Connection
conn Port
retryCount [[NodeID]
request] [Reply
thisReply]
forall (m :: * -> *) a. Monad m => a -> m a
return (Port -> [NodeID] -> Reply -> CompletedRequest
CompletedRequest Port
index [NodeID]
request Reply
retryReply)
refreshShardMapVar :: IO ()
refreshShardMapVar :: IO ()
refreshShardMapVar = forall a. IO a -> IO a
hasLocked forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar ShardMap
shardMapVar (forall a b. a -> b -> a
const IO ShardMap
refreshShardmapAction)
retryBatch :: MVar ShardMap -> IO ShardMap -> Connection -> Int -> [[B.ByteString]] -> [Reply] -> IO [Reply]
retryBatch :: MVar ShardMap
-> IO ShardMap
-> Connection
-> Port
-> [[NodeID]]
-> [Reply]
-> IO [Reply]
retryBatch MVar ShardMap
shardMapVar IO ShardMap
refreshShardmapAction Connection
conn Port
retryCount [[NodeID]]
requests [Reply]
replies =
case forall a. [a] -> a
last [Reply]
replies of
(Error NodeID
errString) | NodeID -> NodeID -> Bool
B.isPrefixOf NodeID
"MOVED" NodeID
errString -> do
let (Connection HashMap NodeID NodeConnection
_ MVar Pipeline
_ MVar ShardMap
_ InfoMap
infoMap) = Connection
conn
[NodeID]
keys <- forall a. Monoid a => [a] -> a
mconcat forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (InfoMap -> [NodeID] -> IO [NodeID]
requestKeys InfoMap
infoMap) [[NodeID]]
requests
HashSlot
hashSlot <- forall e. Exception e => e -> [NodeID] -> IO HashSlot
hashSlotForKeys ([[NodeID]] -> CrossSlotException
CrossSlotException [[NodeID]]
requests) [NodeID]
keys
NodeConnection
nodeConn <- forall e.
Exception e =>
MVar ShardMap -> Connection -> e -> HashSlot -> IO NodeConnection
nodeConnForHashSlot MVar ShardMap
shardMapVar Connection
conn ([NodeID] -> MissingNodeException
MissingNodeException (forall a. [a] -> a
head [[NodeID]]
requests)) HashSlot
hashSlot
NodeConnection -> [[NodeID]] -> IO [Reply]
requestNode NodeConnection
nodeConn [[NodeID]]
requests
(Reply -> Maybe (Host, Port)
askingRedirection -> Just (Host
host, Port
port)) -> do
ShardMap
shardMap <- forall a. IO a -> IO a
hasLocked forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
readMVar MVar ShardMap
shardMapVar
let maybeAskNode :: Maybe NodeConnection
maybeAskNode = ShardMap -> Connection -> Host -> Port -> Maybe NodeConnection
nodeConnWithHostAndPort ShardMap
shardMap Connection
conn Host
host Port
port
case Maybe NodeConnection
maybeAskNode of
Just NodeConnection
askNode -> forall a. [a] -> [a]
tail forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NodeConnection -> [[NodeID]] -> IO [Reply]
requestNode NodeConnection
askNode ([NodeID
"ASKING"] forall a. a -> [a] -> [a]
: [[NodeID]]
requests)
Maybe NodeConnection
Nothing -> case Port
retryCount of
Port
0 -> do
()
_ <- forall a. IO a -> IO a
hasLocked forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar ShardMap
shardMapVar (forall a b. a -> b -> a
const IO ShardMap
refreshShardmapAction)
MVar ShardMap
-> IO ShardMap
-> Connection
-> Port
-> [[NodeID]]
-> [Reply]
-> IO [Reply]
retryBatch MVar ShardMap
shardMapVar IO ShardMap
refreshShardmapAction Connection
conn (Port
retryCount forall a. Num a => a -> a -> a
+ Port
1) [[NodeID]]
requests [Reply]
replies
Port
_ -> forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ [NodeID] -> MissingNodeException
MissingNodeException (forall a. [a] -> a
head [[NodeID]]
requests)
Reply
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return [Reply]
replies
evaluateTransactionPipeline :: MVar ShardMap -> IO ShardMap -> Connection -> [[B.ByteString]] -> IO [Reply]
evaluateTransactionPipeline :: MVar ShardMap
-> IO ShardMap -> Connection -> [[NodeID]] -> IO [Reply]
evaluateTransactionPipeline MVar ShardMap
shardMapVar IO ShardMap
refreshShardmapAction Connection
conn [[NodeID]]
requests' = do
let requests :: [[NodeID]]
requests = forall a. [a] -> [a]
reverse [[NodeID]]
requests'
let (Connection HashMap NodeID NodeConnection
_ MVar Pipeline
_ MVar ShardMap
_ InfoMap
infoMap) = Connection
conn
[NodeID]
keys <- forall a. Monoid a => [a] -> a
mconcat forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (InfoMap -> [NodeID] -> IO [NodeID]
requestKeys InfoMap
infoMap) [[NodeID]]
requests
HashSlot
hashSlot <- forall e. Exception e => e -> [NodeID] -> IO HashSlot
hashSlotForKeys ([[NodeID]] -> CrossSlotException
CrossSlotException [[NodeID]]
requests) [NodeID]
keys
NodeConnection
nodeConn <- forall e.
Exception e =>
MVar ShardMap -> Connection -> e -> HashSlot -> IO NodeConnection
nodeConnForHashSlot MVar ShardMap
shardMapVar Connection
conn ([NodeID] -> MissingNodeException
MissingNodeException (forall a. [a] -> a
head [[NodeID]]
requests)) HashSlot
hashSlot
[Reply]
resps <- NodeConnection -> [[NodeID]] -> IO [Reply]
requestNode NodeConnection
nodeConn [[NodeID]]
requests
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any Reply -> Bool
moved [Reply]
resps)
(forall a. IO a -> IO a
hasLocked forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar ShardMap
shardMapVar (forall a b. a -> b -> a
const IO ShardMap
refreshShardmapAction))
[Reply]
retriedResps <- MVar ShardMap
-> IO ShardMap
-> Connection
-> Port
-> [[NodeID]]
-> [Reply]
-> IO [Reply]
retryBatch MVar ShardMap
shardMapVar IO ShardMap
refreshShardmapAction Connection
conn Port
0 [[NodeID]]
requests [Reply]
resps
forall (m :: * -> *) a. Monad m => a -> m a
return [Reply]
retriedResps
nodeConnForHashSlot :: Exception e => MVar ShardMap -> Connection -> e -> HashSlot -> IO NodeConnection
nodeConnForHashSlot :: forall e.
Exception e =>
MVar ShardMap -> Connection -> e -> HashSlot -> IO NodeConnection
nodeConnForHashSlot MVar ShardMap
shardMapVar Connection
conn e
exception HashSlot
hashSlot = do
let (Connection HashMap NodeID NodeConnection
nodeConns MVar Pipeline
_ MVar ShardMap
_ InfoMap
_) = Connection
conn
(ShardMap IntMap Shard
shardMap) <- forall a. IO a -> IO a
hasLocked forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
readMVar MVar ShardMap
shardMapVar
Node
node <-
case forall a. Port -> IntMap a -> Maybe a
IntMap.lookup (forall a. Enum a => a -> Port
fromEnum HashSlot
hashSlot) IntMap Shard
shardMap of
Maybe Shard
Nothing -> forall e a. Exception e => e -> IO a
throwIO e
exception
Just (Shard Node
master [Node]
_) -> forall (m :: * -> *) a. Monad m => a -> m a
return Node
master
case forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup (Node -> NodeID
nodeId Node
node) HashMap NodeID NodeConnection
nodeConns of
Maybe NodeConnection
Nothing -> forall e a. Exception e => e -> IO a
throwIO e
exception
Just NodeConnection
nodeConn' -> forall (m :: * -> *) a. Monad m => a -> m a
return NodeConnection
nodeConn'
hashSlotForKeys :: Exception e => e -> [B.ByteString] -> IO HashSlot
hashSlotForKeys :: forall e. Exception e => e -> [NodeID] -> IO HashSlot
hashSlotForKeys e
exception [NodeID]
keys =
case forall a. Eq a => [a] -> [a]
nub (NodeID -> HashSlot
keyToSlot forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [NodeID]
keys) of
[] -> forall (m :: * -> *) a. Monad m => a -> m a
return HashSlot
0
[HashSlot
hashSlot] -> forall (m :: * -> *) a. Monad m => a -> m a
return HashSlot
hashSlot
[HashSlot]
_ -> forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ e
exception
requestKeys :: CMD.InfoMap -> [B.ByteString] -> IO [B.ByteString]
requestKeys :: InfoMap -> [NodeID] -> IO [NodeID]
requestKeys InfoMap
infoMap [NodeID]
request =
case InfoMap -> [NodeID] -> Maybe [NodeID]
CMD.keysForRequest InfoMap
infoMap [NodeID]
request of
Maybe [NodeID]
Nothing -> forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ [NodeID] -> UnsupportedClusterCommandException
UnsupportedClusterCommandException [NodeID]
request
Just [NodeID]
k -> forall (m :: * -> *) a. Monad m => a -> m a
return [NodeID]
k
askingRedirection :: Reply -> Maybe (Host, Port)
askingRedirection :: Reply -> Maybe (Host, Port)
askingRedirection (Error NodeID
errString) = case NodeID -> [NodeID]
Char8.words NodeID
errString of
[NodeID
"ASK", NodeID
_, NodeID
hostport] -> case Char -> NodeID -> [NodeID]
Char8.split Char
':' NodeID
hostport of
[NodeID
host, NodeID
portString] -> case NodeID -> Maybe (Port, NodeID)
Char8.readInt NodeID
portString of
Just (Port
port,NodeID
"") -> forall a. a -> Maybe a
Just (NodeID -> Host
Char8.unpack NodeID
host, Port
port)
Maybe (Port, NodeID)
_ -> forall a. Maybe a
Nothing
[NodeID]
_ -> forall a. Maybe a
Nothing
[NodeID]
_ -> forall a. Maybe a
Nothing
askingRedirection Reply
_ = forall a. Maybe a
Nothing
moved :: Reply -> Bool
moved :: Reply -> Bool
moved (Error NodeID
errString) = case NodeID -> [NodeID]
Char8.words NodeID
errString of
NodeID
"MOVED":[NodeID]
_ -> Bool
True
[NodeID]
_ -> Bool
False
moved Reply
_ = Bool
False
nodeConnWithHostAndPort :: ShardMap -> Connection -> Host -> Port -> Maybe NodeConnection
nodeConnWithHostAndPort :: ShardMap -> Connection -> Host -> Port -> Maybe NodeConnection
nodeConnWithHostAndPort ShardMap
shardMap (Connection HashMap NodeID NodeConnection
nodeConns MVar Pipeline
_ MVar ShardMap
_ InfoMap
_) Host
host Port
port = do
Node
node <- ShardMap -> Host -> Port -> Maybe Node
nodeWithHostAndPort ShardMap
shardMap Host
host Port
port
forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup (Node -> NodeID
nodeId Node
node) HashMap NodeID NodeConnection
nodeConns
nodeConnectionForCommand :: Connection -> ShardMap -> [B.ByteString] -> IO [NodeConnection]
nodeConnectionForCommand :: Connection -> ShardMap -> [NodeID] -> IO [NodeConnection]
nodeConnectionForCommand conn :: Connection
conn@(Connection HashMap NodeID NodeConnection
nodeConns MVar Pipeline
_ MVar ShardMap
_ InfoMap
infoMap) (ShardMap IntMap Shard
shardMap) [NodeID]
request =
case [NodeID]
request of
(NodeID
"FLUSHALL" : [NodeID]
_) -> IO [NodeConnection]
allNodes
(NodeID
"FLUSHDB" : [NodeID]
_) -> IO [NodeConnection]
allNodes
(NodeID
"QUIT" : [NodeID]
_) -> IO [NodeConnection]
allNodes
(NodeID
"UNWATCH" : [NodeID]
_) -> IO [NodeConnection]
allNodes
[NodeID]
_ -> do
[NodeID]
keys <- InfoMap -> [NodeID] -> IO [NodeID]
requestKeys InfoMap
infoMap [NodeID]
request
HashSlot
hashSlot <- forall e. Exception e => e -> [NodeID] -> IO HashSlot
hashSlotForKeys ([[NodeID]] -> CrossSlotException
CrossSlotException [[NodeID]
request]) [NodeID]
keys
Node
node <- case forall a. Port -> IntMap a -> Maybe a
IntMap.lookup (forall a. Enum a => a -> Port
fromEnum HashSlot
hashSlot) IntMap Shard
shardMap of
Maybe Shard
Nothing -> forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ [NodeID] -> MissingNodeException
MissingNodeException [NodeID]
request
Just (Shard Node
master [Node]
_) -> forall (m :: * -> *) a. Monad m => a -> m a
return Node
master
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ [NodeID] -> MissingNodeException
MissingNodeException [NodeID]
request) (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => a -> m a
return) (forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup (Node -> NodeID
nodeId Node
node) HashMap NodeID NodeConnection
nodeConns)
where
allNodes :: IO [NodeConnection]
allNodes =
case Connection -> ShardMap -> Maybe [NodeConnection]
allMasterNodes Connection
conn (IntMap Shard -> ShardMap
ShardMap IntMap Shard
shardMap) of
Maybe [NodeConnection]
Nothing -> forall e a. Exception e => e -> IO a
throwIO forall a b. (a -> b) -> a -> b
$ [NodeID] -> MissingNodeException
MissingNodeException [NodeID]
request
Just [NodeConnection]
allNodes' -> forall (m :: * -> *) a. Monad m => a -> m a
return [NodeConnection]
allNodes'
allMasterNodes :: Connection -> ShardMap -> Maybe [NodeConnection]
allMasterNodes :: Connection -> ShardMap -> Maybe [NodeConnection]
allMasterNodes (Connection HashMap NodeID NodeConnection
nodeConns MVar Pipeline
_ MVar ShardMap
_ InfoMap
_) (ShardMap IntMap Shard
shardMap) =
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (forall a b c. (a -> b -> c) -> b -> a -> c
flip forall k v. (Eq k, Hashable k) => k -> HashMap k v -> Maybe v
HM.lookup HashMap NodeID NodeConnection
nodeConns forall b c a. (b -> c) -> (a -> b) -> a -> c
. Node -> NodeID
nodeId) [Node]
masterNodes
where
masterNodes :: [Node]
masterNodes = (\(Shard Node
master [Node]
_) -> Node
master) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. Eq a => [a] -> [a]
nub (forall a. IntMap a -> [a]
IntMap.elems IntMap Shard
shardMap)
requestNode :: NodeConnection -> [[B.ByteString]] -> IO [Reply]
requestNode :: NodeConnection -> [[NodeID]] -> IO [Reply]
requestNode (NodeConnection ConnectionContext
ctx IORef (Maybe NodeID)
lastRecvRef NodeID
_) [[NodeID]]
requests = do
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (NodeID -> IO ()
sendNode forall b c a. (b -> c) -> (a -> b) -> a -> c
. [NodeID] -> NodeID
renderRequest) [[NodeID]]
requests
()
_ <- ConnectionContext -> IO ()
CC.flush ConnectionContext
ctx
forall (m :: * -> *) a. Applicative m => Port -> m a -> m [a]
replicateM (forall (t :: * -> *) a. Foldable t => t a -> Port
length [[NodeID]]
requests) IO Reply
recvNode
where
sendNode :: B.ByteString -> IO ()
sendNode :: NodeID -> IO ()
sendNode = ConnectionContext -> NodeID -> IO ()
CC.send ConnectionContext
ctx
recvNode :: IO Reply
recvNode :: IO Reply
recvNode = do
Maybe NodeID
maybeLastRecv <- forall a. IORef a -> IO a
IOR.readIORef IORef (Maybe NodeID)
lastRecvRef
Result Reply
scanResult <- case Maybe NodeID
maybeLastRecv of
Just NodeID
lastRecv -> forall (m :: * -> *) a.
Monad m =>
m NodeID -> Scanner a -> NodeID -> m (Result a)
Scanner.scanWith (ConnectionContext -> IO NodeID
CC.recv ConnectionContext
ctx) Scanner Reply
reply NodeID
lastRecv
Maybe NodeID
Nothing -> forall (m :: * -> *) a.
Monad m =>
m NodeID -> Scanner a -> NodeID -> m (Result a)
Scanner.scanWith (ConnectionContext -> IO NodeID
CC.recv ConnectionContext
ctx) Scanner Reply
reply NodeID
B.empty
case Result Reply
scanResult of
Scanner.Fail{} -> forall a. IO a
CC.errConnClosed
Scanner.More{} -> forall a. HasCallStack => Host -> a
error Host
"Hedis: parseWith returned Partial"
Scanner.Done NodeID
rest' Reply
r -> do
forall a. IORef a -> a -> IO ()
IOR.writeIORef IORef (Maybe NodeID)
lastRecvRef (forall a. a -> Maybe a
Just NodeID
rest')
forall (m :: * -> *) a. Monad m => a -> m a
return Reply
r
nodes :: ShardMap -> [Node]
nodes :: ShardMap -> [Node]
nodes (ShardMap IntMap Shard
shardMap) = forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap forall a b. (a, b) -> b
snd forall a b. (a -> b) -> a -> b
$ forall a. IntMap a -> [(Port, a)]
IntMap.toList forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Shard -> [Node]
shardNodes IntMap Shard
shardMap where
shardNodes :: Shard -> [Node]
shardNodes :: Shard -> [Node]
shardNodes (Shard Node
master [Node]
slaves) = Node
masterforall a. a -> [a] -> [a]
:[Node]
slaves
nodeWithHostAndPort :: ShardMap -> Host -> Port -> Maybe Node
nodeWithHostAndPort :: ShardMap -> Host -> Port -> Maybe Node
nodeWithHostAndPort ShardMap
shardMap Host
host Port
port = forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (\(Node NodeID
_ NodeRole
_ Host
nodeHost Port
nodePort) -> Port
port forall a. Eq a => a -> a -> Bool
== Port
nodePort Bool -> Bool -> Bool
&& Host
host forall a. Eq a => a -> a -> Bool
== Host
nodeHost) (ShardMap -> [Node]
nodes ShardMap
shardMap)
nodeId :: Node -> NodeID
nodeId :: Node -> NodeID
nodeId (Node NodeID
theId NodeRole
_ Host
_ Port
_) = NodeID
theId
hasLocked :: IO a -> IO a
hasLocked :: forall a. IO a -> IO a
hasLocked IO a
action =
IO a
action forall a. IO a -> [Handler a] -> IO a
`catches`
[ forall a e. Exception e => (e -> IO a) -> Handler a
Handler forall a b. (a -> b) -> a -> b
$ \exc :: BlockedIndefinitelyOnMVar
exc@BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar -> forall e a. Exception e => e -> IO a
throwIO BlockedIndefinitelyOnMVar
exc
]