正确的方法来清除一个streams2pipe道并清空它(不只是刷新)

前提

我试图find正确的方法提前终止在Node.js中的一系列pipe道stream(pipe道):有时我想优雅地中止stream在它完成之前。 具体来说,我正在处理大多数objectMode: true和非本地的并行stream,但这应该不重要。

问题

问题是当我unpipe调整pipe道时,数据保留在每个stream的缓冲区中,并被drain 。 对于大多数中间stream(例如Readable / Transform )来说,这可能是可以的,但最后一个Writable仍然会stream到它的写入目标(例如文件或数据库或套接字或w / e)。 如果缓冲区包含数百或数千个需要耗费大量时间的块,这可能是有问题的。 我希望它立即停止,即不要stream失; 为什么浪费周期和内存上的数据无关紧要?

根据我走的路线,我会收到一个“写完后”的错误,或者当一个stream找不到现有pipe道的exception。

a.pipe(b).pipe(c).pipe(z)的formsa.pipe(b).pipe(c).pipe(z)streampipe道的正确方法是什么?

解?

我提出的解决scheme是3步:

  1. 以相反的顺序在pipe道中unpipe每个stream
  2. 清空实现Writable每个stream的缓冲区
  3. end实现Writable每个stream

一些说明整个过程的伪代码:

 var pipeline = [ // define the pipeline readStream, transformStream0, transformStream1, writeStream ]; // build and start the pipeline var tmpBuildStream; pipeline.forEach(function(stream) { if ( !tmpBuildStream ) { tmpBuildStream = stream; continue; } tmpBuildStream = lastStream.pipe(stream); }); // sleep, timeout, event, etc... // tear down the pipeline var tmpTearStream; pipeline.slice(0).reverse().forEach(function(stream) { if ( !tmpTearStream ) { tmpTearStream = stream; continue; } tmpTearStream = stream.unpipe(tmpTearStream); }); // empty and end the pipeline pipeline.forEach(function(stream) { if ( typeof stream._writableState === 'object' ) { // empty stream._writableState.length -= stream._writableState.buffer.length; stream._writableState.buffer = []; } if ( typeof stream.end === 'function' ) { // kill stream.end(); } }); 

我真的很担心stream._writableState的使用和修改内部bufferlength属性( _表示私有属性)。 这似乎是一个黑客。 还要注意,因为我是pipe道系统,所以pauseresume我们的问题(根据我从IRC收到的build议)。

我也放在一起可以从github抓取的可运行版本(很sl)): https : //github.com/zamnuts/multipipe-proto (git clone,npm install,view readme,npm start)

在这个特殊的情况下,我想我们应该摆脱你有4个不同的不完全定制的stream的结构。 如果我们没有实现我们自己的机制,将它们连接在一起会产生链式依赖,这将很难控制。

我想在这里专注于你的真正目标:

  INPUT >----[read] → [transform0] → [transform1] → [write]-----> OUTPUT | | | | KILL_ALL------o----------o--------------o------------o--------[nothing to drain] 

我相信上述结构可以通过定制来实现:

  1. duplex stream – 用于自己_write(chunk, encoding, cb)_read(bytes)实现

  2. transform stream – 用于自己的_transform(chunk, encoding, cb)实现。

由于您正在使用writable-stream-parallel包,所以您可能还需要查看它们的库,因为它们的duplex实现可以在这里find: https : //github.com/Clever/writable-stream-parallel/blob/master/ lib / duplex.js 。 他们的transform stream实现在这里: https : //github.com/Clever/writable-stream-parallel/blob/master/lib/transform.js 。 在这里他们处理highWaterMark 。

可能的scheme

他们的写stream: https : //github.com/Clever/writable-stream-parallel/blob/master/lib/writable.js#L189有一个有趣的functionwriteOrBuffer ,我想你可能会稍微调整一下,打断从缓冲区写入数据。

注意:这3个标志正在控制缓冲区清除:

 ( !finished && !state.bufferProcessing && state.buffer.length ) 

参考文献:

  • Node.js转换stream文档
  • Node.js双工stream文档
  • 在Node.js中编写转换stream
  • 在Node.js中写入双工stream