如何对节点stream应用背压?

在尝试使用Node.JSstream时,我遇到了一个有趣的难题。 当input(可读)stream推送更多的数据,然后目的地(可写)关心我无法正确应用背压。

我尝试的两个方法是从Writable.prototype._write返回false,并保留对Readable的引用,以便可以从Writable调用Readable.pause() 。 这两个解决scheme都没有帮助,我会解释。

在我的练习中(你可以把完整的源代码看成一个Gist ),我有三条stream:

可读 – PasscodeGenerator

 util.inherits(PasscodeGenerator, stream.Readable); function PasscodeGenerator(prefix) { stream.Readable.call(this, {objectMode: true}); this.count = 0; this.prefix = prefix || ''; } PasscodeGenerator.prototype._read = function() { var passcode = '' + this.prefix + this.count; if (!this.push({passcode: passcode})) { this.pause(); this.once('drain', this.resume.bind(this)); } this.count++; }; 

我认为来自this.push()的返回代码足以让自己暂停,并等待drain事件恢复。

变换 – 哈希

 util.inherits(Hasher, stream.Transform); function Hasher(hashType) { stream.Transform.call(this, {objectMode: true}); this.hashType = hashType; } Hasher.prototype._transform = function(sample, encoding, next) { var hash = crypto.createHash(this.hashType); hash.setEncoding('hex'); hash.write(sample.passcode); hash.end(); sample.hash = hash.read(); this.push(sample); next(); }; 

只需将密码的散列添加到对象。

可写入 – SampleConsumer

 util.inherits(SampleConsumer, stream.Writable); function SampleConsumer(max) { stream.Writable.call(this, {objectMode: true}); this.max = (max != null) ? max : 10; this.count = 0; } SampleConsumer.prototype._write = function(sample, encoding, next) { this.count++; console.log('Hash %d (%s): %s', this.count, sample.passcode, sample.hash); if (this.count < this.max) { next(); } else { return false; } }; 

在这里,我希望尽可能快地使用数据,直到达到最大数量的样本,然后结束数据stream。 我尝试使用this.end()而不是return false但这导致了可怕的写后调用结束的问题。 返回false将停止所有的事情,如果样本量很小,但是当它很大,我得到一个内存不足的错误:

 FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - process out of memory Aborted (core dumped) 

根据这个理论上的回答 ,Writestream将返回false,导致stream缓冲直到缓冲区满( objectMode默认为objectMode ),最后Readable会调用它的this.pause()方法。 但16 + 16 + 16 = 48 ; 这是48个缓冲区中的东西,直到事物填满和系统堵塞。 实际上,因为没有涉及到克隆,因此它们之间传递的对象是相同的参考。 难道这不是只有16个记忆中的物体,直到高水位停止一切?

最后我意识到我可以通过使用闭包的方式将Writable引用设置为Readable来调用它的暂停方法。 但是,这个解决scheme意味着可写入stream很了解另一个对象。 我必须通过一个参考:

 var foo = new PasscodeGenerator('foobar'); foo .pipe(new Hasher('md5')) .pipe(new SampleConsumer(samples, foo)); 

而这对stream如何运作感到不合常规。 我认为背压足以导致Writable停止读取数据并防止内存不足错误。

一个类似的例子是Unix的head命令。 在节点I中实现将假定目的地可以结束 ,而不仅仅是忽略导致源保持推送数据,即使目的地有足够的数据来满足文件的开始部分。

我如何习惯性地构造自定义stream,以便当目标准备好结束源码stream时不会尝试推送更多数据?

这是如何在内部调用_read()的已知问题 。 由于您的_read()始终同步/立即推送,因此内部stream实现可以在正确的条件下进入循环。 通常 期望 _read()实现做某种asynchronousI / O(例如从磁盘或networking读取)。

对此的解决方法(如上面的链接所述)是至less在某些时候使_read()asynchronous。 您也可以在每次调用时使其asynchronous:

 PasscodeGenerator.prototype._read = function(n) { var passcode = '' + this.prefix + this.count; var self = this; // `setImmediate()` delays the push until the beginning // of the next tick of the event loop setImmediate(function() { self.push({passcode: passcode}); }); this.count++; };