-- This Source Code Form is subject to the terms of the Mozilla Public -- License, v. 2.0. If a copy of the MPL was not distributed with this -- file, You can obtain one at http://mozilla.org/MPL/2.0/. {-# LANGUAGE BangPatterns #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE TupleSections #-} {-# LANGUAGE ViewPatterns #-} module Database.CQL.IO.Client ( Client , MonadClient (..) , ClientState , DebugInfo (..) , ControlState (..) , runClient , init , shutdown , request , requestN , request1 , execute , executeWithPrepare , prepare , retry , once , debugInfo , preparedQueries , withPrepareStrategy , getResult , unexpected , C.defQueryParams ) where import Control.Applicative import Control.Concurrent (threadDelay, forkIO) import Control.Concurrent.Async (async, wait) import Control.Concurrent.STM (STM, atomically) import Control.Concurrent.STM.TVar import Control.Exception (IOException, SomeAsyncException (..)) import Control.Lens (makeLenses, (^.), set, over, view) import Control.Monad (when, unless) import Control.Monad.Catch import Control.Monad.IO.Class import Control.Monad.IO.Unlift import Control.Monad.Reader (ReaderT (..), runReaderT, MonadReader, ask) import Control.Monad.Trans.Class import Control.Monad.Trans.Except import Control.Retry (capDelay, exponentialBackoff, rsIterNumber) import Control.Retry (recovering) import Data.Foldable (for_, foldrM) import Data.List (find) import Data.Map.Strict (Map) import Data.Maybe (fromMaybe, listToMaybe) import Data.Semigroup import Data.Text.Encoding (encodeUtf8) import Data.Word import Database.CQL.IO.Cluster.Host import Database.CQL.IO.Cluster.Policies import Database.CQL.IO.Connection (Connection, host, Raw) import Database.CQL.IO.Connection.Settings import Database.CQL.IO.Exception import Database.CQL.IO.Jobs import Database.CQL.IO.Log import Database.CQL.IO.Pool (Pool) import Database.CQL.IO.PrepQuery (PrepQuery, PreparedQueries) import Database.CQL.IO.Settings import Database.CQL.IO.Signal import Database.CQL.IO.Timeouts (TimeoutManager) import Database.CQL.Protocol hiding (Map) import OpenSSL.Session (SomeSSLException) import Prelude hiding (init) import qualified Control.Monad.Reader as Reader import qualified Control.Monad.State.Strict as S import qualified Control.Monad.State.Lazy as LS import qualified Data.List.NonEmpty as NE import qualified Data.Map.Strict as Map import qualified Database.CQL.IO.Cluster.Discovery as Disco import qualified Database.CQL.IO.Connection as C import qualified Database.CQL.IO.Pool as Pool import qualified Database.CQL.IO.PrepQuery as PQ import qualified Database.CQL.IO.Timeouts as TM import qualified Database.CQL.Protocol as Cql data ControlState = Connected | Reconnecting | Disconnected deriving (Eq, Ord, Show) data Control = Control { _state :: !ControlState , _connection :: !Connection } data Context = Context { _settings :: !Settings , _timeouts :: !TimeoutManager , _sigMonit :: !(Signal HostEvent) } -- | Opaque client state/environment. data ClientState = ClientState { _context :: !Context , _policy :: !Policy , _prepQueries :: !PreparedQueries , _control :: !(TVar Control) , _hostmap :: !(TVar (Map Host Pool)) , _jobs :: !(Jobs InetAddr) } makeLenses ''Control makeLenses ''Context makeLenses ''ClientState -- | The Client monad. -- -- A simple reader monad on `IO` around some internal state. Prior to executing -- this monad via 'runClient', its state must be initialised through -- 'Database.CQL.IO.Client.init' and after finishing operation it should be -- terminated with 'shutdown'. -- -- To lift 'Client' actions into another monad, see 'MonadClient'. newtype Client a = Client { client :: ReaderT ClientState IO a } deriving ( Functor , Applicative , Monad , MonadIO , MonadUnliftIO , MonadThrow , MonadCatch , MonadMask , MonadReader ClientState ) -- | Monads in which 'Client' actions may be embedded. class (MonadIO m, MonadThrow m) => MonadClient m where -- | Lift a computation from the 'Client' monad. liftClient :: Client a -> m a -- | Execute an action with a modified 'ClientState'. localState :: (ClientState -> ClientState) -> m a -> m a instance MonadClient Client where liftClient = id localState = Reader.local instance MonadClient m => MonadClient (ReaderT r m) where liftClient = lift . liftClient localState f m = ReaderT (localState f . runReaderT m) instance MonadClient m => MonadClient (S.StateT s m) where liftClient = lift . liftClient localState f m = S.StateT (localState f . S.runStateT m) instance MonadClient m => MonadClient (LS.StateT s m) where liftClient = lift . liftClient localState f m = LS.StateT (localState f . LS.runStateT m) instance MonadClient m => MonadClient (ExceptT e m) where liftClient = lift . liftClient localState f m = ExceptT $ localState f (runExceptT m) ----------------------------------------------------------------------------- -- API -- | Execute the client monad. runClient :: MonadIO m => ClientState -> Client a -> m a runClient p a = liftIO $ runReaderT (client a) p -- | Use given 'RetrySettings' during execution of some client action. retry :: MonadClient m => RetrySettings -> m a -> m a retry r = localState (set (context.settings.retrySettings) r) -- | Execute a client action once, without retries, i.e. -- -- @once action = retry noRetry action@. -- -- Primarily for use in applications where global 'RetrySettings' -- are configured and need to be selectively disabled for individual -- queries. once :: MonadClient m => m a -> m a once = retry noRetry -- | Change the default 'PrepareStrategy' for the given client action. withPrepareStrategy :: MonadClient m => PrepareStrategy -> m a -> m a withPrepareStrategy s = localState (set (context.settings.prepStrategy) s) -- | Send a 'Request' to the server and return a 'Response'. -- -- This function will first ask the clients load-balancing 'Policy' for -- some host and use its connection pool to acquire a connection for -- request transmission. -- -- If all available hosts are busy (i.e. their connection pools are fully -- utilised), the function will block until a connection becomes available -- or the maximum wait-queue length has been reached. -- -- The request is retried according to the configured 'RetrySettings'. request :: (MonadClient m, Tuple a, Tuple b) => Request k a b -> m (HostResponse k a b) request a = liftClient $ do n <- liftIO . hostCount =<< view policy withRetries (requestN n) a -- | Send a request to a host chosen by the configured host policy. -- -- Tries up to @max(1,n)@ hosts. If no host can execute the request, -- a 'HostError' is thrown. Specifically: -- -- * If no host is available from the 'Policy', 'NoHostAvailable' is thrown. -- * If no host can execute the request, e.g. because all streams -- on all connections are occupied, 'HostsBusy' is thrown. requestN :: (Tuple b, Tuple a) => Word -> Request k a b -> ClientState -> Client (HostResponse k a b) requestN !n a s = liftIO (select (s^.policy)) >>= \case Nothing -> replaceControl >> throwM NoHostAvailable Just h -> tryRequest1 h a s >>= \case Just hr -> return hr Nothing -> if n > 1 then requestN (n - 1) a s else throwM HostsBusy -- | Send a 'Request' to a specific 'Host'. -- -- If the request cannot be executed on the given host, e.g. -- because all connections are occupied, 'HostsBusy' is thrown. request1 :: (Tuple a, Tuple b) => Host -> Request k a b -> ClientState -> Client (HostResponse k a b) request1 h r s = do rs <- tryRequest1 h r s maybe (throwM HostsBusy) return rs -- | Try to send a 'Request' to a specific 'Host'. -- -- If the request cannot be executed on the given host, e.g. -- because all connections are occupied, 'Nothing' is returned. tryRequest1 :: (Tuple a, Tuple b) => Host -> Request k a b -> ClientState -> Client (Maybe (HostResponse k a b)) tryRequest1 h a s = do pool <- Map.lookup h <$> readTVarIO' (s^.hostmap) case pool of Just p -> do result <- Pool.with p exec `catches` handlers for_ result $ \(HostResponse _ r) -> for_ (Cql.warnings r) $ \w -> logWarn' $ "Server warning: " <> byteString (encodeUtf8 w) return result Nothing -> do logError' $ "No pool for host: " <> string8 (show h) p' <- mkPool (s^.context) h atomically' $ modifyTVar' (s^.hostmap) (Map.alter (maybe (Just p') Just) h) tryRequest1 h a s where exec c = do r <- C.request c a return $ HostResponse h r handlers = [ Handler $ \(e :: ConnectionError) -> onConnectionError e , Handler $ \(e :: IOException) -> onConnectionError e , Handler $ \(e :: SomeSSLException) -> onConnectionError e ] onConnectionError exc = do e <- ask logWarn' (string8 (show exc)) -- Tell the policy that the host is down until monitoring confirms -- it is still up, which will be signalled by a subsequent 'HostUp' -- event. liftIO $ ignore $ onEvent (e^.policy) (HostDown (h^.hostAddr)) runJob_ (e^.jobs) (h^.hostAddr) $ runClient e $ monitor (Ms 0) (Ms 30000) h -- Any connection error may indicate a problem with the -- control connection, if it uses the same host. ch <- fmap (view (connection.host)) . readTVarIO' =<< view control when (h == ch) $ do ok <- checkControl unless ok replaceControl throwM exc ------------------------------------------------------------------------------ -- Prepared queries -- | Execute the given request. If an 'Unprepared' error is returned, this -- function will automatically try to re-prepare the query and re-execute -- the original request using the same host which was used for re-preparation. executeWithPrepare :: (Tuple b, Tuple a) => Maybe Host -> Request k a b -> Client (HostResponse k a b) executeWithPrepare mh rq | Just h <- mh = exec (request1 h) | otherwise = do p <- view policy n <- liftIO $ hostCount p exec (requestN n) where exec action = do r <- withRetries action rq case hrResponse r of RsError _ _ (Unprepared _ i) -> do pq <- preparedQueries qs <- atomically' (PQ.lookupQueryString (QueryId i) pq) case qs of Nothing -> throwM $ UnexpectedQueryId (QueryId i) Just s -> do (h, _) <- prepare (Just LazyPrepare) (s :: Raw QueryString) executeWithPrepare (Just h) rq _ -> return r -- | Prepare the given query according to the given 'PrepareStrategy', -- returning the resulting 'QueryId' and 'Host' which was used for -- preparation. prepare :: (Tuple b, Tuple a) => Maybe PrepareStrategy -> QueryString k a b -> Client (Host, QueryId k a b) prepare (Just LazyPrepare) qs = do s <- ask n <- liftIO $ hostCount (s^.policy) r <- withRetries (requestN n) (RqPrepare (Prepare qs)) getPreparedQueryId r prepare (Just EagerPrepare) qs = view policy >>= liftIO . current >>= mapM (action (RqPrepare (Prepare qs))) >>= first where action rq h = withRetries (request1 h) rq >>= getPreparedQueryId first (x:_) = return x first [] = replaceControl >> throwM NoHostAvailable prepare Nothing qs = do ps <- view (context.settings.prepStrategy) prepare (Just ps) qs -- | Execute a prepared query (transparently re-preparing if necessary). execute :: (Tuple b, Tuple a) => PrepQuery k a b -> QueryParams a -> Client (HostResponse k a b) execute q p = do pq <- view prepQueries maybe (new pq) (exec Nothing) =<< atomically' (PQ.lookupQueryId q pq) where exec h i = executeWithPrepare h (RqExecute (Execute i p)) new pq = do (h, i) <- prepare (Just LazyPrepare) (PQ.queryString q) atomically' (PQ.insert q i pq) exec (Just h) i prepareAllQueries :: Host -> Client () prepareAllQueries h = do pq <- view prepQueries qs <- atomically' $ PQ.queryStrings pq for_ qs $ \q -> let qry = QueryString q :: Raw QueryString in withRetries (request1 h) (RqPrepare (Prepare qry)) ------------------------------------------------------------------------------ -- Debug info data DebugInfo = DebugInfo { policyInfo :: String -- ^ Host 'Policy' string representation. , jobInfo :: [InetAddr] -- ^ Hosts with running background jobs (e.g. monitoring of hosts -- currently considered down). , hostInfo :: [Host] -- ^ All known hosts. , controlInfo :: (Host, ControlState) -- ^ Control connection information. } instance Show DebugInfo where show dbg = showString "running jobs: " . shows (jobInfo dbg) . showString "\nknown hosts: " . shows (hostInfo dbg) . showString "\npolicy info: " . shows (policyInfo dbg) . showString "\ncontrol host: " . shows (controlInfo dbg) $ "" debugInfo :: MonadClient m => m DebugInfo debugInfo = liftClient $ do hosts <- Map.keys <$> (readTVarIO' =<< view hostmap) pols <- liftIO . display =<< view policy jbs <- listJobKeys =<< view jobs ctrl <- (\(Control s c) -> (c^.host, s)) <$> (readTVarIO' =<< view control) return $ DebugInfo pols jbs hosts ctrl preparedQueries :: Client PreparedQueries preparedQueries = view prepQueries ----------------------------------------------------------------------------- -- Initialisation -- | Initialise client state with the given 'Settings' using the provided -- 'Logger' for all it's logging output. init :: MonadIO m => Settings -> m ClientState init s = liftIO $ do tom <- TM.create (Ms 250) ctx <- Context s tom <$> signal bracketOnError (mkContact ctx) C.close $ \con -> do pol <- s^.policyMaker cst <- ClientState ctx <$> pure pol <*> PQ.new <*> newTVarIO (Control Connected con) <*> newTVarIO Map.empty <*> newJobs ctx^.sigMonit |-> onEvent pol runClient cst (setupControl con) return cst -- | Try to establish a connection to one of the initial contacts. mkContact :: Context -> IO Connection mkContact (Context s t _) = tryAll (s^.contacts) mkConnection where mkConnection h = do as <- C.resolve h (s^.portnumber) NE.fromList as `tryAll` doConnect doConnect a = do logDebug (s^.logger) $ "Connecting to " <> string8 (show a) c <- C.connect (s^.connSettings) t (s^.protoVersion) (s^.logger) (Host a "" "") return c discoverPeers :: MonadIO m => Context -> Connection -> m [Host] discoverPeers ctx c = liftIO $ do let p = ctx^.settings.portnumber map (peer2Host p . asRecord) <$> C.query c One Disco.peers () mkPool :: MonadIO m => Context -> Host -> m Pool mkPool ctx h = liftIO $ do let s = ctx^.settings let m = s^.connSettings.maxStreams Pool.create (connOpen s) connClose (ctx^.settings.logger) (s^.poolSettings) m where lgr = ctx^.settings.logger connOpen s = do c <- C.connect (s^.connSettings) (ctx^.timeouts) (s^.protoVersion) lgr h logDebug lgr $ "Connection established: " <> string8 (show c) return c connClose c = do C.close c logDebug lgr $ "Connection closed: " <> string8 (show c) ----------------------------------------------------------------------------- -- Termination -- | Terminate client state, i.e. end all running background checks and -- shutdown all connection pools. Once this is entered, the client -- will eventually be shut down, though an asynchronous exception can -- interrupt the wait for that to occur. shutdown :: MonadIO m => ClientState -> m () shutdown s = liftIO $ asyncShutdown >>= wait where asyncShutdown = async $ do TM.destroy (s^.context.timeouts) True cancelJobs (s^.jobs) ignore $ C.close . view connection =<< readTVarIO (s^.control) mapM_ Pool.destroy . Map.elems =<< readTVarIO (s^.hostmap) ----------------------------------------------------------------------------- -- Monitoring -- | @monitor initialDelay maxDelay host@ tries to establish a connection -- to @host@ after @initialDelay@. If the connection attempt fails, it is -- retried with exponentially increasing delays, up to a maximum delay of -- @maxDelay@. When a connection attempt suceeds, a 'HostUp' event is -- signalled. -- -- The function returns when one of the following conditions is met: -- -- 1. The connection attempt suceeds. -- 2. The host is no longer found to be in the client's known host map. -- -- I.e. as long as the host is still known to the client and is unreachable, the -- connection attempts continue. Both @initialDelay@ and @maxDelay@ are bounded -- by a limit of 5 minutes. monitor :: Milliseconds -> Milliseconds -> Host -> Client () monitor initial maxDelay h = do liftIO $ threadDelay (toMicros initial) logInfo' $ "Monitoring: " <> string8 (show h) hostCheck 0 where hostCheck :: Int -> Client () hostCheck !n = do hosts <- liftIO . readTVarIO =<< view hostmap when (Map.member h hosts) $ do isUp <- C.canConnect h if isUp then do sig <- view (context.sigMonit) liftIO $ sig $$ (HostUp (h^.hostAddr)) logInfo' $ "Reachable: " <> string8 (show h) else do logInfo' $ "Unreachable: " <> string8 (show h) liftIO $ threadDelay (2^n * minDelay) hostCheck (min (n + 1) maxExp) -- Bounded to 5min toMicros :: Milliseconds -> Int toMicros (Ms s) = min (s * 1000) (5 * 60 * 1000000) minDelay :: Int minDelay = 50000 -- 50ms maxExp :: Int maxExp = let steps = fromIntegral (toMicros maxDelay `div` minDelay) :: Double in floor (logBase 2 steps) ----------------------------------------------------------------------------- -- Exception handling -- [Note: Error responses] -- Cassandra error responses are locally thrown as 'ResponseError's to achieve -- a unified handling of retries in the context of a single retry policy, -- together with other recoverable (i.e. retryable) exceptions. However, this -- is just an internal technicality for handling retries - generally error -- responses must not escape this function as exceptions. Deciding if and when -- to actually throw a 'ResponseError' upon inspection of the 'HostResponse' -- must be left to the caller. withRetries :: (Tuple a, Tuple b) => (Request k a b -> ClientState -> Client (HostResponse k a b)) -> Request k a b -> Client (HostResponse k a b) withRetries fn a = do s <- ask let how = s^.context.settings.retrySettings.retryPolicy let what = s^.context.settings.retrySettings.retryHandlers r <- try $ recovering how what $ \i -> do hr <- if rsIterNumber i == 0 then fn a s else fn (newRequest s) (adjust s) -- [Note: Error responses] maybe (return hr) throwM (toResponseError hr) return $ either fromResponseError id r where adjust s = let Ms x = s^.context.settings.retrySettings.sendTimeoutChange Ms y = s^.context.settings.retrySettings.recvTimeoutChange in over (context.settings.connSettings.sendTimeout) (Ms . (+ x) . ms) . over (context.settings.connSettings.responseTimeout) (Ms . (+ y) . ms) $ s newRequest s = case s^.context.settings.retrySettings.reducedConsistency of Nothing -> a Just c -> case a of RqQuery (Query q p) -> RqQuery (Query q p { consistency = c }) RqExecute (Execute q p) -> RqExecute (Execute q p { consistency = c }) RqBatch b -> RqBatch b { batchConsistency = c } _ -> a ------------------------------------------------------------------------------ -- Control connection handling -- -- The control connection is dedicated to maintaining the client's -- view of the cluster topology. There is a single control connection in a -- client's 'ClientState' at any particular time. -- | Setup and install the given connection as the new control -- connection, replacing the current one. setupControl :: Connection -> Client () setupControl c = do env <- ask pol <- view policy ctx <- view context l <- updateHost (c^.host) . listToMaybe <$> C.query c One Disco.local () r <- discoverPeers ctx c (up, down) <- mkHostMap ctx pol (l:r) m <- view hostmap let h = Map.union up down atomically' $ writeTVar m h liftIO $ setup pol (Map.keys up) (Map.keys down) C.register c C.allEventTypes (runClient env . onCqlEvent) logInfo' $ "Known hosts: " <> string8 (show (Map.keys h)) j <- view jobs for_ (Map.keys down) $ \d -> runJob j (d^.hostAddr) $ runClient env $ monitor (Ms 1000) (Ms 60000) d ctl <- view control let c' = set C.host l c atomically' $ writeTVar ctl (Control Connected c') logInfo' $ "New control connection: " <> string8 (show c') -- | Initialise connection pools for the given hosts, checking for -- acceptability with the host policy and separating them by reachability. mkHostMap :: Context -> Policy -> [Host] -> Client (Map Host Pool, Map Host Pool) mkHostMap c p = liftIO . foldrM checkHost (Map.empty, Map.empty) where checkHost h (up, down) = do okay <- acceptable p h if okay then do isUp <- C.canConnect h if isUp then do up' <- Map.insert h <$> mkPool c h <*> pure up return (up', down) else do down' <- Map.insert h <$> mkPool c h <*> pure down return (up, down') else return (up, down) -- | Check if the control connection is healthy. checkControl :: Client Bool checkControl = do cc <- view connection <$> (readTVarIO' =<< view control) rs <- liftIO $ C.requestRaw cc (RqOptions Options) return $ case rs of RsSupported {} -> True _ -> False `recover` False -- | Asynchronously replace the control connection. -- -- Invariants: -- -- 1) When the control connection is in state 'Reconnecting' there -- is a thread running that attempts to establish a new control -- connection. -- -- 2) There is only one thread performing a reconnect at a time. -- -- To that end, the 'ControlState' acts as a mutex that is acquired -- in state 'Reconnecting' and must eventually be released by either -- a successful reconnect with state 'Connected' or a (fatal) failure -- with state 'Disconnected'. In the latter case, further failing -- requests may trigger another recovery attempt of the control -- connection. replaceControl :: Client () replaceControl = do e <- ask let l = e^.context.settings.logger liftIO $ mask $ \restore -> do cc <- setReconnecting e for_ cc $ \c -> forkIO $ restore $ do ignore (C.close c) reconnect e l `catchAll` \ex -> do logError l $ "Control connection reconnect aborted: " <> string8 (show ex) atomically $ modifyTVar' (e^.control) (set state Disconnected) where setReconnecting e = atomically $ do ctrl <- readTVar (e^.control) if ctrl^.state /= Reconnecting then do writeTVar (e^.control) (set state Reconnecting ctrl) return $ Just (ctrl^.connection) else return Nothing reconnect e l = recovering adInf (onExc l) $ \_ -> do hosts <- NE.nonEmpty . Map.keys <$> readTVarIO (e^.hostmap) case hosts of Just hs -> hs `tryAll` (runClient e . renewControl) `catch` \x -> case fromException x of Just (SomeAsyncException _) -> throwM x Nothing -> do logError l "All known hosts unreachable." runClient e rebootControl Nothing -> do logError l "No known hosts." runClient e rebootControl adInf = capDelay 5000000 (exponentialBackoff 5000) onExc l = [ const $ Handler $ \(_ :: SomeAsyncException) -> return False , const $ Handler $ \(e :: SomeException) -> do logError l $ "Replacement of control connection failed with: " <> string8 (show e) <> ". Retrying ..." return True ] -- | Create a new connection to a known host and set it up -- as the new control connection. renewControl :: Host -> Client () renewControl h = do ctx <- view context logInfo' "Renewing control connection with known host ..." let s = ctx^.settings bracketOnError (C.connect (s^.connSettings) (ctx^.timeouts) (s^.protoVersion) (s^.logger) h) (liftIO . C.close) setupControl -- | Create a new connection to one of the initial contacts -- and set it up as the new control connection. rebootControl :: Client () rebootControl = do e <- ask logInfo' "Renewing control connection with initial contacts ..." bracketOnError (liftIO (mkContact (e^.context))) (liftIO . C.close) setupControl ----------------------------------------------------------------------------- -- Event handling onCqlEvent :: Event -> Client () onCqlEvent x = do logInfo' $ "Event: " <> string8 (show x) pol <- view policy prt <- view (context.settings.portnumber) case x of StatusEvent Down (sock2inet prt -> a) -> liftIO $ onEvent pol (HostDown a) TopologyEvent RemovedNode (sock2inet prt -> a) -> do hmap <- view hostmap atomically' $ modifyTVar' hmap (Map.filterWithKey (\h _ -> h^.hostAddr /= a)) liftIO $ onEvent pol (HostGone a) StatusEvent Up (sock2inet prt -> a) -> do s <- ask startMonitor s a TopologyEvent NewNode (sock2inet prt -> a) -> do s <- ask let ctx = s^.context let hmap = s^.hostmap ctrl <- readTVarIO' (s^.control) let c = ctrl^.connection peers <- liftIO $ discoverPeers ctx c `recover` [] let h = fromMaybe (Host a "" "") $ find ((a == ) . view hostAddr) peers okay <- liftIO $ acceptable pol h when okay $ do p <- mkPool ctx h atomically' $ modifyTVar' hmap (Map.alter (maybe (Just p) Just) h) liftIO $ onEvent pol (HostNew h) tryRunJob_ (s^.jobs) a $ runClient s (prepareAllQueries h) SchemaEvent _ -> return () where startMonitor s a = do hmp <- readTVarIO' (s^.hostmap) case find ((a ==) . view hostAddr) (Map.keys hmp) of Just h -> tryRunJob_ (s^.jobs) a $ runClient s $ do monitor (Ms 3000) (Ms 60000) h prepareAllQueries h Nothing -> return () ----------------------------------------------------------------------------- -- Utilities -- | Get the 'Result' out of a 'HostResponse'. If the response is an 'RsError', -- a 'ResponseError' is thrown. If the response is neither -- 'RsResult' nor 'RsError', an 'UnexpectedResponse' is thrown. getResult :: MonadThrow m => HostResponse k a b -> m (Result k a b) getResult (HostResponse _ (RsResult _ _ r)) = return r getResult (HostResponse h (RsError t w e)) = throwM (ResponseError h t w e) getResult hr = unexpected hr {-# INLINE getResult #-} getPreparedQueryId :: MonadThrow m => HostResponse k a b -> m (Host, QueryId k a b) getPreparedQueryId hr = getResult hr >>= \case PreparedResult i _ _ -> return (hrHost hr, i) _ -> unexpected hr {-# INLINE getPreparedQueryId #-} unexpected :: MonadThrow m => HostResponse k a b -> m c unexpected (HostResponse h r) = throwM $ UnexpectedResponse h r atomically' :: STM a -> Client a atomically' = liftIO . atomically readTVarIO' :: TVar a -> Client a readTVarIO' = liftIO . readTVarIO logInfo' :: Builder -> Client () logInfo' m = do l <- view (context.settings.logger) liftIO $ logInfo l m {-# INLINE logInfo' #-} logWarn' :: Builder -> Client () logWarn' m = do l <- view (context.settings.logger) liftIO $ logWarn l m {-# INLINE logWarn' #-} logError' :: Builder -> Client () logError' m = do l <- view (context.settings.logger) liftIO $ logError l m {-# INLINE logError' #-}