stream.Transform在完成输出之前接受新的input

我正在实现一个asynchronous操作的转换stream。 我叫Parser

 var Transform = require('stream').transform; function Parser(options) { Transform.call(this, {objectMode: true}); } Parser.prototype._transform = function _transform(input, encoding, callback) { var this_ = this; doSomethingAsync(input, function(output) { this_.push(output); //possible location #1 for callback(); }); //possible location #2 for callback(); } 

每个传入的块可能需要很长时间才能处理( doSomethingAsync需要networking请求)。 但是,每个块都完全独立于块处理。 而且,输出的确切顺序并不重要。 每个输出包含一个标识其input的描述符,而不是按顺序标识。

因此,我希望尽快再次调用_transform ,而不是等待给定的块完成处理。 所以,看代码,如果我把callback()放在possible location #1 ,那么在每个块被完全处理之前, _transform不会被调用。 但如果我把它放在possible location #2 ,那么我的stream是在callback后推动,导致这些不好看的

 Uncaught Error: stream.push() after EOF 

一旦stream终止,错误。

所以我的问题:是否有可能做到这一点与变换stream? 还是应该考虑使用一个库? 如果是这样,哪种stream派(事件stream,FRP等)?

谢谢。

你可以在你的stream上实现_flush() ,只有当你所有的asynchronous函数完成时才调用传递给该函数的callback函数。 像这样的东西:

 function Parser(options) { Transform.call(this, {objectMode: true}); this._pending = 0; this._flushcb = undefined; } Parser.prototype._transform = function _transform(input, encoding, callback) { var self = this; ++this._pending; doSomethingAsync(input, function(output) { self.push(output); if (--self._pending === 0 && self._flushcb) self._flushcb(); }); callback(); } Parser.prototype._flush = function(callback) { this._flushcb = callback; }; 

我相信答案并不完整。 想象一下你有这样一个_transform()

 _transform(chunk, encoding, done) { let data = chunk.toString(); this.rest += data; [this.toPush, this.rest] = this.f(this.rest); for (let i = 0; i < this.toPush.length; i++) { if (!this.push(this.toPush[i])) { this._source.readStop(); break; } } done() } 

“`

例如, f是一个函数,将段落中接收的块分割。 rest是在块的末尾f东西不能确定它是否是一个完整的paragraphe,因此需要更多的数据(另一块)。 当所有内容都被读取后,可以假设rest是整个段落,然后使用_flush按如下所示推送它。 上面描述的例外是抛出的,可能是因为"<p>"+this.rest+"</p>"大于this.rest 。 这不是真正的预期的行为…

  _flush(done) { if (this.rest !== "") this.push("<p>"+this.rest+"</p>"); this.rest = null; this.toPush = null; done() } 

编辑:所以卡尔文Metcalf给了我一个工作在这里https://github.com/nodejs/readable-stream/issues/207 :在节点8.0.0可以使用_final而不是_flush。 这个问题似乎很不稳定,因为他没有在他的环境中复制。