我如何使用Node.js在MongoDB中使用cursor.forEach()?

我在我的数据库中有一个巨大的文件集合,我想知道如何运行所有文件并更新它们,每个文件具有不同的值。

答案取决于你使用的驱动程序。 我所知道的所有MongoDB驱动程序都有cursor.forEach()实现这种或那种方式。

这里有些例子:

节点MongoDB的原生

 collection.find(query).forEach(function(doc) { // handle }, function(err) { // done or error }); 

mongojs

 db.collection.find(query).forEach(function(err, doc) { // handle }); 

 collection.find(query, { stream: true }) .each(function(doc){ // handle doc }) .error(function(err){ // handle error }) .success(function(){ // final callback }); 

mongoose

 collection.find(query).stream() .on('data', function(doc){ // handle doc }) .on('error', function(err){ // handle error }) .on('end', function(){ // final callback }); 

更新.forEachcallback中的文档

更新.forEachcallback中的文档唯一的问题是你不知道什么时候所有的文档被更新。

要解决这个问题,你应该使用一些asynchronous控制stream程解决scheme。 这里有一些选项:

  • asynchronous
  • 诺言( when.js , 蓝鸟 )

以下是使用queuefunction使用async的示例:

 var q = async.queue(function (doc, callback) { // code for your update collection.update({ _id: doc._id }, { $set: {hi: 'there'} }, { w: 1 }, callback); }, Infinity); var cursor = collection.find(query); cursor.each(function(err, doc) { if (err) throw err; if (doc) q.push(doc); // dispatching doc to async.queue }); q.drain = function() { if (cursor.isClosed()) { console.log('all items have been processed'); db.close(); } } 

列昂尼德的答案很好,但我想强化使用asynchronous/承诺的重要性,并给出与承诺示例不同的解决scheme。

解决这个问题最简单的方法是循环forEach文件并调用更新。 通常, 每次请求后都不需要closures数据库连接 ,但是如果需要closures连接,请小心。 如果您确定所有更新都已经完成,您必须closures它。

这里常见的错误是在所有的更新被调度之后调用db.close()而不知道它们是否已经完成。 如果你这样做,你会得到错误。

错误的执行

 collection.find(query).each(function(err, doc) { if (err) throw err; if (doc) { collection.update(query, update, function(err, updated) { // handle }); } else { db.close(); // if there is any pending update, it will throw an error there } }); 

但是,因为db.close()也是一个asynchronous操作( 它的签名有一个callback选项),你可能是幸运的,这个代码可以完成没有错误。 它只有当你需要更新一个小集合中的几个文档时才有效(所以,不要尝试)。

正确的解决scheme:

Leonid已经提出了asynchronous解决scheme,下面是使用Q promise的解决scheme。

 var Q = require('q'); var client = require('mongodb').MongoClient; var url = 'mongodb://localhost:27017/test'; client.connect(url, function(err, db) { if (err) throw err; var promises = []; var query = {}; // select all docs var collection = db.collection('demo'); var cursor = collection.find(query); // read all docs cursor.each(function(err, doc) { if (err) throw err; if (doc) { // create a promise to update the doc var query = doc; var update = { $set: {hi: 'there'} }; var promise = Q.npost(collection, 'update', [query, update]) .then(function(updated){ console.log('Updated: ' + updated); }); promises.push(promise); } else { // close the connection after executing all promises Q.all(promises) .then(function() { if (cursor.isClosed()) { console.log('all items have been processed'); db.close(); } }) .fail(console.error); } }); }); 

下面是一个使用Mongoose游标asynchronous与承诺的例子:

 new Promise(function (resolve, reject) { collection.find(query).cursor() .on('data', function(doc) { // ... }) .on('error', reject) .on('end', resolve); }) .then(function () { // ... }); 

参考:

  • mongoose游标
  • stream和承诺

现在, node-mongodb-native支持endCallback参数, cursor.forEach在整个迭代之后处理事件,请参阅官方文档http://mongodb.github.io/node-mongodb-native/ 2.2 / api / Cursor.html#forEach 。

另外请注意.each现在已经在nodejs本地驱动中被弃用了。

var MongoClient = require('mongodb').MongoClient, assert = require('assert'); MongoClient.connect('mongodb://localhost:27017/crunchbase', function(err, db) { assert.equal(err, null); console.log("Successfully connected to MongoDB."); var query = { "category_code": "biotech" }; db.collection('companies').find(query).toArray(function(err, docs) { assert.equal(err, null); assert.notEqual(docs.length, 0); docs.forEach(function(doc) { console.log(doc.name + " is a " + doc.category_code + " company."); }); db.close(); }); });
var MongoClient = require('mongodb').MongoClient, assert = require('assert'); MongoClient.connect('mongodb://localhost:27017/crunchbase', function(err, db) { assert.equal(err, null); console.log("Successfully connected to MongoDB."); var query = { "category_code": "biotech" }; db.collection('companies').find(query).toArray(function(err, docs) { assert.equal(err, null); assert.notEqual(docs.length, 0); docs.forEach(function(doc) { console.log(doc.name + " is a " + doc.category_code + " company."); }); db.close(); }); }); 

请注意,调用.toArray正在使应用程序获取整个数据集。

var MongoClient = require('mongodb').MongoClient, assert = require('assert'); MongoClient.connect('mongodb://localhost:27017/crunchbase', function(err, db) { assert.equal(err, null); console.log("Successfully connected to MongoDB."); var query = {"category_code": "biotech"}; var cursor = db.collection('companies').find(query); function(doc) { cursor.forEach( console.log( doc.name + " is a " + doc.category_code + " company." ); }, function(err) { assert.equal(err, null); return db.close(); } ); });
var MongoClient = require('mongodb').MongoClient, assert = require('assert'); MongoClient.connect('mongodb://localhost:27017/crunchbase', function(err, db) { assert.equal(err, null); console.log("Successfully connected to MongoDB."); var query = {"category_code": "biotech"}; var cursor = db.collection('companies').find(query); function(doc) { cursor.forEach( console.log( doc.name + " is a " + doc.category_code + " company." ); }, function(err) { assert.equal(err, null); return db.close(); } ); }); 

请注意, find()返回的游标被分配给var cursor 。 使用这种方法,我们不是一次性提取memort中的所有数据并使用数据,而是将数据传输到我们的应用程序。 find()可以立即创build一个游标,因为它实际上并没有向数据库发出请求,除非我们尝试使用它提供的一些文档。 cursor是描述我们的查询。 cursor.forEach的第二个参数显示当驱动程序耗尽或发生错误时要执行的操作。

在上面的代码的初始版本,它是强制数据库调用的toArray() 。 这意味着我们需要所有的文件,并希望他们在一个array

另外, MongoDB以批量格式返回数据。 下面的图片显示了从游标(从应用程序)到MongoDB请求

MongoDB游标请求

forEachtoArray更好,因为我们可以处理文档直到达到最后。 将其与toArray对比 – 我们在那里等待所有的文件被检索,并build立整个arrays。 这意味着我们没有从驱动程序和数据库系统一起工作将结果批量处理到应用程序的事实中获得任何优势。 批处理意味着提供内存开销和执行时间方面的效率。 利用它,如果你可以在你的应用程序