什么是一个正确的方法来暂停从可写的nodejspipe道可读stream?

我正在写一个模块,这是一个可写的stream。 我想为我的用户实现pipe道接口。

如果发生错误,我需要暂停可读stream并发出错误事件。 然后,用户将决定 – 如果他有错误,他应该能够恢复到数据处理。

var writeable = new BackPressureStream(); writeable.on('error', function(error){ console.log(error); writeable.resume(); }); var readable = require('fs').createReadStream('somefile.txt'); readable.pipe.(writeable); 

我看到该节点为我们提供了可用于暂停可读stream的readable.pause()方法。 但我无法得到我可以从我的可写stream模块中调用它:

 var Writable = require('stream').Writable; function BackPressureStream(options) { Writable.call(this, options); } require('util').inherits(BackPressureStream, Writable); BackPressureStream.prototype._write = function(chunk, encoding, done) { done(); }; BackPressureStream.prototype.resume = function() { this.emit('drain'); } 

如何在可写入的stream中实现背压?

PS可以使用pipe/unpipe事件,提供可读stream作为参数。 但是也有人说,对于pipe道stream,暂停的唯一机会是从可写入的不可读可读stream。

我说得对吗? 我必须删除我的可写入stream,直到用户调用恢复? 而且,在用户调用恢复后,我应该将可读stream传回给用户?

基本上,据我所知,在错误事件的情况下,你正在考虑把反压放在stream上。 你有几个select。

首先,正如你已经确定的那样,使用pipe来抓取readstream的一个实例,并做一些奇特的步法。

另一个select是创build一个包装可写的stream,它提供了这个function(即它将一个WritableStream作为input,并且在实现streamfunction时将数据传递给提供的stream。

基本上你最终会得到类似的东西

source stream -> wrapping writable -> writable

https://nodejs.org/api/stream.html#stream_implementing_a_writable_stream处理实现可写入的stream。

对你来说,关键是如果在底层写入中发生错误,你可以在stream上设置一个标志,下一个write调用发生,你将缓冲块,存储callback,只有调用。 就像是

 // ... constructor(wrappedWritableStream) { wrappedWritableStream.on('error', this.errorHandler); this.wrappedWritableStream = wrappedWritableStream; } // ... write(chunk, encoding, callback) { if (this.hadError) { // Note: until callback is called, this function won't be called again, so we will have maximum one stored // chunk. this.bufferedChunk = [chunk, encoding, callback]; } else { wrappedWritableStream.write(chunk, encoding, callback); } } // ... errorHandler(err) { console.error(err); this.hadError = err; this.emit(err); } // ... recoverFromError() { if (this.bufferedChunk) { wrappedWritableStream.write(...this.bufferedChunk); this.bufferedChunk = undefined; } this.hadError = false; } 

注意:你只需要实现writefunction,但是我鼓励你去挖掘和发挥其他的实现function。

另外值得注意的是,你可能在发送错误事件的stream中写入一些麻烦,但是我将把它作为一个独立的问题来解决。

这里是另一个很好的资源https://nodejs.org/en/docs/guides/backpressuring-in-streams/