在Node.js中暂停readline

考虑下面的代码…我想在读第5行之后暂停stream:

var fs = require('fs'); var readline = require('readline'); var stream = require('stream'); var numlines = 0; var instream = fs.createReadStream("myfile.json"); var outstream = new stream; var readStream = readline.createInterface(instream, outstream); readStream.on('line', function(line){ numlines++; console.log("Read " + numlines + " lines"); if (numlines >= 5) { console.log("Pausing stream"); readStream.pause(); } }); 

输出(下一个复制)表明它在暂停后保持读取行。 也许readline已经在缓冲区中排队了几行,并且正在将它们反馈给我…如果它继续在后台asynchronous读取,这将是有道理的,但基于文档,我不知道适当的行为应该是。 有关如何达到预期效果的任何build议?

 Read 1 lines Read 2 lines Read 3 lines Read 4 lines Read 5 lines Pausing stream Read 6 lines Pausing stream Read 7 lines 

所以,事实certificate,即使在暂停()之后,readlinestream也倾向于“滴落”(即,泄漏一些额外的行)。 文件没有说清楚,但这是事实。

如果你想暂停()切换立即出现,你将不得不创build自己的行缓冲区,并积累自己的剩余行。

有点不直观, 暂停方法不会停止排队行事件 :

调用rl.pause()不会立即暂停由readline.Interface实例发出的其他事件(包括'line' )。

然而,有一个line-by-line命名的第三方模块,其中pause 暂停line事件,直到恢复为止。

 var LineByLineReader = require('line-by-line'), lr = new LineByLineReader('big_file.txt'); lr.on('error', function (err) { // 'err' contains error object }); lr.on('line', function (line) { // pause emitting of lines... lr.pause(); // ...do your asynchronous line processing.. setTimeout(function () { // ...and continue emitting lines. lr.resume(); }, 100); }); lr.on('end', function () { // All lines are read, file is closed now. }); 

(我没有从模块的隶属关系,只是发现它对于处理这个问题很有用。)

添加一些要点:

 .on('pause', function() { console.log(numlines) }) 

你会得到5.它在node.js文件中提到:

  • inputstream没有被暂停接收到SIGCONT事件。 (请参阅事件SIGTSTP和SIGCONT)

所以,我在line事件中创build了一个tmp缓冲区。 使用一个标志来确定它是否被触发暂停。

 .on('line', function(line) { if (paused) { putLineInBulkTmp(line); } else { putLineInBulk(line); } } 

然后在暂停,并恢复:

 .on('pause', function() { paused = true; doSomething(bulk, function(resp) { // clean up bulk for the next. bulk = []; // clone tmp buffer. bulk = clone(bulktmp); bulktmp = []; lr.resume(); }); }) .on('resume', () => { paused = false; }) 

用这种方式来处理这种情况。