{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, TypeFamilies, CPP, DeriveDataTypeable, ScopedTypeVariables, BangPatterns #-}
module Database.MongoDB.Query (
Action, access, Failure(..), ErrorCode,
AccessMode(..), GetLastError, master, slaveOk, accessMode,
liftDB,
MongoContext(..), HasMongoContext(..),
Database, allDatabases, useDb, thisDatabase,
Username, Password, auth, authMongoCR, authSCRAMSHA1,
Collection, allCollections,
Selection(..), Selector, whereJS,
Select(select),
insert, insert_, insertMany, insertMany_, insertAll, insertAll_,
save, replace, repsert, upsert, Modifier, modify, updateMany, updateAll,
WriteResult(..), UpdateOption(..), Upserted(..),
delete, deleteOne, deleteMany, deleteAll, DeleteOption(..),
Query(..), QueryOption(NoCursorTimeout, TailableCursor, AwaitData, Partial),
Projector, Limit, Order, BatchSize,
explain, find, findOne, fetch,
findAndModify, findAndModifyOpts, FindAndModifyOpts(..), defFamUpdateOpts,
count, distinct,
Cursor, nextBatch, next, nextN, rest, closeCursor, isCursorClosed,
Pipeline, AggregateConfig(..), aggregate, aggregateCursor,
Group(..), GroupKey(..), group,
MapReduce(..), MapFun, ReduceFun, FinalizeFun, MROut(..), MRMerge(..),
MRResult, mapReduce, runMR, runMR',
Command, runCommand, runCommand1,
eval, retrieveServerData, ServerData(..)
) where
import Prelude hiding (lookup)
import Control.Exception (Exception, throwIO)
import Control.Monad (unless, replicateM, liftM, liftM2)
import Control.Monad.Fail(MonadFail)
import Data.Default.Class (Default(..))
import Data.Int (Int32, Int64)
import Data.Either (lefts, rights)
import Data.List (foldl1')
import Data.Maybe (listToMaybe, catMaybes, isNothing)
import Data.Word (Word32)
#if !MIN_VERSION_base(4,8,0)
import Data.Monoid (mappend)
#endif
import Data.Typeable (Typeable)
import System.Mem.Weak (Weak)
import qualified Control.Concurrent.MVar as MV
#if MIN_VERSION_base(4,6,0)
import Control.Concurrent.MVar.Lifted (MVar,
readMVar)
#else
import Control.Concurrent.MVar.Lifted (MVar, addMVarFinalizer,
readMVar)
#endif
import Control.Applicative ((<$>))
import Control.Exception (catch)
import Control.Monad (when, void)
import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask, asks, local)
import Control.Monad.Trans (MonadIO, liftIO)
import Data.Binary.Put (runPut)
import Data.Bson (Document, Field(..), Label, Val, Value(String, Doc, Bool),
Javascript, at, valueAt, lookup, look, genObjectId, (=:),
(=?), (!?), Val(..), ObjectId, Value(..))
import Data.Bson.Binary (putDocument)
import Data.Text (Text)
import qualified Data.Text as T
import Database.MongoDB.Internal.Protocol (Reply(..), QueryOption(..),
ResponseFlag(..), InsertOption(..),
UpdateOption(..), DeleteOption(..),
CursorId, FullCollection, Username,
Password, Pipe, Notice(..),
Request(GetMore, qOptions, qSkip,
qFullCollection, qBatchSize,
qSelector, qProjector),
pwKey, ServerData(..))
import Database.MongoDB.Internal.Util (loop, liftIOE, true1, (<.>))
import qualified Database.MongoDB.Internal.Protocol as P
import qualified Crypto.Nonce as Nonce
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as LBS
import qualified Data.ByteString.Base16 as B16
import qualified Data.ByteString.Base64 as B64
import qualified Data.ByteString.Char8 as B
import qualified Data.Either as E
import qualified Crypto.Hash.MD5 as MD5
import qualified Crypto.Hash.SHA1 as SHA1
import qualified Crypto.MAC.HMAC as HMAC
import Data.Bits (xor)
import qualified Data.Map as Map
import Text.Read (readMaybe)
import Data.Maybe (fromMaybe)
type Action = ReaderT MongoContext
access :: (MonadIO m) => Pipe -> AccessMode -> Database -> Action m a -> m a
access mongoPipe mongoAccessMode mongoDatabase action = runReaderT action MongoContext{..}
data Failure =
ConnectionFailure IOError
| CursorNotFoundFailure CursorId
| QueryFailure ErrorCode String
| WriteFailure Int ErrorCode String
| WriteConcernFailure Int String
| DocNotFound Selection
| AggregateFailure String
| CompoundFailure [Failure]
| ProtocolFailure Int String
deriving (Show, Eq, Typeable)
instance Exception Failure
type ErrorCode = Int
data AccessMode =
ReadStaleOk
| UnconfirmedWrites
| ConfirmWrites GetLastError
deriving Show
type GetLastError = Document
class Result a where
isFailed :: a -> Bool
data WriteResult = WriteResult
{ failed :: Bool
, nMatched :: Int
, nModified :: Maybe Int
, nRemoved :: Int
, upserted :: [Upserted]
, writeErrors :: [Failure]
, writeConcernErrors :: [Failure]
} deriving Show
instance Result WriteResult where
isFailed = failed
instance Result (Either a b) where
isFailed (Left _) = True
isFailed _ = False
data Upserted = Upserted
{ upsertedIndex :: Int
, upsertedId :: ObjectId
} deriving Show
master :: AccessMode
master = ConfirmWrites []
slaveOk :: AccessMode
slaveOk = ReadStaleOk
accessMode :: (Monad m) => AccessMode -> Action m a -> Action m a
accessMode mode act = local (\ctx -> ctx {mongoAccessMode = mode}) act
readMode :: AccessMode -> ReadMode
readMode ReadStaleOk = StaleOk
readMode _ = Fresh
writeMode :: AccessMode -> WriteMode
writeMode ReadStaleOk = Confirm []
writeMode UnconfirmedWrites = NoConfirm
writeMode (ConfirmWrites z) = Confirm z
data MongoContext = MongoContext {
mongoPipe :: Pipe,
mongoAccessMode :: AccessMode,
mongoDatabase :: Database }
mongoReadMode :: MongoContext -> ReadMode
mongoReadMode = readMode . mongoAccessMode
mongoWriteMode :: MongoContext -> WriteMode
mongoWriteMode = writeMode . mongoAccessMode
class HasMongoContext env where
mongoContext :: env -> MongoContext
instance HasMongoContext MongoContext where
mongoContext = id
liftDB :: (MonadReader env m, HasMongoContext env, MonadIO m)
=> Action IO a
-> m a
liftDB m = do
env <- ask
liftIO $ runReaderT m (mongoContext env)
type Database = Text
allDatabases :: (MonadIO m) => Action m [Database]
allDatabases = (map (at "name") . at "databases") `liftM` useDb "admin" (runCommand1 "listDatabases")
thisDatabase :: (Monad m) => Action m Database
thisDatabase = asks mongoDatabase
useDb :: (Monad m) => Database -> Action m a -> Action m a
useDb db act = local (\ctx -> ctx {mongoDatabase = db}) act
auth :: MonadIO m => Username -> Password -> Action m Bool
auth un pw = do
let serverVersion = liftM (at "version") $ useDb "admin" $ runCommand ["buildinfo" =: (1 :: Int)]
mmv <- liftM (readMaybe . T.unpack . head . T.splitOn ".") $ serverVersion
maybe (return False) performAuth mmv
where
performAuth majorVersion =
case (majorVersion >= (3 :: Int)) of
True -> authSCRAMSHA1 un pw
False -> authMongoCR un pw
authMongoCR :: (MonadIO m) => Username -> Password -> Action m Bool
authMongoCR usr pss = do
n <- at "nonce" `liftM` runCommand ["getnonce" =: (1 :: Int)]
true1 "ok" `liftM` runCommand ["authenticate" =: (1 :: Int), "user" =: usr, "nonce" =: n, "key" =: pwKey n usr pss]
authSCRAMSHA1 :: MonadIO m => Username -> Password -> Action m Bool
authSCRAMSHA1 un pw = do
let hmac = HMAC.hmac SHA1.hash 64
nonce <- liftIO (Nonce.withGenerator Nonce.nonce128 >>= return . B64.encode)
let firstBare = B.concat [B.pack $ "n=" ++ (T.unpack un) ++ ",r=", nonce]
let client1 = ["saslStart" =: (1 :: Int), "mechanism" =: ("SCRAM-SHA-1" :: String), "payload" =: (B.unpack . B64.encode $ B.concat [B.pack "n,,", firstBare]), "autoAuthorize" =: (1 :: Int)]
server1 <- runCommand client1
shortcircuit (true1 "ok" server1) $ do
let serverPayload1 = B64.decodeLenient . B.pack . at "payload" $ server1
let serverData1 = parseSCRAM serverPayload1
let iterations = read . B.unpack $ Map.findWithDefault "1" "i" serverData1
let salt = B64.decodeLenient $ Map.findWithDefault "" "s" serverData1
let snonce = Map.findWithDefault "" "r" serverData1
shortcircuit (B.isInfixOf nonce snonce) $ do
let withoutProof = B.concat [B.pack "c=biws,r=", snonce]
let digestS = B.pack $ T.unpack un ++ ":mongo:" ++ T.unpack pw
let digest = B16.encode $ MD5.hash digestS
let saltedPass = scramHI digest salt iterations
let clientKey = hmac saltedPass (B.pack "Client Key")
let storedKey = SHA1.hash clientKey
let authMsg = B.concat [firstBare, B.pack ",", serverPayload1, B.pack ",", withoutProof]
let clientSig = hmac storedKey authMsg
let pval = B64.encode . BS.pack $ BS.zipWith xor clientKey clientSig
let clientFinal = B.concat [withoutProof, B.pack ",p=", pval]
let serverKey = hmac saltedPass (B.pack "Server Key")
let serverSig = B64.encode $ hmac serverKey authMsg
let client2 = ["saslContinue" =: (1 :: Int), "conversationId" =: (at "conversationId" server1 :: Int), "payload" =: (B.unpack $ B64.encode clientFinal)]
server2 <- runCommand client2
shortcircuit (true1 "ok" server2) $ do
let serverPayload2 = B64.decodeLenient . B.pack $ at "payload" server2
let serverData2 = parseSCRAM serverPayload2
let serverSigComp = Map.findWithDefault "" "v" serverData2
shortcircuit (serverSig == serverSigComp) $ do
let done = true1 "done" server2
if done
then return True
else do
let client2Step2 = [ "saslContinue" =: (1 :: Int)
, "conversationId" =: (at "conversationId" server1 :: Int)
, "payload" =: String ""]
server3 <- runCommand client2Step2
shortcircuit (true1 "ok" server3) $ do
return True
where
shortcircuit True f = f
shortcircuit False _ = return False
scramHI :: B.ByteString -> B.ByteString -> Int -> B.ByteString
scramHI digest salt iters = snd $ foldl com (u1, u1) [1..(iters-1)]
where
hmacd = HMAC.hmac SHA1.hash 64 digest
u1 = hmacd (B.concat [salt, BS.pack [0, 0, 0, 1]])
com (u,uc) _ = let u' = hmacd u in (u', BS.pack $ BS.zipWith xor uc u')
parseSCRAM :: B.ByteString -> Map.Map B.ByteString B.ByteString
parseSCRAM = Map.fromList . fmap cleanup . (fmap $ T.breakOn "=") . T.splitOn "," . T.pack . B.unpack
where cleanup (t1, t2) = (B.pack $ T.unpack t1, B.pack . T.unpack $ T.drop 1 t2)
retrieveServerData :: (MonadIO m) => Action m ServerData
retrieveServerData = do
d <- runCommand1 "isMaster"
let newSd = ServerData
{ isMaster = (fromMaybe False $ lookup "ismaster" d)
, minWireVersion = (fromMaybe 0 $ lookup "minWireVersion" d)
, maxWireVersion = (fromMaybe 0 $ lookup "maxWireVersion" d)
, maxMessageSizeBytes = (fromMaybe 48000000 $ lookup "maxMessageSizeBytes" d)
, maxBsonObjectSize = (fromMaybe (16 * 1024 * 1024) $ lookup "maxBsonObjectSize" d)
, maxWriteBatchSize = (fromMaybe 1000 $ lookup "maxWriteBatchSize" d)
}
return newSd
type Collection = Text
allCollections :: MonadIO m => Action m [Collection]
allCollections = do
p <- asks mongoPipe
let sd = P.serverData p
if (maxWireVersion sd <= 2)
then do
db <- thisDatabase
docs <- rest =<< find (query [] "system.namespaces") {sort = ["name" =: (1 :: Int)]}
return . filter (not . isSpecial db) . map dropDbPrefix $ map (at "name") docs
else do
r <- runCommand1 "listCollections"
let curData = do
(Doc curDoc) <- r !? "cursor"
(curId :: Int64) <- curDoc !? "id"
(curNs :: Text) <- curDoc !? "ns"
(firstBatch :: [Value]) <- curDoc !? "firstBatch"
return $ (curId, curNs, ((catMaybes (map cast' firstBatch)) :: [Document]))
case curData of
Nothing -> return []
Just (curId, curNs, firstBatch) -> do
db <- thisDatabase
nc <- newCursor db curNs 0 $ return $ Batch Nothing curId firstBatch
docs <- rest nc
return $ catMaybes $ map (\d -> (d !? "name")) docs
where
dropDbPrefix = T.tail . T.dropWhile (/= '.')
isSpecial db col = T.any (== '$') col && db <.> col /= "local.oplog.$main"
data Selection = Select {selector :: Selector, coll :: Collection} deriving (Show, Eq)
type Selector = Document
whereJS :: Selector -> Javascript -> Selector
whereJS sel js = ("$where" =: js) : sel
class Select aQueryOrSelection where
select :: Selector -> Collection -> aQueryOrSelection
instance Select Selection where
select = Select
instance Select Query where
select = query
data WriteMode =
NoConfirm
| Confirm GetLastError
deriving (Show, Eq)
write :: Notice -> Action IO (Maybe Document)
write notice = asks mongoWriteMode >>= \mode -> case mode of
NoConfirm -> do
pipe <- asks mongoPipe
liftIOE ConnectionFailure $ P.send pipe [notice]
return Nothing
Confirm params -> do
let q = query (("getlasterror" =: (1 :: Int)) : params) "$cmd"
pipe <- asks mongoPipe
Batch _ _ [doc] <- do
r <- queryRequest False q {limit = 1}
rr <- liftIO $ request pipe [notice] r
fulfill rr
return $ Just doc
insert :: (MonadIO m) => Collection -> Document -> Action m Value
insert col doc = do
doc' <- liftIO $ assignId doc
res <- insertBlock [] col (0, [doc'])
case res of
Left failure -> liftIO $ throwIO failure
Right r -> return $ head r
insert_ :: (MonadIO m) => Collection -> Document -> Action m ()
insert_ col doc = insert col doc >> return ()
insertMany :: (MonadIO m) => Collection -> [Document] -> Action m [Value]
insertMany = insert' []
insertMany_ :: (MonadIO m) => Collection -> [Document] -> Action m ()
insertMany_ col docs = insertMany col docs >> return ()
insertAll :: (MonadIO m) => Collection -> [Document] -> Action m [Value]
insertAll = insert' [KeepGoing]
insertAll_ :: (MonadIO m) => Collection -> [Document] -> Action m ()
insertAll_ col docs = insertAll col docs >> return ()
insertCommandDocument :: [InsertOption] -> Collection -> [Document] -> Document -> Document
insertCommandDocument opts col docs writeConcern =
[ "insert" =: col
, "ordered" =: (KeepGoing `notElem` opts)
, "documents" =: docs
, "writeConcern" =: writeConcern
]
takeRightsUpToLeft :: [Either a b] -> [b]
takeRightsUpToLeft l = E.rights $ takeWhile E.isRight l
insert' :: (MonadIO m)
=> [InsertOption] -> Collection -> [Document] -> Action m [Value]
insert' opts col docs = do
p <- asks mongoPipe
let sd = P.serverData p
docs' <- liftIO $ mapM assignId docs
mode <- asks mongoWriteMode
let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int)]
Confirm params -> params
let docSize = sizeOfDocument $ insertCommandDocument opts col [] writeConcern
let ordered = (not (KeepGoing `elem` opts))
let preChunks = splitAtLimit
(maxBsonObjectSize sd - docSize)
(maxWriteBatchSize sd)
docs'
let chunks =
if ordered
then takeRightsUpToLeft preChunks
else rights preChunks
let lens = map length chunks
let lSums = 0 : (zipWith (+) lSums lens)
chunkResults <- interruptibleFor ordered (zip lSums chunks) $ insertBlock opts col
let lchunks = lefts preChunks
when (not $ null lchunks) $ do
liftIO $ throwIO $ head lchunks
let lresults = lefts chunkResults
when (not $ null lresults) $ liftIO $ throwIO $ head lresults
return $ concat $ rights chunkResults
insertBlock :: (MonadIO m)
=> [InsertOption] -> Collection -> (Int, [Document]) -> Action m (Either Failure [Value])
insertBlock _ _ (_, []) = return $ Right []
insertBlock opts col (prevCount, docs) = do
db <- thisDatabase
p <- asks mongoPipe
let sd = P.serverData p
if (maxWireVersion sd < 2)
then do
res <- liftDB $ write (Insert (db <.> col) opts docs)
let errorMessage = do
jRes <- res
em <- lookup "err" jRes
return $ WriteFailure prevCount (maybe 0 id $ lookup "code" jRes) em
case errorMessage of
Just failure -> return $ Left failure
Nothing -> return $ Right $ map (valueAt "_id") docs
else do
mode <- asks mongoWriteMode
let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int)]
Confirm params -> params
doc <- runCommand $ insertCommandDocument opts col docs writeConcern
case (look "writeErrors" doc, look "writeConcernError" doc) of
(Nothing, Nothing) -> return $ Right $ map (valueAt "_id") docs
(Just (Array errs), Nothing) -> do
let writeErrors = map (anyToWriteError prevCount) $ errs
let errorsWithFailureIndex = map (addFailureIndex prevCount) writeErrors
return $ Left $ CompoundFailure errorsWithFailureIndex
(Nothing, Just err) -> do
return $ Left $ WriteFailure
prevCount
(maybe 0 id $ lookup "ok" doc)
(show err)
(Just (Array errs), Just writeConcernErr) -> do
let writeErrors = map (anyToWriteError prevCount) $ errs
let errorsWithFailureIndex = map (addFailureIndex prevCount) writeErrors
return $ Left $ CompoundFailure $ (WriteFailure
prevCount
(maybe 0 id $ lookup "ok" doc)
(show writeConcernErr)) : errorsWithFailureIndex
(Just unknownValue, Nothing) -> do
return $ Left $ ProtocolFailure prevCount $ "Expected array of errors. Received: " ++ show unknownValue
(Just unknownValue, Just writeConcernErr) -> do
return $ Left $ CompoundFailure $ [ ProtocolFailure prevCount $ "Expected array of errors. Received: " ++ show unknownValue
, WriteFailure prevCount (maybe 0 id $ lookup "ok" doc) $ show writeConcernErr]
splitAtLimit :: Int -> Int -> [Document] -> [Either Failure [Document]]
splitAtLimit maxSize maxCount list = chop (go 0 0 []) list
where
go :: Int -> Int -> [Document] -> [Document] -> ((Either Failure [Document]), [Document])
go _ _ res [] = (Right $ reverse res, [])
go curSize curCount [] (x:xs) |
((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize) =
(Left $ WriteFailure 0 0 "One document is too big for the message", xs)
go curSize curCount res (x:xs) =
if ( ((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize)
|| ((curCount + 1) > maxCount))
then
(Right $ reverse res, x:xs)
else
go (curSize + (sizeOfDocument x)) (curCount + 1) (x:res) xs
chop :: ([a] -> (b, [a])) -> [a] -> [b]
chop _ [] = []
chop f as = let (b, as') = f as in b : chop f as'
sizeOfDocument :: Document -> Int
sizeOfDocument d = fromIntegral $ LBS.length $ runPut $ putDocument d
assignId :: Document -> IO Document
assignId doc = if any (("_id" ==) . label) doc
then return doc
else (\oid -> ("_id" =: oid) : doc) `liftM` genObjectId
save :: (MonadIO m)
=> Collection -> Document -> Action m ()
save col doc = case look "_id" doc of
Nothing -> insert_ col doc
Just i -> upsert (Select ["_id" := i] col) doc
replace :: (MonadIO m)
=> Selection -> Document -> Action m ()
replace = update []
repsert :: (MonadIO m)
=> Selection -> Document -> Action m ()
repsert = update [Upsert]
{-# DEPRECATED repsert "use upsert instead" #-}
upsert :: (MonadIO m)
=> Selection -> Document -> Action m ()
upsert = update [Upsert]
type Modifier = Document
modify :: (MonadIO m)
=> Selection -> Modifier -> Action m ()
modify = update [MultiUpdate]
update :: (MonadIO m)
=> [UpdateOption] -> Selection -> Document -> Action m ()
update opts (Select sel col) up = do
db <- thisDatabase
ctx <- ask
liftIO $ runReaderT (void $ write (Update (db <.> col) opts sel up)) ctx
updateCommandDocument :: Collection -> Bool -> [Document] -> Document -> Document
updateCommandDocument col ordered updates writeConcern =
[ "update" =: col
, "ordered" =: ordered
, "updates" =: updates
, "writeConcern" =: writeConcern
]
updateMany :: (MonadIO m)
=> Collection
-> [(Selector, Document, [UpdateOption])]
-> Action m WriteResult
updateMany = update' True
updateAll :: (MonadIO m)
=> Collection
-> [(Selector, Document, [UpdateOption])]
-> Action m WriteResult
updateAll = update' False
update' :: (MonadIO m)
=> Bool
-> Collection
-> [(Selector, Document, [UpdateOption])]
-> Action m WriteResult
update' ordered col updateDocs = do
p <- asks mongoPipe
let sd = P.serverData p
let updates = map (\(s, d, os) -> [ "q" =: s
, "u" =: d
, "upsert" =: (Upsert `elem` os)
, "multi" =: (MultiUpdate `elem` os)])
updateDocs
mode <- asks mongoWriteMode
ctx <- ask
liftIO $ do
let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int)]
Confirm params -> params
let docSize = sizeOfDocument $ updateCommandDocument
col
ordered
[]
writeConcern
let preChunks = splitAtLimit
(maxBsonObjectSize sd - docSize)
(maxWriteBatchSize sd)
updates
let chunks =
if ordered
then takeRightsUpToLeft preChunks
else rights preChunks
let lens = map length chunks
let lSums = 0 : (zipWith (+) lSums lens)
blocks <- interruptibleFor ordered (zip lSums chunks) $ \b -> do
ur <- runReaderT (updateBlock ordered col b) ctx
return ur
`catch` \(e :: Failure) -> do
return $ WriteResult True 0 Nothing 0 [] [e] []
let failedTotal = or $ map failed blocks
let updatedTotal = sum $ map nMatched blocks
let modifiedTotal =
if all isNothing $ map nModified blocks
then Nothing
else Just $ sum $ catMaybes $ map nModified blocks
let totalWriteErrors = concat $ map writeErrors blocks
let totalWriteConcernErrors = concat $ map writeConcernErrors blocks
let upsertedTotal = concat $ map upserted blocks
return $ WriteResult
failedTotal
updatedTotal
modifiedTotal
0
upsertedTotal
totalWriteErrors
totalWriteConcernErrors
`catch` \(e :: Failure) -> return $ WriteResult True 0 Nothing 0 [] [e] []
updateBlock :: (MonadIO m)
=> Bool -> Collection -> (Int, [Document]) -> Action m WriteResult
updateBlock ordered col (prevCount, docs) = do
p <- asks mongoPipe
let sd = P.serverData p
if (maxWireVersion sd < 2)
then liftIO $ ioError $ userError "updateMany doesn't support mongodb older than 2.6"
else do
mode <- asks mongoWriteMode
let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int)]
Confirm params -> params
doc <- runCommand $ updateCommandDocument col ordered docs writeConcern
let n = fromMaybe 0 $ doc !? "n"
let writeErrorsResults =
case look "writeErrors" doc of
Nothing -> WriteResult False 0 (Just 0) 0 [] [] []
Just (Array err) -> WriteResult True 0 (Just 0) 0 [] (map (anyToWriteError prevCount) err) []
Just unknownErr -> WriteResult
True
0
(Just 0)
0
[]
[ ProtocolFailure
prevCount
$ "Expected array of error docs, but received: "
++ (show unknownErr)]
[]
let writeConcernResults =
case look "writeConcernError" doc of
Nothing -> WriteResult False 0 (Just 0) 0 [] [] []
Just (Doc err) -> WriteResult
True
0
(Just 0)
0
[]
[]
[ WriteConcernFailure
(fromMaybe (-1) $ err !? "code")
(fromMaybe "" $ err !? "errmsg")
]
Just unknownErr -> WriteResult
True
0
(Just 0)
0
[]
[]
[ ProtocolFailure
prevCount
$ "Expected doc in writeConcernError, but received: "
++ (show unknownErr)]
let upsertedList = map docToUpserted $ fromMaybe [] (doc !? "upserted")
let successResults = WriteResult False n (doc !? "nModified") 0 upsertedList [] []
return $ foldl1' mergeWriteResults [writeErrorsResults, writeConcernResults, successResults]
interruptibleFor :: (Monad m, Result b) => Bool -> [a] -> (a -> m b) -> m [b]
interruptibleFor ordered = go []
where
go !res [] _ = return $ reverse res
go !res (x:xs) f = do
y <- f x
if isFailed y && ordered
then return $ reverse (y:res)
else go (y:res) xs f
mergeWriteResults :: WriteResult -> WriteResult -> WriteResult
mergeWriteResults
(WriteResult failed1 nMatched1 nModified1 nDeleted1 upserted1 writeErrors1 writeConcernErrors1)
(WriteResult failed2 nMatched2 nModified2 nDeleted2 upserted2 writeErrors2 writeConcernErrors2) =
(WriteResult
(failed1 || failed2)
(nMatched1 + nMatched2)
((liftM2 (+)) nModified1 nModified2)
(nDeleted1 + nDeleted2)
(upserted2 ++ upserted1)
(writeErrors2 ++ writeErrors1)
(writeConcernErrors2 ++ writeConcernErrors1)
)
docToUpserted :: Document -> Upserted
docToUpserted doc = Upserted ind uid
where
ind = at "index" doc
uid = at "_id" doc
docToWriteError :: Document -> Failure
docToWriteError doc = WriteFailure ind code msg
where
ind = at "index" doc
code = at "code" doc
msg = at "errmsg" doc
delete :: (MonadIO m)
=> Selection -> Action m ()
delete = deleteHelper []
deleteOne :: (MonadIO m)
=> Selection -> Action m ()
deleteOne = deleteHelper [SingleRemove]
deleteHelper :: (MonadIO m)
=> [DeleteOption] -> Selection -> Action m ()
deleteHelper opts (Select sel col) = do
db <- thisDatabase
ctx <- ask
liftIO $ runReaderT (void $ write (Delete (db <.> col) opts sel)) ctx
deleteMany :: (MonadIO m)
=> Collection
-> [(Selector, [DeleteOption])]
-> Action m WriteResult
deleteMany = delete' True
deleteAll :: (MonadIO m)
=> Collection
-> [(Selector, [DeleteOption])]
-> Action m WriteResult
deleteAll = delete' False
deleteCommandDocument :: Collection -> Bool -> [Document] -> Document -> Document
deleteCommandDocument col ordered deletes writeConcern =
[ "delete" =: col
, "ordered" =: ordered
, "deletes" =: deletes
, "writeConcern" =: writeConcern
]
delete' :: (MonadIO m)
=> Bool
-> Collection
-> [(Selector, [DeleteOption])]
-> Action m WriteResult
delete' ordered col deleteDocs = do
p <- asks mongoPipe
let sd = P.serverData p
let deletes = map (\(s, os) -> [ "q" =: s
, "limit" =: if SingleRemove `elem` os
then (1 :: Int)
else (0 :: Int)
])
deleteDocs
mode <- asks mongoWriteMode
let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int)]
Confirm params -> params
let docSize = sizeOfDocument $ deleteCommandDocument col ordered [] writeConcern
let preChunks = splitAtLimit
(maxBsonObjectSize sd - docSize)
(maxWriteBatchSize sd)
deletes
let chunks =
if ordered
then takeRightsUpToLeft preChunks
else rights preChunks
ctx <- ask
let lens = map length chunks
let lSums = 0 : (zipWith (+) lSums lens)
blockResult <- liftIO $ interruptibleFor ordered (zip lSums chunks) $ \b -> do
dr <- runReaderT (deleteBlock ordered col b) ctx
return dr
`catch` \(e :: Failure) -> do
return $ WriteResult True 0 Nothing 0 [] [e] []
return $ foldl1' mergeWriteResults blockResult
addFailureIndex :: Int -> Failure -> Failure
addFailureIndex i (WriteFailure ind code s) = WriteFailure (ind + i) code s
addFailureIndex _ f = f
deleteBlock :: (MonadIO m)
=> Bool -> Collection -> (Int, [Document]) -> Action m WriteResult
deleteBlock ordered col (prevCount, docs) = do
p <- asks mongoPipe
let sd = P.serverData p
if (maxWireVersion sd < 2)
then liftIO $ ioError $ userError "deleteMany doesn't support mongodb older than 2.6"
else do
mode <- asks mongoWriteMode
let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int)]
Confirm params -> params
doc <- runCommand $ deleteCommandDocument col ordered docs writeConcern
let n = fromMaybe 0 $ doc !? "n"
let successResults = WriteResult False 0 Nothing n [] [] []
let writeErrorsResults =
case look "writeErrors" doc of
Nothing -> WriteResult False 0 Nothing 0 [] [] []
Just (Array err) -> WriteResult True 0 Nothing 0 [] (map (anyToWriteError prevCount) err) []
Just unknownErr -> WriteResult
True
0
Nothing
0
[]
[ ProtocolFailure
prevCount
$ "Expected array of error docs, but received: "
++ (show unknownErr)]
[]
let writeConcernResults =
case look "writeConcernError" doc of
Nothing -> WriteResult False 0 Nothing 0 [] [] []
Just (Doc err) -> WriteResult
True
0
Nothing
0
[]
[]
[ WriteConcernFailure
(fromMaybe (-1) $ err !? "code")
(fromMaybe "" $ err !? "errmsg")
]
Just unknownErr -> WriteResult
True
0
Nothing
0
[]
[]
[ ProtocolFailure
prevCount
$ "Expected doc in writeConcernError, but received: "
++ (show unknownErr)]
return $ foldl1' mergeWriteResults [successResults, writeErrorsResults, writeConcernResults]
anyToWriteError :: Int -> Value -> Failure
anyToWriteError _ (Doc d) = docToWriteError d
anyToWriteError ind _ = ProtocolFailure ind "Unknown bson value"
data ReadMode =
Fresh
| StaleOk
deriving (Show, Eq)
readModeOption :: ReadMode -> [QueryOption]
readModeOption Fresh = []
readModeOption StaleOk = [SlaveOK]
data Query = Query {
options :: [QueryOption],
selection :: Selection,
project :: Projector,
skip :: Word32,
limit :: Limit,
sort :: Order,
snapshot :: Bool,
batchSize :: BatchSize,
hint :: Order
} deriving (Show, Eq)
type Projector = Document
type Limit = Word32
type Order = Document
type BatchSize = Word32
query :: Selector -> Collection -> Query
query sel col = Query [] (Select sel col) [] 0 0 [] False 0 []
find :: MonadIO m => Query -> Action m Cursor
find q@Query{selection, batchSize} = do
db <- thisDatabase
pipe <- asks mongoPipe
qr <- queryRequest False q
dBatch <- liftIO $ request pipe [] qr
newCursor db (coll selection) batchSize dBatch
findOne :: (MonadIO m) => Query -> Action m (Maybe Document)
findOne q = do
pipe <- asks mongoPipe
qr <- queryRequest False q {limit = 1}
rq <- liftIO $ request pipe [] qr
Batch _ _ docs <- liftDB $ fulfill rq
return (listToMaybe docs)
fetch :: (MonadIO m) => Query -> Action m Document
fetch q = findOne q >>= maybe (liftIO $ throwIO $ DocNotFound $ selection q) return
data FindAndModifyOpts = FamRemove Bool
| FamUpdate
{ famUpdate :: Document
, famNew :: Bool
, famUpsert :: Bool
}
deriving Show
defFamUpdateOpts :: Document -> FindAndModifyOpts
defFamUpdateOpts ups = FamUpdate
{ famNew = True
, famUpsert = False
, famUpdate = ups
}
findAndModify :: (MonadIO m, MonadFail m)
=> Query
-> Document
-> Action m (Either String Document)
findAndModify q ups = do
eres <- findAndModifyOpts q (defFamUpdateOpts ups)
return $ case eres of
Left l -> Left l
Right r -> case r of
Nothing -> Left "findAndModify: impossible null result"
Just doc -> Right doc
findAndModifyOpts :: (MonadIO m, MonadFail m)
=> Query
->FindAndModifyOpts
-> Action m (Either String (Maybe Document))
findAndModifyOpts (Query {
selection = Select sel collection
, project = project
, sort = sort
}) famOpts = do
result <- runCommand
([ "findAndModify" := String collection
, "query" := Doc sel
, "fields" := Doc project
, "sort" := Doc sort
] ++
case famOpts of
FamRemove shouldRemove -> [ "remove" := Bool shouldRemove ]
FamUpdate {..} ->
[ "update" := Doc famUpdate
, "new" := Bool famNew
, "upsert" := Bool famUpsert
])
return $ case lookupErr result of
Just e -> leftErr e
Nothing -> case lookup "value" result of
Nothing -> leftErr "no document found"
Just mdoc -> case mdoc of
Just doc@(_:_) -> Right (Just doc)
Just [] -> case famOpts of
FamUpdate { famUpsert = True, famNew = False } -> Right Nothing
_ -> leftErr $ show result
_ -> leftErr $ show result
where
leftErr err = Left $ "findAndModify " `mappend` show collection
`mappend` "\nfrom query: " `mappend` show sel
`mappend` "\nerror: " `mappend` err
lookupErr :: Document -> Maybe String
lookupErr result = do
errObject <- lookup "lastErrorObject" result
lookup "err" errObject
explain :: (MonadIO m) => Query -> Action m Document
explain q = do
pipe <- asks mongoPipe
qr <- queryRequest True q {limit = 1}
r <- liftIO $ request pipe [] qr
Batch _ _ docs <- liftDB $ fulfill r
return $ if null docs then error ("no explain: " ++ show q) else head docs
count :: (MonadIO m) => Query -> Action m Int
count Query{selection = Select sel col, skip, limit} = at "n" `liftM` runCommand
(["count" =: col, "query" =: sel, "skip" =: (fromIntegral skip :: Int32)]
++ ("limit" =? if limit == 0 then Nothing else Just (fromIntegral limit :: Int32)))
distinct :: (MonadIO m) => Label -> Selection -> Action m [Value]
distinct k (Select sel col) = at "values" `liftM` runCommand ["distinct" =: col, "key" =: k, "query" =: sel]
queryRequest :: (Monad m) => Bool -> Query -> Action m (Request, Maybe Limit)
queryRequest isExplain Query{..} = do
ctx <- ask
return $ queryRequest' (mongoReadMode ctx) (mongoDatabase ctx)
where
queryRequest' rm db = (P.Query{..}, remainingLimit) where
qOptions = readModeOption rm ++ options
qFullCollection = db <.> coll selection
qSkip = fromIntegral skip
(qBatchSize, remainingLimit) = batchSizeRemainingLimit batchSize (if limit == 0 then Nothing else Just limit)
qProjector = project
mOrder = if null sort then Nothing else Just ("$orderby" =: sort)
mSnapshot = if snapshot then Just ("$snapshot" =: True) else Nothing
mHint = if null hint then Nothing else Just ("$hint" =: hint)
mExplain = if isExplain then Just ("$explain" =: True) else Nothing
special = catMaybes [mOrder, mSnapshot, mHint, mExplain]
qSelector = if null special then s else ("$query" =: s) : special where s = selector selection
batchSizeRemainingLimit :: BatchSize -> (Maybe Limit) -> (Int32, Maybe Limit)
batchSizeRemainingLimit batchSize mLimit =
let remaining =
case mLimit of
Nothing -> batchSize
Just limit ->
if 0 < batchSize && batchSize < limit
then batchSize
else limit
in (fromIntegral remaining, mLimit)
type DelayedBatch = IO Batch
data Batch = Batch (Maybe Limit) CursorId [Document]
request :: Pipe -> [Notice] -> (Request, Maybe Limit) -> IO DelayedBatch
request pipe ns (req, remainingLimit) = do
promise <- liftIOE ConnectionFailure $ P.call pipe ns req
let protectedPromise = liftIOE ConnectionFailure promise
return $ fromReply remainingLimit =<< protectedPromise
fromReply :: Maybe Limit -> Reply -> DelayedBatch
fromReply limit Reply{..} = do
mapM_ checkResponseFlag rResponseFlags
return (Batch limit rCursorId rDocuments)
where
checkResponseFlag flag = case flag of
AwaitCapable -> return ()
CursorNotFound -> throwIO $ CursorNotFoundFailure rCursorId
QueryError -> throwIO $ QueryFailure (at "code" $ head rDocuments) (at "$err" $ head rDocuments)
fulfill :: DelayedBatch -> Action IO Batch
fulfill = liftIO
data Cursor = Cursor FullCollection BatchSize (MVar DelayedBatch)
newCursor :: MonadIO m => Database -> Collection -> BatchSize -> DelayedBatch -> Action m Cursor
newCursor db col batchSize dBatch = do
var <- liftIO $ MV.newMVar dBatch
let cursor = Cursor (db <.> col) batchSize var
_ <- liftDB $ mkWeakMVar var (closeCursor cursor)
return cursor
nextBatch :: MonadIO m => Cursor -> Action m [Document]
nextBatch (Cursor fcol batchSize var) = liftDB $ modifyMVar var $ \dBatch -> do
Batch mLimit cid docs <- liftDB $ fulfill' fcol batchSize dBatch
let newLimit = do
limit <- mLimit
return $ limit - (min limit $ fromIntegral $ length docs)
let emptyBatch = return $ Batch (Just 0) 0 []
let getNextBatch = nextBatch' fcol batchSize newLimit cid
let resultDocs = (maybe id (take . fromIntegral) mLimit) docs
case (cid, newLimit) of
(0, _) -> return (emptyBatch, resultDocs)
(_, Just 0) -> do
pipe <- asks mongoPipe
liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]]
return (emptyBatch, resultDocs)
(_, _) -> (, resultDocs) <$> getNextBatch
fulfill' :: FullCollection -> BatchSize -> DelayedBatch -> Action IO Batch
fulfill' fcol batchSize dBatch = do
b@(Batch limit cid docs) <- fulfill dBatch
if cid /= 0 && null docs && (limit > (Just 0))
then nextBatch' fcol batchSize limit cid >>= fulfill
else return b
nextBatch' :: (MonadIO m) => FullCollection -> BatchSize -> (Maybe Limit) -> CursorId -> Action m DelayedBatch
nextBatch' fcol batchSize limit cid = do
pipe <- asks mongoPipe
liftIO $ request pipe [] (GetMore fcol batchSize' cid, remLimit)
where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit
next :: MonadIO m => Cursor -> Action m (Maybe Document)
next (Cursor fcol batchSize var) = liftDB $ modifyMVar var nextState where
nextState dBatch = do
Batch mLimit cid docs <- liftDB $ fulfill' fcol batchSize dBatch
if mLimit == (Just 0)
then return (return $ Batch (Just 0) 0 [], Nothing)
else
case docs of
doc : docs' -> do
let newLimit = do
limit <- mLimit
return $ limit - 1
dBatch' <- if null docs' && cid /= 0 && ((newLimit > (Just 0)) || (isNothing newLimit))
then nextBatch' fcol batchSize newLimit cid
else return $ return (Batch newLimit cid docs')
when (newLimit == (Just 0)) $ unless (cid == 0) $ do
pipe <- asks mongoPipe
liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]]
return (dBatch', Just doc)
[] -> if cid == 0
then return (return $ Batch (Just 0) 0 [], Nothing)
else do
nb <- nextBatch' fcol batchSize mLimit cid
return (nb, Nothing)
nextN :: MonadIO m => Int -> Cursor -> Action m [Document]
nextN n c = catMaybes `liftM` replicateM n (next c)
rest :: MonadIO m => Cursor -> Action m [Document]
rest c = loop (next c)
closeCursor :: MonadIO m => Cursor -> Action m ()
closeCursor (Cursor _ _ var) = liftDB $ modifyMVar var $ \dBatch -> do
Batch _ cid _ <- fulfill dBatch
unless (cid == 0) $ do
pipe <- asks mongoPipe
liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]]
return $ (return $ Batch (Just 0) 0 [], ())
isCursorClosed :: MonadIO m => Cursor -> Action m Bool
isCursorClosed (Cursor _ _ var) = do
Batch _ cid docs <- liftDB $ fulfill =<< readMVar var
return (cid == 0 && null docs)
type Pipeline = [Document]
aggregate :: (MonadIO m, MonadFail m) => Collection -> Pipeline -> Action m [Document]
aggregate aColl agg = do
aggregateCursor aColl agg def >>= rest
data AggregateConfig = AggregateConfig {}
deriving Show
instance Default AggregateConfig where
def = AggregateConfig {}
aggregateCursor :: (MonadIO m, MonadFail m) => Collection -> Pipeline -> AggregateConfig -> Action m Cursor
aggregateCursor aColl agg _ = do
response <- runCommand ["aggregate" =: aColl, "pipeline" =: agg, "cursor" =: ([] :: Document)]
case true1 "ok" response of
True -> do
cursor :: Document <- lookup "cursor" response
firstBatch :: [Document] <- lookup "firstBatch" cursor
cursorId :: Int64 <- lookup "id" cursor
db <- thisDatabase
newCursor db aColl 0 $ return $ Batch Nothing cursorId firstBatch
False -> liftIO $ throwIO $ AggregateFailure $ at "errmsg" response
data Group = Group {
gColl :: Collection,
gKey :: GroupKey,
gReduce :: Javascript,
gInitial :: Document,
gCond :: Selector,
gFinalize :: Maybe Javascript
} deriving (Show, Eq)
data GroupKey = Key [Label] | KeyF Javascript deriving (Show, Eq)
groupDocument :: Group -> Document
groupDocument Group{..} =
("finalize" =? gFinalize) ++ [
"ns" =: gColl,
case gKey of Key k -> "key" =: map (=: True) k; KeyF f -> "$keyf" =: f,
"$reduce" =: gReduce,
"initial" =: gInitial,
"cond" =: gCond ]
group :: (MonadIO m) => Group -> Action m [Document]
group g = at "retval" `liftM` runCommand ["group" =: groupDocument g]
data MapReduce = MapReduce {
rColl :: Collection,
rMap :: MapFun,
rReduce :: ReduceFun,
rSelect :: Selector,
rSort :: Order,
rLimit :: Limit,
rOut :: MROut,
rFinalize :: Maybe FinalizeFun,
rScope :: Document,
rVerbose :: Bool
} deriving (Show, Eq)
type MapFun = Javascript
type ReduceFun = Javascript
type FinalizeFun = Javascript
data MROut =
Inline
| Output MRMerge Collection (Maybe Database)
deriving (Show, Eq)
data MRMerge =
Replace
| Merge
| Reduce
deriving (Show, Eq)
type MRResult = Document
mrDocument :: MapReduce -> Document
mrDocument MapReduce{..} =
("mapreduce" =: rColl) :
("out" =: mrOutDoc rOut) :
("finalize" =? rFinalize) ++ [
"map" =: rMap,
"reduce" =: rReduce,
"query" =: rSelect,
"sort" =: rSort,
"limit" =: (fromIntegral rLimit :: Int),
"scope" =: rScope,
"verbose" =: rVerbose ]
mrOutDoc :: MROut -> Document
mrOutDoc Inline = ["inline" =: (1 :: Int)]
mrOutDoc (Output mrMerge coll mDB) = (mergeName mrMerge =: coll) : mdb mDB where
mergeName Replace = "replace"
mergeName Merge = "merge"
mergeName Reduce = "reduce"
mdb Nothing = []
mdb (Just db) = ["db" =: db]
mapReduce :: Collection -> MapFun -> ReduceFun -> MapReduce
mapReduce col map' red = MapReduce col map' red [] [] 0 Inline Nothing [] False
runMR :: MonadIO m => MapReduce -> Action m Cursor
runMR mr = do
res <- runMR' mr
case look "result" res of
Just (String coll) -> find $ query [] coll
Just (Doc doc) -> useDb (at "db" doc) $ find $ query [] (at "collection" doc)
Just x -> error $ "unexpected map-reduce result field: " ++ show x
Nothing -> newCursor "" "" 0 $ return $ Batch (Just 0) 0 (at "results" res)
runMR' :: (MonadIO m) => MapReduce -> Action m MRResult
runMR' mr = do
doc <- runCommand (mrDocument mr)
return $ if true1 "ok" doc then doc else error $ "mapReduce error:\n" ++ show doc ++ "\nin:\n" ++ show mr
type Command = Document
runCommand :: (MonadIO m) => Command -> Action m Document
runCommand c = maybe err id `liftM` findOne (query c "$cmd") where
err = error $ "Nothing returned for command: " ++ show c
runCommand1 :: (MonadIO m) => Text -> Action m Document
runCommand1 c = runCommand [c =: (1 :: Int)]
eval :: (MonadIO m, Val v) => Javascript -> Action m v
eval code = at "retval" `liftM` runCommand ["$eval" =: code]
modifyMVar :: MVar a -> (a -> Action IO (a, b)) -> Action IO b
modifyMVar v f = do
ctx <- ask
liftIO $ MV.modifyMVar v (\x -> runReaderT (f x) ctx)
mkWeakMVar :: MVar a -> Action IO () -> Action IO (Weak (MVar a))
mkWeakMVar m closing = do
ctx <- ask
#if MIN_VERSION_base(4,6,0)
liftIO $ MV.mkWeakMVar m $ runReaderT closing ctx
#else
liftIO $ MV.addMVarFinalizer m $ runReaderT closing ctx
#endif