如何对节点stream应用背压?
在尝试使用Node.JSstream时,我遇到了一个有趣的难题。 当input(可读)stream推送更多的数据,然后目的地(可写)关心我无法正确应用背压。
我尝试的两个方法是从Writable.prototype._write
返回false,并保留对Readable的引用,以便可以从Writable调用Readable.pause()
。 这两个解决scheme都没有帮助,我会解释。
在我的练习中(你可以把完整的源代码看成一个Gist ),我有三条stream:
可读 – PasscodeGenerator
util.inherits(PasscodeGenerator, stream.Readable); function PasscodeGenerator(prefix) { stream.Readable.call(this, {objectMode: true}); this.count = 0; this.prefix = prefix || ''; } PasscodeGenerator.prototype._read = function() { var passcode = '' + this.prefix + this.count; if (!this.push({passcode: passcode})) { this.pause(); this.once('drain', this.resume.bind(this)); } this.count++; };
我认为来自this.push()
的返回代码足以让自己暂停,并等待drain
事件恢复。
变换 – 哈希
util.inherits(Hasher, stream.Transform); function Hasher(hashType) { stream.Transform.call(this, {objectMode: true}); this.hashType = hashType; } Hasher.prototype._transform = function(sample, encoding, next) { var hash = crypto.createHash(this.hashType); hash.setEncoding('hex'); hash.write(sample.passcode); hash.end(); sample.hash = hash.read(); this.push(sample); next(); };
只需将密码的散列添加到对象。
可写入 – SampleConsumer
util.inherits(SampleConsumer, stream.Writable); function SampleConsumer(max) { stream.Writable.call(this, {objectMode: true}); this.max = (max != null) ? max : 10; this.count = 0; } SampleConsumer.prototype._write = function(sample, encoding, next) { this.count++; console.log('Hash %d (%s): %s', this.count, sample.passcode, sample.hash); if (this.count < this.max) { next(); } else { return false; } };
在这里,我希望尽可能快地使用数据,直到达到最大数量的样本,然后结束数据stream。 我尝试使用this.end()
而不是return false
但这导致了可怕的写后调用结束的问题。 返回false将停止所有的事情,如果样本量很小,但是当它很大,我得到一个内存不足的错误:
FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - process out of memory Aborted (core dumped)
根据这个理论上的回答 ,Writestream将返回false,导致stream缓冲直到缓冲区满( objectMode
默认为objectMode
),最后Readable会调用它的this.pause()
方法。 但16 + 16 + 16 = 48
; 这是48个缓冲区中的东西,直到事物填满和系统堵塞。 实际上,因为没有涉及到克隆,因此它们之间传递的对象是相同的参考。 难道这不是只有16个记忆中的物体,直到高水位停止一切?
最后我意识到我可以通过使用闭包的方式将Writable引用设置为Readable来调用它的暂停方法。 但是,这个解决scheme意味着可写入stream很了解另一个对象。 我必须通过一个参考:
var foo = new PasscodeGenerator('foobar'); foo .pipe(new Hasher('md5')) .pipe(new SampleConsumer(samples, foo));
而这对stream如何运作感到不合常规。 我认为背压足以导致Writable停止读取数据并防止内存不足错误。
一个类似的例子是Unix的head
命令。 在节点I中实现将假定目的地可以结束 ,而不仅仅是忽略导致源保持推送数据,即使目的地有足够的数据来满足文件的开始部分。
我如何习惯性地构造自定义stream,以便当目标准备好结束源码stream时不会尝试推送更多数据?
这是如何在内部调用_read()
的已知问题 。 由于您的_read()
始终同步/立即推送,因此内部stream实现可以在正确的条件下进入循环。 通常 期望 _read()
实现做某种asynchronousI / O(例如从磁盘或networking读取)。
对此的解决方法(如上面的链接所述)是至less在某些时候使_read()
asynchronous。 您也可以在每次调用时使其asynchronous:
PasscodeGenerator.prototype._read = function(n) { var passcode = '' + this.prefix + this.count; var self = this; // `setImmediate()` delays the push until the beginning // of the next tick of the event loop setImmediate(function() { self.push({passcode: passcode}); }); this.count++; };