使用node.js在stream中进行asynchronous调用的问题

我在我的stream中进行asynchronous调用时遇到问题。 它看起来像是因为某种原因,如果我有三个stream,中间stream进行asynchronous调用,最终的stream永远不会收到“结束”事件。 我可以用一些简单的stream和超时模拟行为。

var options = {} var streamOne = function(){ return through(function write(row){ this.emit('data', row) }) } var streamTwo = function(){ return through(function write(row){ this.pause() var that = this; setTimeout(function(){ that.emit('data', row) that.resume() }, 1000) }) } options.streams = [new streamOne(), new streamTwo()] 

然后我通过这个事件stream 。

 var streams = [] //Pass stream one and stream two to es.pipe streams.push(es.pipe.apply(this, options.streams)) //Then add the final stream var endStream = es.pipe( new streamThree() ) streams.push(endStream) //And send it through the pipeline es.pipeline.apply(this, streams) 

所以,这在目前的情况下是行不通的。

一些令人困惑的问题:如果我删除streamOne,它的工作原理! 如果streamTwo不进行asynchronous调用,它的工作原理。 这让我觉得问题在于两个stream的交互方式。 然而,如果我在整个代码console.log ,它看起来像一切工作正常,streamThree将写入数据,但从来没有注册“结束”事件。 *注意:streamThree不使用through,而是使用本地Streams模块。

想一想为什么发生这种情况?

在运行一些testing用例之后,看起来I / O没有被pipe道或者通过正确处理。 我不完全确定为什么会出现这种情况,但我认为这是造成问题的stream程暂停和恢复的一个竞争条件。 我做了一些事情来清理代码:

1)简化pipe道。 而不是在pipe道内部嵌套es.pipe ,我只是直接放入stream。 这有助于更好地pipe理数据stream之间的数据stream。

2)我不是通过数据stream发送数据,而是使用this.queue排队数据,让模块处理潜在的背压。

3)我使用事件stream方法es.map来处理asynchronous调用的stream程。 我认为这是一个更好的解决scheme,因为它更干净地处理了stream的暂停和恢复,并且仍然返回直通stream。