module Database.MongoDB.Connection (
Network', ANetwork', Internet(..),
Host(..), PortID(..), host, showHostPort, readHostPort, readHostPortM,
ReplicaSet(..), Name,
MasterOrSlaveOk(..),
Server(..), newConnPool',
connHost, replicaSet
) where
import Database.MongoDB.Internal.Protocol as X
import Network.Abstract (IOE, connect, ANetwork(..))
import Data.Bson ((=:), at, UString)
import Control.Pipeline as P
import Control.Applicative ((<$>))
import Control.Exception (assert)
import Control.Monad.Error
import Control.Monad.MVar
import Control.Monad.Context
import Network (HostName, PortID(..))
import Data.Bson (Document, look)
import Text.ParserCombinators.Parsec as T (parse, many1, letter, digit, char, eof, spaces, try, (<|>))
import Control.Monad.Identity
import Control.Monad.Util (MonadIO', untilSuccess)
import Database.MongoDB.Internal.Util ()
import Var.Pool
import System.Random (newStdGen, randomRs)
import Data.List (delete, find, nub)
import System.IO.Unsafe (unsafePerformIO)
type Name = UString
adminCommand :: Document -> Request
adminCommand cmd = Query{..} where
qOptions = [SlaveOK]
qFullCollection = "admin.$cmd"
qSkip = 0
qBatchSize = 0
qSelector = cmd
qProjector = []
commandReply :: String -> Reply -> Document
commandReply title Reply{..} = if elem QueryError rResponseFlags
then error $ title ++ ": " ++ at "$err" (head rDocuments)
else head rDocuments
data Host = Host HostName PortID deriving (Show, Eq, Ord)
defaultPort :: PortID
defaultPort = PortNumber 27017
host :: HostName -> Host
host hostname = Host hostname defaultPort
showHostPort :: Host -> String
showHostPort (Host hostname port) = hostname ++ ":" ++ (case port of
Service s -> s
PortNumber p -> show p
UnixSocket s -> s)
readHostPortM :: (Monad m) => String -> m Host
readHostPortM = either (fail . show) return . parse parser "readHostPort" where
hostname = many1 (letter <|> digit <|> char '-' <|> char '.')
parser = do
spaces
h <- hostname
T.try (spaces >> eof >> return (host h)) <|> do
_ <- char ':'
port :: Int <- read <$> many1 digit
spaces >> eof
return $ Host h (PortNumber $ fromIntegral port)
readHostPort :: String -> Host
readHostPort = runIdentity . readHostPortM
data ReplicaSet = ReplicaSet {setName :: Name, seedHosts :: [Host]} deriving (Show)
instance Eq ReplicaSet where ReplicaSet x _ == ReplicaSet y _ = x == y
getReplicaInfo :: Pipe -> IOE ReplicaInfo
getReplicaInfo pipe = do
promise <- X.call pipe [] (adminCommand ["ismaster" =: (1 :: Int)])
info <- commandReply "ismaster" <$> promise
_ <- look "hosts" info
_ <- look "primary" info
return info
type ReplicaInfo = Document
replicas :: ReplicaInfo -> [Host]
replicas = map readHostPort . at "hosts"
primary :: ReplicaInfo -> Host
primary = readHostPort . at "primary"
hosts :: ReplicaInfo -> [Host]
hosts info = master : delete master members where
members = replicas info
master = primary info
data MasterOrSlaveOk =
Master
| SlaveOk
deriving (Show, Eq)
type Pool' = Pool IOError
class Server t where
data ConnPool t
newConnPool :: (Network' n, MonadIO' m) => n -> Int -> t -> m (ConnPool t)
getPipe :: MasterOrSlaveOk -> ConnPool t -> IOE Pipe
killPipes :: ConnPool t -> IO ()
newConnPool' :: (Server t, MonadIO' m, Context ANetwork' m) => Int -> t -> m (ConnPool t)
newConnPool' poolSize' host' = context >>= \(ANetwork net :: ANetwork') -> newConnPool net poolSize' host'
instance Server Host where
data ConnPool Host = HostConnPool {connHost :: Host, connPool :: Pool' Pipe}
newConnPool net poolSize' host' = liftIO $ newHostConnPool (ANetwork net) poolSize' host'
getPipe _ = getHostPipe
killPipes (HostConnPool _ pool) = killAll pool
instance Show (ConnPool Host) where
show HostConnPool{connHost} = "ConnPool " ++ show connHost
newHostConnPool :: ANetwork' -> Int -> Host -> IO (ConnPool Host)
newHostConnPool net poolSize' host' = HostConnPool host' <$> newPool Factory{..} poolSize' where
newResource = tcpConnect net host'
killResource = P.close
isExpired = P.isClosed
getHostPipe :: ConnPool Host -> IOE Pipe
getHostPipe (HostConnPool _ pool) = aResource pool
tcpConnect :: ANetwork' -> Host -> IOE Pipe
tcpConnect net (Host hostname port) = newPipeline =<< connect net (hostname, port)
instance Server ReplicaSet where
data ConnPool ReplicaSet = ReplicaSetConnPool {
network :: ANetwork',
repsetName :: Name,
currentMembers :: MVar [ConnPool Host] }
newConnPool net poolSize' repset = liftIO $ newSetConnPool (ANetwork net) poolSize' repset
getPipe = getSetPipe
killPipes ReplicaSetConnPool{..} = withMVar currentMembers (mapM_ killPipes)
instance Show (ConnPool ReplicaSet) where
show r = "ConnPool " ++ show (unsafePerformIO $ replicaSet r)
replicaSet :: (MonadIO' m) => ConnPool ReplicaSet -> m ReplicaSet
replicaSet ReplicaSetConnPool{..} = ReplicaSet repsetName . map connHost <$> readMVar currentMembers
newSetConnPool :: ANetwork' -> Int -> ReplicaSet -> IO (ConnPool ReplicaSet)
newSetConnPool net poolSize' repset = assert (not . null $ seedHosts repset) $ do
currentMembers <- newMVar =<< mapM (newHostConnPool net poolSize') (seedHosts repset)
return $ ReplicaSetConnPool net (setName repset) currentMembers
getMembers :: Name -> [ConnPool Host] -> IOE [Host]
getMembers _repsetName connections = hosts <$> untilSuccess (getReplicaInfo <=< getHostPipe) connections
refreshMembers :: ANetwork' -> Name -> [ConnPool Host] -> IOE [ConnPool Host]
refreshMembers net repsetName connections = do
n <- liftIO . poolSize . connPool $ head connections
mapM (liftIO . connection n) =<< getMembers repsetName connections
where
connection n host' = maybe (newHostConnPool net n host') return mc where
mc = find ((host' ==) . connHost) connections
getSetPipe :: MasterOrSlaveOk -> ConnPool ReplicaSet -> IOE Pipe
getSetPipe mos ReplicaSetConnPool{..} = modifyMVar currentMembers $ \conns -> do
connections <- refreshMembers network repsetName conns
pipe <- case mos of
Master -> getHostPipe (head connections)
SlaveOk -> do
let n = length connections 1
is <- take (max 1 n) . nub . randomRs (min 1 n, n) <$> liftIO newStdGen
untilSuccess (getHostPipe . (connections !!)) is
return (connections, pipe)