如何在处理输出时暂停和取消暂停节点对象stream

我目前正在逐行处理一个文件stream,通过运行它发送'line'事件的变换stream。 我希望能够在发现当前行符合某些标准时暂停input文件stream,开始处理新的stream,并且在完成时,逐行恢复处理原始stream。 我把它简化为下面的一个简单例子:

test.coffee:

 fs = require 'fs' TestTransform = require './test-transform' inStream = new TestTransform fs.createReadStream("./test.coffee").pipe(inStream) inStream.on 'line', (line) -> process.stdout.write "-->" if line.match /line\.match/g process.stdout.write line console.error "PAUSE" inStream.pause() fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', -> console.error "UNPAUSE" inStream.resume() else process.stdout.write line 

test-transform.coffee:

 Transform = require('stream').Transform module.exports = class TestTransform extends Transform constructor: -> Transform.call @, readableObjectMode: true @buffer = "" pushLines: -> newlineIndex = @buffer.indexOf "\n" while newlineIndex isnt -1 @push @buffer.substr(0, newlineIndex + 1) @emit 'line', @buffer.substr(0, newlineIndex + 1) @buffer = @buffer.substr(newlineIndex + 1) newlineIndex = @buffer.indexOf "\n" _transform: (chunk, enc, cb) -> @buffer = @buffer + chunk.toString() @pushLines() cb?() _flush: (cb) -> @pushLines() @buffer += "\n" # ending newline @push @buffer @emit 'line', @buffer # push last line @buffer = "" cb?() 

(不要担心转换stream太多,这只是一个例子。)无论如何, coffee test.coffee的输出如下所示:

 -->fs = require 'fs' --> -->TestTransform = require './test-transform' --> -->inStream = new TestTransform --> -->fs.createReadStream("./test.coffee").pipe(inStream) --> -->inStream.on 'line', (line) -> --> process.stdout.write "-->" --> if line.match /line\.match/g PAUSE --> process.stdout.write line --> console.error "PAUSE" --> inStream.pause() --> fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', -> --> console.error "UNPAUSE" --> inStream.unpause() --> else --> process.stdout.write line --> fs = require 'fs' TestTransform = require './test-transform' inStream = new TestTransform fs.createReadStream("./test.coffee").pipe(inStream) inStream.on 'line', (line) -> process.stdout.write "-->" if line.match /line\.match/g process.stdout.write line console.error "PAUSE" inStream.pause() fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', -> console.error "UNPAUSE" inStream.unpause() else process.stdout.write line 

所以显然,pipe道并没有被暂停,而是一直持续到完成(即使PAUSE正在按预期方式运行),并且由于"UNPAUSE"也永远不会被写出来,所以'end'callback永远不会被触发。 切换stream暂停/取消暂停从转换streamreadStream似乎也不工作。 我从这个行为中假设节点stream不知道从事件callback中的暂停/取消暂停。

也可能有另外一种方法来完成这个任务,而不用调用pause / unpause; 如果有一些方法想要等待stream的结束并暂停当前的执行线程,那么这将有效地做我正在做的事情。

如果我已经正确地理解了这个问题,下面是一个简单的Node应用程序,使用Dust.js解决了这个问题。

灰尘是一个模板引擎,但其最大的特点之一就是它对节点stream的原生理解。 这个例子使用Dust 2.7.0。

我使用node-byline作为你的转换stream的替代品,但是它也做了同样的事情 – 逐行读取stream。

 var fs = require('fs'), byline = require('byline'), dust = require('dustjs-linkedin'); var stream = byline(fs.createReadStream('./test.txt', { encoding: 'utf8' })); var template = dust.loadSource(dust.compile('{#byline}--> {.|s}{~n}{match}{/byline}')); dust.stream(template, { byline: stream, match: function(chunk, context) { var currentLine = context.current(); if(currentLine.match(/line\.match/g)) { return fs.createReadStream('./test.txt', 'utf8'); } return chunk; } }).pipe(process.stdout); 

这是我的程序的输出:

 $ node index.js --> fs = require 'fs' --> TestTransform = require './test-transform' --> inStream = new TestTransform --> fs.createReadStream("./test.coffee").pipe(inStream) --> inStream.on 'line', (line) -> --> process.stdout.write "-->" --> if line.match /line\.match/g fs = require 'fs' TestTransform = require './test-transform' inStream = new TestTransform fs.createReadStream("./test.coffee").pipe(inStream) inStream.on 'line', (line) -> process.stdout.write "-->" if line.match /line\.match/g process.stdout.write line console.error "PAUSE" inStream.pause() fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', -> console.error "UNPAUSE" inStream.resume() else process.stdout.write line --> process.stdout.write line --> console.error "PAUSE" --> inStream.pause() --> fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', -> --> console.error "UNPAUSE" --> inStream.resume() --> else --> process.stdout.write line 

