如何使用 Node.js 在 MongoDB 中使用 cursor.forEach()?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/25507866/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How can I use a cursor.forEach() in MongoDB using Node.js?
提问by Alex Brodov
I have a huge collection of documents in my DB and I'm wondering how can I run through all the documents and update them, each document with a different value.
我的数据库中有大量文档,我想知道如何遍历所有文档并更新它们,每个文档都具有不同的值。
回答by Leonid Beschastny
The answer depends on the driver you're using. All MongoDB drivers I know have cursor.forEach()implemented one way or another.
答案取决于您使用的驱动程序。我知道的所有 MongoDB 驱动程序都以cursor.forEach()一种或另一种方式实现。
Here are some examples:
这里有些例子:
node-mongodb-native
节点-mongodb-本机
collection.find(query).forEach(function(doc) {
// handle
}, function(err) {
// done or error
});
mongojs
mongojs
db.collection.find(query).forEach(function(err, doc) {
// handle
});
monk
僧
collection.find(query, { stream: true })
.each(function(doc){
// handle doc
})
.error(function(err){
// handle error
})
.success(function(){
// final callback
});
mongoose
猫鼬
collection.find(query).stream()
.on('data', function(doc){
// handle doc
})
.on('error', function(err){
// handle error
})
.on('end', function(){
// final callback
});
Updating documents inside of .forEachcallback
在.forEach回调中更新文档
The only problem with updating documents inside of .forEachcallback is that you have no idea when all documents are updated.
在.forEach回调中更新文档的唯一问题是您不知道何时更新所有文档。
To solve this problem you should use some asynchronous control flow solution. Here are some options:
要解决此问题,您应该使用一些异步控制流解决方案。以下是一些选项:
Here is an example of using async, using its queuefeature:
这是使用的示例async,使用其queue功能:
var q = async.queue(function (doc, callback) {
// code for your update
collection.update({
_id: doc._id
}, {
$set: {hi: 'there'}
}, {
w: 1
}, callback);
}, Infinity);
var cursor = collection.find(query);
cursor.each(function(err, doc) {
if (err) throw err;
if (doc) q.push(doc); // dispatching doc to async.queue
});
q.drain = function() {
if (cursor.isClosed()) {
console.log('all items have been processed');
db.close();
}
}
回答by chris6953
Using the mongodbdriver, and modern NodeJS with async/await, a good solution is to use next():
使用mongodb驱动程序和带有 async/await 的现代 NodeJS,一个好的解决方案是使用next():
const collection = db.collection('things')
const cursor = collection.find({
bla: 42 // find all things where bla is 42
});
let document;
while ((document = await cursor.next())) {
await collection.findOneAndUpdate({
_id: document._id
}, {
$set: {
blu: 43
}
});
}
This results in only one document at a time being required in memory, as opposed to e.g. the accepted answer, where many documents get sucked into memory, before processing of the documents starts. In cases of "huge collections" (as per the question) this may be important.
这导致内存中一次只需要一个文档,而不是例如接受的答案,其中许多文档在文档处理开始之前被吸入内存。在“大量收藏”的情况下(根据问题),这可能很重要。
If documents are large, this can be improved further by using a projection, so that only those fields of documents that are required are fetched from the database.
如果文档很大,这可以通过使用投影进一步改进,以便仅从数据库中获取所需的文档字段。
回答by xameeramir
var MongoClient = require('mongodb').MongoClient,
assert = require('assert');
MongoClient.connect('mongodb://localhost:27017/crunchbase', function(err, db) {
assert.equal(err, null);
console.log("Successfully connected to MongoDB.");
var query = {
"category_code": "biotech"
};
db.collection('companies').find(query).toArray(function(err, docs) {
assert.equal(err, null);
assert.notEqual(docs.length, 0);
docs.forEach(function(doc) {
console.log(doc.name + " is a " + doc.category_code + " company.");
});
db.close();
});
});
Notice that the call .toArrayis making the application to fetch the entire dataset.
请注意,该调用.toArray使应用程序获取整个数据集。
var MongoClient = require('mongodb').MongoClient,
assert = require('assert');
MongoClient.connect('mongodb://localhost:27017/crunchbase', function(err, db) {
assert.equal(err, null);
console.log("Successfully connected to MongoDB.");
var query = {
"category_code": "biotech"
};
var cursor = db.collection('companies').find(query);
function(doc) {
cursor.forEach(
console.log(doc.name + " is a " + doc.category_code + " company.");
},
function(err) {
assert.equal(err, null);
return db.close();
}
);
});
Notice that the cursorreturned by the find()is assigned to var cursor. With this approach, instead of fetching all data in memory and consuming data at once, we're streaming the data to our application. find()can create a cursor immediately because it doesn't actually make a request to the database until we try to use some of the documents it will provide. The point of cursoris to describe our query. The 2nd parameter to cursor.forEachshows what to do when the driver gets exhausted or an error occurs.
请注意,由返回的游标find()已分配给var cursor。使用这种方法,我们不是一次获取内存中的所有数据并使用数据,而是将数据流式传输到我们的应用程序。find()可以立即创建游标,因为在我们尝试使用它将提供的某些文档之前,它实际上不会向数据库发出请求。重点cursor是描述我们的查询。第二个参数cursor.forEach显示当驱动程序耗尽或发生错误时要做什么。
In the initial version of the above code, it was toArray()which forced the database call. It meant we needed ALLthe documents and wanted them to be in an array.
在上面代码的初始版本中,它是toArray()强制调用数据库的。这意味着我们需要所有文档并希望它们在array.
Also, MongoDBreturns data in batch format. The image below shows, requests from cursors (from application) to MongoDB
此外,MongoDB以批处理格式返回数据。下图显示,从游标(从应用程序)到MongoDB


