什么是正确的方法来处理node.js转换stream中的背压?

介绍

这些是我编写node.js服务器端的第一次冒险。 到目前为止它一直很有趣,但是我很难理解实现与node.jsstream相关的东西的正确方法。

问题

为了testing和学习的目的,我正在处理内容为zlib压缩的大文件。 压缩的内容是二进制数据,每个数据包的长度是38个字节。 我试图创build一个结果文件,看起来几乎与原始文件相同,除了每个1024 38字节的数据包有一个未压缩的31字节头。

原始文件内容(解压缩)

+----------+----------+----------+----------+ | packet 1 | packet 2 | ...... | packet N | | 38 bytes | 38 bytes | ...... | 38 bytes | +----------+----------+----------+----------+ 

产生的文件内容

 +----------+--------------------------------+----------+--------------------------------+ | header 1 | 1024 38 byte packets | header 2 | 1024 38 byte packets | | 31 bytes | zlib compressed | 31 bytes | zlib compressed | +----------+--------------------------------+----------+--------------------------------+ 

正如你所看到的,这是一个翻译问题。 也就是说,我将一些源码stream作为input,然后将其稍微转换为一些输出stream。 因此,实现一个转换stream是很自然的。

课堂只是试图完成以下内容:

  1. 以stream为input
  2. zlib通过膨胀数据块来计算数据包数量,将1024个数据包放在一起,zlib放缩,并预先添加一个头部。
  3. 通过this.push(chunk)通过pipe道传递新的结果块。

用例如下所示:

 var fs = require('fs'); var me = require('./me'); // Where my Transform stream code sits var inp = fs.createReadStream('depth_1000000'); var out = fs.createWriteStream('depth_1000000.out'); inp.pipe(me.createMyTranslate()).pipe(out); 

问题(S)

假设变换对于这个用例来说是个不错的select,我似乎正在遇到一个可能的背压问题。 我对this.push(chunk)_transform一直返回false 。 为什么会这样,以及如何处理这样的事情?

我认为Transform是适合这个,但我会执行膨胀在pipe道中的一个单独的步骤。

这里有一个很快且很大程度上未经testing的例子:

 var zlib = require('zlib'); var stream = require('stream'); var transformer = new stream.Transform(); // Properties used to keep internal state of transformer. transformer._buffers = []; transformer._inputSize = 0; transformer._targetSize = 1024 * 38; // Dump one 'output packet' transformer._dump = function(done) { // concatenate buffers and convert to binary string var buffer = Buffer.concat(this._buffers).toString('binary'); // Take first 1024 packets. var packetBuffer = buffer.substring(0, this._targetSize); // Keep the rest and reset counter. this._buffers = [ new Buffer(buffer.substring(this._targetSize)) ]; this._inputSize = this._buffers[0].length; // output header this.push('HELLO WORLD'); // output compressed packet buffer zlib.deflate(packetBuffer, function(err, compressed) { // TODO: handle `err` this.push(compressed); if (done) { done(); } }.bind(this)); }; // Main transformer logic: buffer chunks and dump them once the // target size has been met. transformer._transform = function(chunk, encoding, done) { this._buffers.push(chunk); this._inputSize += chunk.length; if (this._inputSize >= this._targetSize) { this._dump(done); } else { done(); } }; // Flush any remaining buffers. transformer._flush = function() { this._dump(); }; // Example: var fs = require('fs'); fs.createReadStream('depth_1000000') .pipe(zlib.createInflate()) .pipe(transformer) .pipe(fs.createWriteStream('depth_1000000.out')); 

如果正在写入的stream(在这种情况下是文件输出stream)的缓冲数据太多, push将返回false。 由于您正在写入磁盘,这是有道理的:您正在处理数据的速度比写出更快。

当缓冲区满时,转换stream将无法推送,并开始缓冲数据本身。 如果缓冲区应该填满,那么inp将会开始填充。 这是事情应该如何工作。 pipe道stream只处理数据,就像链中最慢的链接可以处理数据(一旦缓冲区已满)。

从2013年起,这个问题就是我在创build节点变换stream时如何处理“背压”问题。

从节点7.10.0 转换stream和可读stream文档我收集的是,一旦push返回false,没有别的应该推,直到_read被调用。

除了提到基类Transform类实现它(和_write)之外,Transform文档没有提及_read 。 我发现在可读stream文档中调用返回false和_read的信息。

我在转换反压方面发现的唯一的另一个权威评论只提到它是一个问题,并且在节点文件_stream_transform.js的顶部注释 。

