module Database.MongoDB.Query (
Action, access, Failure(..), ErrorCode,
AccessMode(..), GetLastError, master, slaveOk, accessMode,
MonadDB(..),
Database, allDatabases, useDb, thisDatabase,
Username, Password, auth,
Collection, allCollections,
Selection(..), Selector, whereJS,
Select(select),
insert, insert_, insertMany, insertMany_, insertAll, insertAll_,
save, replace, repsert, Modifier, modify,
delete, deleteOne,
Query(..), QueryOption(NoCursorTimeout, TailableCursor, AwaitData, Partial),
Projector, Limit, Order, BatchSize,
explain, find, findOne, fetch, findAndModify, count, distinct,
Cursor, nextBatch, next, nextN, rest, closeCursor, isCursorClosed,
Pipeline, aggregate,
Group(..), GroupKey(..), group,
MapReduce(..), MapFun, ReduceFun, FinalizeFun, MROut(..), MRMerge(..),
MRResult, mapReduce, runMR, runMR',
Command, runCommand, runCommand1,
eval,
) where
import Prelude hiding (lookup)
import Control.Applicative (Applicative, (<$>))
import Control.Monad (unless, replicateM, liftM)
import Data.Int (Int32)
import Data.Maybe (listToMaybe, catMaybes)
import Data.Word (Word32)
import Data.Monoid (mappend)
#if MIN_VERSION_base(4,6,0)
import Control.Concurrent.MVar.Lifted (MVar, newMVar, mkWeakMVar,
readMVar, modifyMVar)
#else
import Control.Concurrent.MVar.Lifted (MVar, newMVar, addMVarFinalizer,
readMVar, modifyMVar)
#endif
import Control.Monad.Base (MonadBase(liftBase))
import Control.Monad.Error (ErrorT, Error(..), MonadError, runErrorT,
throwError)
import Control.Monad.Reader (ReaderT, runReaderT, ask, asks, local)
import Control.Monad.RWS (RWST)
import Control.Monad.State (StateT)
import Control.Monad.Trans (MonadIO, MonadTrans, lift, liftIO)
import Control.Monad.Trans.Control (ComposeSt, MonadBaseControl(..),
MonadTransControl(..), StM, StT,
defaultLiftBaseWith, defaultRestoreM)
import Control.Monad.Writer (WriterT, Monoid)
import Data.Bson (Document, Field(..), Label, Val, Value(String, Doc, Bool),
Javascript, at, valueAt, lookup, look, genObjectId, (=:),
(=?))
import Data.Text (Text)
import qualified Data.Text as T
import Database.MongoDB.Internal.Protocol (Reply(..), QueryOption(..),
ResponseFlag(..), InsertOption(..),
UpdateOption(..), DeleteOption(..),
CursorId, FullCollection, Username,
Password, Pipe, Notice(..),
Request(GetMore, qOptions, qSkip,
qFullCollection, qBatchSize,
qSelector, qProjector),
pwKey)
import Database.MongoDB.Internal.Util (MonadIO', loop, liftIOE, true1, (<.>))
import qualified Database.MongoDB.Internal.Protocol as P
#if !MIN_VERSION_base(4,6,0)
#endif
newtype Action m a = Action {unAction :: ErrorT Failure (ReaderT Context m) a}
deriving (Functor, Applicative, Monad, MonadIO, MonadError Failure)
instance MonadBase b m => MonadBase b (Action m) where
liftBase = Action . liftBase
instance (MonadIO m, MonadBaseControl b m) => MonadBaseControl b (Action m) where
newtype StM (Action m) a = StMT {unStMT :: ComposeSt Action m a}
liftBaseWith = defaultLiftBaseWith StMT
restoreM = defaultRestoreM unStMT
instance MonadTrans Action where
lift = Action . lift . lift
instance MonadTransControl Action where
newtype StT Action a = StActionT {unStAction :: StT (ReaderT Context) (StT (ErrorT Failure) a)}
liftWith f = Action $ liftWith $ \runError ->
liftWith $ \runReader' ->
f (liftM StActionT . runReader' . runError . unAction)
restoreT = Action . restoreT . restoreT . liftM unStAction
access :: (MonadIO m) => Pipe -> AccessMode -> Database -> Action m a -> m (Either Failure a)
access myPipe myAccessMode myDatabase (Action action) = runReaderT (runErrorT action) Context{..}
data Failure =
ConnectionFailure IOError
| CursorNotFoundFailure CursorId
| QueryFailure ErrorCode String
| WriteFailure ErrorCode String
| DocNotFound Selection
| AggregateFailure String
deriving (Show, Eq)
type ErrorCode = Int
instance Error Failure where strMsg = error
data AccessMode =
ReadStaleOk
| UnconfirmedWrites
| ConfirmWrites GetLastError
deriving Show
type GetLastError = Document
master :: AccessMode
master = ConfirmWrites []
slaveOk :: AccessMode
slaveOk = ReadStaleOk
accessMode :: (Monad m) => AccessMode -> Action m a -> Action m a
accessMode mode (Action act) = Action $ local (\ctx -> ctx {myAccessMode = mode}) act
readMode :: AccessMode -> ReadMode
readMode ReadStaleOk = StaleOk
readMode _ = Fresh
writeMode :: AccessMode -> WriteMode
writeMode ReadStaleOk = Confirm []
writeMode UnconfirmedWrites = NoConfirm
writeMode (ConfirmWrites z) = Confirm z
data Context = Context {
myPipe :: Pipe,
myAccessMode :: AccessMode,
myDatabase :: Database }
myReadMode :: Context -> ReadMode
myReadMode = readMode . myAccessMode
myWriteMode :: Context -> WriteMode
myWriteMode = writeMode . myAccessMode
send :: (MonadIO m) => [Notice] -> Action m ()
send ns = Action $ do
pipe <- asks myPipe
liftIOE ConnectionFailure $ P.send pipe ns
call :: (MonadIO m) => [Notice] -> Request -> Action m (ErrorT Failure IO Reply)
call ns r = Action $ do
pipe <- asks myPipe
promise <- liftIOE ConnectionFailure $ P.call pipe ns r
return (liftIOE ConnectionFailure promise)
class (Monad m, MonadBaseControl IO (BaseMonad m), Applicative (BaseMonad m), Functor (BaseMonad m)) => MonadDB m where
type BaseMonad m :: * -> *
liftDB :: Action (BaseMonad m) a -> m a
instance (MonadBaseControl IO m, Applicative m, Functor m) => MonadDB (Action m) where
type BaseMonad (Action m) = m
liftDB = id
instance (MonadDB m, Error e) => MonadDB (ErrorT e m) where
type BaseMonad (ErrorT e m) = BaseMonad m
liftDB = lift . liftDB
instance (MonadDB m) => MonadDB (ReaderT r m) where
type BaseMonad (ReaderT r m) = BaseMonad m
liftDB = lift . liftDB
instance (MonadDB m) => MonadDB (StateT s m) where
type BaseMonad (StateT s m) = BaseMonad m
liftDB = lift . liftDB
instance (MonadDB m, Monoid w) => MonadDB (WriterT w m) where
type BaseMonad (WriterT w m) = BaseMonad m
liftDB = lift . liftDB
instance (MonadDB m, Monoid w) => MonadDB (RWST r w s m) where
type BaseMonad (RWST r w s m) = BaseMonad m
liftDB = lift . liftDB
type Database = Text
allDatabases :: (MonadIO' m) => Action m [Database]
allDatabases = map (at "name") . at "databases" <$> useDb "admin" (runCommand1 "listDatabases")
thisDatabase :: (Monad m) => Action m Database
thisDatabase = Action $ asks myDatabase
useDb :: (Monad m) => Database -> Action m a -> Action m a
useDb db (Action act) = Action $ local (\ctx -> ctx {myDatabase = db}) act
auth :: (MonadIO' m) => Username -> Password -> Action m Bool
auth usr pss = do
n <- at "nonce" <$> runCommand ["getnonce" =: (1 :: Int)]
true1 "ok" <$> runCommand ["authenticate" =: (1 :: Int), "user" =: usr, "nonce" =: n, "key" =: pwKey n usr pss]
type Collection = Text
allCollections :: (MonadIO m, MonadBaseControl IO m, Functor m) => Action m [Collection]
allCollections = do
db <- thisDatabase
docs <- rest =<< find (query [] "system.namespaces") {sort = ["name" =: (1 :: Int)]}
return . filter (not . isSpecial db) . map dropDbPrefix $ map (at "name") docs
where
dropDbPrefix = T.tail . T.dropWhile (/= '.')
isSpecial db col = T.any (== '$') col && db <.> col /= "local.oplog.$main"
data Selection = Select {selector :: Selector, coll :: Collection} deriving (Show, Eq)
type Selector = Document
whereJS :: Selector -> Javascript -> Selector
whereJS sel js = ("$where" =: js) : sel
class Select aQueryOrSelection where
select :: Selector -> Collection -> aQueryOrSelection
instance Select Selection where
select = Select
instance Select Query where
select = query
data WriteMode =
NoConfirm
| Confirm GetLastError
deriving (Show, Eq)
write :: (MonadIO m) => Notice -> Action m ()
write notice = Action (asks myWriteMode) >>= \mode -> case mode of
NoConfirm -> send [notice]
Confirm params -> do
let q = query (("getlasterror" =: (1 :: Int)) : params) "$cmd"
Batch _ _ [doc] <- fulfill =<< request [notice] =<< queryRequest False q {limit = 1}
case lookup "err" doc of
Nothing -> return ()
Just err -> throwError $ WriteFailure (maybe 0 id $ lookup "code" doc) err
insert :: (MonadIO' m) => Collection -> Document -> Action m Value
insert col doc = head <$> insertMany col [doc]
insert_ :: (MonadIO' m) => Collection -> Document -> Action m ()
insert_ col doc = insert col doc >> return ()
insertMany :: (MonadIO m) => Collection -> [Document] -> Action m [Value]
insertMany = insert' []
insertMany_ :: (MonadIO m) => Collection -> [Document] -> Action m ()
insertMany_ col docs = insertMany col docs >> return ()
insertAll :: (MonadIO m) => Collection -> [Document] -> Action m [Value]
insertAll = insert' [KeepGoing]
insertAll_ :: (MonadIO m) => Collection -> [Document] -> Action m ()
insertAll_ col docs = insertAll col docs >> return ()
insert' :: (MonadIO m) => [InsertOption] -> Collection -> [Document] -> Action m [Value]
insert' opts col docs = do
db <- thisDatabase
docs' <- liftIO $ mapM assignId docs
write (Insert (db <.> col) opts docs')
return $ map (valueAt "_id") docs'
assignId :: Document -> IO Document
assignId doc = if any (("_id" ==) . label) doc
then return doc
else (\oid -> ("_id" =: oid) : doc) <$> genObjectId
save :: (MonadIO' m) => Collection -> Document -> Action m ()
save col doc = case look "_id" doc of
Nothing -> insert_ col doc
Just i -> repsert (Select ["_id" := i] col) doc
replace :: (MonadIO m) => Selection -> Document -> Action m ()
replace = update []
repsert :: (MonadIO m) => Selection -> Document -> Action m ()
repsert = update [Upsert]
type Modifier = Document
modify :: (MonadIO m) => Selection -> Modifier -> Action m ()
modify = update [MultiUpdate]
update :: (MonadIO m) => [UpdateOption] -> Selection -> Document -> Action m ()
update opts (Select sel col) up = do
db <- thisDatabase
write (Update (db <.> col) opts sel up)
delete :: (MonadIO m) => Selection -> Action m ()
delete = delete' []
deleteOne :: (MonadIO m) => Selection -> Action m ()
deleteOne = delete' [SingleRemove]
delete' :: (MonadIO m) => [DeleteOption] -> Selection -> Action m ()
delete' opts (Select sel col) = do
db <- thisDatabase
write (Delete (db <.> col) opts sel)
data ReadMode =
Fresh
| StaleOk
deriving (Show, Eq)
readModeOption :: ReadMode -> [QueryOption]
readModeOption Fresh = []
readModeOption StaleOk = [SlaveOK]
data Query = Query {
options :: [QueryOption],
selection :: Selection,
project :: Projector,
skip :: Word32,
limit :: Limit,
sort :: Order,
snapshot :: Bool,
batchSize :: BatchSize,
hint :: Order
} deriving (Show, Eq)
type Projector = Document
type Limit = Word32
type Order = Document
type BatchSize = Word32
query :: Selector -> Collection -> Query
query sel col = Query [] (Select sel col) [] 0 0 [] False 0 []
find :: (MonadIO m, MonadBaseControl IO m) => Query -> Action m Cursor
find q@Query{selection, batchSize} = do
db <- thisDatabase
dBatch <- request [] =<< queryRequest False q
newCursor db (coll selection) batchSize dBatch
findOne :: (MonadIO m) => Query -> Action m (Maybe Document)
findOne q = do
Batch _ _ docs <- fulfill =<< request [] =<< queryRequest False q {limit = 1}
return (listToMaybe docs)
fetch :: (MonadIO m) => Query -> Action m Document
fetch q = findOne q >>= maybe (throwError $ DocNotFound $ selection q) return
findAndModify :: (Applicative m, MonadIO m)
=> Query
-> Document
-> Action m (Either String Document)
findAndModify (Query {
selection = Select sel collection
, project = project
, sort = sort
}) updates = do
result <- runCommand
[ "findAndModify" := String collection
, "new" := Bool True
, "query" := Doc sel
, "update" := Doc updates
, "fields" := Doc project
, "sort" := Doc sort
]
return $
case lookup "value" result of
Left err -> leftErr err
Right mdoc -> case mdoc of
Nothing -> leftErr $ show result
Just doc -> case lookupErr result of
Just e -> leftErr e
Nothing -> Right doc
where
leftErr err = Left $ "findAndModify: no document found: "
`mappend` show collection
`mappend` "from query: " `mappend` show sel
`mappend` err
lookupErr result = case lookup "lastErrorObject" result of
Right errObject -> lookup "err" errObject
Left err -> Just err
explain :: (MonadIO m) => Query -> Action m Document
explain q = do
Batch _ _ docs <- fulfill =<< request [] =<< queryRequest True q {limit = 1}
return $ if null docs then error ("no explain: " ++ show q) else head docs
count :: (MonadIO' m) => Query -> Action m Int
count Query{selection = Select sel col, skip, limit} = at "n" <$> runCommand
(["count" =: col, "query" =: sel, "skip" =: (fromIntegral skip :: Int32)]
++ ("limit" =? if limit == 0 then Nothing else Just (fromIntegral limit :: Int32)))
distinct :: (MonadIO' m) => Label -> Selection -> Action m [Value]
distinct k (Select sel col) = at "values" <$> runCommand ["distinct" =: col, "key" =: k, "query" =: sel]
queryRequest :: (Monad m) => Bool -> Query -> Action m (Request, Limit)
queryRequest isExplain Query{..} = do
ctx <- Action ask
return $ queryRequest' (myReadMode ctx) (myDatabase ctx)
where
queryRequest' rm db = (P.Query{..}, remainingLimit) where
qOptions = readModeOption rm ++ options
qFullCollection = db <.> coll selection
qSkip = fromIntegral skip
(qBatchSize, remainingLimit) = batchSizeRemainingLimit batchSize limit
qProjector = project
mOrder = if null sort then Nothing else Just ("$orderby" =: sort)
mSnapshot = if snapshot then Just ("$snapshot" =: True) else Nothing
mHint = if null hint then Nothing else Just ("$hint" =: hint)
mExplain = if isExplain then Just ("$explain" =: True) else Nothing
special = catMaybes [mOrder, mSnapshot, mHint, mExplain]
qSelector = if null special then s else ("$query" =: s) : special where s = selector selection
batchSizeRemainingLimit :: BatchSize -> Limit -> (Int32, Limit)
batchSizeRemainingLimit batchSize limit = if limit == 0
then (fromIntegral batchSize', 0)
else if 0 < batchSize' && batchSize' < limit
then (fromIntegral batchSize', limit batchSize')
else ( fromIntegral limit, 1)
where batchSize' = if batchSize == 1 then 2 else batchSize
type DelayedBatch = ErrorT Failure IO Batch
data Batch = Batch Limit CursorId [Document]
request :: (MonadIO m) => [Notice] -> (Request, Limit) -> Action m DelayedBatch
request ns (req, remainingLimit) = do
promise <- call ns req
return $ fromReply remainingLimit =<< promise
fromReply :: Limit -> Reply -> DelayedBatch
fromReply limit Reply{..} = do
mapM_ checkResponseFlag rResponseFlags
return (Batch limit rCursorId rDocuments)
where
checkResponseFlag flag = case flag of
AwaitCapable -> return ()
CursorNotFound -> throwError $ CursorNotFoundFailure rCursorId
QueryError -> throwError $ QueryFailure (at "code" $ head rDocuments) (at "$err" $ head rDocuments)
fulfill :: (MonadIO m) => DelayedBatch -> Action m Batch
fulfill = Action . liftIOE id
data Cursor = Cursor FullCollection BatchSize (MVar DelayedBatch)
newCursor :: (MonadIO m, MonadBaseControl IO m) => Database -> Collection -> BatchSize -> DelayedBatch -> Action m Cursor
newCursor db col batchSize dBatch = do
var <- newMVar dBatch
let cursor = Cursor (db <.> col) batchSize var
_ <- mkWeakMVar var (closeCursor cursor)
return cursor
#if !MIN_VERSION_base(4,6,0)
where mkWeakMVar = addMVarFinalizer
#endif
nextBatch :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m [Document]
nextBatch (Cursor fcol batchSize var) = modifyMVar var $ \dBatch -> do
Batch limit cid docs <- fulfill' fcol batchSize dBatch
dBatch' <- if cid /= 0 then nextBatch' fcol batchSize limit cid else return $ return (Batch 0 0 [])
return (dBatch', docs)
fulfill' :: (MonadIO m) => FullCollection -> BatchSize -> DelayedBatch -> Action m Batch
fulfill' fcol batchSize dBatch = do
b@(Batch limit cid docs) <- fulfill dBatch
if cid /= 0 && null docs
then nextBatch' fcol batchSize limit cid >>= fulfill
else return b
nextBatch' :: (MonadIO m) => FullCollection -> BatchSize -> Limit -> CursorId -> Action m DelayedBatch
nextBatch' fcol batchSize limit cid = request [] (GetMore fcol batchSize' cid, remLimit)
where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit
next :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m (Maybe Document)
next (Cursor fcol batchSize var) = modifyMVar var nextState where
nextState dBatch = do
Batch limit cid docs <- fulfill' fcol batchSize dBatch
case docs of
doc : docs' -> do
dBatch' <- if null docs' && cid /= 0
then nextBatch' fcol batchSize limit cid
else return $ return (Batch limit cid docs')
return (dBatch', Just doc)
[] -> if cid == 0
then return (return $ Batch 0 0 [], Nothing)
else fmap (,Nothing) $ nextBatch' fcol batchSize limit cid
nextN :: (MonadIO m, MonadBaseControl IO m, Functor m) => Int -> Cursor -> Action m [Document]
nextN n c = catMaybes <$> replicateM n (next c)
rest :: (MonadIO m, MonadBaseControl IO m, Functor m) => Cursor -> Action m [Document]
rest c = loop (next c)
closeCursor :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m ()
closeCursor (Cursor _ _ var) = modifyMVar var $ \dBatch -> do
Batch _ cid _ <- fulfill dBatch
unless (cid == 0) $ send [KillCursors [cid]]
return $ (return $ Batch 0 0 [], ())
isCursorClosed :: (MonadIO m, MonadBase IO m) => Cursor -> Action m Bool
isCursorClosed (Cursor _ _ var) = do
Batch _ cid docs <- fulfill =<< readMVar var
return (cid == 0 && null docs)
type Pipeline = [Document]
aggregate :: MonadIO' m => Collection -> Pipeline -> Action m [Document]
aggregate aColl agg = do
response <- runCommand ["aggregate" =: aColl, "pipeline" =: agg]
case true1 "ok" response of
True -> lookup "result" response
False -> throwError $ AggregateFailure $ at "errmsg" response
data Group = Group {
gColl :: Collection,
gKey :: GroupKey,
gReduce :: Javascript,
gInitial :: Document,
gCond :: Selector,
gFinalize :: Maybe Javascript
} deriving (Show, Eq)
data GroupKey = Key [Label] | KeyF Javascript deriving (Show, Eq)
groupDocument :: Group -> Document
groupDocument Group{..} =
("finalize" =? gFinalize) ++ [
"ns" =: gColl,
case gKey of Key k -> "key" =: map (=: True) k; KeyF f -> "$keyf" =: f,
"$reduce" =: gReduce,
"initial" =: gInitial,
"cond" =: gCond ]
group :: (MonadIO' m) => Group -> Action m [Document]
group g = at "retval" <$> runCommand ["group" =: groupDocument g]
data MapReduce = MapReduce {
rColl :: Collection,
rMap :: MapFun,
rReduce :: ReduceFun,
rSelect :: Selector,
rSort :: Order,
rLimit :: Limit,
rOut :: MROut,
rFinalize :: Maybe FinalizeFun,
rScope :: Document,
rVerbose :: Bool
} deriving (Show, Eq)
type MapFun = Javascript
type ReduceFun = Javascript
type FinalizeFun = Javascript
data MROut =
Inline
| Output MRMerge Collection (Maybe Database)
deriving (Show, Eq)
data MRMerge =
Replace
| Merge
| Reduce
deriving (Show, Eq)
type MRResult = Document
mrDocument :: MapReduce -> Document
mrDocument MapReduce{..} =
("mapreduce" =: rColl) :
("out" =: mrOutDoc rOut) :
("finalize" =? rFinalize) ++ [
"map" =: rMap,
"reduce" =: rReduce,
"query" =: rSelect,
"sort" =: rSort,
"limit" =: (fromIntegral rLimit :: Int),
"scope" =: rScope,
"verbose" =: rVerbose ]
mrOutDoc :: MROut -> Document
mrOutDoc Inline = ["inline" =: (1 :: Int)]
mrOutDoc (Output mrMerge coll mDB) = (mergeName mrMerge =: coll) : mdb mDB where
mergeName Replace = "replace"
mergeName Merge = "merge"
mergeName Reduce = "reduce"
mdb Nothing = []
mdb (Just db) = ["db" =: db]
mapReduce :: Collection -> MapFun -> ReduceFun -> MapReduce
mapReduce col map' red = MapReduce col map' red [] [] 0 Inline Nothing [] False
runMR :: (MonadIO m, MonadBaseControl IO m, Applicative m) => MapReduce -> Action m Cursor
runMR mr = do
res <- runMR' mr
case look "result" res of
Just (String coll) -> find $ query [] coll
Just (Doc doc) -> useDb (at "db" doc) $ find $ query [] (at "collection" doc)
Just x -> error $ "unexpected map-reduce result field: " ++ show x
Nothing -> newCursor "" "" 0 $ return $ Batch 0 0 (at "results" res)
runMR' :: (MonadIO' m) => MapReduce -> Action m MRResult
runMR' mr = do
doc <- runCommand (mrDocument mr)
return $ if true1 "ok" doc then doc else error $ "mapReduce error:\n" ++ show doc ++ "\nin:\n" ++ show mr
type Command = Document
runCommand :: (MonadIO' m) => Command -> Action m Document
runCommand c = maybe err id <$> findOne (query c "$cmd") where
err = error $ "Nothing returned for command: " ++ show c
runCommand1 :: (MonadIO' m) => Text -> Action m Document
runCommand1 c = runCommand [c =: (1 :: Int)]
eval :: (MonadIO' m, Val v) => Javascript -> Action m v
eval code = at "retval" <$> runCommand ["$eval" =: code]