forEachis better than toArraybecause we can process documents as they come inuntil we reach the end. Contrast it with toArray- where we wait for ALLthe documents to be retrieved and the entirearray is built. This means we're not getting any advantage from the fact that the driver and the database system are working together to batch results to your application. Batching is meant to provide efficiency in terms of memory overhead and the execution time. Take advantage of it, if you can in your application.
forEach比toArray因为我们可以处理进来的文件直到我们到达最后。将它与toArray- 我们等待检索所有文档并构建整个数组的情况进行对比。这意味着我们没有从驱动程序和数据库系统协同工作以将结果批处理到您的应用程序这一事实中获得任何优势。批处理旨在提供内存开销和执行时间方面的效率。如果可以的话,请在您的应用程序中利用它。
回答by Dan Dascalescu
None of the previous answers mentions batching the updates. That makes them extremely slow - tens or hundreds of times slower than a solution using bulkWrite.
以前的答案都没有提到批处理更新。这使得它们非常慢——比使用bulkWrite的解决方案慢几十或几百倍。
Let's say you want to double the value of a field in each document. Here's how to do that fast and with fixed memory consumption:
假设您想将每个文档中的字段值加倍。以下是在固定内存消耗的情况下快速完成此操作的方法:
// Double the value of the 'foo' field in all documents
let bulkWrites = [];
const bulkDocumentsSize = 100; // how many documents to write at once
let i = 0;
db.collection.find({ ... }).forEach(doc => {
i++;
// Update the document...
doc.foo = doc.foo * 2;
// Add the update to an array of bulk operations to execute later
bulkWrites.push({
replaceOne: {
filter: { _id: doc._id },
replacement: doc,
},
});
// Update the documents and log progress every `bulkDocumentsSize` documents
if (i % bulkDocumentsSize === 0) {
db.collection.bulkWrite(bulkWrites);
bulkWrites = [];
print(`Updated ${i} documents`);
}
});
// Flush the last <100 bulk writes
db.collection.bulkWrite(bulkWrites);
回答by Wtower
And here is an example of using a Mongoose cursor async with promises:
这是一个使用带有承诺的 Mongoose 游标异步的示例:
new Promise(function (resolve, reject) {
collection.find(query).cursor()
.on('data', function(doc) {
// ...
})
.on('error', reject)
.on('end', resolve);
})
.then(function () {
// ...
});
Reference:
参考:
回答by Yongfeng Lu
The node-mongodb-nativenow supports a endCallbackparameter to cursor.forEachas for one to handle the event AFTER the whole iteration, refer to the official document for details http://mongodb.github.io/node-mongodb-native/2.2/api/Cursor.html#forEach.
在node-mongodb-native现在支持一个endCallback参数cursor.forEach作为一个以处理该事件的整个迭代后,请参考官方文档细节http://mongodb.github.io/node-mongodb-native/2.2/api/Cursor.html#forEach.
Also note that .eachis deprecated in the nodejs native driver now.
还要注意的是。每次在现在的NodeJS本地驱动程序已经过时了。
回答by Zanon
Leonid's answeris great, but I want to reinforce the importance of using async/promises and to give a different solution with a promises example.
Leonid 的回答很棒,但我想强调使用 async/promises 的重要性,并通过 promise 示例给出不同的解决方案。
The simplest solution to this problem is to loop forEach document and call an update. Usually, you don't need close the db connection after each request, but if you do need to close the connection, be careful. You must just close it if you are sure that allupdates have finished executing.
这个问题最简单的解决方案是循环 forEach 文档并调用更新。通常,您不需要在每次请求后关闭数据库连接,但如果您确实需要关闭连接,请小心。如果您确定所有更新都已完成执行,则必须关闭它。
A common mistake here is to call db.close()after all updates are dispatched without knowing if they have completed. If you do that, you'll get errors.
这里的一个常见错误是db.close()在不知道它们是否已完成的情况下在所有更新发送后调用。如果你这样做,你会得到错误。
Wrong implementation:
错误的实施:
collection.find(query).each(function(err, doc) {
if (err) throw err;
if (doc) {
collection.update(query, update, function(err, updated) {
// handle
});
}
else {
db.close(); // if there is any pending update, it will throw an error there
}
});
However, as db.close()is also an async operation (its signaturehave a callback option) you may be lucky and this code can finish without errors. It may work only when you need to update just a few docs in a small collection (so, don't try).
然而,作为db.close()一个异步操作(它的签名有一个回调选项),你可能很幸运,这段代码可以毫无错误地完成。它可能仅在您需要更新小集合中的几个文档时才有效(所以,不要尝试)。
Correct solution:
正确的解决方法:
As a solution with async was already proposed by Leonid, below follows a solution using Qpromises.
由于Leonid已经提出了异步解决方案,下面遵循使用Q承诺的解决方案。
var Q = require('q');
var client = require('mongodb').MongoClient;
var url = 'mongodb://localhost:27017/test';
client.connect(url, function(err, db) {
if (err) throw err;
var promises = [];
var query = {}; // select all docs
var collection = db.collection('demo');
var cursor = collection.find(query);
// read all docs
cursor.each(function(err, doc) {
if (err) throw err;
if (doc) {
// create a promise to update the doc
var query = doc;
var update = { $set: {hi: 'there'} };
var promise =
Q.npost(collection, 'update', [query, update])
.then(function(updated){
console.log('Updated: ' + updated);
});
promises.push(promise);
} else {
// close the connection after executing all promises
Q.all(promises)
.then(function() {
if (cursor.isClosed()) {
console.log('all items have been processed');
db.close();
}
})
.fail(console.error);
}
});
});
回答by white shadow
let's assume that we have the below MongoDB data in place.
让我们假设我们有以下 MongoDB 数据。
Database name: users
Collection name: jobs
===========================
Documents
{ "_id" : ObjectId("1"), "job" : "Security", "name" : "Hyman", "age" : 35 }
{ "_id" : ObjectId("2"), "job" : "Development", "name" : "Tito" }
{ "_id" : ObjectId("3"), "job" : "Design", "name" : "Ben", "age" : 45}
{ "_id" : ObjectId("4"), "job" : "Programming", "name" : "John", "age" : 25 }
{ "_id" : ObjectId("5"), "job" : "IT", "name" : "ricko", "age" : 45 }
==========================
This code:
这段代码:
var MongoClient = require('mongodb').MongoClient;
var dbURL = 'mongodb://localhost/users';
MongoClient.connect(dbURL, (err, db) => {
if (err) {
throw err;
} else {
console.log('Connection successful');
var dataBase = db.db();
// loop forEach
dataBase.collection('jobs').find().forEach(function(myDoc){
console.log('There is a job called :'+ myDoc.job +'in Database')})
});
回答by cipak
You can now use (in an async function, of course):
您现在可以使用(当然是在异步函数中):
for await (let doc of collection.find(query)) {
await updateDoc(doc);
}
// all done
which nicely serializes all updates.
它很好地序列化了所有更新。

