连接两个(或n个)stream

  • 2stream:

    给定可读stream stream1stream2获取包含stream1stream2的stream的方式是什么是一种习惯的(简洁的)方式?

    我不能做stream1.pipe(outStream); stream2.pipe(outStream) stream1.pipe(outStream); stream2.pipe(outStream) ,因为那么stream内容混杂在一起。

  • nstream:

    给定一个EventEmitter ,发出不确定数量的stream,例如

     eventEmitter.emit('stream', stream1) eventEmitter.emit('stream', stream2) eventEmitter.emit('stream', stream3) ... eventEmitter.emit('end') 

    什么是一种习惯(简洁)的方式来获得所有stream串联在一起的stream

组合stream包将stream连接起来。 自述文件中的示例:

 var CombinedStream = require('combined-stream'); var fs = require('fs'); var combinedStream = CombinedStream.create(); combinedStream.append(fs.createReadStream('file1.txt')); combinedStream.append(fs.createReadStream('file2.txt')); combinedStream.pipe(fs.createWriteStream('combined.txt')); 

我相信你必须一次追加所有的stream。 如果队列为空, combinedStream自动结束。 见问题#5 。

Stream-Stream库是一个明确的.end替代品,但它不太受欢迎,大概没有经过很好的testing。 它使用节点0.10的streams2 API(请参阅此讨论 )。

一个简单的reduce操作在nodejs中应该没问题 !

 const {PassThrough} = require('stream') let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) => { s.pipe(pt, {end: false}) s.once('end', () => a.every(s => s.ended) && pt.emit('end')) return pt }, new PassThrough()) 

干杯;)

你可以使它更简洁,但是这是一个可行的方法:

 var util = require('util'); var EventEmitter = require('events').EventEmitter; function ConcatStream(streamStream) { EventEmitter.call(this); var isStreaming = false, streamsEnded = false, that = this; var streams = []; streamStream.on('stream', function(stream){ stream.pause(); streams.push(stream); ensureState(); }); streamStream.on('end', function() { streamsEnded = true; ensureState(); }); var ensureState = function() { if(isStreaming) return; if(streams.length == 0) { if(streamsEnded) that.emit('end'); return; } isStreaming = true; streams[0].on('data', onData); streams[0].on('end', onEnd); streams[0].resume(); }; var onData = function(data) { that.emit('data', data); }; var onEnd = function() { isStreaming = false; streams[0].removeAllListeners('data'); streams[0].removeAllListeners('end'); streams.shift(); ensureState(); }; } util.inherits(ConcatStream, EventEmitter); 

我们跟踪streams的状态( streams的队列; push回来,从前面shift ), isStreamingstreamsEnded 。 当我们得到一个新的stream,我们推它,当一个stream结束时,我们停止聆听和转移。 当stream的stream结束时,我们设置streamsEnded

在每一个事件中,我们检查我们所处的状态。如果我们已经在stream式传输(pipe道stream),我们什么也不做。 如果队列是空的并且streamsEnded被设置,我们发出end事件。 如果队列中有东西,我们恢复并听取它的事件。

*请注意pauseresume是build议性的,所以一些stream可能不正确,并需要缓冲。 这个练习留给读者。

完成了所有这些之后,我将通过构造一个EventEmitter ,创build一个ConcatStream并发出两个stream事件,然后是一个end事件来完成n=2情况。 我相信它可以做得更简洁,但我们也可以使用我们所拥有的。

这可以用香草nodejs完成

 import { PassThrough } from 'stream' const merge = (...streams) => { let pass = new PassThrough() let waiting = streams.length for (let stream of streams) { pass = stream.pipe(pass, {end: false}) stream.once('end', () => --waiting === 0 && pass.emit('end')) } return pass } 

https://github.com/joepie91/node-combined-stream2是组合stream模块(如上所述)的一个embedded式Streams2兼容替代品。它自动包装Streams1stream。

合并stream2的示例代码:

 var CombinedStream = require('combined-stream2'); var fs = require('fs'); var combinedStream = CombinedStream.create(); combinedStream.append(fs.createReadStream('file1.txt')); combinedStream.append(fs.createReadStream('file2.txt')); combinedStream.pipe(fs.createWriteStream('combined.txt')); 

streamee.js是基于节点1.0+stream的一组stream转换器和composer php,并且包括连接方法:

 var stream1ThenStream2 = streamee.concatenate([stream1, stream2]);