{-# LANGUAGE CPP, OverloadedStrings, ScopedTypeVariables, TupleSections #-}
#if (__GLASGOW_HASKELL__ >= 706)
{-# LANGUAGE RecursiveDo #-}
#else
{-# LANGUAGE DoRec #-}
#endif
module Database.MongoDB.Connection (
Secs,
Pipe, close, isClosed,
Host(..), PortID(..), defaultPort, host, showHostPort, readHostPort,
readHostPortM, globalConnectTimeout, connect, connect',
ReplicaSetName, openReplicaSet, openReplicaSet', openReplicaSetTLS, openReplicaSetTLS',
openReplicaSetSRV, openReplicaSetSRV', openReplicaSetSRV'', openReplicaSetSRV''',
ReplicaSet, primary, secondaryOk, routedHost, closeReplicaSet, replSetName
) where
import Prelude hiding (lookup)
import Data.IORef (IORef, newIORef, readIORef)
import Data.List (intersect, partition, (\\), delete)
import Data.Maybe (fromJust)
#if !MIN_VERSION_base(4,8,0)
import Control.Applicative ((<$>))
#endif
import Control.Monad (forM_, guard)
import Control.Monad.Fail(MonadFail)
import System.IO.Unsafe (unsafePerformIO)
import System.Timeout (timeout)
import Text.ParserCombinators.Parsec (parse, many1, letter, digit, char, anyChar, eof,
spaces, try, (<|>))
import qualified Data.List as List
import Control.Monad.Identity (runIdentity)
import Control.Monad.Except (throwError)
import Control.Concurrent.MVar.Lifted (MVar, newMVar, withMVar, modifyMVar,
readMVar)
import Data.Bson (Document, at, (=:))
import Data.Text (Text)
import qualified Data.Bson as B
import qualified Data.Text as T
import Database.MongoDB.Internal.Network (Host(..), HostName, PortID(..), connectTo, lookupSeedList, lookupReplicaSetName)
import Database.MongoDB.Internal.Protocol (Pipe, newPipe, close, isClosed)
import Database.MongoDB.Internal.Util (untilSuccess, liftIOE,
updateAssocs, shuffle, mergesortM)
import Database.MongoDB.Query (Command, Failure(ConnectionFailure), access,
slaveOk, runCommand, retrieveServerData)
import qualified Database.MongoDB.Transport.Tls as TLS (connect)
adminCommand :: Command -> Pipe -> IO Document
adminCommand cmd pipe =
liftIOE failureToIOError $ access pipe slaveOk "admin" $ runCommand cmd
where
failureToIOError (ConnectionFailure e) = e
failureToIOError e = userError $ show e
defaultPort :: PortID
defaultPort = PortNumber 27017
host :: HostName -> Host
host hostname = Host hostname defaultPort
showHostPort :: Host -> String
showHostPort (Host hostname (PortNumber port)) = hostname ++ ":" ++ show port
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
showHostPort (Host _ (UnixSocket path)) = "unix:" ++ path
#endif
readHostPortM :: (MonadFail m) => String -> m Host
readHostPortM = either (fail . show) return . parse parser "readHostPort" where
hostname = many1 (letter <|> digit <|> char '-' <|> char '.' <|> char '_')
parser = do
spaces
h <- hostname
try (spaces >> eof >> return (host h)) <|> do
_ <- char ':'
try ( do port :: Int <- read <$> many1 digit
spaces >> eof
return $ Host h (PortNumber $ fromIntegral port))
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
<|> do guard (h == "unix")
p <- many1 anyChar
eof
return $ Host "" (UnixSocket p)
#endif
readHostPort :: String -> Host
readHostPort = fromJust . readHostPortM
type Secs = Double
globalConnectTimeout :: IORef Secs
globalConnectTimeout = unsafePerformIO (newIORef 6)
{-# NOINLINE globalConnectTimeout #-}
connect :: Host -> IO Pipe
connect h = readIORef globalConnectTimeout >>= flip connect' h
connect' :: Secs -> Host -> IO Pipe
connect' timeoutSecs (Host hostname port) = do
mh <- timeout (round $ timeoutSecs * 1000000) (connectTo hostname port)
handle <- maybe (ioError $ userError "connect timed out") return mh
rec
p <- newPipe sd handle
sd <- access p slaveOk "admin" retrieveServerData
return p
type ReplicaSetName = Text
data TransportSecurity = Secure | Unsecure
data ReplicaSet = ReplicaSet ReplicaSetName (MVar [(Host, Maybe Pipe)]) Secs TransportSecurity
replSetName :: ReplicaSet -> Text
replSetName (ReplicaSet rsName _ _ _) = rsName
openReplicaSet :: (ReplicaSetName, [Host]) -> IO ReplicaSet
openReplicaSet rsSeed = readIORef globalConnectTimeout >>= flip openReplicaSet' rsSeed
openReplicaSet' :: Secs -> (ReplicaSetName, [Host]) -> IO ReplicaSet
openReplicaSet' timeoutSecs (rs, hosts) = _openReplicaSet timeoutSecs (rs, hosts, Unsecure)
openReplicaSetTLS :: (ReplicaSetName, [Host]) -> IO ReplicaSet
openReplicaSetTLS rsSeed = readIORef globalConnectTimeout >>= flip openReplicaSetTLS' rsSeed
openReplicaSetTLS' :: Secs -> (ReplicaSetName, [Host]) -> IO ReplicaSet
openReplicaSetTLS' timeoutSecs (rs, hosts) = _openReplicaSet timeoutSecs (rs, hosts, Secure)
_openReplicaSet :: Secs -> (ReplicaSetName, [Host], TransportSecurity) -> IO ReplicaSet
_openReplicaSet timeoutSecs (rsName, seedList, transportSecurity) = do
vMembers <- newMVar (map (, Nothing) seedList)
let rs = ReplicaSet rsName vMembers timeoutSecs transportSecurity
_ <- updateMembers rs
return rs
openReplicaSetSRV :: HostName -> IO ReplicaSet
openReplicaSetSRV hostname = do
timeoutSecs <- readIORef globalConnectTimeout
_openReplicaSetSRV timeoutSecs Unsecure hostname
openReplicaSetSRV' :: HostName -> IO ReplicaSet
openReplicaSetSRV' hostname = do
timeoutSecs <- readIORef globalConnectTimeout
_openReplicaSetSRV timeoutSecs Secure hostname
openReplicaSetSRV'' :: Secs -> HostName -> IO ReplicaSet
openReplicaSetSRV'' timeoutSecs = _openReplicaSetSRV timeoutSecs Unsecure
openReplicaSetSRV''' :: Secs -> HostName -> IO ReplicaSet
openReplicaSetSRV''' timeoutSecs = _openReplicaSetSRV timeoutSecs Secure
_openReplicaSetSRV :: Secs -> TransportSecurity -> HostName -> IO ReplicaSet
_openReplicaSetSRV timeoutSecs transportSecurity hostname = do
replicaSetName <- lookupReplicaSetName hostname
hosts <- lookupSeedList hostname
case (replicaSetName, hosts) of
(Nothing, _) -> throwError $ userError "Failed to lookup replica set name"
(_, []) -> throwError $ userError "Failed to lookup replica set seedlist"
(Just rsName, _) ->
case transportSecurity of
Secure -> openReplicaSetTLS' timeoutSecs (rsName, hosts)
Unsecure -> openReplicaSet' timeoutSecs (rsName, hosts)
closeReplicaSet :: ReplicaSet -> IO ()
closeReplicaSet (ReplicaSet _ vMembers _ _) = withMVar vMembers $ mapM_ (maybe (return ()) close . snd)
primary :: ReplicaSet -> IO Pipe
primary rs@(ReplicaSet rsName _ _ _) = do
mHost <- statedPrimary <$> updateMembers rs
case mHost of
Just host' -> connection rs Nothing host'
Nothing -> throwError $ userError $ "replica set " ++ T.unpack rsName ++ " has no primary"
secondaryOk :: ReplicaSet -> IO Pipe
secondaryOk rs = do
info <- updateMembers rs
hosts <- shuffle (possibleHosts info)
let hosts' = maybe hosts (\p -> delete p hosts ++ [p]) (statedPrimary info)
untilSuccess (connection rs Nothing) hosts'
routedHost :: ((Host, Bool) -> (Host, Bool) -> IO Ordering) -> ReplicaSet -> IO Pipe
routedHost f rs = do
info <- updateMembers rs
hosts <- shuffle (possibleHosts info)
let addIsPrimary h = (h, if Just h == statedPrimary info then True else False)
hosts' <- mergesortM (\a b -> f (addIsPrimary a) (addIsPrimary b)) hosts
untilSuccess (connection rs Nothing) hosts'
type ReplicaInfo = (Host, Document)
statedPrimary :: ReplicaInfo -> Maybe Host
statedPrimary (host', info) = if (at "ismaster" info) then Just host' else readHostPort <$> B.lookup "primary" info
possibleHosts :: ReplicaInfo -> [Host]
possibleHosts (_, info) = map readHostPort $ at "hosts" info
updateMembers :: ReplicaSet -> IO ReplicaInfo
updateMembers rs@(ReplicaSet _ vMembers _ _) = do
(host', info) <- untilSuccess (fetchReplicaInfo rs) =<< readMVar vMembers
modifyMVar vMembers $ \members -> do
let ((members', old), new) = intersection (map readHostPort $ at "hosts" info) members
forM_ old $ \(_, mPipe) -> maybe (return ()) close mPipe
return (members' ++ map (, Nothing) new, (host', info))
where
intersection :: (Eq k) => [k] -> [(k, v)] -> (([(k, v)], [(k, v)]), [k])
intersection keys assocs = (partition (flip elem inKeys . fst) assocs, keys \\ inKeys) where
assocKeys = map fst assocs
inKeys = intersect keys assocKeys
fetchReplicaInfo :: ReplicaSet -> (Host, Maybe Pipe) -> IO ReplicaInfo
fetchReplicaInfo rs@(ReplicaSet rsName _ _ _) (host', mPipe) = do
pipe <- connection rs mPipe host'
info <- adminCommand ["isMaster" =: (1 :: Int)] pipe
case B.lookup "setName" info of
Nothing -> throwError $ userError $ show host' ++ " not a member of any replica set, including " ++ T.unpack rsName ++ ": " ++ show info
Just setName | setName /= rsName -> throwError $ userError $ show host' ++ " not a member of replica set " ++ T.unpack rsName ++ ": " ++ show info
Just _ -> return (host', info)
connection :: ReplicaSet -> Maybe Pipe -> Host -> IO Pipe
connection (ReplicaSet _ vMembers timeoutSecs transportSecurity) mPipe host' =
maybe conn (\p -> isClosed p >>= \bad -> if bad then conn else return p) mPipe
where
conn = modifyMVar vMembers $ \members -> do
let (Host h p) = host'
let conn' = case transportSecurity of
Secure -> TLS.connect h p
Unsecure -> connect' timeoutSecs host'
let new = conn' >>= \pipe -> return (updateAssocs host' (Just pipe) members, pipe)
case List.lookup host' members of
Just (Just pipe) -> isClosed pipe >>= \bad -> if bad then new else return (members, pipe)
_ -> new