正如你所看到的,它正确地交错输出。 如果我可以进一步详细说明“灰尘”部分如何工作,请告诉我。

编辑:这是一个专门的灰尘模板的解释。

 {#byline} {! look for the context variable named `byline` !} {! okay, it's a stream. For each `data` event, output this stuff once !} --> {.|s} {! output the current `data`. Use |s to turn off HTML escaping !} {~n} {! a newline !} {match} {! look up the variable called `match` !} {! okay, it's a function. Run it and insert the result !} {! if the result is a stream, stream it in. !} {/byline} {! done looping !} 

我其实也find了一个单独的答案。 没有那么漂亮,而且还行得通。

本质上, pause()只暂停pipe道stream的输出(处于“stream动”模式); 因为我在听'line'事件,所以没有stream动,所以pause当然什么也没做。 所以第一个解决scheme是使用removeListener而不是pause ,这有效地停止了stream。 该文件现在看起来像:

 fs = require 'fs' TestTransform = require './test-transform' inStream = new TestTransform fs.createReadStream("./test.coffee").pipe(inStream) c = (line) -> process.stdout.write "-->" if line.match /line\.match/g process.stdout.write line console.error "PAUSE" inStream.removeListener 'line', c f = fs.createReadStream("./test.coffee") f.on 'end', -> console.error "UNPAUSE" inStream.on 'line', c f.pipe(process.stdout) else process.stdout.write line inStream.on 'line', c 

这产生了几乎可以工作的输出:

 -->fs = require 'fs' -->TestTransform = require './test-transform' -->inStream = new TestTransform -->fs.createReadStream("./test.coffee").pipe(inStream) -->c = (line) -> --> process.stdout.write "-->" --> if line.match /line\.match/g PAUSE fs = require 'fs' TestTransform = require './test-transform' inStream = new TestTransform fs.createReadStream("./test.coffee").pipe(inStream) c = (line) -> process.stdout.write "-->" if line.match /line\.match/g process.stdout.write line console.error "PAUSE" inStream.removeListener 'line', c f = fs.createReadStream("./test.coffee") f.on 'end', -> console.error "UNPAUSE" inStream.on 'line', c f.pipe(process.stdout) else process.stdout.write line inStream.on 'line', c UNPAUSE 

但是,当我删除侦听器时,它看起来像刚刚停止的原始可读stream; 这使得一些扭曲的感觉(我猜节点垃圾收集其可读的stream,当所有的听众已被删除)。 所以我发现最终的工作解决scheme取决于pipe道。 由于上面显示的Transformstream也会将其输出按行推送到任何'data'侦听器,因此可以在此处有效使用pause()来实现其原始目标,而不仅仅是杀死stream。 最终输出:

 fs = require 'fs' TestTransform = require './test-transform' inStream = new TestTransform fs.createReadStream("./test.coffee").pipe(inStream) inStream.on 'data', (chunk) -> line = chunk.toString() process.stdout.write "-->#{line}" if line.match /line\.match/g inStream.pause() f = fs.createReadStream("./test.coffee") f.on 'end', -> inStream.resume() f.pipe(process.stdout) 

输出:

 -->fs = require 'fs' -->TestTransform = require './test-transform' -->inStream = new TestTransform -->fs.createReadStream("./test.coffee").pipe(inStream) -->inStream.on 'data', (chunk) -> --> line = chunk.toString() --> process.stdout.write "-->#{line}" --> if line.match /line\.match/g fs = require 'fs' TestTransform = require './test-transform' inStream = new TestTransform fs.createReadStream("./test.coffee").pipe(inStream) inStream.on 'data', (chunk) -> line = chunk.toString() process.stdout.write "-->#{line}" if line.match /line\.match/g inStream.pause() f = fs.createReadStream("./test.coffee") f.on 'end', -> inStream.resume() f.pipe(process.stdout) --> inStream.pause() --> f = fs.createReadStream("./test.coffee") --> f.on 'end', -> --> inStream.resume() --> f.pipe(process.stdout) --> 

这是预期的结果。