我怎样才能扩大findOneAndUpdatemongoose/ mongodb 500万更新?

我正在与这些后端工作:nodejs,mongoose,mongodb,ironmq。 还有另一个应用程序(一个python FTP服务器)被用作数据源。

这个系统或多或less是这样工作的:

  • 用户上传一个csv的数据转储(近300万条)到FTP服务器(这是定期发生的,每24小时一次)

  • FTP服务器parsing数据,并同步地批量推送到IronMQ队列(2000年)。 我在这里进行批处理来优化内存

  • 另一个应用程序(nodejs)不断轮询该队列中的数据,每10秒钟发送100条消息(这是允许的最大数量),对这些数据起作用,然后更新我的数据库(对每个消息使用findOneAndUpdate )。 我有5个这些应用程序正在运行。

现在除了整个操作完成所需的时间之外,这个设置没有任何明显的问题。 parsing的数据完全被推送到MQ需要将近2个小时,但是这样做并不是什么大问题,因为它是分批完成的。 实际的问题出现在“保存/更新到数据库”部分。

平均而言,每小时在db中更新20-24K条目。 但是,由于我有300万条目,这是花了超过24小时(这是行不通的,因为FTP上的文件每24小时刷新一次,数据将用于在我的应用程序的其他部分执行某些操作)。

我不确定如何从这里继续,但我有几个问题。

  • 我的上述方法可以被认为是最佳/有效的吗? 或者有什么可以改进的?
  • 如何通过db或者通过改变devise来减less整个更新操作所花费的时间?
  • mongodb是否认为这个案例是好的,还是有更好的select?

如果你能提供一些帮助,这将是非常棒的。 请让我知道,如果你们需要更多的信息。

您可以使用Bulk API方法来优化更新,这些方法非常高效,因为它们允许您在单个请求(作为批处理)内向服务器发送许多更新操作。 考虑下面的例子来演示不同的MongoDB版本的这种方法:

假设您的nodejs应用程序将消息数据轮询到列表,并且对于支持MongoDB Server 3.2.x Mongoose版本>=4.3.0 ,可以使用bulkWrite()将集合更新为:

 var bulkUpdateCallback = function(err, r){ console.log(r.matchedCount); console.log(r.modifiedCount); }, operations = []; // Initialise the bulk operations array messages.forEach(function (msg) { operations.push({ "updateOne": { "filter": { "_id": msg._id } , "update": { "$set": { "value": msg.value } } // example update operation } }); // Send once in 500 requests only if (operations.length % 500 === 0 ) { Model.collection.bulkWrite( operations, { "ordered": true, w: 1 }, bulkUpdateCallback ); operations = []; } }); // Get the underlying collection via the native node.js driver collection object Model.collection.bulkWrite(bulkOps, { "ordered": true, w: 1 }, bulkUpdateCallback); 

在上面,您初始化您的更新操作数组,并将操作限制为500个批次。select比默认批量限制值1000低的值的原因通常是受控制的select。 正如那里的文档中所指出的那样,默认情况下,MongoDB会一次最多1000次的批量发送到服务器,并且不能保证这些默认的1000个操作请求实际上符合16MB的BSON限制 。 所以你仍然需要在“安全”的一面,并强加一个较低的批量大小,你只能有效地pipe理,以便它总计小于数据限制在发送到服务器的大小。


如果您使用的旧版本的Mongoose〜3.8.8 ~3.8.8, ~3.8.22, 4.x支持MongoDB Server >=2.6.x ,则可以使用Bulk() API,如下所示

 var bulk = Model.collection.initializeOrderedBulkOp(), bulkUpdateCallback = function(err, r){ console.log(r.matchedCount); console.log(r.modifiedCount); }, counter = 0; messages.forEach(function(msg) { bulk.find({ "_id": msg._id }).updateOne({ "$set": { "value": msg.value } }); counter++; if (counter % 500 == 0) { bulk.execute(function(err, r) { // do something with the result bulk = Model.collection.initializeOrderedBulkOp(); counter = 0; }); } }); // Catch any docs in the queue under or over the 500's if (counter > 0) { bulk.execute(bulkUpdateCallback); }