使用setTimeout时node.js处理stream反压

对于我之前从这个问题中遇到的进一步问题,这是一个问题:

nodejs:从文件读取并存储到数据库,限制最大并发数据库操作

问题:

我想稍后重新安排一些操作,但是这是打破了我处理背压的方法。

详情:

我有一个CSV文件,我正在阅读作为一个stream,并使用转换转换为JSON,然后asynchronous存储每一行​​数据库。

由于行被转换处理,它们被放置在负责发出数据库操作的asynchronous队列上。

例如

parser._transform = function(data, encoding, done) { var tick = this._parseRow(data); dbQueue.push(tick, function(err, result) { if (typeof(err) != 'undefined') { console.log(err) } }); this.push(tick); done(); } 

当队列饱和/空时,通过暂停和恢复parsing器来处理背压:

 dbQueue.saturated = function() { parser.pause(); } dbQueue.empty = function() { parser.resume(); } 

我一直试图做的改变是,当一个项目从队列中被取消时,将来有条件地重新调度一段时间(100ms):

 var dbQueue = async.queue(function(data, callback) { if (condition) { // re-schedule operation by adding back to queue 100ms later setTimeout(function(data, callback) { dbQueue.push(data, function(err, result){ }); }, 100, data, callback); } else { //execute the db store ... ... } } 

我相信我的问题是,现在很多操作将花费大部分时间在setTimeout上,所以dbQueue将是空的,并且转换stream的背压没有按照需要处理。

我已经尝试了使用诸如max_ops和running_ops之类的计数器来确保stream被暂停/恢复,但未成功。

在node.js中处理这个问题有更习惯的方法吗?

由于这看起来像是一个外部条件,而不是与dbQueue正在做什么有关,而不是在条件发生时将数据重新插入到队列中,所以我会暂停dbQueue 。 例如,假设您的情况是由于某种原因数据库断开连接,并且您可以听到该事件。 在这种情况下,当dbQueue饱和/空时,你可以做类似于你正在做的事情:

 db.on('disconnect', function() { dbQueue.pause(); }); db.on('connect', function() { dbQueue.resume(); }); 

这通常比等待一些预定的超时更好。 这就是说,有时候等待超时是唯一的select。 在这种情况下,你可以做类似的事情,但是不用等待一个单独的事件来触发resume() ,只需使用setTimeout()

 db.on('disconnect', function() { dbQueue.pause(); setTimeout(function() { dbQueue.resume(); }); }); 

注意:如果我们真的在这里谈论db断开连接,那么如果在db没有足够时间重新连接的情况下有db错误,那么您可能还想要暂停/恢复dbQueue

如果你有一个更具体的条件,你正在寻找,你愿意分享是什么,我可以给你一个更好的例子:)