实现缓冲变换stream

我正在试图用新的Node.jsstreamAPI实现一个stream缓冲一定数量的数据。 当这个数据stream被传送到另一个数据stream,或者某些事件消耗了readable事件时,这个数据stream应该刷新它的缓冲区,然后直接传递。 捕获的是,这个stream将被传送到许多其他stream中,并且当每个目标stream被附加时, 即使缓冲区已经被刷新到另一个stream ,缓冲区也必须被刷新。

例如:

  1. BufferStream实现stream.Transform ,并保留512KB的内部环形缓冲区
  2. ReadableStreamA被传递给BufferStream一个实例
  3. BufferStream写入其环形缓冲区,从ReadableStreamA读取数据(数据丢失并不重要,因为缓冲区会覆盖旧数据)。
  4. BufferStream传送到WritableStreamB
  5. WritableStreamB接收整个512KB缓冲区,并继续获取数据,因为它是通过BufferStreamReadableStreamA写入的。
  6. BufferStream传送到WritableStreamC
  7. WritableStreamC也接收整个512KB的缓冲区,但是这个缓冲区现在不同于WritableStreamB接收的缓冲区,因为更多的数据已经被写入BufferStream

这是可能的streamAPI? 我能想到的唯一方法是用一个方法创build一个对象,为每个目标创build一个新的PassThroughstream,这意味着我不能简单地通过它。

对于什么是值得的,我只是在data事件上监听新的处理程序,而使用旧的“stream动”API来完成这项工作。 使用.on('data')附加一个新函数时,我会直接用一个环形缓冲区的副本来调用它。

这是我对你的问题的看法。

基本的想法是创build一个Transformstream,这将允许我们在stream的输出上发送数据之前执行您的自定义缓冲逻辑:

 var util = require('util') var stream = require('stream') var BufferStream = function (streamOptions) { stream.Transform.call(this, streamOptions) this.buffer = new Buffer('') } util.inherits(BufferStream, stream.Transform) BufferStream.prototype._transform = function (chunk, encoding, done) { // custom buffering logic // ie. add chunk to this.buffer, check buffer size, etc. this.buffer = new Buffer(chunk) this.push(chunk) done() } 

然后,我们需要覆盖.pipe()方法,这样当BufferStream传送到一个stream中的时候,我们会得到通知,这允许我们自动向它写入数据:

 BufferStream.prototype.pipe = function (destination, options) { var res = BufferStream.super_.prototype.pipe.call(this, destination, options) res.write(this.buffer) return res } 

这样,当我们写buffer.pipe(someStream) ,我们按照想要的执行pipe道,并且把内部缓冲区写入输出stream。 之后, Transform课程负责处理所有事情,同时logging背压和其他情况。

这是一个工作的要点 。 请注意,我没有打扰写一个正确的缓冲逻辑(即我不关心内部缓冲区的大小),但这应该很容易解决。

保罗的答案是好的,但我不认为它符合确切的要求。 这听起来像是需要发生的一个事情是,每当在这个变换stream上调用pipe()时,它需要首先清空表示在变换stream被创build/(连接到源stream)之间的所有数据累积和它连接到当前可写/目标stream的时间。

像这样的东西可能会更正确:

  var BufferStream = function () { stream.Transform.apply(this, arguments); this.buffer = []; //I guess an array will do }; util.inherits(BufferStream, stream.Transform); BufferStream.prototype._transform = function (chunk, encoding, done) { this.push(chunk ? String(chunk) : null); this.buffer.push(chunk ? String(chunk) : null); done() }; BufferStream.prototype.pipe = function (destination, options) { var res = BufferStream.super_.prototype.pipe.apply(this, arguments); this.buffer.forEach(function (b) { res.write(String(b)); }); return res; }; return new BufferStream(); 

我想这个:

 BufferStream.super_.prototype.pipe.apply(this, arguments); 

相当于这个:

 stream.Transform.prototype.pipe.apply(this, arguments); 

你可能可以优化这个,并使用一些标志时,调用pipe / unpipe。