{-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} -------------------------------------------------------------------------------- -- | -- Module : Database.EventStore.Internal.Discovery -- Copyright : (C) 2016 Yorick Laupa -- License : (see the file LICENSE) -- -- Maintainer : Yorick Laupa -- Stability : provisional -- Portability : non-portable -- -------------------------------------------------------------------------------- module Database.EventStore.Internal.Discovery ( Discovery(..) , GossipSeed , DnsDiscoveryException(..) , ClusterSettings(..) , DnsServer(..) , EndPoint(..) , staticEndPointDiscovery , clusterDnsEndPointDiscovery , gossipSeedClusterSettings , simpleDnsEndPointDiscovery , dnsClusterSettings , gossipSeed , gossipSeedWithHeader , gossipSeedHeader , gossipSeedHost , gossipSeedPort ) where -------------------------------------------------------------------------------- import Prelude (String) import Data.Maybe -------------------------------------------------------------------------------- import Data.Aeson import Data.Aeson.Types import Data.Array.IO import Data.DotNet.TimeSpan import Data.List.NonEmpty (NonEmpty) import Data.UUID import Network.HTTP.Client import Network.DNS hiding (decode) import System.Random -------------------------------------------------------------------------------- import Database.EventStore.Internal.EndPoint import Database.EventStore.Internal.Prelude -------------------------------------------------------------------------------- data DnsDiscoveryException = MaxDiscoveryAttemptReached ByteString | DNSDiscoveryError DNSError deriving (Show, Typeable) -------------------------------------------------------------------------------- instance Exception DnsDiscoveryException -------------------------------------------------------------------------------- httpRequest :: EndPoint -> String -> IO Request httpRequest (EndPoint ip p) path = parseUrlThrow url where url = "http://" <> ip <> ":" <> show p <> path -------------------------------------------------------------------------------- -- | Represents a source of cluster gossip. data GossipSeed = GossipSeed { gossipEndpoint :: !EndPoint -- ^ The endpoint for the external HTTP endpoint of the gossip seed. The -- HTTP endpoint is used rather than the TCP endpoint because it is -- required for the client to exchange gossip with the server. -- standard port which should be used here in 2113. , gossipSeedHeader :: !String -- ^ The host header to be sent when requesting gossip. } deriving Show -------------------------------------------------------------------------------- -- | Creates a 'GossipSeed'. gossipSeed :: String -> Int -> GossipSeed gossipSeed h p = GossipSeed (EndPoint h p) "" -------------------------------------------------------------------------------- -- | Creates a 'GossipSeed' with a specific HTTP header. gossipSeedWithHeader :: String -> Int -> String -> GossipSeed gossipSeedWithHeader h p hd = GossipSeed (EndPoint h p) hd -------------------------------------------------------------------------------- -- | Returns 'GossipSeed' host IP address. gossipSeedHost :: GossipSeed -> String gossipSeedHost = endPointIp . gossipEndpoint -------------------------------------------------------------------------------- -- | Returns 'GossipSeed' port. gossipSeedPort :: GossipSeed -> Int gossipSeedPort = endPointPort . gossipEndpoint -------------------------------------------------------------------------------- emptyGossipSeed :: GossipSeed emptyGossipSeed = GossipSeed emptyEndPoint "" -------------------------------------------------------------------------------- -- | Procedure used to discover an network 'EndPoint'. newtype Discovery = Discovery { runDiscovery :: Maybe EndPoint -> IO (Maybe EndPoint) } -------------------------------------------------------------------------------- staticEndPointDiscovery :: String -> Int -> Discovery staticEndPointDiscovery host port = Discovery $ \_ -> return $ Just $ EndPoint host port -------------------------------------------------------------------------------- simpleDnsEndPointDiscovery :: ByteString -> Maybe DnsServer -> Int -> Discovery simpleDnsEndPointDiscovery domain srv port = Discovery $ \_ -> do let conf = case srv of Nothing -> defaultResolvConf Just tpe -> let rc = case tpe of DnsFilePath p -> RCFilePath p DnsHostName h -> RCHostName h DnsHostPort h p -> RCHostPort h (fromIntegral p) in defaultResolvConf { resolvInfo = rc } dnsSeed <- makeResolvSeed conf res <- withResolver dnsSeed $ \resv -> lookupA resv domain case res of Left e -> throwIO $ DNSDiscoveryError e Right ips -> do let pts = [ EndPoint (show ip) port | ip <- ips ] case pts of [] -> return Nothing pt:_ -> return $ Just pt -------------------------------------------------------------------------------- -- | Tells how the DNS server should be contacted. data DnsServer = DnsFilePath String | DnsHostName String | DnsHostPort String Int -------------------------------------------------------------------------------- -- | Contains settings related to a connection to a cluster. data ClusterSettings = ClusterSettings { clusterDns :: !ByteString -- ^ The DNS name to use for discovering endpoints. , clusterMaxDiscoverAttempts :: !Int -- ^ The maximum number of attempts for discovering endpoints. , clusterExternalGossipPort :: !Int -- ^ The well-known endpoint on which cluster managers are running. , clusterGossipSeeds :: (Maybe (NonEmpty GossipSeed)) -- ^ Endpoints for seeding gossip if not using DNS. , clusterGossipTimeout :: !TimeSpan -- ^ Timeout for cluster gossip. , clusterDnsServer :: !(Maybe DnsServer) -- ^ Indicates a specific DNS server should be contacted. } -------------------------------------------------------------------------------- -- | Configures a 'ClusterSettings' for connecting to a cluster using gossip -- seeds. -- clusterDns = "" -- clusterMaxDiscoverAttempts = 10 -- clusterExternalGossipPort = 0 -- clusterGossipTimeout = 1s gossipSeedClusterSettings :: NonEmpty GossipSeed -> ClusterSettings gossipSeedClusterSettings xs = ClusterSettings { clusterDns = "" , clusterMaxDiscoverAttempts = 10 , clusterExternalGossipPort = 0 , clusterGossipSeeds = Just xs , clusterGossipTimeout = fromSeconds 1 , clusterDnsServer = Nothing } -------------------------------------------------------------------------------- -- | Configures a 'ClusterSettings' for connecting to a cluster using DNS -- discovery. -- clusterMaxDiscoverAttempts = 10 -- clusterExternalGossipPort = 0 -- clusterGossipSeeds = Nothing -- clusterGossipTimeout = 1s dnsClusterSettings :: ByteString -> ClusterSettings dnsClusterSettings clusterDns = ClusterSettings{..} where clusterMaxDiscoverAttempts = 10 clusterExternalGossipPort = 0 clusterGossipSeeds = Nothing clusterGossipTimeout = fromSeconds 1 clusterDnsServer = Nothing -------------------------------------------------------------------------------- clusterDnsEndPointDiscovery :: ClusterSettings -> IO Discovery clusterDnsEndPointDiscovery settings = do ref <- newIORef Nothing manager <- newManager defaultManagerSettings return $ Discovery $ \fend -> discoverEndPoint manager ref fend settings -------------------------------------------------------------------------------- data VNodeState = Initializing | Unknown | PreReplica | CatchingUp | Clone | Slave | PreMaster | Master | Manager | ShuttingDown | Shutdown deriving (Eq, Ord, Generic, Show) -------------------------------------------------------------------------------- instance FromJSON VNodeState -------------------------------------------------------------------------------- newtype GUUID = GUUID UUID deriving Show -------------------------------------------------------------------------------- instance FromJSON GUUID where parseJSON (String txt) = case fromText txt of Just uuid -> return $ GUUID uuid _ -> fail $ "Wrong UUID format " <> show txt parseJSON invalid = typeMismatch "UUID" invalid -------------------------------------------------------------------------------- data MemberInfo = MemberInfo { _instanceId :: !GUUID , _state :: !VNodeState , _isAlive :: !Bool , _internalTcpIp :: !String , _internalTcpPort :: !Int , _externalTcpIp :: !String , _externalTcpPort :: !Int , _internalHttpIp :: !String , _internalHttpPort :: !Int , _externalHttpIp :: !String , _externalHttpPort :: !Int , _lastCommitPosition :: !Int64 , _writerCheckpoint :: !Int64 , _chaserCheckpoint :: !Int64 , _epochPosition :: !Int64 , _epochNumber :: !Int , _epochId :: !GUUID , _nodePriority :: !Int } deriving Show -------------------------------------------------------------------------------- instance FromJSON MemberInfo where parseJSON (Object m) = MemberInfo <$> m .: "instanceId" <*> m .: "state" <*> m .: "isAlive" <*> m .: "internalTcpIp" <*> m .: "internalTcpPort" <*> m .: "externalTcpIp" <*> m .: "externalTcpPort" <*> m .: "internalHttpIp" <*> m .: "internalHttpPort" <*> m .: "externalHttpIp" <*> m .: "externalHttpPort" <*> m .: "lastCommitPosition" <*> m .: "writerCheckpoint" <*> m .: "chaserCheckpoint" <*> m .: "epochPosition" <*> m .: "epochNumber" <*> m .: "epochId" <*> m .: "nodePriority" parseJSON invalid = typeMismatch "MemberInfo" invalid -------------------------------------------------------------------------------- data ClusterInfo = ClusterInfo { members :: [MemberInfo] } deriving (Show, Generic) -------------------------------------------------------------------------------- instance FromJSON ClusterInfo -------------------------------------------------------------------------------- discoverEndPoint :: Manager -> IORef (Maybe [MemberInfo]) -> Maybe EndPoint -> ClusterSettings -> IO (Maybe EndPoint) discoverEndPoint mgr ref fend settings = do old_m <- readIORef ref writeIORef ref Nothing candidates <- case old_m of Nothing -> gossipCandidatesFromDns settings Just old -> gossipCandidatesFromOldGossip fend old forArrayFirst candidates $ \i -> do c <- readArray candidates i res <- tryGetGossipFrom settings mgr c print (i, res) let fin_end = do info <- res best <- tryDetermineBestNode $ members info return (info, best) case fin_end of Nothing -> return Nothing Just (info, best) -> do writeIORef ref (Just $ members info) return $ Just best -------------------------------------------------------------------------------- tryGetGossipFrom :: ClusterSettings -> Manager -> GossipSeed -> IO (Maybe ClusterInfo) tryGetGossipFrom ClusterSettings{..} mgr seed = do init_req <- httpRequest (gossipEndpoint seed) "/gossip?format=json" let timeout = truncate (totalMillis clusterGossipTimeout * 1000) req = init_req { responseTimeout = responseTimeoutMicro timeout } resp <- httpLbs req mgr return $ decode $ responseBody resp -------------------------------------------------------------------------------- tryDetermineBestNode :: [MemberInfo] -> Maybe EndPoint tryDetermineBestNode members = node_m where nodes = [m | m <- members , _isAlive m , allowedState $ _state m ] node_m = case sortOn (Down . _state) nodes of [] -> Nothing n:_ -> Just $ EndPoint (_externalTcpIp n) (_externalTcpPort n) allowedState Manager = False allowedState ShuttingDown = False allowedState Shutdown = False allowedState _ = True -------------------------------------------------------------------------------- gossipCandidatesFromOldGossip :: Maybe EndPoint -> [MemberInfo] -> IO (IOArray Int GossipSeed) gossipCandidatesFromOldGossip fend_m oldGossip = arrangeGossipCandidates candidates where candidates = case fend_m of Nothing -> oldGossip Just fend -> [ c | c <- oldGossip , _externalTcpPort c == endPointPort fend , _externalTcpIp c == endPointIp fend ] -------------------------------------------------------------------------------- data AState = AState !Int !Int -------------------------------------------------------------------------------- arrangeGossipCandidates :: [MemberInfo] -> IO (IOArray Int GossipSeed) arrangeGossipCandidates members = do arr <- newArray (0, len) emptyGossipSeed AState i j <- foldM (go arr) (AState (-1) len) members shuffle arr 0 i -- shuffle nodes shuffle arr j (len - 1) -- shuffle managers return arr where len = length members go :: IOArray Int GossipSeed -> AState -> MemberInfo -> IO AState go arr (AState i j) m = case _state m of Manager -> do let new_j = j - 1 writeArray arr new_j seed return (AState i new_j) _ -> do let new_i = i + 1 writeArray arr new_i seed return (AState new_i j) where end = EndPoint (_externalHttpIp m) (_externalHttpPort m) seed = GossipSeed end "" -------------------------------------------------------------------------------- gossipCandidatesFromDns :: ClusterSettings -> IO (IOArray Int GossipSeed) gossipCandidatesFromDns settings@ClusterSettings{..} = do arr <- endpoints shuffleAll arr return arr where endpoints = case clusterGossipSeeds of Nothing -> resolveDns settings Just ss -> let ls = toList ss len = length ls in newListArray (0, len - 1) ls -------------------------------------------------------------------------------- resolveDns :: ClusterSettings -> IO (IOArray Int GossipSeed) resolveDns ClusterSettings{..} = do let timeoutMicros = totalMillis clusterGossipTimeout * 1000 conf = case clusterDnsServer of Nothing -> defaultResolvConf Just tpe -> let rc = case tpe of DnsFilePath p -> RCFilePath p DnsHostName h -> RCHostName h DnsHostPort h p -> RCHostPort h (fromIntegral p) in defaultResolvConf { resolvInfo = rc } dnsSeed <- makeResolvSeed conf { resolvTimeout = truncate timeoutMicros , resolvRetry = clusterMaxDiscoverAttempts } withResolver dnsSeed $ \resv -> do result <- lookupA resv clusterDns case result of Left e -> throwIO $ DNSDiscoveryError e Right ips -> do let len = length ips - 1 arr <- newArray_ (0, len) forM_ (zip [0..] ips) $ \(idx, ip) -> do let end = EndPoint (show ip) clusterExternalGossipPort seed = GossipSeed end "" writeArray arr idx seed return arr -------------------------------------------------------------------------------- shuffleAll :: IOArray Int a -> IO () shuffleAll arr = do (low, hig) <- getBounds arr shuffle arr low hig -------------------------------------------------------------------------------- shuffle :: IOArray Int a -> Int -> Int -> IO () shuffle arr from to = forRange_ from to $ \cur -> do idx <- randomRIO (cur, to) tmp <- readArray arr idx value <- readArray arr cur writeArray arr idx value writeArray arr cur tmp -------------------------------------------------------------------------------- forRange_ :: Int -> Int -> (Int -> IO ()) -> IO () forRange_ from to k = do when (from <= to) $ loop (to + 1) from where loop len cur | len == cur = return () | otherwise = do k cur loop len (cur + 1) -------------------------------------------------------------------------------- forArrayFirst :: IOArray Int a -> (Int -> IO (Maybe b)) -> IO (Maybe b) forArrayFirst arr k = do (low, hig) <- getBounds arr forRangeFirst low hig k -------------------------------------------------------------------------------- forRangeFirst :: Int -> Int -> (Int -> IO (Maybe b)) -> IO (Maybe b) forRangeFirst from to k = do if from <= to then loop (to + 1) from else return Nothing where loop len cur | len == cur = return Nothing | otherwise = do res <- k cur if isJust res then return res else loop len (cur + 1)