以下是关于背部压力的部分:

 // This way, back-pressure is actually determined by the reading side, // since _read has to be called to start processing a new chunk. However, // a pathological inflate type of transform can cause excessive buffering // here. For example, imagine a stream where every byte of input is // interpreted as an integer from 0-255, and then results in that many // bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in // 1kb of data being output. In this case, you could write a very small // amount of input, and end up with a very large amount of output. In // such a pathological inflating mechanism, there'd be no way to tell // the system to stop doing the transform. A single 4MB write could // cause the system to run out of memory. // // However, even in such a pathological case, only a single written chunk // would be consumed, and then the rest would wait (un-transformed) until // the results of the previous transformed chunk were consumed. 

解决scheme示例

下面是我拼凑在一起的解决scheme,以处理转换stream中的背压问题,我非常肯定。 (我没有写任何真正的testing,这需要写一个可写的stream来控制背压。)

这是一个基本的线性变换,它需要线性变换的工作,但是却performance出处理“背压”的问题。

 const stream = require('stream'); class LineTransform extends stream.Transform { constructor(options) { super(options); this._lastLine = ""; this._continueTransform = null; this._transforming = false; this._debugTransformCallCount = 0; } _transform(chunk, encoding, callback) { if (encoding === "buffer") return callback(new Error("Buffer chunks not supported")); if (this._continueTransform !== null) return callback(new Error("_transform called before previous transform has completed.")); // DEBUG: Uncomment for debugging help to see what's going on //console.error(`${++this._debugTransformCallCount} _transform called:`); // Guard (so we don't call _continueTransform from _read while it is being // invoked from _transform) this._transforming = true; // Do our transforming (in this case splitting the big chunk into lines) let lines = (this._lastLine + chunk).split(/\r\n|\n/); this._lastLine = lines.pop(); // In order to respond to "back pressure" create a function // that will push all of the lines stopping when push returns false, // and then resume where it left off when called again, only calling // the "callback" once all lines from this transform have been pushed. // Resuming (until done) will be done by _read(). let nextLine = 0; this._continueTransform = () => { let backpressure = false; while (nextLine < lines.length) { if (!this.push(lines[nextLine++] + "\n")) { // we've got more to push, but we got backpressure so it has to wait. if (backpressure) return; backpressure = !this.push(lines[nextLine++] + "\n"); } } // DEBUG: Uncomment for debugging help to see what's going on //console.error(`_continueTransform ${this._debugTransformCallCount} finished\n`); // All lines are pushed, remove this function from the LineTransform instance this._continueTransform = null; return callback(); }; // Start pushing the lines this._continueTransform(); // Turn off guard allowing _read to continue the transform pushes if needed. this._transforming = false; } _flush(callback) { if (this._lastLine.length > 0) { this.push(this._lastLine); this._lastLine = ""; } return callback(); } _read(size) { // DEBUG: Uncomment for debugging help to see what's going on //if (this._transforming) // console.error(`_read called during _transform ${this._debugTransformCallCount}`); // If a transform has not pushed every line yet, continue that transform // otherwise just let the base class implementation do its thing. if (!this._transforming && this._continueTransform !== null) this._continueTransform(); else super._read(size); } } 

我通过在〜10000行〜200KB的文件中注释DEBUG行来testing上面的代码。 将stdout或stderrredirect到一个文件(或两者)以将debugging语句与预期的输出分开。 ( node test.js > out.log 2> err.log

 const fs = require('fs'); let inStrm = fs.createReadStream("testdata/largefile.txt", { encoding: "utf8" }); let lineStrm = new LineTransform({ encoding: "utf8", decodeStrings: false }); inStrm.pipe(lineStrm).pipe(process.stdout); 

有用的debugging提示

虽然最初写这个时,我并没有意识到_read可以 _transform返回之前 _transform ,所以我没有实现这个this._transforming警卫,我得到以下错误:

 Error: no writecb in Transform class at afterTransform (_stream_transform.js:71:33) at TransformState.afterTransform (_stream_transform.js:54:12) at LineTransform._continueTransform (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:44:13) at LineTransform._transform (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:46:21) at LineTransform.Transform._read (_stream_transform.js:167:10) at LineTransform._read (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:56:15) at LineTransform.Transform._write (_stream_transform.js:155:12) at doWrite (_stream_writable.js:331:12) at writeOrBuffer (_stream_writable.js:317:5) at LineTransform.Writable.write (_stream_writable.js:243:11) 

看着节点的实现,我意识到这个错误意味着给_transform的callback被_transform调用。 没有太多关于这个错误的信息,所以我想我会包括我在这里弄清楚的。