module Database.MongoDB.Query (
Action, access, Failure(..), ErrorCode,
AccessMode(..), GetLastError, master, slaveOk, accessMode,
MonadDB(..),
Database, allDatabases, useDb, thisDatabase,
Username, Password, auth,
Collection, allCollections,
Selection(..), Selector, whereJS,
Select(select),
insert, insert_, insertMany, insertMany_, insertAll, insertAll_,
save, replace, repsert, Modifier, modify,
delete, deleteOne,
Query(..), QueryOption(NoCursorTimeout, TailableCursor, AwaitData, Partial), Projector, Limit, Order, BatchSize,
explain, find, findOne, fetch, count, distinct,
Cursor, nextBatch, next, nextN, rest, closeCursor, isCursorClosed,
Group(..), GroupKey(..), group,
MapReduce(..), MapFun, ReduceFun, FinalizeFun, MROut(..), MRMerge(..), MRResult, mapReduce, runMR, runMR',
Command, runCommand, runCommand1,
eval,
) where
import Prelude as X hiding (lookup)
import Data.UString as U (UString, dropWhile, any, tail)
import Data.Bson (Document, at, valueAt, lookup, look, Field(..), (=:), (=?), Label, Value(String,Doc), Javascript, genObjectId)
import Database.MongoDB.Internal.Protocol (Pipe, Notice(..), Request(GetMore, qOptions, qFullCollection, qSkip, qBatchSize, qSelector, qProjector), Reply(..), QueryOption(..), ResponseFlag(..), InsertOption(..), UpdateOption(..), DeleteOption(..), CursorId, FullCollection, Username, Password, pwKey)
import qualified Database.MongoDB.Internal.Protocol as P (send, call, Request(Query))
import Database.MongoDB.Internal.Util (MonadIO', loop, liftIOE, true1, (<.>))
import Control.Monad.MVar
import Control.Monad.Error
import Control.Monad.Reader
import Control.Monad.State (StateT)
import Control.Monad.Writer (WriterT, Monoid)
import Control.Monad.RWS (RWST)
import Control.Applicative (Applicative, (<$>))
import Data.Maybe (listToMaybe, catMaybes)
import Data.Int (Int32)
import Data.Word (Word32)
newtype Action m a = Action (ErrorT Failure (ReaderT Context m) a)
deriving (Functor, Applicative, Monad, MonadIO, MonadControlIO, MonadError Failure)
instance MonadTrans Action where lift = Action . lift . lift
access :: (MonadIO m) => Pipe -> AccessMode -> Database -> Action m a -> m (Either Failure a)
access myPipe myAccessMode myDatabase (Action action) = runReaderT (runErrorT action) Context{..}
data Failure =
ConnectionFailure IOError
| CursorNotFoundFailure CursorId
| QueryFailure ErrorCode String
| WriteFailure ErrorCode String
| DocNotFound Selection
deriving (Show, Eq)
type ErrorCode = Int
instance Error Failure where strMsg = error
data AccessMode =
ReadStaleOk
| UnconfirmedWrites
| ConfirmWrites GetLastError
type GetLastError = Document
master :: AccessMode
master = ConfirmWrites []
slaveOk :: AccessMode
slaveOk = ReadStaleOk
accessMode :: (Monad m) => AccessMode -> Action m a -> Action m a
accessMode mode (Action act) = Action $ local (\ctx -> ctx {myAccessMode = mode}) act
readMode :: AccessMode -> ReadMode
readMode ReadStaleOk = StaleOk
readMode _ = Fresh
writeMode :: AccessMode -> WriteMode
writeMode ReadStaleOk = Confirm []
writeMode UnconfirmedWrites = NoConfirm
writeMode (ConfirmWrites z) = Confirm z
data Context = Context {
myPipe :: Pipe,
myAccessMode :: AccessMode,
myDatabase :: Database }
myReadMode :: Context -> ReadMode
myReadMode = readMode . myAccessMode
myWriteMode :: Context -> WriteMode
myWriteMode = writeMode . myAccessMode
send :: (MonadIO m) => [Notice] -> Action m ()
send ns = Action $ do
pipe <- asks myPipe
liftIOE ConnectionFailure $ P.send pipe ns
call :: (MonadIO m) => [Notice] -> Request -> Action m (ErrorT Failure IO Reply)
call ns r = Action $ do
pipe <- asks myPipe
promise <- liftIOE ConnectionFailure $ P.call pipe ns r
return (liftIOE ConnectionFailure promise)
class (Monad m, MonadControlIO (BaseMonad m), Applicative (BaseMonad m), Functor (BaseMonad m)) => MonadDB m where
type BaseMonad m :: * -> *
liftDB :: Action (BaseMonad m) a -> m a
instance (MonadControlIO m, Applicative m, Functor m) => MonadDB (Action m) where
type BaseMonad (Action m) = m
liftDB = id
instance (MonadDB m, Error e) => MonadDB (ErrorT e m) where
type BaseMonad (ErrorT e m) = BaseMonad m
liftDB = lift . liftDB
instance (MonadDB m) => MonadDB (ReaderT r m) where
type BaseMonad (ReaderT r m) = BaseMonad m
liftDB = lift . liftDB
instance (MonadDB m) => MonadDB (StateT s m) where
type BaseMonad (StateT s m) = BaseMonad m
liftDB = lift . liftDB
instance (MonadDB m, Monoid w) => MonadDB (WriterT w m) where
type BaseMonad (WriterT w m) = BaseMonad m
liftDB = lift . liftDB
instance (MonadDB m, Monoid w) => MonadDB (RWST r w s m) where
type BaseMonad (RWST r w s m) = BaseMonad m
liftDB = lift . liftDB
type Database = UString
allDatabases :: (MonadIO' m) => Action m [Database]
allDatabases = map (at "name") . at "databases" <$> useDb "admin" (runCommand1 "listDatabases")
thisDatabase :: (Monad m) => Action m Database
thisDatabase = Action $ asks myDatabase
useDb :: (Monad m) => Database -> Action m a -> Action m a
useDb db (Action act) = Action $ local (\ctx -> ctx {myDatabase = db}) act
auth :: (MonadIO' m) => Username -> Password -> Action m Bool
auth usr pss = do
n <- at "nonce" <$> runCommand ["getnonce" =: (1 :: Int)]
true1 "ok" <$> runCommand ["authenticate" =: (1 :: Int), "user" =: usr, "nonce" =: n, "key" =: pwKey n usr pss]
type Collection = UString
allCollections :: (MonadControlIO m, Functor m) => Action m [Collection]
allCollections = do
db <- thisDatabase
docs <- rest =<< find (query [] "system.namespaces") {sort = ["name" =: (1 :: Int)]}
return . filter (not . isSpecial db) . map dropDbPrefix $ map (at "name") docs
where
dropDbPrefix = U.tail . U.dropWhile (/= '.')
isSpecial db col = U.any (== '$') col && db <.> col /= "local.oplog.$main"
data Selection = Select {selector :: Selector, coll :: Collection} deriving (Show, Eq)
type Selector = Document
whereJS :: Selector -> Javascript -> Selector
whereJS sel js = ("$where" =: js) : sel
class Select aQueryOrSelection where
select :: Selector -> Collection -> aQueryOrSelection
instance Select Selection where
select = Select
instance Select Query where
select = query
data WriteMode =
NoConfirm
| Confirm GetLastError
deriving (Show, Eq)
write :: (MonadIO m) => Notice -> Action m ()
write notice = Action (asks myWriteMode) >>= \mode -> case mode of
NoConfirm -> send [notice]
Confirm params -> do
let q = query (("getlasterror" =: (1 :: Int)) : params) "$cmd"
Batch _ _ [doc] <- fulfill =<< request [notice] =<< queryRequest False q {limit = 1}
case lookup "err" doc of
Nothing -> return ()
Just err -> throwError $ WriteFailure (maybe 0 id $ lookup "code" doc) err
insert :: (MonadIO' m) => Collection -> Document -> Action m Value
insert col doc = head <$> insertMany col [doc]
insert_ :: (MonadIO' m) => Collection -> Document -> Action m ()
insert_ col doc = insert col doc >> return ()
insertMany :: (MonadIO m) => Collection -> [Document] -> Action m [Value]
insertMany = insert' []
insertMany_ :: (MonadIO m) => Collection -> [Document] -> Action m ()
insertMany_ col docs = insertMany col docs >> return ()
insertAll :: (MonadIO m) => Collection -> [Document] -> Action m [Value]
insertAll = insert' [KeepGoing]
insertAll_ :: (MonadIO m) => Collection -> [Document] -> Action m ()
insertAll_ col docs = insertAll col docs >> return ()
insert' :: (MonadIO m) => [InsertOption] -> Collection -> [Document] -> Action m [Value]
insert' opts col docs = do
db <- thisDatabase
docs' <- liftIO $ mapM assignId docs
write (Insert (db <.> col) opts docs')
return $ map (valueAt "_id") docs'
assignId :: Document -> IO Document
assignId doc = if X.any (("_id" ==) . label) doc
then return doc
else (\oid -> ("_id" =: oid) : doc) <$> genObjectId
save :: (MonadIO' m) => Collection -> Document -> Action m ()
save col doc = case look "_id" doc of
Nothing -> insert_ col doc
Just i -> repsert (Select ["_id" := i] col) doc
replace :: (MonadIO m) => Selection -> Document -> Action m ()
replace = update []
repsert :: (MonadIO m) => Selection -> Document -> Action m ()
repsert = update [Upsert]
type Modifier = Document
modify :: (MonadIO m) => Selection -> Modifier -> Action m ()
modify = update [MultiUpdate]
update :: (MonadIO m) => [UpdateOption] -> Selection -> Document -> Action m ()
update opts (Select sel col) up = do
db <- thisDatabase
write (Update (db <.> col) opts sel up)
delete :: (MonadIO m) => Selection -> Action m ()
delete = delete' []
deleteOne :: (MonadIO m) => Selection -> Action m ()
deleteOne = delete' [SingleRemove]
delete' :: (MonadIO m) => [DeleteOption] -> Selection -> Action m ()
delete' opts (Select sel col) = do
db <- thisDatabase
write (Delete (db <.> col) opts sel)
data ReadMode =
Fresh
| StaleOk
deriving (Show, Eq)
readModeOption :: ReadMode -> [QueryOption]
readModeOption Fresh = []
readModeOption StaleOk = [SlaveOK]
data Query = Query {
options :: [QueryOption],
selection :: Selection,
project :: Projector,
skip :: Word32,
limit :: Limit,
sort :: Order,
snapshot :: Bool,
batchSize :: BatchSize,
hint :: Order
} deriving (Show, Eq)
type Projector = Document
type Limit = Word32
type Order = Document
type BatchSize = Word32
query :: Selector -> Collection -> Query
query sel col = Query [] (Select sel col) [] 0 0 [] False 0 []
find :: (MonadControlIO m) => Query -> Action m Cursor
find q@Query{selection, batchSize} = do
db <- thisDatabase
dBatch <- request [] =<< queryRequest False q
newCursor db (coll selection) batchSize dBatch
findOne :: (MonadIO m) => Query -> Action m (Maybe Document)
findOne q = do
Batch _ _ docs <- fulfill =<< request [] =<< queryRequest False q {limit = 1}
return (listToMaybe docs)
fetch :: (MonadIO m) => Query -> Action m Document
fetch q = findOne q >>= maybe (throwError $ DocNotFound $ selection q) return
explain :: (MonadIO m) => Query -> Action m Document
explain q = do
Batch _ _ docs <- fulfill =<< request [] =<< queryRequest True q {limit = 1}
return $ if null docs then error ("no explain: " ++ show q) else head docs
count :: (MonadIO' m) => Query -> Action m Int
count Query{selection = Select sel col, skip, limit} = at "n" <$> runCommand
(["count" =: col, "query" =: sel, "skip" =: (fromIntegral skip :: Int32)]
++ ("limit" =? if limit == 0 then Nothing else Just (fromIntegral limit :: Int32)))
distinct :: (MonadIO' m) => Label -> Selection -> Action m [Value]
distinct k (Select sel col) = at "values" <$> runCommand ["distinct" =: col, "key" =: k, "query" =: sel]
queryRequest :: (Monad m) => Bool -> Query -> Action m (Request, Limit)
queryRequest isExplain Query{..} = do
ctx <- Action ask
return $ queryRequest' (myReadMode ctx) (myDatabase ctx)
where
queryRequest' rm db = (P.Query{..}, remainingLimit) where
qOptions = readModeOption rm ++ options
qFullCollection = db <.> coll selection
qSkip = fromIntegral skip
(qBatchSize, remainingLimit) = batchSizeRemainingLimit batchSize limit
qProjector = project
mOrder = if null sort then Nothing else Just ("$orderby" =: sort)
mSnapshot = if snapshot then Just ("$snapshot" =: True) else Nothing
mHint = if null hint then Nothing else Just ("$hint" =: hint)
mExplain = if isExplain then Just ("$explain" =: True) else Nothing
special = catMaybes [mOrder, mSnapshot, mHint, mExplain]
qSelector = if null special then s else ("$query" =: s) : special where s = selector selection
batchSizeRemainingLimit :: BatchSize -> Limit -> (Int32, Limit)
batchSizeRemainingLimit batchSize limit = if limit == 0
then (fromIntegral batchSize', 0)
else if 0 < batchSize' && batchSize' < limit
then (fromIntegral batchSize', limit batchSize')
else ( fromIntegral limit, 1)
where batchSize' = if batchSize == 1 then 2 else batchSize
type DelayedBatch = ErrorT Failure IO Batch
data Batch = Batch Limit CursorId [Document]
request :: (MonadIO m) => [Notice] -> (Request, Limit) -> Action m DelayedBatch
request ns (req, remainingLimit) = do
promise <- call ns req
return $ fromReply remainingLimit =<< promise
fromReply :: Limit -> Reply -> DelayedBatch
fromReply limit Reply{..} = do
mapM_ checkResponseFlag rResponseFlags
return (Batch limit rCursorId rDocuments)
where
checkResponseFlag flag = case flag of
AwaitCapable -> return ()
CursorNotFound -> throwError $ CursorNotFoundFailure rCursorId
QueryError -> throwError $ QueryFailure (at "code" $ head rDocuments) (at "$err" $ head rDocuments)
fulfill :: (MonadIO m) => DelayedBatch -> Action m Batch
fulfill = Action . liftIOE id
data Cursor = Cursor FullCollection BatchSize (MVar DelayedBatch)
newCursor :: (MonadControlIO m) => Database -> Collection -> BatchSize -> DelayedBatch -> Action m Cursor
newCursor db col batchSize dBatch = do
var <- newMVar dBatch
let cursor = Cursor (db <.> col) batchSize var
addMVarFinalizer var (closeCursor cursor)
return cursor
nextBatch :: (MonadControlIO m) => Cursor -> Action m [Document]
nextBatch (Cursor fcol batchSize var) = modifyMVar var $ \dBatch -> do
Batch limit cid docs <- fulfill dBatch
dBatch' <- if cid /= 0 then nextBatch' limit cid else return $ return (Batch 0 0 [])
return (dBatch', docs)
where
nextBatch' limit cid = request [] (GetMore fcol batchSize' cid, remLimit)
where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit
next :: (MonadControlIO m) => Cursor -> Action m (Maybe Document)
next (Cursor fcol batchSize var) = modifyMVar var nextState where
nextState dBatch = do
Batch limit cid docs <- fulfill dBatch
case docs of
doc : docs' -> do
dBatch' <- if null docs' && cid /= 0
then nextBatch' limit cid
else return $ return (Batch limit cid docs')
return (dBatch', Just doc)
[] -> if cid == 0
then return (return $ Batch 0 0 [], Nothing)
else error $ "server returned empty batch but says more results on server"
nextBatch' limit cid = request [] (GetMore fcol batchSize' cid, remLimit)
where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit
nextN :: (MonadControlIO m, Functor m) => Int -> Cursor -> Action m [Document]
nextN n c = catMaybes <$> replicateM n (next c)
rest :: (MonadControlIO m, Functor m) => Cursor -> Action m [Document]
rest c = loop (next c)
closeCursor :: (MonadControlIO m) => Cursor -> Action m ()
closeCursor (Cursor _ _ var) = modifyMVar var $ \dBatch -> do
Batch _ cid _ <- fulfill dBatch
unless (cid == 0) $ send [KillCursors [cid]]
return $ (return $ Batch 0 0 [], ())
isCursorClosed :: (MonadIO m) => Cursor -> Action m Bool
isCursorClosed (Cursor _ _ var) = do
Batch _ cid docs <- fulfill =<< readMVar var
return (cid == 0 && null docs)
data Group = Group {
gColl :: Collection,
gKey :: GroupKey,
gReduce :: Javascript,
gInitial :: Document,
gCond :: Selector,
gFinalize :: Maybe Javascript
} deriving (Show, Eq)
data GroupKey = Key [Label] | KeyF Javascript deriving (Show, Eq)
groupDocument :: Group -> Document
groupDocument Group{..} =
("finalize" =? gFinalize) ++ [
"ns" =: gColl,
case gKey of Key k -> "key" =: map (=: True) k; KeyF f -> "$keyf" =: f,
"$reduce" =: gReduce,
"initial" =: gInitial,
"cond" =: gCond ]
group :: (MonadIO' m) => Group -> Action m [Document]
group g = at "retval" <$> runCommand ["group" =: groupDocument g]
data MapReduce = MapReduce {
rColl :: Collection,
rMap :: MapFun,
rReduce :: ReduceFun,
rSelect :: Selector,
rSort :: Order,
rLimit :: Limit,
rOut :: MROut,
rFinalize :: Maybe FinalizeFun,
rScope :: Document,
rVerbose :: Bool
} deriving (Show, Eq)
type MapFun = Javascript
type ReduceFun = Javascript
type FinalizeFun = Javascript
data MROut =
Inline
| Output MRMerge Collection (Maybe Database)
deriving (Show, Eq)
data MRMerge =
Replace
| Merge
| Reduce
deriving (Show, Eq)
type MRResult = Document
mrDocument :: MapReduce -> Document
mrDocument MapReduce{..} =
("mapreduce" =: rColl) :
("out" =: mrOutDoc rOut) :
("finalize" =? rFinalize) ++ [
"map" =: rMap,
"reduce" =: rReduce,
"query" =: rSelect,
"sort" =: rSort,
"limit" =: (fromIntegral rLimit :: Int),
"scope" =: rScope,
"verbose" =: rVerbose ]
mrOutDoc :: MROut -> Document
mrOutDoc Inline = ["inline" =: (1 :: Int)]
mrOutDoc (Output mrMerge coll mDB) = (mergeName mrMerge =: coll) : mdb mDB where
mergeName Replace = "replace"
mergeName Merge = "merge"
mergeName Reduce = "reduce"
mdb Nothing = []
mdb (Just db) = ["db" =: db]
mapReduce :: Collection -> MapFun -> ReduceFun -> MapReduce
mapReduce col map' red = MapReduce col map' red [] [] 0 Inline Nothing [] False
runMR :: (MonadControlIO m, Applicative m) => MapReduce -> Action m Cursor
runMR mr = do
res <- runMR' mr
case look "result" res of
Just (String coll) -> find $ query [] coll
Just (Doc doc) -> useDb (at "db" doc) $ find $ query [] (at "collection" doc)
Just x -> error $ "unexpected map-reduce result field: " ++ show x
Nothing -> newCursor "" "" 0 $ return $ Batch 0 0 (at "results" res)
runMR' :: (MonadIO' m) => MapReduce -> Action m MRResult
runMR' mr = do
doc <- runCommand (mrDocument mr)
return $ if true1 "ok" doc then doc else error $ "mapReduce error:\n" ++ show doc ++ "\nin:\n" ++ show mr
type Command = Document
runCommand :: (MonadIO' m) => Command -> Action m Document
runCommand c = maybe err id <$> findOne (query c "$cmd") where
err = error $ "Nothing returned for command: " ++ show c
runCommand1 :: (MonadIO' m) => UString -> Action m Document
runCommand1 c = runCommand [c =: (1 :: Int)]
eval :: (MonadIO' m) => Javascript -> Action m Document
eval code = at "retval" <$> runCommand ["$eval" =: code]