如何在node.js断开连接时cachingMongoDB插入?

我们读了一个XML文件(使用xml-stream ),大约有500k个元素,并且像这样插入到MongoDB中:

 xml.on(`endElement: product`, writeDataToDb.bind(this, "product")); 

writeDataToDb(type, obj)插入如下所示:

 collection.insertOne(obj, {w: 1, wtimeout: 15000}).catch((e) => { }); 

现在,当Mongo连接断开连接时,xmlstream仍会读取,并且控制台会收到错误消息(无法插入,断开连接,EPIPE断开…)。

在文档中说:

当你closuresmongod进程时,驱动程序停止处理操作,并保持缓冲,因为bufferMaxEntries默认为-1,意味着缓冲所有的操作。

这个缓冲区实际上做了什么?

我们注意到当我们插入数据并closuresmongo服务器时,事情被缓冲,然后我们把mongo服务器备份起来,本地驱动程序成功地重新连接,并且节点恢复插入数据,但是缓冲文档(在mongo beeing offline期间)不被插入再次。

所以我质疑这个缓冲区及其用法。

目标:

我们正在寻找最好的方法来保持插入缓冲,直到mongo回来(在15000毫秒根据wtimeout ),然后插入缓冲的文件或使用xml.pause(); 和我们尝试没有成功的xml.resume()

基本上我们需要一点帮助,以便如何处理断开连接而不会丢失数据或中断。

用insertOne()插入500K元素是一个非常糟糕的主意。 您应该使用批量操作 ,允许您在单个请求中插入多个文档。 (这里例如10000,所以可以在50个单个请求中完成)为了避免缓冲问题,可以手动处理它:

  1. 使用bufferMaxEntries: 0禁用缓冲bufferMaxEntries: 0
  2. 设置重新连接属性: reconnectTries: 30, reconnectInterval: 1000
  3. 创build一个批量操作并为其提供10000个项目
  4. 暂停xml阅读器。 尝试插入10000个项目。 如果失败,则每3000ms重试一次,直到成功
  5. 如果批量操作在执行过程中被中断,您可能会遇到一些重复的ID问题,所以忽略它们(错误代码:11000)

这里是一个示例脚本:

 var fs = require('fs') var Xml = require('xml-stream') var MongoClient = require('mongodb').MongoClient var url = 'mongodb://localhost:27017/test' MongoClient.connect(url, { reconnectTries: 30, reconnectInterval: 1000, bufferMaxEntries: 0 }, function (err, db) { if (err != null) { console.log('connect error: ' + err) } else { var collection = db.collection('product') var bulk = collection.initializeUnorderedBulkOp() var totalSize = 500001 var size = 0 var fileStream = fs.createReadStream('data.xml') var xml = new Xml(fileStream) xml.on('endElement: product', function (product) { bulk.insert(product) size++ // if we have enough product, save them using bulk insert if (size % 10000 == 0) { xml.pause() bulk.execute(function (err, result) { if (err == null) { bulk = collection.initializeUnorderedBulkOp() console.log('doc ' + (size - 10000) + ' : ' + size + ' saved on first try') xml.resume() } else { console.log('bulk insert failed: ' + err) counter = 0 var retryInsert = setInterval(function () { counter++ bulk.execute(function (err, result) { if (err == null) { clearInterval(retryInsert) bulk = collection.initializeUnorderedBulkOp() console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries') xml.resume() } else if (err.code === 11000) { // ignore duplicate ID error clearInterval(retryInsert) bulk = collection.initializeUnorderedBulkOp() console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries') xml.resume() } else { console.log('failed after first try: ' + counter, 'error: ' + err) } }) }, 3000) // retry every 3000ms until success } }) } else if (size === totalSize) { bulk.execute(function (err, result) { if (err == null) { db.close() } else { console.log('bulk insert failed: ' + err) } }) } }) } }) 

样本日志输出:

 doc 0 : 10000 saved on first try doc 10000 : 20000 saved on first try doc 20000 : 30000 saved on first try [...] bulk insert failed: MongoError: interrupted at shutdown // mongodb server shutdown failed after first try: 1 error: MongoError: no connection available for operation and number of stored operation > 0 failed after first try: 2 error: MongoError: no connection available for operation and number of stored operation > 0 failed after first try: 3 error: MongoError: no connection available for operation and number of stored operation > 0 doc 130000 : 140000 saved after 4 tries doc 140000 : 150000 saved on first try [...] 

我不知道Mongodb驱动程序和这个缓冲区的条目。 也许它只保留特定场景下的数据。

所以我会用更一般的方法来回答这个问题,它可以处理任何数据库。

总之,你有两个问题:

  1. 您没有从失败的尝试中恢复
  2. XMLstream发送数据太快

为了处理第一个问题,你需要实现一个重试algorithm,以确保在放弃之前进行很多尝试。

要处理第二个问题,您需要在xmlstream上实施背压。 你可以使用pause方法, resume方法和input缓冲区。

 var Promise = require('bluebird'); var fs = require('fs'); var Xml = require('xml-stream'); var fileStream = fs.createReadStream('myFile.xml'); var xml = new Xml(fileStream); // simple exponential retry algorithm based on promises function exponentialRetry(task, initialDelay, maxDelay, maxRetry) { var delay = initialDelay; var retry = 0; var closure = function() { return task().catch(function(error) { retry++; if (retry > maxRetry) { throw error } var promise = Promise.delay(delay).then(closure); delay = Math.min(delay * 2, maxDelay); return promise; }) }; return closure(); } var maxPressure = 100; var currentPressure = 0; var suspended = false; var stopped = false; var buffer = []; // handle back pressure by storing incoming tasks in the buffer // pause the xml stream as soon as we have enough tasks to work on // resume it when the buffer is empty function writeXmlDataWithBackPressure(product) { // closure used to try to start a task var tryStartTask = function() { // if we have enough tasks running, pause the xml stream if (!stopped && !suspended && currentPressure >= maxPressure) { xml.pause(); suspended = true; console.log("stream paused"); } // if we have room to run tasks if (currentPressure < maxPressure) { // if we have a buffered task, start it // if not, resume the xml stream if (buffer.length > 0) { buffer.shift()(); } else if (!stopped) { try { xml.resume(); suspended = false; console.log("stream resumed"); } catch (e) { // the only way to know if you've reached the end of the stream // xml.on('end') can be triggered BEFORE all handlers are called // probably a bug of xml-stream stopped = true; console.log("stream end"); } } } }; // push the task to the buffer buffer.push(function() { currentPressure++; // use exponential retry to ensure we will try this operation 100 times before giving up exponentialRetry(function() { return writeDataToDb(product) }, 100, 2000, 100).finally(function() { currentPressure--; // a task has just finished, let's try to run a new one tryStartTask(); }); }); // we've just buffered a task, let's try to run it tryStartTask(); } // write the product to database here :) function writeDataToDb(product) { // the following code is here to create random delays and random failures (just for testing) var timeToWrite = Math.random() * 100; var failure = Math.random() > 0.5; return Promise.delay(timeToWrite).then(function() { if (failure) { throw new Error(); } return null; }) } xml.on('endElement: product', writeXmlDataWithBackPressure); 

玩它,把一些console.log了解它的行为。 我希望这会帮助你解决你的问题:)