{-# LANGUAGE CPP, OverloadedStrings, ScopedTypeVariables, TupleSections #-}
#if (__GLASGOW_HASKELL__ >= 706)
{-# LANGUAGE RecursiveDo #-}
#else
{-# LANGUAGE DoRec #-}
#endif
module Database.MongoDB.Connection (
Secs,
Pipe, close, isClosed,
Host(..), PortID(..), defaultPort, host, showHostPort, readHostPort,
readHostPortM, globalConnectTimeout, connect, connect',
ReplicaSetName, openReplicaSet, openReplicaSet', openReplicaSetTLS, openReplicaSetTLS',
openReplicaSetSRV, openReplicaSetSRV', openReplicaSetSRV'', openReplicaSetSRV''',
ReplicaSet, primary, secondaryOk, routedHost, closeReplicaSet, replSetName
) where
import Prelude hiding (lookup)
import Data.IORef (IORef, newIORef, readIORef)
import Data.List (intersect, partition, (\\), delete)
import Data.Maybe (fromJust)
#if !MIN_VERSION_base(4,8,0)
import Control.Applicative ((<$>))
#endif
import Control.Monad (forM_, guard)
import Control.Monad.Fail(MonadFail)
import System.IO.Unsafe (unsafePerformIO)
import System.Timeout (timeout)
import Text.ParserCombinators.Parsec (parse, many1, letter, digit, char, anyChar, eof,
spaces, try, (<|>))
import qualified Data.List as List
import Control.Monad.Identity (runIdentity)
import Control.Monad.Except (throwError)
import Control.Concurrent.MVar.Lifted (MVar, newMVar, withMVar, modifyMVar,
readMVar)
import Data.Bson (Document, at, (=:))
import Data.Text (Text)
import qualified Data.Bson as B
import qualified Data.Text as T
import Database.MongoDB.Internal.Network (Host(..), HostName, PortID(..), connectTo, lookupSeedList, lookupReplicaSetName)
import Database.MongoDB.Internal.Protocol (Pipe, newPipe, close, isClosed)
import Database.MongoDB.Internal.Util (untilSuccess, liftIOE,
updateAssocs, shuffle, mergesortM)
import Database.MongoDB.Query (Command, Failure(ConnectionFailure), access,
slaveOk, runCommand, retrieveServerData)
import qualified Database.MongoDB.Transport.Tls as TLS (connect)
adminCommand :: Command -> Pipe -> IO Document
adminCommand :: Command -> Pipe -> IO Command
adminCommand Command
cmd Pipe
pipe =
(Failure -> IOError) -> IO Command -> IO Command
forall (m :: * -> *) e e' a.
(MonadIO m, Exception e, Exception e') =>
(e -> e') -> IO a -> m a
liftIOE Failure -> IOError
failureToIOError (IO Command -> IO Command) -> IO Command -> IO Command
forall a b. (a -> b) -> a -> b
$ Pipe -> AccessMode -> Database -> Action IO Command -> IO Command
forall (m :: * -> *) a.
MonadIO m =>
Pipe -> AccessMode -> Database -> Action m a -> m a
access Pipe
pipe AccessMode
slaveOk Database
"admin" (Action IO Command -> IO Command)
-> Action IO Command -> IO Command
forall a b. (a -> b) -> a -> b
$ Command -> Action IO Command
forall (m :: * -> *). MonadIO m => Command -> Action m Command
runCommand Command
cmd
where
failureToIOError :: Failure -> IOError
failureToIOError (ConnectionFailure IOError
e) = IOError
e
failureToIOError Failure
e = String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ Failure -> String
forall a. Show a => a -> String
show Failure
e
defaultPort :: PortID
defaultPort :: PortID
defaultPort = PortNumber -> PortID
PortNumber PortNumber
27017
host :: HostName -> Host
host :: String -> Host
host String
hostname = String -> PortID -> Host
Host String
hostname PortID
defaultPort
showHostPort :: Host -> String
showHostPort :: Host -> String
showHostPort (Host String
hostname (PortNumber PortNumber
port)) = String
hostname String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
":" String -> String -> String
forall a. [a] -> [a] -> [a]
++ PortNumber -> String
forall a. Show a => a -> String
show PortNumber
port
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
showHostPort (Host String
_ (UnixSocket String
path)) = String
"unix:" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
path
#endif
readHostPortM :: (MonadFail m) => String -> m Host
readHostPortM :: String -> m Host
readHostPortM = (ParseError -> m Host)
-> (Host -> m Host) -> Either ParseError Host -> m Host
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (String -> m Host
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> m Host)
-> (ParseError -> String) -> ParseError -> m Host
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ParseError -> String
forall a. Show a => a -> String
show) Host -> m Host
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ParseError Host -> m Host)
-> (String -> Either ParseError Host) -> String -> m Host
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Parsec String () Host -> String -> String -> Either ParseError Host
forall s t a.
Stream s Identity t =>
Parsec s () a -> String -> s -> Either ParseError a
parse Parsec String () Host
forall u. ParsecT String u Identity Host
parser String
"readHostPort" where
hostname :: ParsecT String u Identity String
hostname = ParsecT String u Identity Char -> ParsecT String u Identity String
forall s (m :: * -> *) t u a.
Stream s m t =>
ParsecT s u m a -> ParsecT s u m [a]
many1 (ParsecT String u Identity Char
forall s (m :: * -> *) u. Stream s m Char => ParsecT s u m Char
letter ParsecT String u Identity Char
-> ParsecT String u Identity Char -> ParsecT String u Identity Char
forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> ParsecT String u Identity Char
forall s (m :: * -> *) u. Stream s m Char => ParsecT s u m Char
digit ParsecT String u Identity Char
-> ParsecT String u Identity Char -> ParsecT String u Identity Char
forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> Char -> ParsecT String u Identity Char
forall s (m :: * -> *) u.
Stream s m Char =>
Char -> ParsecT s u m Char
char Char
'-' ParsecT String u Identity Char
-> ParsecT String u Identity Char -> ParsecT String u Identity Char
forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> Char -> ParsecT String u Identity Char
forall s (m :: * -> *) u.
Stream s m Char =>
Char -> ParsecT s u m Char
char Char
'.' ParsecT String u Identity Char
-> ParsecT String u Identity Char -> ParsecT String u Identity Char
forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> Char -> ParsecT String u Identity Char
forall s (m :: * -> *) u.
Stream s m Char =>
Char -> ParsecT s u m Char
char Char
'_')
parser :: ParsecT String u Identity Host
parser = do
ParsecT String u Identity ()
forall s (m :: * -> *) u. Stream s m Char => ParsecT s u m ()
spaces
String
h <- ParsecT String u Identity String
forall u. ParsecT String u Identity String
hostname
ParsecT String u Identity Host -> ParsecT String u Identity Host
forall tok st a. GenParser tok st a -> GenParser tok st a
try (ParsecT String u Identity ()
forall s (m :: * -> *) u. Stream s m Char => ParsecT s u m ()
spaces ParsecT String u Identity ()
-> ParsecT String u Identity () -> ParsecT String u Identity ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ParsecT String u Identity ()
forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m ()
eof ParsecT String u Identity ()
-> ParsecT String u Identity Host -> ParsecT String u Identity Host
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Host -> ParsecT String u Identity Host
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> Host
host String
h)) ParsecT String u Identity Host
-> ParsecT String u Identity Host -> ParsecT String u Identity Host
forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> do
Char
_ <- Char -> ParsecT String u Identity Char
forall s (m :: * -> *) u.
Stream s m Char =>
Char -> ParsecT s u m Char
char Char
':'
ParsecT String u Identity Host -> ParsecT String u Identity Host
forall tok st a. GenParser tok st a -> GenParser tok st a
try ( do Int
port :: Int <- String -> Int
forall a. Read a => String -> a
read (String -> Int)
-> ParsecT String u Identity String
-> ParsecT String u Identity Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ParsecT String u Identity Char -> ParsecT String u Identity String
forall s (m :: * -> *) t u a.
Stream s m t =>
ParsecT s u m a -> ParsecT s u m [a]
many1 ParsecT String u Identity Char
forall s (m :: * -> *) u. Stream s m Char => ParsecT s u m Char
digit
ParsecT String u Identity ()
forall s (m :: * -> *) u. Stream s m Char => ParsecT s u m ()
spaces ParsecT String u Identity ()
-> ParsecT String u Identity () -> ParsecT String u Identity ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ParsecT String u Identity ()
forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m ()
eof
Host -> ParsecT String u Identity Host
forall (m :: * -> *) a. Monad m => a -> m a
return (Host -> ParsecT String u Identity Host)
-> Host -> ParsecT String u Identity Host
forall a b. (a -> b) -> a -> b
$ String -> PortID -> Host
Host String
h (PortNumber -> PortID
PortNumber (PortNumber -> PortID) -> PortNumber -> PortID
forall a b. (a -> b) -> a -> b
$ Int -> PortNumber
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
port))
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
ParsecT String u Identity Host
-> ParsecT String u Identity Host -> ParsecT String u Identity Host
forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> do Bool -> ParsecT String u Identity ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (String
h String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
"unix")
String
p <- ParsecT String u Identity Char -> ParsecT String u Identity String
forall s (m :: * -> *) t u a.
Stream s m t =>
ParsecT s u m a -> ParsecT s u m [a]
many1 ParsecT String u Identity Char
forall s (m :: * -> *) u. Stream s m Char => ParsecT s u m Char
anyChar
ParsecT String u Identity ()
forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m ()
eof
Host -> ParsecT String u Identity Host
forall (m :: * -> *) a. Monad m => a -> m a
return (Host -> ParsecT String u Identity Host)
-> Host -> ParsecT String u Identity Host
forall a b. (a -> b) -> a -> b
$ String -> PortID -> Host
Host String
"" (String -> PortID
UnixSocket String
p)
#endif
readHostPort :: String -> Host
readHostPort :: String -> Host
readHostPort = Maybe Host -> Host
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe Host -> Host) -> (String -> Maybe Host) -> String -> Host
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Maybe Host
forall (m :: * -> *). MonadFail m => String -> m Host
readHostPortM
type Secs = Double
globalConnectTimeout :: IORef Secs
globalConnectTimeout :: IORef Secs
globalConnectTimeout = IO (IORef Secs) -> IORef Secs
forall a. IO a -> a
unsafePerformIO (Secs -> IO (IORef Secs)
forall a. a -> IO (IORef a)
newIORef Secs
6)
{-# NOINLINE globalConnectTimeout #-}
connect :: Host -> IO Pipe
connect :: Host -> IO Pipe
connect Host
h = IORef Secs -> IO Secs
forall a. IORef a -> IO a
readIORef IORef Secs
globalConnectTimeout IO Secs -> (Secs -> IO Pipe) -> IO Pipe
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Secs -> Host -> IO Pipe) -> Host -> Secs -> IO Pipe
forall a b c. (a -> b -> c) -> b -> a -> c
flip Secs -> Host -> IO Pipe
connect' Host
h
connect' :: Secs -> Host -> IO Pipe
connect' :: Secs -> Host -> IO Pipe
connect' Secs
timeoutSecs (Host String
hostname PortID
port) = do
Maybe Handle
mh <- Int -> IO Handle -> IO (Maybe Handle)
forall a. Int -> IO a -> IO (Maybe a)
timeout (Secs -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (Secs -> Int) -> Secs -> Int
forall a b. (a -> b) -> a -> b
$ Secs
timeoutSecs Secs -> Secs -> Secs
forall a. Num a => a -> a -> a
* Secs
1000000) (String -> PortID -> IO Handle
connectTo String
hostname PortID
port)
Handle
handle <- IO Handle -> (Handle -> IO Handle) -> Maybe Handle -> IO Handle
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (IOError -> IO Handle
forall a. IOError -> IO a
ioError (IOError -> IO Handle) -> IOError -> IO Handle
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError String
"connect timed out") Handle -> IO Handle
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Handle
mh
rec
Pipe
p <- ServerData -> Handle -> IO Pipe
newPipe ServerData
sd Handle
handle
ServerData
sd <- Pipe
-> AccessMode -> Database -> Action IO ServerData -> IO ServerData
forall (m :: * -> *) a.
MonadIO m =>
Pipe -> AccessMode -> Database -> Action m a -> m a
access Pipe
p AccessMode
slaveOk Database
"admin" Action IO ServerData
forall (m :: * -> *). MonadIO m => Action m ServerData
retrieveServerData
Pipe -> IO Pipe
forall (m :: * -> *) a. Monad m => a -> m a
return Pipe
p
type ReplicaSetName = Text
data TransportSecurity = Secure | Unsecure
data ReplicaSet = ReplicaSet ReplicaSetName (MVar [(Host, Maybe Pipe)]) Secs TransportSecurity
replSetName :: ReplicaSet -> Text
replSetName :: ReplicaSet -> Database
replSetName (ReplicaSet Database
rsName MVar [(Host, Maybe Pipe)]
_ Secs
_ TransportSecurity
_) = Database
rsName
openReplicaSet :: (ReplicaSetName, [Host]) -> IO ReplicaSet
openReplicaSet :: (Database, [Host]) -> IO ReplicaSet
openReplicaSet (Database, [Host])
rsSeed = IORef Secs -> IO Secs
forall a. IORef a -> IO a
readIORef IORef Secs
globalConnectTimeout IO Secs -> (Secs -> IO ReplicaSet) -> IO ReplicaSet
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Secs -> (Database, [Host]) -> IO ReplicaSet)
-> (Database, [Host]) -> Secs -> IO ReplicaSet
forall a b c. (a -> b -> c) -> b -> a -> c
flip Secs -> (Database, [Host]) -> IO ReplicaSet
openReplicaSet' (Database, [Host])
rsSeed
openReplicaSet' :: Secs -> (ReplicaSetName, [Host]) -> IO ReplicaSet
openReplicaSet' :: Secs -> (Database, [Host]) -> IO ReplicaSet
openReplicaSet' Secs
timeoutSecs (Database
rs, [Host]
hosts) = Secs -> (Database, [Host], TransportSecurity) -> IO ReplicaSet
_openReplicaSet Secs
timeoutSecs (Database
rs, [Host]
hosts, TransportSecurity
Unsecure)
openReplicaSetTLS :: (ReplicaSetName, [Host]) -> IO ReplicaSet
openReplicaSetTLS :: (Database, [Host]) -> IO ReplicaSet
openReplicaSetTLS (Database, [Host])
rsSeed = IORef Secs -> IO Secs
forall a. IORef a -> IO a
readIORef IORef Secs
globalConnectTimeout IO Secs -> (Secs -> IO ReplicaSet) -> IO ReplicaSet
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Secs -> (Database, [Host]) -> IO ReplicaSet)
-> (Database, [Host]) -> Secs -> IO ReplicaSet
forall a b c. (a -> b -> c) -> b -> a -> c
flip Secs -> (Database, [Host]) -> IO ReplicaSet
openReplicaSetTLS' (Database, [Host])
rsSeed
openReplicaSetTLS' :: Secs -> (ReplicaSetName, [Host]) -> IO ReplicaSet
openReplicaSetTLS' :: Secs -> (Database, [Host]) -> IO ReplicaSet
openReplicaSetTLS' Secs
timeoutSecs (Database
rs, [Host]
hosts) = Secs -> (Database, [Host], TransportSecurity) -> IO ReplicaSet
_openReplicaSet Secs
timeoutSecs (Database
rs, [Host]
hosts, TransportSecurity
Secure)
_openReplicaSet :: Secs -> (ReplicaSetName, [Host], TransportSecurity) -> IO ReplicaSet
_openReplicaSet :: Secs -> (Database, [Host], TransportSecurity) -> IO ReplicaSet
_openReplicaSet Secs
timeoutSecs (Database
rsName, [Host]
seedList, TransportSecurity
transportSecurity) = do
MVar [(Host, Maybe Pipe)]
vMembers <- [(Host, Maybe Pipe)] -> IO (MVar [(Host, Maybe Pipe)])
forall (m :: * -> *) a. MonadBase IO m => a -> m (MVar a)
newMVar ((Host -> (Host, Maybe Pipe)) -> [Host] -> [(Host, Maybe Pipe)]
forall a b. (a -> b) -> [a] -> [b]
map (, Maybe Pipe
forall a. Maybe a
Nothing) [Host]
seedList)
let rs :: ReplicaSet
rs = Database
-> MVar [(Host, Maybe Pipe)]
-> Secs
-> TransportSecurity
-> ReplicaSet
ReplicaSet Database
rsName MVar [(Host, Maybe Pipe)]
vMembers Secs
timeoutSecs TransportSecurity
transportSecurity
ReplicaInfo
_ <- ReplicaSet -> IO ReplicaInfo
updateMembers ReplicaSet
rs
ReplicaSet -> IO ReplicaSet
forall (m :: * -> *) a. Monad m => a -> m a
return ReplicaSet
rs
openReplicaSetSRV :: HostName -> IO ReplicaSet
openReplicaSetSRV :: String -> IO ReplicaSet
openReplicaSetSRV String
hostname = do
Secs
timeoutSecs <- IORef Secs -> IO Secs
forall a. IORef a -> IO a
readIORef IORef Secs
globalConnectTimeout
Secs -> TransportSecurity -> String -> IO ReplicaSet
_openReplicaSetSRV Secs
timeoutSecs TransportSecurity
Unsecure String
hostname
openReplicaSetSRV' :: HostName -> IO ReplicaSet
openReplicaSetSRV' :: String -> IO ReplicaSet
openReplicaSetSRV' String
hostname = do
Secs
timeoutSecs <- IORef Secs -> IO Secs
forall a. IORef a -> IO a
readIORef IORef Secs
globalConnectTimeout
Secs -> TransportSecurity -> String -> IO ReplicaSet
_openReplicaSetSRV Secs
timeoutSecs TransportSecurity
Secure String
hostname
openReplicaSetSRV'' :: Secs -> HostName -> IO ReplicaSet
openReplicaSetSRV'' :: Secs -> String -> IO ReplicaSet
openReplicaSetSRV'' Secs
timeoutSecs = Secs -> TransportSecurity -> String -> IO ReplicaSet
_openReplicaSetSRV Secs
timeoutSecs TransportSecurity
Unsecure
openReplicaSetSRV''' :: Secs -> HostName -> IO ReplicaSet
openReplicaSetSRV''' :: Secs -> String -> IO ReplicaSet
openReplicaSetSRV''' Secs
timeoutSecs = Secs -> TransportSecurity -> String -> IO ReplicaSet
_openReplicaSetSRV Secs
timeoutSecs TransportSecurity
Secure
_openReplicaSetSRV :: Secs -> TransportSecurity -> HostName -> IO ReplicaSet
_openReplicaSetSRV :: Secs -> TransportSecurity -> String -> IO ReplicaSet
_openReplicaSetSRV Secs
timeoutSecs TransportSecurity
transportSecurity String
hostname = do
Maybe Database
replicaSetName <- String -> IO (Maybe Database)
lookupReplicaSetName String
hostname
[Host]
hosts <- String -> IO [Host]
lookupSeedList String
hostname
case (Maybe Database
replicaSetName, [Host]
hosts) of
(Maybe Database
Nothing, [Host]
_) -> IOError -> IO ReplicaSet
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (IOError -> IO ReplicaSet) -> IOError -> IO ReplicaSet
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError String
"Failed to lookup replica set name"
(Maybe Database
_, []) -> IOError -> IO ReplicaSet
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (IOError -> IO ReplicaSet) -> IOError -> IO ReplicaSet
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError String
"Failed to lookup replica set seedlist"
(Just Database
rsName, [Host]
_) ->
case TransportSecurity
transportSecurity of
TransportSecurity
Secure -> Secs -> (Database, [Host]) -> IO ReplicaSet
openReplicaSetTLS' Secs
timeoutSecs (Database
rsName, [Host]
hosts)
TransportSecurity
Unsecure -> Secs -> (Database, [Host]) -> IO ReplicaSet
openReplicaSet' Secs
timeoutSecs (Database
rsName, [Host]
hosts)
closeReplicaSet :: ReplicaSet -> IO ()
closeReplicaSet :: ReplicaSet -> IO ()
closeReplicaSet (ReplicaSet Database
_ MVar [(Host, Maybe Pipe)]
vMembers Secs
_ TransportSecurity
_) = MVar [(Host, Maybe Pipe)]
-> ([(Host, Maybe Pipe)] -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
MVar a -> (a -> m b) -> m b
withMVar MVar [(Host, Maybe Pipe)]
vMembers (([(Host, Maybe Pipe)] -> IO ()) -> IO ())
-> ([(Host, Maybe Pipe)] -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ ((Host, Maybe Pipe) -> IO ()) -> [(Host, Maybe Pipe)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (IO () -> (Pipe -> IO ()) -> Maybe Pipe -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) Pipe -> IO ()
close (Maybe Pipe -> IO ())
-> ((Host, Maybe Pipe) -> Maybe Pipe)
-> (Host, Maybe Pipe)
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Host, Maybe Pipe) -> Maybe Pipe
forall a b. (a, b) -> b
snd)
primary :: ReplicaSet -> IO Pipe
primary :: ReplicaSet -> IO Pipe
primary rs :: ReplicaSet
rs@(ReplicaSet Database
rsName MVar [(Host, Maybe Pipe)]
_ Secs
_ TransportSecurity
_) = do
Maybe Host
mHost <- ReplicaInfo -> Maybe Host
statedPrimary (ReplicaInfo -> Maybe Host) -> IO ReplicaInfo -> IO (Maybe Host)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ReplicaSet -> IO ReplicaInfo
updateMembers ReplicaSet
rs
case Maybe Host
mHost of
Just Host
host' -> ReplicaSet -> Maybe Pipe -> Host -> IO Pipe
connection ReplicaSet
rs Maybe Pipe
forall a. Maybe a
Nothing Host
host'
Maybe Host
Nothing -> IOError -> IO Pipe
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (IOError -> IO Pipe) -> IOError -> IO Pipe
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ String
"replica set " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Database -> String
T.unpack Database
rsName String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" has no primary"
secondaryOk :: ReplicaSet -> IO Pipe
secondaryOk :: ReplicaSet -> IO Pipe
secondaryOk ReplicaSet
rs = do
ReplicaInfo
info <- ReplicaSet -> IO ReplicaInfo
updateMembers ReplicaSet
rs
[Host]
hosts <- [Host] -> IO [Host]
forall a. [a] -> IO [a]
shuffle (ReplicaInfo -> [Host]
possibleHosts ReplicaInfo
info)
let hosts' :: [Host]
hosts' = [Host] -> (Host -> [Host]) -> Maybe Host -> [Host]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [Host]
hosts (\Host
p -> Host -> [Host] -> [Host]
forall a. Eq a => a -> [a] -> [a]
delete Host
p [Host]
hosts [Host] -> [Host] -> [Host]
forall a. [a] -> [a] -> [a]
++ [Host
p]) (ReplicaInfo -> Maybe Host
statedPrimary ReplicaInfo
info)
(Host -> IO Pipe) -> [Host] -> IO Pipe
forall e (m :: * -> *) a b.
MonadError e m =>
(a -> m b) -> [a] -> m b
untilSuccess (ReplicaSet -> Maybe Pipe -> Host -> IO Pipe
connection ReplicaSet
rs Maybe Pipe
forall a. Maybe a
Nothing) [Host]
hosts'
routedHost :: ((Host, Bool) -> (Host, Bool) -> IO Ordering) -> ReplicaSet -> IO Pipe
routedHost :: ((Host, Bool) -> (Host, Bool) -> IO Ordering)
-> ReplicaSet -> IO Pipe
routedHost (Host, Bool) -> (Host, Bool) -> IO Ordering
f ReplicaSet
rs = do
ReplicaInfo
info <- ReplicaSet -> IO ReplicaInfo
updateMembers ReplicaSet
rs
[Host]
hosts <- [Host] -> IO [Host]
forall a. [a] -> IO [a]
shuffle (ReplicaInfo -> [Host]
possibleHosts ReplicaInfo
info)
let addIsPrimary :: Host -> (Host, Bool)
addIsPrimary Host
h = (Host
h, if Host -> Maybe Host
forall a. a -> Maybe a
Just Host
h Maybe Host -> Maybe Host -> Bool
forall a. Eq a => a -> a -> Bool
== ReplicaInfo -> Maybe Host
statedPrimary ReplicaInfo
info then Bool
True else Bool
False)
[Host]
hosts' <- (Host -> Host -> IO Ordering) -> [Host] -> IO [Host]
forall (m :: * -> *) a.
Monad m =>
(a -> a -> m Ordering) -> [a] -> m [a]
mergesortM (\Host
a Host
b -> (Host, Bool) -> (Host, Bool) -> IO Ordering
f (Host -> (Host, Bool)
addIsPrimary Host
a) (Host -> (Host, Bool)
addIsPrimary Host
b)) [Host]
hosts
(Host -> IO Pipe) -> [Host] -> IO Pipe
forall e (m :: * -> *) a b.
MonadError e m =>
(a -> m b) -> [a] -> m b
untilSuccess (ReplicaSet -> Maybe Pipe -> Host -> IO Pipe
connection ReplicaSet
rs Maybe Pipe
forall a. Maybe a
Nothing) [Host]
hosts'
type ReplicaInfo = (Host, Document)
statedPrimary :: ReplicaInfo -> Maybe Host
statedPrimary :: ReplicaInfo -> Maybe Host
statedPrimary (Host
host', Command
info) = if (Database -> Command -> Bool
forall v. Val v => Database -> Command -> v
at Database
"ismaster" Command
info) then Host -> Maybe Host
forall a. a -> Maybe a
Just Host
host' else String -> Host
readHostPort (String -> Host) -> Maybe String -> Maybe Host
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Database -> Command -> Maybe String
forall v (m :: * -> *).
(Val v, MonadFail m) =>
Database -> Command -> m v
B.lookup Database
"primary" Command
info
possibleHosts :: ReplicaInfo -> [Host]
possibleHosts :: ReplicaInfo -> [Host]
possibleHosts (Host
_, Command
info) = (String -> Host) -> [String] -> [Host]
forall a b. (a -> b) -> [a] -> [b]
map String -> Host
readHostPort ([String] -> [Host]) -> [String] -> [Host]
forall a b. (a -> b) -> a -> b
$ Database -> Command -> [String]
forall v. Val v => Database -> Command -> v
at Database
"hosts" Command
info
updateMembers :: ReplicaSet -> IO ReplicaInfo
updateMembers :: ReplicaSet -> IO ReplicaInfo
updateMembers rs :: ReplicaSet
rs@(ReplicaSet Database
_ MVar [(Host, Maybe Pipe)]
vMembers Secs
_ TransportSecurity
_) = do
(Host
host', Command
info) <- ((Host, Maybe Pipe) -> IO ReplicaInfo)
-> [(Host, Maybe Pipe)] -> IO ReplicaInfo
forall e (m :: * -> *) a b.
MonadError e m =>
(a -> m b) -> [a] -> m b
untilSuccess (ReplicaSet -> (Host, Maybe Pipe) -> IO ReplicaInfo
fetchReplicaInfo ReplicaSet
rs) ([(Host, Maybe Pipe)] -> IO ReplicaInfo)
-> IO [(Host, Maybe Pipe)] -> IO ReplicaInfo
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MVar [(Host, Maybe Pipe)] -> IO [(Host, Maybe Pipe)]
forall (m :: * -> *) a. MonadBase IO m => MVar a -> m a
readMVar MVar [(Host, Maybe Pipe)]
vMembers
MVar [(Host, Maybe Pipe)]
-> ([(Host, Maybe Pipe)] -> IO ([(Host, Maybe Pipe)], ReplicaInfo))
-> IO ReplicaInfo
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
MVar a -> (a -> m (a, b)) -> m b
modifyMVar MVar [(Host, Maybe Pipe)]
vMembers (([(Host, Maybe Pipe)] -> IO ([(Host, Maybe Pipe)], ReplicaInfo))
-> IO ReplicaInfo)
-> ([(Host, Maybe Pipe)] -> IO ([(Host, Maybe Pipe)], ReplicaInfo))
-> IO ReplicaInfo
forall a b. (a -> b) -> a -> b
$ \[(Host, Maybe Pipe)]
members -> do
let (([(Host, Maybe Pipe)]
members', [(Host, Maybe Pipe)]
old), [Host]
new) = [Host]
-> [(Host, Maybe Pipe)]
-> (([(Host, Maybe Pipe)], [(Host, Maybe Pipe)]), [Host])
forall k v. Eq k => [k] -> [(k, v)] -> (([(k, v)], [(k, v)]), [k])
intersection ((String -> Host) -> [String] -> [Host]
forall a b. (a -> b) -> [a] -> [b]
map String -> Host
readHostPort ([String] -> [Host]) -> [String] -> [Host]
forall a b. (a -> b) -> a -> b
$ Database -> Command -> [String]
forall v. Val v => Database -> Command -> v
at Database
"hosts" Command
info) [(Host, Maybe Pipe)]
members
[(Host, Maybe Pipe)] -> ((Host, Maybe Pipe) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(Host, Maybe Pipe)]
old (((Host, Maybe Pipe) -> IO ()) -> IO ())
-> ((Host, Maybe Pipe) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Host
_, Maybe Pipe
mPipe) -> IO () -> (Pipe -> IO ()) -> Maybe Pipe -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) Pipe -> IO ()
close Maybe Pipe
mPipe
([(Host, Maybe Pipe)], ReplicaInfo)
-> IO ([(Host, Maybe Pipe)], ReplicaInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return ([(Host, Maybe Pipe)]
members' [(Host, Maybe Pipe)]
-> [(Host, Maybe Pipe)] -> [(Host, Maybe Pipe)]
forall a. [a] -> [a] -> [a]
++ (Host -> (Host, Maybe Pipe)) -> [Host] -> [(Host, Maybe Pipe)]
forall a b. (a -> b) -> [a] -> [b]
map (, Maybe Pipe
forall a. Maybe a
Nothing) [Host]
new, (Host
host', Command
info))
where
intersection :: (Eq k) => [k] -> [(k, v)] -> (([(k, v)], [(k, v)]), [k])
intersection :: [k] -> [(k, v)] -> (([(k, v)], [(k, v)]), [k])
intersection [k]
keys [(k, v)]
assocs = (((k, v) -> Bool) -> [(k, v)] -> ([(k, v)], [(k, v)])
forall a. (a -> Bool) -> [a] -> ([a], [a])
partition ((k -> [k] -> Bool) -> [k] -> k -> Bool
forall a b c. (a -> b -> c) -> b -> a -> c
flip k -> [k] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem [k]
inKeys (k -> Bool) -> ((k, v) -> k) -> (k, v) -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (k, v) -> k
forall a b. (a, b) -> a
fst) [(k, v)]
assocs, [k]
keys [k] -> [k] -> [k]
forall a. Eq a => [a] -> [a] -> [a]
\\ [k]
inKeys) where
assocKeys :: [k]
assocKeys = ((k, v) -> k) -> [(k, v)] -> [k]
forall a b. (a -> b) -> [a] -> [b]
map (k, v) -> k
forall a b. (a, b) -> a
fst [(k, v)]
assocs
inKeys :: [k]
inKeys = [k] -> [k] -> [k]
forall a. Eq a => [a] -> [a] -> [a]
intersect [k]
keys [k]
assocKeys
fetchReplicaInfo :: ReplicaSet -> (Host, Maybe Pipe) -> IO ReplicaInfo
fetchReplicaInfo :: ReplicaSet -> (Host, Maybe Pipe) -> IO ReplicaInfo
fetchReplicaInfo rs :: ReplicaSet
rs@(ReplicaSet Database
rsName MVar [(Host, Maybe Pipe)]
_ Secs
_ TransportSecurity
_) (Host
host', Maybe Pipe
mPipe) = do
Pipe
pipe <- ReplicaSet -> Maybe Pipe -> Host -> IO Pipe
connection ReplicaSet
rs Maybe Pipe
mPipe Host
host'
Command
info <- Command -> Pipe -> IO Command
adminCommand [Database
"isMaster" Database -> Int -> Field
forall v. Val v => Database -> v -> Field
=: (Int
1 :: Int)] Pipe
pipe
case Database -> Command -> Maybe Database
forall v (m :: * -> *).
(Val v, MonadFail m) =>
Database -> Command -> m v
B.lookup Database
"setName" Command
info of
Maybe Database
Nothing -> IOError -> IO ReplicaInfo
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (IOError -> IO ReplicaInfo) -> IOError -> IO ReplicaInfo
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ Host -> String
forall a. Show a => a -> String
show Host
host' String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" not a member of any replica set, including " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Database -> String
T.unpack Database
rsName String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
": " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Command -> String
forall a. Show a => a -> String
show Command
info
Just Database
setName | Database
setName Database -> Database -> Bool
forall a. Eq a => a -> a -> Bool
/= Database
rsName -> IOError -> IO ReplicaInfo
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (IOError -> IO ReplicaInfo) -> IOError -> IO ReplicaInfo
forall a b. (a -> b) -> a -> b
$ String -> IOError
userError (String -> IOError) -> String -> IOError
forall a b. (a -> b) -> a -> b
$ Host -> String
forall a. Show a => a -> String
show Host
host' String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
" not a member of replica set " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Database -> String
T.unpack Database
rsName String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
": " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Command -> String
forall a. Show a => a -> String
show Command
info
Just Database
_ -> ReplicaInfo -> IO ReplicaInfo
forall (m :: * -> *) a. Monad m => a -> m a
return (Host
host', Command
info)
connection :: ReplicaSet -> Maybe Pipe -> Host -> IO Pipe
connection :: ReplicaSet -> Maybe Pipe -> Host -> IO Pipe
connection (ReplicaSet Database
_ MVar [(Host, Maybe Pipe)]
vMembers Secs
timeoutSecs TransportSecurity
transportSecurity) Maybe Pipe
mPipe Host
host' =
IO Pipe -> (Pipe -> IO Pipe) -> Maybe Pipe -> IO Pipe
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO Pipe
conn (\Pipe
p -> Pipe -> IO Bool
isClosed Pipe
p IO Bool -> (Bool -> IO Pipe) -> IO Pipe
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
bad -> if Bool
bad then IO Pipe
conn else Pipe -> IO Pipe
forall (m :: * -> *) a. Monad m => a -> m a
return Pipe
p) Maybe Pipe
mPipe
where
conn :: IO Pipe
conn = MVar [(Host, Maybe Pipe)]
-> ([(Host, Maybe Pipe)] -> IO ([(Host, Maybe Pipe)], Pipe))
-> IO Pipe
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
MVar a -> (a -> m (a, b)) -> m b
modifyMVar MVar [(Host, Maybe Pipe)]
vMembers (([(Host, Maybe Pipe)] -> IO ([(Host, Maybe Pipe)], Pipe))
-> IO Pipe)
-> ([(Host, Maybe Pipe)] -> IO ([(Host, Maybe Pipe)], Pipe))
-> IO Pipe
forall a b. (a -> b) -> a -> b
$ \[(Host, Maybe Pipe)]
members -> do
let (Host String
h PortID
p) = Host
host'
let conn' :: IO Pipe
conn' = case TransportSecurity
transportSecurity of
TransportSecurity
Secure -> String -> PortID -> IO Pipe
TLS.connect String
h PortID
p
TransportSecurity
Unsecure -> Secs -> Host -> IO Pipe
connect' Secs
timeoutSecs Host
host'
let new :: IO ([(Host, Maybe Pipe)], Pipe)
new = IO Pipe
conn' IO Pipe
-> (Pipe -> IO ([(Host, Maybe Pipe)], Pipe))
-> IO ([(Host, Maybe Pipe)], Pipe)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Pipe
pipe -> ([(Host, Maybe Pipe)], Pipe) -> IO ([(Host, Maybe Pipe)], Pipe)
forall (m :: * -> *) a. Monad m => a -> m a
return (Host -> Maybe Pipe -> [(Host, Maybe Pipe)] -> [(Host, Maybe Pipe)]
forall k v. Eq k => k -> v -> [(k, v)] -> [(k, v)]
updateAssocs Host
host' (Pipe -> Maybe Pipe
forall a. a -> Maybe a
Just Pipe
pipe) [(Host, Maybe Pipe)]
members, Pipe
pipe)
case Host -> [(Host, Maybe Pipe)] -> Maybe (Maybe Pipe)
forall a b. Eq a => a -> [(a, b)] -> Maybe b
List.lookup Host
host' [(Host, Maybe Pipe)]
members of
Just (Just Pipe
pipe) -> Pipe -> IO Bool
isClosed Pipe
pipe IO Bool
-> (Bool -> IO ([(Host, Maybe Pipe)], Pipe))
-> IO ([(Host, Maybe Pipe)], Pipe)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
bad -> if Bool
bad then IO ([(Host, Maybe Pipe)], Pipe)
new else ([(Host, Maybe Pipe)], Pipe) -> IO ([(Host, Maybe Pipe)], Pipe)
forall (m :: * -> *) a. Monad m => a -> m a
return ([(Host, Maybe Pipe)]
members, Pipe
pipe)
Maybe (Maybe Pipe)
_ -> IO ([(Host, Maybe Pipe)], Pipe)
new