连接两个(或n个)stream
-
2stream:
给定可读stream
stream1
和stream2
, 获取包含stream1
和stream2
的stream的方式是什么是一种习惯的(简洁的)方式?我不能做
stream1.pipe(outStream); stream2.pipe(outStream)
stream1.pipe(outStream); stream2.pipe(outStream)
,因为那么stream内容混杂在一起。 -
nstream:
给定一个EventEmitter ,发出不确定数量的stream,例如
eventEmitter.emit('stream', stream1) eventEmitter.emit('stream', stream2) eventEmitter.emit('stream', stream3) ... eventEmitter.emit('end')
什么是一种习惯(简洁)的方式来获得所有stream串联在一起的stream ?
组合stream包将stream连接起来。 自述文件中的示例:
var CombinedStream = require('combined-stream'); var fs = require('fs'); var combinedStream = CombinedStream.create(); combinedStream.append(fs.createReadStream('file1.txt')); combinedStream.append(fs.createReadStream('file2.txt')); combinedStream.pipe(fs.createWriteStream('combined.txt'));
我相信你必须一次追加所有的stream。 如果队列为空, combinedStream
自动结束。 见问题#5 。
Stream-Stream库是一个明确的.end
替代品,但它不太受欢迎,大概没有经过很好的testing。 它使用节点0.10的streams2 API(请参阅此讨论 )。
一个简单的reduce操作在nodejs中应该没问题 !
const {PassThrough} = require('stream') let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) => { s.pipe(pt, {end: false}) s.once('end', () => a.every(s => s.ended) && pt.emit('end')) return pt }, new PassThrough())
干杯;)
你可以使它更简洁,但是这是一个可行的方法:
var util = require('util'); var EventEmitter = require('events').EventEmitter; function ConcatStream(streamStream) { EventEmitter.call(this); var isStreaming = false, streamsEnded = false, that = this; var streams = []; streamStream.on('stream', function(stream){ stream.pause(); streams.push(stream); ensureState(); }); streamStream.on('end', function() { streamsEnded = true; ensureState(); }); var ensureState = function() { if(isStreaming) return; if(streams.length == 0) { if(streamsEnded) that.emit('end'); return; } isStreaming = true; streams[0].on('data', onData); streams[0].on('end', onEnd); streams[0].resume(); }; var onData = function(data) { that.emit('data', data); }; var onEnd = function() { isStreaming = false; streams[0].removeAllListeners('data'); streams[0].removeAllListeners('end'); streams.shift(); ensureState(); }; } util.inherits(ConcatStream, EventEmitter);
我们跟踪streams
的状态( streams
的队列; push
回来,从前面shift
), isStreaming
和streamsEnded
。 当我们得到一个新的stream,我们推它,当一个stream结束时,我们停止聆听和转移。 当stream的stream结束时,我们设置streamsEnded
。
在每一个事件中,我们检查我们所处的状态。如果我们已经在stream式传输(pipe道stream),我们什么也不做。 如果队列是空的并且streamsEnded
被设置,我们发出end
事件。 如果队列中有东西,我们恢复并听取它的事件。
*请注意pause
和resume
是build议性的,所以一些stream可能不正确,并需要缓冲。 这个练习留给读者。
完成了所有这些之后,我将通过构造一个EventEmitter
,创build一个ConcatStream
并发出两个stream
事件,然后是一个end
事件来完成n=2
情况。 我相信它可以做得更简洁,但我们也可以使用我们所拥有的。
这可以用香草nodejs完成
import { PassThrough } from 'stream' const merge = (...streams) => { let pass = new PassThrough() let waiting = streams.length for (let stream of streams) { pass = stream.pipe(pass, {end: false}) stream.once('end', () => --waiting === 0 && pass.emit('end')) } return pass }
合并stream2的示例代码:
var CombinedStream = require('combined-stream2'); var fs = require('fs'); var combinedStream = CombinedStream.create(); combinedStream.append(fs.createReadStream('file1.txt')); combinedStream.append(fs.createReadStream('file2.txt')); combinedStream.pipe(fs.createWriteStream('combined.txt'));
streamee.js是基于节点1.0+stream的一组stream转换器和composer php,并且包括连接方法:
var stream1ThenStream2 = streamee.concatenate([stream1, stream2]);