stream.Transform在完成输出之前接受新的input
我正在实现一个asynchronous操作的转换stream。 我叫Parser
。
var Transform = require('stream').transform; function Parser(options) { Transform.call(this, {objectMode: true}); } Parser.prototype._transform = function _transform(input, encoding, callback) { var this_ = this; doSomethingAsync(input, function(output) { this_.push(output); //possible location #1 for callback(); }); //possible location #2 for callback(); }
每个传入的块可能需要很长时间才能处理( doSomethingAsync
需要networking请求)。 但是,每个块都完全独立于块处理。 而且,输出的确切顺序并不重要。 每个输出包含一个标识其input的描述符,而不是按顺序标识。
因此,我希望尽快再次调用_transform
,而不是等待给定的块完成处理。 所以,看代码,如果我把callback()
放在possible location #1
,那么在每个块被完全处理之前, _transform
不会被调用。 但如果我把它放在possible location #2
,那么我的stream是在callback后推动,导致这些不好看的
Uncaught Error: stream.push() after EOF
一旦stream终止,错误。
所以我的问题:是否有可能做到这一点与变换stream? 还是应该考虑使用一个库? 如果是这样,哪种stream派(事件stream,FRP等)?
谢谢。
你可以在你的stream上实现_flush()
,只有当你所有的asynchronous函数完成时才调用传递给该函数的callback函数。 像这样的东西:
function Parser(options) { Transform.call(this, {objectMode: true}); this._pending = 0; this._flushcb = undefined; } Parser.prototype._transform = function _transform(input, encoding, callback) { var self = this; ++this._pending; doSomethingAsync(input, function(output) { self.push(output); if (--self._pending === 0 && self._flushcb) self._flushcb(); }); callback(); } Parser.prototype._flush = function(callback) { this._flushcb = callback; };
我相信答案并不完整。 想象一下你有这样一个_transform()
:
_transform(chunk, encoding, done) { let data = chunk.toString(); this.rest += data; [this.toPush, this.rest] = this.f(this.rest); for (let i = 0; i < this.toPush.length; i++) { if (!this.push(this.toPush[i])) { this._source.readStop(); break; } } done() }
“`
例如, f
是一个函数,将段落中接收的块分割。 rest
是在块的末尾f
东西不能确定它是否是一个完整的paragraphe,因此需要更多的数据(另一块)。 当所有内容都被读取后,可以假设rest
是整个段落,然后使用_flush
按如下所示推送它。 上面描述的例外是抛出的,可能是因为"<p>"+this.rest+"</p>"
大于this.rest
。 这不是真正的预期的行为…
_flush(done) { if (this.rest !== "") this.push("<p>"+this.rest+"</p>"); this.rest = null; this.toPush = null; done() }
编辑:所以卡尔文Metcalf给了我一个工作在这里https://github.com/nodejs/readable-stream/issues/207 :在节点8.0.0可以使用_final而不是_flush。 这个问题似乎很不稳定,因为他没有在他的环境中复制。