module Database.MongoDB.Connection (
Secs,
Pipe, close, isClosed,
Host(..), PortID(..), defaultPort, host, showHostPort, readHostPort,
readHostPortM, globalConnectTimeout, connect, connect',
ReplicaSetName, openReplicaSet, openReplicaSet',
ReplicaSet, primary, secondaryOk, routedHost, closeReplicaSet, replSetName
) where
import Prelude hiding (lookup)
import Data.IORef (IORef, newIORef, readIORef)
import Data.List (intersect, partition, (\\), delete)
#if !MIN_VERSION_base(4,8,0)
import Control.Applicative ((<$>))
#endif
import Control.Monad (forM_)
import Network (HostName, PortID(..), connectTo)
import System.IO.Unsafe (unsafePerformIO)
import System.Timeout (timeout)
import Text.ParserCombinators.Parsec (parse, many1, letter, digit, char, eof,
spaces, try, (<|>))
import qualified Data.List as List
import Control.Monad.Identity (runIdentity)
import Control.Monad.Error (throwError)
import Control.Concurrent.MVar.Lifted (MVar, newMVar, withMVar, modifyMVar,
readMVar)
import Data.Bson (Document, at, (=:))
import Data.Text (Text)
import qualified Data.Bson as B
import qualified Data.Text as T
import Database.MongoDB.Internal.Protocol (Pipe, newPipe)
import Database.MongoDB.Internal.Util (untilSuccess, liftIOE,
updateAssocs, shuffle, mergesortM)
import Database.MongoDB.Query (Command, Failure(ConnectionFailure), access,
slaveOk, runCommand)
import System.IO.Pipeline (close, isClosed)
adminCommand :: Command -> Pipe -> IO Document
adminCommand cmd pipe =
liftIOE failureToIOError $ access pipe slaveOk "admin" $ runCommand cmd
where
failureToIOError (ConnectionFailure e) = e
failureToIOError e = userError $ show e
data Host = Host HostName PortID deriving (Show, Eq, Ord)
defaultPort :: PortID
defaultPort = PortNumber 27017
host :: HostName -> Host
host hostname = Host hostname defaultPort
showHostPort :: Host -> String
showHostPort (Host hostname port) = hostname ++ ":" ++ portname where
portname = case port of
Service s -> s
PortNumber p -> show p
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
UnixSocket s -> s
#endif
readHostPortM :: (Monad m) => String -> m Host
readHostPortM = either (fail . show) return . parse parser "readHostPort" where
hostname = many1 (letter <|> digit <|> char '-' <|> char '.')
parser = do
spaces
h <- hostname
try (spaces >> eof >> return (host h)) <|> do
_ <- char ':'
port :: Int <- read <$> many1 digit
spaces >> eof
return $ Host h (PortNumber $ fromIntegral port)
readHostPort :: String -> Host
readHostPort = runIdentity . readHostPortM
type Secs = Double
globalConnectTimeout :: IORef Secs
globalConnectTimeout = unsafePerformIO (newIORef 6)
connect :: Host -> IO Pipe
connect h = readIORef globalConnectTimeout >>= flip connect' h
connect' :: Secs -> Host -> IO Pipe
connect' timeoutSecs (Host hostname port) = do
mh <- timeout (round $ timeoutSecs * 1000000) (connectTo hostname port)
handle <- maybe (ioError $ userError "connect timed out") return mh
newPipe handle
type ReplicaSetName = Text
data ReplicaSet = ReplicaSet ReplicaSetName (MVar [(Host, Maybe Pipe)]) Secs
replSetName :: ReplicaSet -> Text
replSetName (ReplicaSet rsName _ _) = rsName
openReplicaSet :: (ReplicaSetName, [Host]) -> IO ReplicaSet
openReplicaSet rsSeed = readIORef globalConnectTimeout >>= flip openReplicaSet' rsSeed
openReplicaSet' :: Secs -> (ReplicaSetName, [Host]) -> IO ReplicaSet
openReplicaSet' timeoutSecs (rsName, seedList) = do
vMembers <- newMVar (map (, Nothing) seedList)
let rs = ReplicaSet rsName vMembers timeoutSecs
_ <- updateMembers rs
return rs
closeReplicaSet :: ReplicaSet -> IO ()
closeReplicaSet (ReplicaSet _ vMembers _) = withMVar vMembers $ mapM_ (maybe (return ()) close . snd)
primary :: ReplicaSet -> IO Pipe
primary rs@(ReplicaSet rsName _ _) = do
mHost <- statedPrimary <$> updateMembers rs
case mHost of
Just host' -> connection rs Nothing host'
Nothing -> throwError $ userError $ "replica set " ++ T.unpack rsName ++ " has no primary"
secondaryOk :: ReplicaSet -> IO Pipe
secondaryOk rs = do
info <- updateMembers rs
hosts <- shuffle (possibleHosts info)
let hosts' = maybe hosts (\p -> delete p hosts ++ [p]) (statedPrimary info)
untilSuccess (connection rs Nothing) hosts'
routedHost :: ((Host, Bool) -> (Host, Bool) -> IO Ordering) -> ReplicaSet -> IO Pipe
routedHost f rs = do
info <- updateMembers rs
hosts <- shuffle (possibleHosts info)
let addIsPrimary h = (h, if Just h == statedPrimary info then True else False)
hosts' <- mergesortM (\a b -> f (addIsPrimary a) (addIsPrimary b)) hosts
untilSuccess (connection rs Nothing) hosts'
type ReplicaInfo = (Host, Document)
statedPrimary :: ReplicaInfo -> Maybe Host
statedPrimary (host', info) = if (at "ismaster" info) then Just host' else readHostPort <$> B.lookup "primary" info
possibleHosts :: ReplicaInfo -> [Host]
possibleHosts (_, info) = map readHostPort $ at "hosts" info
updateMembers :: ReplicaSet -> IO ReplicaInfo
updateMembers rs@(ReplicaSet _ vMembers _) = do
(host', info) <- untilSuccess (fetchReplicaInfo rs) =<< readMVar vMembers
modifyMVar vMembers $ \members -> do
let ((members', old), new) = intersection (map readHostPort $ at "hosts" info) members
forM_ old $ \(_, mPipe) -> maybe (return ()) close mPipe
return (members' ++ map (, Nothing) new, (host', info))
where
intersection :: (Eq k) => [k] -> [(k, v)] -> (([(k, v)], [(k, v)]), [k])
intersection keys assocs = (partition (flip elem inKeys . fst) assocs, keys \\ inKeys) where
assocKeys = map fst assocs
inKeys = intersect keys assocKeys
fetchReplicaInfo :: ReplicaSet -> (Host, Maybe Pipe) -> IO ReplicaInfo
fetchReplicaInfo rs@(ReplicaSet rsName _ _) (host', mPipe) = do
pipe <- connection rs mPipe host'
info <- adminCommand ["isMaster" =: (1 :: Int)] pipe
case B.lookup "setName" info of
Nothing -> throwError $ userError $ show host' ++ " not a member of any replica set, including " ++ T.unpack rsName ++ ": " ++ show info
Just setName | setName /= rsName -> throwError $ userError $ show host' ++ " not a member of replica set " ++ T.unpack rsName ++ ": " ++ show info
Just _ -> return (host', info)
connection :: ReplicaSet -> Maybe Pipe -> Host -> IO Pipe
connection (ReplicaSet _ vMembers timeoutSecs) mPipe host' =
maybe conn (\p -> isClosed p >>= \bad -> if bad then conn else return p) mPipe
where
conn = modifyMVar vMembers $ \members -> do
let new = connect' timeoutSecs host' >>= \pipe -> return (updateAssocs host' (Just pipe) members, pipe)
case List.lookup host' members of
Just (Just pipe) -> isClosed pipe >>= \bad -> if bad then new else return (members, pipe)
_ -> new