实现缓冲变换stream
我正在试图用新的Node.jsstreamAPI实现一个stream缓冲一定数量的数据。 当这个数据stream被传送到另一个数据stream,或者某些事件消耗了readable
事件时,这个数据stream应该刷新它的缓冲区,然后直接传递。 捕获的是,这个stream将被传送到许多其他stream中,并且当每个目标stream被附加时, 即使缓冲区已经被刷新到另一个stream ,缓冲区也必须被刷新。
例如:
-
BufferStream
实现stream.Transform
,并保留512KB的内部环形缓冲区 -
ReadableStreamA
被传递给BufferStream
一个实例 -
BufferStream
写入其环形缓冲区,从ReadableStreamA
读取数据(数据丢失并不重要,因为缓冲区会覆盖旧数据)。 -
BufferStream
传送到WritableStreamB
-
WritableStreamB
接收整个512KB缓冲区,并继续获取数据,因为它是通过BufferStream
从ReadableStreamA
写入的。 -
BufferStream
传送到WritableStreamC
-
WritableStreamC
也接收整个512KB的缓冲区,但是这个缓冲区现在不同于WritableStreamB
接收的缓冲区,因为更多的数据已经被写入BufferStream
。
这是可能的streamAPI? 我能想到的唯一方法是用一个方法创build一个对象,为每个目标创build一个新的PassThroughstream,这意味着我不能简单地通过它。
对于什么是值得的,我只是在data
事件上监听新的处理程序,而使用旧的“stream动”API来完成这项工作。 使用.on('data')
附加一个新函数时,我会直接用一个环形缓冲区的副本来调用它。
这是我对你的问题的看法。
基本的想法是创build一个Transform
stream,这将允许我们在stream的输出上发送数据之前执行您的自定义缓冲逻辑:
var util = require('util') var stream = require('stream') var BufferStream = function (streamOptions) { stream.Transform.call(this, streamOptions) this.buffer = new Buffer('') } util.inherits(BufferStream, stream.Transform) BufferStream.prototype._transform = function (chunk, encoding, done) { // custom buffering logic // ie. add chunk to this.buffer, check buffer size, etc. this.buffer = new Buffer(chunk) this.push(chunk) done() }
然后,我们需要覆盖.pipe()
方法,这样当BufferStream
传送到一个stream中的时候,我们会得到通知,这允许我们自动向它写入数据:
BufferStream.prototype.pipe = function (destination, options) { var res = BufferStream.super_.prototype.pipe.call(this, destination, options) res.write(this.buffer) return res }
这样,当我们写buffer.pipe(someStream)
,我们按照想要的执行pipe道,并且把内部缓冲区写入输出stream。 之后, Transform
课程负责处理所有事情,同时logging背压和其他情况。
这是一个工作的要点 。 请注意,我没有打扰写一个正确的缓冲逻辑(即我不关心内部缓冲区的大小),但这应该很容易解决。
保罗的答案是好的,但我不认为它符合确切的要求。 这听起来像是需要发生的一个事情是,每当在这个变换stream上调用pipe()时,它需要首先清空表示在变换stream被创build/(连接到源stream)之间的所有数据累积和它连接到当前可写/目标stream的时间。
像这样的东西可能会更正确:
var BufferStream = function () { stream.Transform.apply(this, arguments); this.buffer = []; //I guess an array will do }; util.inherits(BufferStream, stream.Transform); BufferStream.prototype._transform = function (chunk, encoding, done) { this.push(chunk ? String(chunk) : null); this.buffer.push(chunk ? String(chunk) : null); done() }; BufferStream.prototype.pipe = function (destination, options) { var res = BufferStream.super_.prototype.pipe.apply(this, arguments); this.buffer.forEach(function (b) { res.write(String(b)); }); return res; }; return new BufferStream();
我想这个:
BufferStream.super_.prototype.pipe.apply(this, arguments);
相当于这个:
stream.Transform.prototype.pipe.apply(this, arguments);
你可能可以优化这个,并使用一些标志时,调用pipe / unpipe。