什么是一个正确的方法来暂停从可写的nodejspipe道可读stream?
我正在写一个模块,这是一个可写的stream。 我想为我的用户实现pipe道接口。
如果发生错误,我需要暂停可读stream并发出错误事件。 然后,用户将决定 – 如果他有错误,他应该能够恢复到数据处理。
var writeable = new BackPressureStream(); writeable.on('error', function(error){ console.log(error); writeable.resume(); }); var readable = require('fs').createReadStream('somefile.txt'); readable.pipe.(writeable);
我看到该节点为我们提供了可用于暂停可读stream的readable.pause()
方法。 但我无法得到我可以从我的可写stream模块中调用它:
var Writable = require('stream').Writable; function BackPressureStream(options) { Writable.call(this, options); } require('util').inherits(BackPressureStream, Writable); BackPressureStream.prototype._write = function(chunk, encoding, done) { done(); }; BackPressureStream.prototype.resume = function() { this.emit('drain'); }
如何在可写入的stream中实现背压?
PS可以使用pipe/unpipe
事件,提供可读stream作为参数。 但是也有人说,对于pipe道stream,暂停的唯一机会是从可写入的不可读可读stream。
我说得对吗? 我必须删除我的可写入stream,直到用户调用恢复? 而且,在用户调用恢复后,我应该将可读stream传回给用户?
基本上,据我所知,在错误事件的情况下,你正在考虑把反压放在stream上。 你有几个select。
首先,正如你已经确定的那样,使用pipe
来抓取readstream的一个实例,并做一些奇特的步法。
另一个select是创build一个包装可写的stream,它提供了这个function(即它将一个WritableStream
作为input,并且在实现streamfunction时将数据传递给提供的stream。
基本上你最终会得到类似的东西
source stream -> wrapping writable -> writable
https://nodejs.org/api/stream.html#stream_implementing_a_writable_stream处理实现可写入的stream。
对你来说,关键是如果在底层写入中发生错误,你可以在stream上设置一个标志,下一个write
调用发生,你将缓冲块,存储callback,只有调用。 就像是
// ... constructor(wrappedWritableStream) { wrappedWritableStream.on('error', this.errorHandler); this.wrappedWritableStream = wrappedWritableStream; } // ... write(chunk, encoding, callback) { if (this.hadError) { // Note: until callback is called, this function won't be called again, so we will have maximum one stored // chunk. this.bufferedChunk = [chunk, encoding, callback]; } else { wrappedWritableStream.write(chunk, encoding, callback); } } // ... errorHandler(err) { console.error(err); this.hadError = err; this.emit(err); } // ... recoverFromError() { if (this.bufferedChunk) { wrappedWritableStream.write(...this.bufferedChunk); this.bufferedChunk = undefined; } this.hadError = false; }
注意:你只需要实现write
function,但是我鼓励你去挖掘和发挥其他的实现function。
另外值得注意的是,你可能在发送错误事件的stream中写入一些麻烦,但是我将把它作为一个独立的问题来解决。
这里是另一个很好的资源https://nodejs.org/en/docs/guides/backpressuring-in-streams/