module Database.MongoDB.Connection (
Pipe,
Host(..), PortID(..), host, showHostPort, readHostPort, readHostPortM,
ReplicaSet(..), Name,
MasterOrSlaveOk(..),
Service(..),
connHost, replicaSet
) where
import Prelude hiding (lookup)
import Database.MongoDB.Internal.Protocol as X
import qualified Network.Abstract as C
import Network.Abstract (IOE, NetworkIO, ANetwork)
import Data.Bson ((=:), at, lookup, UString)
import Control.Pipeline as P
import Control.Applicative ((<$>))
import Control.Exception (assert)
import Control.Monad.Error
import Control.Monad.MVar
import Network (HostName, PortID(..))
import Data.Bson (Document, look)
import Text.ParserCombinators.Parsec as T (parse, many1, letter, digit, char, eof, spaces, try, (<|>))
import Control.Monad.Identity
import Control.Monad.Util (MonadIO', untilSuccess)
import Database.MongoDB.Internal.Util ()
import Var.Pool
import System.Random (newStdGen, randomRs)
import Data.List (delete, find, nub)
import System.IO.Unsafe (unsafePerformIO)
type Name = UString
adminCommand :: Document -> Request
adminCommand cmd = Query{..} where
qOptions = [SlaveOK]
qFullCollection = "admin.$cmd"
qSkip = 0
qBatchSize = 1
qSelector = cmd
qProjector = []
commandReply :: String -> Reply -> Document
commandReply title Reply{..} = if elem QueryError rResponseFlags
then error $ title ++ ": " ++ at "$err" (head rDocuments)
else if null rDocuments
then error ("empty reply to: " ++ title)
else head rDocuments
data Host = Host HostName PortID deriving (Show, Eq, Ord)
defaultPort :: PortID
defaultPort = PortNumber 27017
host :: HostName -> Host
host hostname = Host hostname defaultPort
showHostPort :: Host -> String
showHostPort (Host hostname port) = hostname ++ ":" ++ portname where
portname = case port of
Service s -> s
PortNumber p -> show p
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
UnixSocket s -> s
#endif
readHostPortM :: (Monad m) => String -> m Host
readHostPortM = either (fail . show) return . parse parser "readHostPort" where
hostname = many1 (letter <|> digit <|> char '-' <|> char '.')
parser = do
spaces
h <- hostname
T.try (spaces >> eof >> return (host h)) <|> do
_ <- char ':'
port :: Int <- read <$> many1 digit
spaces >> eof
return $ Host h (PortNumber $ fromIntegral port)
readHostPort :: String -> Host
readHostPort = runIdentity . readHostPortM
data ReplicaSet = ReplicaSet {setName :: Name, seedHosts :: [Host]} deriving (Show)
instance Eq ReplicaSet where ReplicaSet x _ == ReplicaSet y _ = x == y
getReplicaInfo :: ConnPool Host -> IOE ReplicaInfo
getReplicaInfo conn = do
pipe <- getHostPipe conn
promise <- X.call pipe [] (adminCommand ["ismaster" =: (1 :: Int)])
info <- commandReply "ismaster" <$> promise
_ <- look "hosts" info
_ <- look "ismaster" info
return $ ReplicaInfo (connHost conn) info
data ReplicaInfo = ReplicaInfo {_infoHost :: Host, infoDoc :: Document} deriving (Show)
primary :: ReplicaInfo -> Maybe Host
primary (ReplicaInfo host' info) = if at "ismaster" info then Just host' else readHostPort <$> lookup "primary" info
replicas :: ReplicaInfo -> [Host]
replicas info = maybe members (\m -> m : delete m members) master where
members = map readHostPort $ at "hosts" (infoDoc info)
master = primary info
data MasterOrSlaveOk =
Master
| SlaveOk
deriving (Show, Eq)
type Pool' = Pool IOError
class Service t where
data ConnPool t
newConnPool :: (NetworkIO m) => Int -> t -> m (ConnPool t)
getPipe :: MasterOrSlaveOk -> ConnPool t -> IOE Pipe
killPipes :: ConnPool t -> IO ()
instance Service Host where
data ConnPool Host = HostConnPool {connHost :: Host, connPool :: Pool' Pipe}
newConnPool poolSize' host' = liftIO . newHostConnPool poolSize' host' =<< C.network
getPipe _ = getHostPipe
killPipes (HostConnPool _ pool) = killAll pool
instance Show (ConnPool Host) where
show HostConnPool{connHost} = "ConnPool " ++ show connHost
newHostConnPool :: Int -> Host -> ANetwork -> IO (ConnPool Host)
newHostConnPool poolSize' host' net = HostConnPool host' <$> newPool Factory{..} poolSize' where
newResource = tcpConnect net host'
killResource = P.close
isExpired = P.isClosed
getHostPipe :: ConnPool Host -> IOE Pipe
getHostPipe (HostConnPool _ pool) = aResource pool
tcpConnect :: ANetwork -> Host -> IOE Pipe
tcpConnect net (Host hostname port) = newPipeline =<< C.connect net (C.Server hostname port)
instance Service ReplicaSet where
data ConnPool ReplicaSet = ReplicaSetConnPool {
network :: ANetwork,
repsetName :: Name,
currentMembers :: MVar [ConnPool Host] }
newConnPool poolSize' repset = liftIO . newSetConnPool poolSize' repset =<< C.network
getPipe = getSetPipe
killPipes ReplicaSetConnPool{..} = withMVar currentMembers (mapM_ killPipes)
instance Show (ConnPool ReplicaSet) where
show r = "ConnPool " ++ show (unsafePerformIO $ replicaSet r)
replicaSet :: (MonadIO' m) => ConnPool ReplicaSet -> m ReplicaSet
replicaSet ReplicaSetConnPool{..} = ReplicaSet repsetName . map connHost <$> readMVar currentMembers
newSetConnPool :: Int -> ReplicaSet -> ANetwork -> IO (ConnPool ReplicaSet)
newSetConnPool poolSize' repset net = assert (not . null $ seedHosts repset) $ do
currentMembers <- newMVar =<< mapM (\h -> newHostConnPool poolSize' h net) (seedHosts repset)
return $ ReplicaSetConnPool net (setName repset) currentMembers
getMembers :: Name -> [ConnPool Host] -> IOE [Host]
getMembers _repsetName connections = replicas <$> untilSuccess getReplicaInfo connections
refreshMembers :: ANetwork -> Name -> [ConnPool Host] -> IOE [ConnPool Host]
refreshMembers net repsetName connections = do
n <- liftIO . poolSize . connPool $ head connections
mapM (liftIO . connection n) =<< getMembers repsetName connections
where
connection n host' = maybe (newHostConnPool n host' net) return mc where
mc = find ((host' ==) . connHost) connections
getSetPipe :: MasterOrSlaveOk -> ConnPool ReplicaSet -> IOE Pipe
getSetPipe mos ReplicaSetConnPool{..} = modifyMVar currentMembers $ \conns -> do
connections <- refreshMembers network repsetName conns
pipe <- case mos of
Master -> getHostPipe (head connections)
SlaveOk -> do
let n = length connections 1
is <- take (max 1 n) . nub . randomRs (min 1 n, n) <$> liftIO newStdGen
untilSuccess (getHostPipe . (connections !!)) is
return (connections, pipe)