使用setTimeout时node.js处理stream反压
对于我之前从这个问题中遇到的进一步问题,这是一个问题:
nodejs:从文件读取并存储到数据库,限制最大并发数据库操作
问题:
我想稍后重新安排一些操作,但是这是打破了我处理背压的方法。
详情:
我有一个CSV文件,我正在阅读作为一个stream,并使用转换转换为JSON,然后asynchronous存储每一行数据库。
由于行被转换处理,它们被放置在负责发出数据库操作的asynchronous队列上。
例如
parser._transform = function(data, encoding, done) { var tick = this._parseRow(data); dbQueue.push(tick, function(err, result) { if (typeof(err) != 'undefined') { console.log(err) } }); this.push(tick); done(); }
当队列饱和/空时,通过暂停和恢复parsing器来处理背压:
dbQueue.saturated = function() { parser.pause(); } dbQueue.empty = function() { parser.resume(); }
我一直试图做的改变是,当一个项目从队列中被取消时,将来有条件地重新调度一段时间(100ms):
var dbQueue = async.queue(function(data, callback) { if (condition) { // re-schedule operation by adding back to queue 100ms later setTimeout(function(data, callback) { dbQueue.push(data, function(err, result){ }); }, 100, data, callback); } else { //execute the db store ... ... } }
我相信我的问题是,现在很多操作将花费大部分时间在setTimeout上,所以dbQueue将是空的,并且转换stream的背压没有按照需要处理。
我已经尝试了使用诸如max_ops和running_ops之类的计数器来确保stream被暂停/恢复,但未成功。
在node.js中处理这个问题有更习惯的方法吗?
由于这看起来像是一个外部条件,而不是与dbQueue
正在做什么有关,而不是在条件发生时将数据重新插入到队列中,所以我会暂停dbQueue
。 例如,假设您的情况是由于某种原因数据库断开连接,并且您可以听到该事件。 在这种情况下,当dbQueue
饱和/空时,你可以做类似于你正在做的事情:
db.on('disconnect', function() { dbQueue.pause(); }); db.on('connect', function() { dbQueue.resume(); });
这通常比等待一些预定的超时更好。 这就是说,有时候等待超时是唯一的select。 在这种情况下,你可以做类似的事情,但是不用等待一个单独的事件来触发resume()
,只需使用setTimeout()
:
db.on('disconnect', function() { dbQueue.pause(); setTimeout(function() { dbQueue.resume(); }); });
注意:如果我们真的在这里谈论db断开连接,那么如果在db没有足够时间重新连接的情况下有db错误,那么您可能还想要暂停/恢复dbQueue
如果你有一个更具体的条件,你正在寻找,你愿意分享是什么,我可以给你一个更好的例子:)