如何在node.js中实现正确处理背压的stream?

我不能为了我的生活弄清楚如何实现一个正确处理背压的stream 。 你应该永远不要使用暂停和恢复?

我有这个实现我试图得到正确的工作:

var StreamPeeker = exports.StreamPeeker = function(myStream, callback) { stream.Readable.call(this, {highWaterMark: highWaterMark}) this.stream = myStream myStream.on('readable', function() { var data = myStream.read(5000) //process.stdout.write("Eff: "+data) if(data !== null) { if(!this.push(data)) { process.stdout.write("Pause") this.pause() } callback(data) } }.bind(this)) myStream.on('end', function() { this.push(null) }.bind(this)) } util.inherits(StreamPeeker, stream.Readable) StreamPeeker.prototype._read = function() { process.stdout.write("resume") //this.resume() // putting this in for some reason causes the stream to not output??? } 

它正确地发送输出,但不正确地产生背压。 我怎样才能改变它来正确支持背压?

好吧,我经过了大量的试验和错误,终于弄清楚了。 几条准则:

  • 永远不要使用暂停或恢复(否则会进入传统的“stream动”模式)
  • 不要添加“数据”事件监听器(否则它将进入传统的“stream动”模式)
  • 它的实现者有责任跟踪源的可读性
  • 它的实现者有责任跟踪目标何时需要更多数据
  • 实现不应读取任何数据,直到_read方法被调用
  • read的参数告诉源给它多个字节,最好将传递给this._read的parameter passing给源的read方法。 这样,您应该能够configuration在目的地一次读取多less,而stream链的其余部分应该是自动的。

所以这就是我改变它:

更新:我创build了一个Readable,通过适当的背压实现起来更容易,并且应该和节点的本地stream一样灵活。

 var Readable = stream.Readable var util = require('util') // an easier Readable stream interface to implement // requires that subclasses: // implement a _readSource function that // * gets the same parameter as Readable._read (size) // * should return either data to write, or null if the source doesn't have more data yet // call 'sourceHasData(hasData)' when the source starts or stops having data available // calls 'end()' when the source is out of data (forever) var Stream666 = {} Stream666.Readable = function() { stream.Readable.apply(this, arguments) if(this._readSource === undefined) { throw new Error("You must define a _readSource function for an object implementing Stream666") } this._sourceHasData = false this._destinationWantsData = false this._size = undefined // can be set by _read } util.inherits(Stream666.Readable, stream.Readable) Stream666.Readable.prototype._read = function(size) { this._destinationWantsData = true if(this._sourceHasData) { pushSourceData(this, size) } else { this._size = size } } Stream666.Readable.prototype.sourceHasData = function(_sourceHasData) { this._sourceHasData = _sourceHasData if(_sourceHasData && this._destinationWantsData) { pushSourceData(this, this._size) } } Stream666.Readable.prototype.end = function() { this.push(null) } function pushSourceData(stream666Readable, size) { var data = stream666Readable._readSource(size) if(data !== null) { if(!stream666Readable.push(data)) { stream666Readable._destinationWantsData = false } } else { stream666Readable._sourceHasData = false } } // creates a stream that can view all the data in a stream and passes the data through // correctly supports backpressure // parameters: // stream - the stream to peek at // callback - called when there's data sent from the passed stream var StreamPeeker = function(myStream, callback) { Stream666.Readable.call(this) this.stream = myStream this.callback = callback myStream.on('readable', function() { this.sourceHasData(true) }.bind(this)) myStream.on('end', function() { this.end() }.bind(this)) } util.inherits(StreamPeeker, Stream666.Readable) StreamPeeker.prototype._readSource = function(size) { var data = this.stream.read(size) if(data !== null) { this.callback(data) return data } else { this.sourceHasData(false) return null } } 

老答案:

 // creates a stream that can view all the data in a stream and passes the data through // correctly supports backpressure // parameters: // stream - the stream to peek at // callback - called when there's data sent from the passed stream var StreamPeeker = exports.StreamPeeker = function(myStream, callback) { stream.Readable.call(this) this.stream = myStream this.callback = callback this.reading = false this.sourceIsReadable = false myStream.on('readable', function() { this.sourceIsReadable = true this._readMoreData() }.bind(this)) myStream.on('end', function() { this.push(null) }.bind(this)) } util.inherits(StreamPeeker, stream.Readable) StreamPeeker.prototype._read = function() { this.reading = true if(this.sourceIsReadable) { this._readMoreData() } } StreamPeeker.prototype._readMoreData = function() { if(!this.reading) return; var data = this.stream.read() if(data !== null) { if(!this.push(data)) { this.reading = false } this.callback(data) } }