{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Database.EventStore.Internal.Discovery
( Discovery(..)
, GossipSeed
, DnsDiscoveryException(..)
, ClusterSettings(..)
, DnsServer(..)
, EndPoint(..)
, staticEndPointDiscovery
, clusterDnsEndPointDiscovery
, gossipSeedClusterSettings
, simpleDnsEndPointDiscovery
, dnsClusterSettings
, gossipSeed
, gossipSeedWithHeader
, gossipSeedHeader
, gossipSeedHost
, gossipSeedPort
) where
import Prelude (String)
import Data.Maybe
import Control.Exception.Safe (tryAny)
import Data.Aeson
import Data.Aeson.Types
import Data.Array.IO
import Data.DotNet.TimeSpan
import Data.List.NonEmpty (NonEmpty)
import Data.UUID
import Network.HTTP.Client
import Network.DNS hiding (decode)
import System.Random
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.EndPoint
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Prelude
data DnsDiscoveryException
= MaxDiscoveryAttemptReached ByteString
| DNSDiscoveryError DNSError
deriving (Show, Typeable)
instance Exception DnsDiscoveryException
httpRequest :: EndPoint -> String -> IO Request
httpRequest (EndPoint ip p) path = parseUrlThrow url
where
url = "http://" <> ip <> ":" <> show p <> path
data GossipSeed =
GossipSeed
{ gossipEndpoint :: !EndPoint
, gossipSeedHeader :: !String
} deriving Show
gossipSeed :: String -> Int -> GossipSeed
gossipSeed h p = GossipSeed (EndPoint h p) ""
gossipSeedWithHeader :: String -> Int -> String -> GossipSeed
gossipSeedWithHeader h p hd = GossipSeed (EndPoint h p) hd
gossipSeedHost :: GossipSeed -> String
gossipSeedHost = endPointIp . gossipEndpoint
gossipSeedPort :: GossipSeed -> Int
gossipSeedPort = endPointPort . gossipEndpoint
emptyGossipSeed :: GossipSeed
emptyGossipSeed = GossipSeed emptyEndPoint ""
newtype Discovery =
Discovery { runDiscovery :: Maybe EndPoint -> EventStore (Maybe EndPoint) }
staticEndPointDiscovery :: String -> Int -> Discovery
staticEndPointDiscovery host port =
Discovery $ \_ -> return $ Just $ EndPoint host port
simpleDnsEndPointDiscovery :: ByteString -> Maybe DnsServer -> Int -> Discovery
simpleDnsEndPointDiscovery domain srv port = Discovery $ \_ -> do
let conf =
case srv of
Nothing -> defaultResolvConf
Just tpe ->
let rc =
case tpe of
DnsFilePath p -> RCFilePath p
DnsHostName h -> RCHostName h
DnsHostPort h p -> RCHostPort h (fromIntegral p)
in defaultResolvConf { resolvInfo = rc }
dnsSeed <- liftIO $ makeResolvSeed conf
res <- liftIO $ withResolver dnsSeed $ \resv -> lookupA resv domain
case res of
Left e -> throwIO $ DNSDiscoveryError e
Right ips -> do
let pts = [ EndPoint (show ip) port | ip <- ips ]
case pts of
[] -> return Nothing
pt:_ -> return $ Just pt
data DnsServer
= DnsFilePath String
| DnsHostName String
| DnsHostPort String Int
data ClusterSettings =
ClusterSettings
{ clusterDns :: !ByteString
, clusterMaxDiscoverAttempts :: !Int
, clusterExternalGossipPort :: !Int
, clusterGossipSeeds :: (Maybe (NonEmpty GossipSeed))
, clusterGossipTimeout :: !TimeSpan
, clusterDnsServer :: !(Maybe DnsServer)
}
gossipSeedClusterSettings :: NonEmpty GossipSeed -> ClusterSettings
gossipSeedClusterSettings xs =
ClusterSettings
{ clusterDns = ""
, clusterMaxDiscoverAttempts = 10
, clusterExternalGossipPort = 0
, clusterGossipSeeds = Just xs
, clusterGossipTimeout = fromSeconds 1
, clusterDnsServer = Nothing
}
dnsClusterSettings :: ByteString -> ClusterSettings
dnsClusterSettings clusterDns = ClusterSettings{..}
where
clusterMaxDiscoverAttempts = 10
clusterExternalGossipPort = 0
clusterGossipSeeds = Nothing
clusterGossipTimeout = fromSeconds 1
clusterDnsServer = Nothing
clusterDnsEndPointDiscovery :: ClusterSettings -> IO Discovery
clusterDnsEndPointDiscovery settings = do
ref <- newIORef Nothing
manager <- newManager defaultManagerSettings
return $ Discovery $ \fend -> discoverEndPoint manager ref fend settings
data VNodeState
= Initializing
| Unknown
| PreReplica
| CatchingUp
| Clone
| Slave
| PreMaster
| Master
| Manager
| ShuttingDown
| Shutdown
deriving (Eq, Ord, Generic, Show)
instance FromJSON VNodeState
newtype GUUID = GUUID UUID deriving Show
instance FromJSON GUUID where
parseJSON (String txt) =
case fromText txt of
Just uuid -> return $ GUUID uuid
_ -> fail $ "Wrong UUID format " <> show txt
parseJSON invalid = typeMismatch "UUID" invalid
data MemberInfo =
MemberInfo
{ _instanceId :: !GUUID
, _state :: !VNodeState
, _isAlive :: !Bool
, _internalTcpIp :: !String
, _internalTcpPort :: !Int
, _externalTcpIp :: !String
, _externalTcpPort :: !Int
, _internalHttpIp :: !String
, _internalHttpPort :: !Int
, _externalHttpIp :: !String
, _externalHttpPort :: !Int
, _lastCommitPosition :: !Int64
, _writerCheckpoint :: !Int64
, _chaserCheckpoint :: !Int64
, _epochPosition :: !Int64
, _epochNumber :: !Int
, _epochId :: !GUUID
, _nodePriority :: !Int
} deriving Show
instance FromJSON MemberInfo where
parseJSON (Object m) =
MemberInfo
<$> m .: "instanceId"
<*> m .: "state"
<*> m .: "isAlive"
<*> m .: "internalTcpIp"
<*> m .: "internalTcpPort"
<*> m .: "externalTcpIp"
<*> m .: "externalTcpPort"
<*> m .: "internalHttpIp"
<*> m .: "internalHttpPort"
<*> m .: "externalHttpIp"
<*> m .: "externalHttpPort"
<*> m .: "lastCommitPosition"
<*> m .: "writerCheckpoint"
<*> m .: "chaserCheckpoint"
<*> m .: "epochPosition"
<*> m .: "epochNumber"
<*> m .: "epochId"
<*> m .: "nodePriority"
parseJSON invalid = typeMismatch "MemberInfo" invalid
data ClusterInfo =
ClusterInfo { members :: [MemberInfo] }
deriving (Show, Generic)
instance FromJSON ClusterInfo
discoverEndPoint :: Manager
-> IORef (Maybe [MemberInfo])
-> Maybe EndPoint
-> ClusterSettings
-> EventStore (Maybe EndPoint)
discoverEndPoint mgr ref fend settings = do
old_m <- readIORef ref
writeIORef ref Nothing
candidates <- case old_m of
Nothing -> gossipCandidatesFromDns settings
Just old -> liftIO $ gossipCandidatesFromOldGossip fend old
forArrayFirst candidates $ \idx -> do
c <- liftIO $ readArray candidates idx
res <- tryGetGossipFrom settings mgr c
let fin_end = do
info <- res
best <- tryDetermineBestNode $ members info
return (info, best)
case fin_end of
Nothing -> return Nothing
Just (info, best) -> do
writeIORef ref (Just $ members info)
return $ Just best
tryGetGossipFrom :: ClusterSettings
-> Manager
-> GossipSeed
-> EventStore (Maybe ClusterInfo)
tryGetGossipFrom ClusterSettings{..} mgr seed = do
init_req <- liftIO $ httpRequest (gossipEndpoint seed) "/gossip?format=json"
let timeout = truncate (totalMillis clusterGossipTimeout * 1000)
req = init_req { responseTimeout = responseTimeoutMicro timeout }
eithResp <- tryAny $ liftIO $ httpLbs req mgr
case eithResp of
Right resp -> return $ decode $ responseBody resp
Left err -> do
$logInfo [i|Failed to get cluster info from [#{seed}], error: #{err}.|]
pure Nothing
tryDetermineBestNode :: [MemberInfo] -> Maybe EndPoint
tryDetermineBestNode members = node_m
where
nodes = [m | m <- members
, _isAlive m
, allowedState $ _state m
]
node_m =
case sortOn (Down . _state) nodes of
[] -> Nothing
n:_ -> Just $ EndPoint (_externalTcpIp n) (_externalTcpPort n)
allowedState Manager = False
allowedState ShuttingDown = False
allowedState Shutdown = False
allowedState _ = True
gossipCandidatesFromOldGossip :: Maybe EndPoint
-> [MemberInfo]
-> IO (IOArray Int GossipSeed)
gossipCandidatesFromOldGossip fend_m oldGossip =
arrangeGossipCandidates candidates
where
candidates =
case fend_m of
Nothing -> oldGossip
Just fend -> [ c | c <- oldGossip
, EndPoint (_externalTcpIp c) (_externalTcpPort c) /= fend
]
data AState = AState !Int !Int
arrangeGossipCandidates :: [MemberInfo] -> IO (IOArray Int GossipSeed)
arrangeGossipCandidates members = do
arr <- newArray (0, len) emptyGossipSeed
AState idx j <- foldM (go arr) (AState (-1) len) members
shuffle arr 0 idx
shuffle arr j (len - 1)
return arr
where
len = length members
go :: IOArray Int GossipSeed -> AState -> MemberInfo -> IO AState
go arr (AState idx j) m =
case _state m of
Manager -> do
let new_j = j - 1
writeArray arr new_j seed
return (AState idx new_j)
_ -> do
let new_i = idx + 1
writeArray arr new_i seed
return (AState new_i j)
where
end = EndPoint (_externalHttpIp m) (_externalHttpPort m)
seed = GossipSeed end ""
gossipCandidatesFromDns :: ClusterSettings -> EventStore (IOArray Int GossipSeed)
gossipCandidatesFromDns settings@ClusterSettings{..} = do
arr <- endpoints
liftIO $ shuffleAll arr
return arr
where
endpoints =
case clusterGossipSeeds of
Nothing -> resolveDns settings
Just ss -> let ls = toList ss
len = length ls
in liftIO $ newListArray (0, len - 1) ls
resolveDns :: ClusterSettings -> EventStore (IOArray Int GossipSeed)
resolveDns ClusterSettings{..} = do
let timeoutMicros = totalMillis clusterGossipTimeout * 1000
conf =
case clusterDnsServer of
Nothing -> defaultResolvConf
Just tpe ->
let rc =
case tpe of
DnsFilePath p -> RCFilePath p
DnsHostName h -> RCHostName h
DnsHostPort h p -> RCHostPort h (fromIntegral p)
in defaultResolvConf { resolvInfo = rc }
dnsSeed <- liftIO $ makeResolvSeed conf
{ resolvTimeout = truncate timeoutMicros
, resolvRetry = clusterMaxDiscoverAttempts
}
liftIO $ withResolver dnsSeed $ \resv -> do
result <- lookupA resv clusterDns
case result of
Left e -> throwIO $ DNSDiscoveryError e
Right ips -> do
let len = length ips - 1
arr <- newArray_ (0, len)
forM_ (zip [0..] ips) $ \(idx, ip) -> do
let end = EndPoint (show ip) clusterExternalGossipPort
seed = GossipSeed end ""
writeArray arr idx seed
return arr
shuffleAll :: IOArray Int a -> IO ()
shuffleAll arr = do
(low, hig) <- getBounds arr
shuffle arr low hig
shuffle :: IOArray Int a -> Int -> Int -> IO ()
shuffle arr from to = forRange_ from to $ \cur -> do
idx <- randomRIO (cur, to)
tmp <- readArray arr idx
value <- readArray arr cur
writeArray arr idx value
writeArray arr cur tmp
forRange_ :: Int -> Int -> (Int -> IO ()) -> IO ()
forRange_ from to k = do
when (from <= to) $ loop (to + 1) from
where
loop len cur
| len == cur = return ()
| otherwise = do
k cur
loop len (cur + 1)
forArrayFirst :: IOArray Int a
-> (Int -> EventStore (Maybe b))
-> EventStore (Maybe b)
forArrayFirst arr k = do
(low, hig) <- liftIO $ getBounds arr
forRangeFirst low hig k
forRangeFirst :: Int
-> Int
-> (Int -> EventStore (Maybe b))
-> EventStore (Maybe b)
forRangeFirst from to k = do
if from <= to then loop (to + 1) from else return Nothing
where
loop len cur
| len == cur = return Nothing
| otherwise = do
res <- k cur
if isJust res then return res else loop len (cur + 1)