在MongoDB中使用mongoose批量插入

有没有任何select与mongoose执行大量upserts? 所以基本上有一个数组,并插入每个元素,如果它不存在或更新它,如果它存在? (我正在使用海关_ids)

当我使用.insert MongoDB返回错误E11000重复的键(这应该更新)。 插入多个新文档虽然工作正常:

var Users = self.db.collection('Users'); Users.insert(data, function(err){ if (err) { callback(err); } else { callback(null); } }); 

使用.save返回一个参数必须是单个文档的错误:

 Users.save(data, function(err){ ... } 

这个答案表明没有这样的选项,但它是特定于C#,也已经3岁了。 所以我想知道是否有任何select使用mongoose做到这一点?

谢谢!

不是在“mongoose”具体,或至less还没有写作。 从2.6版本开始,MongoDB shell实际上使用了“引导之下”的“批量操作API ”,就像所有通用的辅助方法一样。 在它的实现中,它试图首先做到这一点,如果检测到旧版本服务器,那么遗留实现就有一个“后备”。

所有mongoose方法“目前”都使用“遗留”实现或写入关注响应以及基本遗留方法。 但是,从任何给定的mongoose模型中都有一个.collection访问器,它实质上是从本身实现mongoose的底层“节点本地驱动程序”访问“集合对象”:

  var mongoose = require('mongoose'), Schema = mongoose.Schema; mongoose.connect('mongodb://localhost/test'); var sampleSchema = new Schema({},{ "strict": false }); var Sample = mongoose.model( "Sample", sampleSchema, "sample" ); mongoose.connection.on("open", function(err,conn) { var bulk = Sample.collection.initializeOrderedBulkOp(); var counter = 0; // representing a long loop for ( var x = 0; x < 100000; x++ ) { bulk.find(/* some search */).upsert().updateOne( /* update conditions */ }); counter++; if ( counter % 1000 == 0 ) bulk.execute(function(err,result) { bulk = Sample.collection.initializeOrderedBulkOp(); }); } if ( counter % 1000 != 0 ) bulk.execute(function(err,result) { // maybe do something with result }); }); 

主要的是,“mongoose方法”实际上意识到一个连接可能实际上还没有完成,并“排队”,直到完成。 你“挖掘”的本地驱动程序并没有做出这种区分。

所以你必须意识到连接是以某种方式或formsbuild立的。 但只要你对自己所做的事情保持谨慎,你就可以使用原生的驱动程序方法。

@nil-lunnbuild议您不需要pipe理限制(1000)。 mongoose已经这样做了。 我用他的伟大的答案作为这个完整的基于Promise的实现和示例的基础:

 var Promise = require('bluebird'); var mongoose = require('mongoose'); var Show = mongoose.model('Show', { "id": Number, "title": String, "provider": {'type':String, 'default':'eztv'} }); /** * Atomic connect Promise - not sure if I need this, might be in mongoose already.. * @return {Priomise} */ function connect(uri, options){ return new Promise(function(resolve, reject){ mongoose.connect(uri, options, function(err){ if (err) return reject(err); resolve(mongoose.connection); }); }); } /** * Bulk-upsert an array of records * @param {Array} records List of records to update * @param {Model} Model Mongoose model to update * @param {Object} match Database field to match * @return {Promise} always resolves a BulkWriteResult */ function save(records, Model, match){ match = match || 'id'; return new Promise(function(resolve, reject){ var bulk = Model.collection.initializeUnorderedBulkOp(); records.forEach(function(record){ var query = {}; query[match] = record[match]; bulk.find(query).upsert().updateOne( record ); }); bulk.execute(function(err, bulkres){ if (err) return reject(err); resolve(bulkres); }); }); } /** * Map function for EZTV-to-Show * @param {Object} show EZTV show * @return {Object} Mongoose Show object */ function mapEZ(show){ return { title: show.title, id: Number(show.id), provider: 'eztv' }; } // if you are not using EZTV, put shows in here var shows = []; // giant array of {id: X, title: "X"} // var eztv = require('eztv'); // eztv.getShows({}, function(err, shows){ // if(err) return console.log('EZ Error:', err); // var shows = shows.map(mapEZ); console.log('found', shows.length, 'shows.'); connect('mongodb://localhost/tv', {}).then(function(db){ save(shows, Show).then(function(bulkRes){ console.log('Bulk complete.', bulkRes); db.close(); }, function(err){ console.log('Bulk Error:', err); db.close(); }); }, function(err){ console.log('DB Error:', err); }); // }); 

这样做的好处是在连接完成时closures连接,如果你关心的话显示任何错误,但如果没有,则忽略它们(Promise中的错误callback是可选的)。这也是非常快的。 只是在这里留下来分享我的发现。 例如,如果要将所有eztv节目保存到数据库,可以取消注释eztv的内容。

如果你没有看到你的db.collection中的批量方法,即你得到一个错误的影响xxxvariables没有方法:initializeOrderedBulkOp()

尝试更新你的mongoose版本。 显然,老的mongoose版本不通过所有的基础mongo db.collection方法。

npm安装mongoose

照顾好了我。

在我的电子商务应用程序中存储产品时,最近我必须实现这一点。 我的数据库用于超时,因为我必须每4小时插入10000个项目。 对我来说,一个select是在连接数据库的时候将socketTimeoutMS和connectTimeoutMS设置为mongoose,但是它感觉很乱,而且我不想操纵数据库的连接超时默认值。 我也看到@neil lunn的解决scheme采取了一种简单的同步方法,在for循环中取模。 这是我的一个asynchronous版本,我相信这个工作好得多

 let BATCH_SIZE = 500 Array.prototype.chunk = function (groupsize) { var sets = []; var chunks = this.length / groupsize; for (var i = 0, j = 0; i < chunks; i++ , j += groupsize) { sets[i] = this.slice(j, j + groupsize); } return sets; } function upsertDiscountedProducts(products) { //Take the input array of products and divide it into chunks of BATCH_SIZE let chunks = products.chunk(BATCH_SIZE), current = 0 console.log('Number of chunks ', chunks.length) let bulk = models.Product.collection.initializeUnorderedBulkOp(); //Get the current time as timestamp let timestamp = new Date(), //Keep track of the number of items being looped pendingCount = 0, inserted = 0, upserted = 0, matched = 0, modified = 0, removed = 0, //If atleast one upsert was performed upsertHappened = false; //Call the load function to get started load() function load() { //If we have a chunk to process if (current < chunks.length) { console.log('Current value ', current) for (let i = 0; i < chunks[current].length; i++) { //For each item set the updated timestamp to the current time let item = chunks[current][i] //Set the updated timestamp on each item item.updatedAt = timestamp; bulk.find({ _id: item._id }) .upsert() .updateOne({ "$set": item, //If the item is being newly inserted, set a created timestamp on it "$setOnInsert": { "createdAt": timestamp } }) } //Execute the bulk operation for the current chunk bulk.execute((error, result) => { if (error) { console.error('Error while inserting products' + JSON.stringify(error)) next() } else { //Atleast one upsert has happened upsertHappened = true; inserted += result.nInserted upserted += result.nUpserted matched += result.nMatched modified += result.nModified removed += result.nRemoved //Move to the next chunk next() } }) } else { console.log("Calling finish") finish() } } function next() { current++; //Reassign bulk to a new object and call load once again on the new object after incrementing chunk bulk = models.Product.collection.initializeUnorderedBulkOp(); setTimeout(load, 0) } function finish() { console.log('Inserted ', inserted + ' Upserted ', upserted, ' Matched ', matched, ' Modified ', modified, ' Removed ', removed) //If atleast one chunk was inserted, remove all items with a 0% discount or not updated in the latest upsert if (upsertHappened) { console.log("Calling remove") remove() } } /** * Remove all the items that were not updated in the recent upsert or those items with a discount of 0 */ function remove() { models.Product.remove( { "$or": [{ "updatedAt": { "$lt": timestamp } }, { "discount": { "$eq": 0 } }] }, (error, obj) => { if (error) { console.log('Error while removing', JSON.stringify(error)) } else { if (obj.result.n === 0) { console.log('Nothing was removed') } else { console.log('Removed ' + obj.result.n + ' documents') } } } ) } }