将数据放回可读stream

TL; DR如何从stream中读取一些数据,然后将其放回,以允许其他消费者获得相同的data事件?

这是一个可读的stream,stream1 … Infinity:

 var Readable = require('stream').Readable; var readable = new Readable(); var c = 0; readable._read = function () { var self = this; setTimeout(function () { self.push((++c).toString()); }, 500); }; 

我想读取第一个data事件,查看数据,然后“重置”stream到其原始状态,并允许其他data侦听器消耗第一个事件,就好像它从未发生过一样。 我认为unshift()将是正确的方法,因为它在文档中说:

readable.unshift(块)#

块缓冲区| string数据的大块不转移到读取队列这在分析器正在使用某个stream的某些情况下非常有用,该分析器需要“消耗”乐观地从源中抽取的一些数据,以便stream可以传递给其他一方。

这听起来对我的需求是完美的,但它不工作,我期望:

 ... readable.once('data', function (d) { console.log(d.toString()); // Outputs 1 readable.unshift(d); // Put the 1 back on the stream readable.on('data', function (d) { console.log(d.toString()); // Heh?! Outputs 2, how about 1? }); }); 

所以我想出了答案:

当您调用stream.unshift()如果stream将处于stream动模式,将立即发送数据事件。 所以在我join听众的时候,这艘船已经航行了。

 readable.unshift(d); // emits 'data' event readable.on('data', function (d) { // missed `data` event console.log(d.toString()); }); 

有几种方法可以使我的预期工作:

1)在取消转移之前添加新的监听器:

 readable.once('data', function (d) { console.log(d.toString()); // Outputs 1 readable.on('data', function (d) { console.log(d.toString()); // Outputs 1,1,2,3... }); readable.unshift(d); // Put the 1 back on the stream }); 

2)暂停和恢复stream:

 readable.once('data', function (d) { console.log(d.toString()); // Outputs 1 readable.pause(); // Stops the stream from flowing readable.unshift(d); // Put the 1 back on the stream readable.on('data', function (d) { console.log(d.toString()); // Outputs 1,1,2,3... }); readable.resume(); // Start the stream flowing again });