正确的方法来清除一个streams2pipe道并清空它(不只是刷新)
前提
我试图find正确的方法提前终止在Node.js中的一系列pipe道stream(pipe道):有时我想优雅地中止stream在它完成之前。 具体来说,我正在处理大多数objectMode: true
和非本地的并行stream,但这应该不重要。
问题
问题是当我unpipe
调整pipe道时,数据保留在每个stream的缓冲区中,并被drain
。 对于大多数中间stream(例如Readable
/ Transform
)来说,这可能是可以的,但最后一个Writable
仍然会stream到它的写入目标(例如文件或数据库或套接字或w / e)。 如果缓冲区包含数百或数千个需要耗费大量时间的块,这可能是有问题的。 我希望它立即停止,即不要stream失; 为什么浪费周期和内存上的数据无关紧要?
根据我走的路线,我会收到一个“写完后”的错误,或者当一个stream找不到现有pipe道的exception。
题
a.pipe(b).pipe(c).pipe(z)
的formsa.pipe(b).pipe(c).pipe(z)
streampipe道的正确方法是什么?
解?
我提出的解决scheme是3步:
- 以相反的顺序在pipe道中
unpipe
每个stream - 清空实现
Writable
每个stream的缓冲区 -
end
实现Writable
每个stream
一些说明整个过程的伪代码:
var pipeline = [ // define the pipeline readStream, transformStream0, transformStream1, writeStream ]; // build and start the pipeline var tmpBuildStream; pipeline.forEach(function(stream) { if ( !tmpBuildStream ) { tmpBuildStream = stream; continue; } tmpBuildStream = lastStream.pipe(stream); }); // sleep, timeout, event, etc... // tear down the pipeline var tmpTearStream; pipeline.slice(0).reverse().forEach(function(stream) { if ( !tmpTearStream ) { tmpTearStream = stream; continue; } tmpTearStream = stream.unpipe(tmpTearStream); }); // empty and end the pipeline pipeline.forEach(function(stream) { if ( typeof stream._writableState === 'object' ) { // empty stream._writableState.length -= stream._writableState.buffer.length; stream._writableState.buffer = []; } if ( typeof stream.end === 'function' ) { // kill stream.end(); } });
我真的很担心stream._writableState
的使用和修改内部buffer
和length
属性( _
表示私有属性)。 这似乎是一个黑客。 还要注意,因为我是pipe道系统,所以pause
和resume
我们的问题(根据我从IRC收到的build议)。
我也放在一起可以从github抓取的可运行版本(很sl)): https : //github.com/zamnuts/multipipe-proto (git clone,npm install,view readme,npm start)
在这个特殊的情况下,我想我们应该摆脱你有4个不同的不完全定制的stream的结构。 如果我们没有实现我们自己的机制,将它们连接在一起会产生链式依赖,这将很难控制。
我想在这里专注于你的真正目标:
INPUT >----[read] → [transform0] → [transform1] → [write]-----> OUTPUT | | | | KILL_ALL------o----------o--------------o------------o--------[nothing to drain]
我相信上述结构可以通过定制来实现:
-
duplex stream
– 用于自己_write(chunk, encoding, cb)
和_read(bytes)
实现 -
transform stream
– 用于自己的_transform(chunk, encoding, cb)
实现。
由于您正在使用
writable-stream-parallel
包,所以您可能还需要查看它们的库,因为它们的duplex
实现可以在这里find: https : //github.com/Clever/writable-stream-parallel/blob/master/ lib / duplex.js 。 他们的transform stream
实现在这里: https : //github.com/Clever/writable-stream-parallel/blob/master/lib/transform.js 。 在这里他们处理highWaterMark 。
可能的scheme
他们的写stream: https : //github.com/Clever/writable-stream-parallel/blob/master/lib/writable.js#L189有一个有趣的functionwriteOrBuffer
,我想你可能会稍微调整一下,打断从缓冲区写入数据。
注意:这3个标志正在控制缓冲区清除:
( !finished && !state.bufferProcessing && state.buffer.length )
参考文献:
- Node.js转换stream文档
- Node.js双工stream文档
- 在Node.js中编写转换stream
- 在Node.js中写入双工stream