在NodeJS中使用stream和asynchronous读取和处理大小文件

我在逐行处理文件列表时遇到问题。 这是我正在使用的代码:

var LineReader = require("line-by-line"); var async = require("async"); var files = [ "small.txt", "medium.txt", "large.txt" ]; var queue = async.queue(function(task, next){ console.log(task); next(); }, 10); async.eachSeries( files, function (file, callback) { var lineReader = new LineReader(file, { encoding: "utf8", skipEmptyLines: true }); lineReader.on("error", function (err) { callback(err); }); lineReader.on("line", function (line) { lineReader.pause(); queue.push(line); }); queue.drain = function () { lineReader.resume(); // I need to resume the stream ! callback(); // When all lines have been processed, I need to read the next file }; }, function (err) { if (err) return console.log(err); console.log("Job done."); } ); 

我使用asynchronous “同步”处理每个文件,并处理队列中的每一行,并逐行逐行读取每个文件。

我的问题是:

  • 如果我暂停了stream,请将该行推送到队列,并在出现此错误后继续stream

RangeError:超出最大调用堆栈大小

  • 如果我暂停了这个stream,将这一行推送到队列并等待队列为空,我不能恢复这个stream并执行callback

q.drain = function(){lineReader.resume(); 回电话(); };

我怎么能等到所有的行已经被处理,并执行callback来处理下一个文件?

谢谢。

更新:

我用“逐行”模块发现了一个奇怪的事情。 “结束”事件发出两次。 所以我决定重构代码,我发现问题来了。 另一个问题是:模块一年没有更新,一个月前发送了两个pull请求。

这是我的解决scheme(如果一行一行地工作):

 var LineReader = require("line-by-line"); var async = require("async"); var files = [ "small.txt", "medium.txt", "large.txt" ]; var queue = async.queue(function(task, next){ console.log(task); next(); }, 10); async.eachSeries( files, function (file, callback) { var lineReader = new LineReader(file, { encoding: "utf8", skipEmptyLines: true }); lineReader.on("error", function (err) { callback(err); }); lineReader.on("end", function () { callback(); }); lineReader.on("line", function (line) { lineReader.pause(); queue.push(line); }); queue.drain = function () { lineReader.resume(); }; }, function (err) { if (err) return console.log(err); console.log("Job done."); } ); 

有了这个解决scheme,我们在队列中只有一行。 如果有人有一个想法推动多于一行,然后暂停stream。

我将尝试find没有这个问题的另一个模块,因为我不想重写一个新的模块。

我会解决这个问题完全不同。

不需要监听事件或使用新的stream API暂停。
我会使用through2through2

 var gulp = require('gulp') , thr = require('through2').obj ; function fixLine (line) { // do stuff with a single line of a file. // just return it back for no reason :) return line } files = [ "small.txt", "medium.txt", "large.txt" ] gulp.src(files).pipe(thr(function(vfs, enc, next){ // vfs - vinyl filesystem. var str = vfs.contents.toString().split('\n').map(fixLine).join('\n') vfs.contents = new Buffer(str) next(null, vfs) })) 

然而这是asynchronous的。 不能保证文件的顺序是数组中的顺序。 但显然,这条线是按顺序处理的。

我希望这有帮助。

我喜欢使用这个function:

 function emitLines(stream, re) { re = re || /\n/; var buffer = ''; stream.on('data', stream_data); stream.on('end', stream_end); function stream_data(data) { buffer += data; flush(); } function stream_end() { if (buffer) stream.emmit('line', buffer); } function flush() { var match; while ((match = re.exec(buffer))) { var index = match.index + match[0].length; stream.emit('line', buffer.substring(0, index)); buffer = buffer.substring(index); re.lastIndex = 0; } } } 

在stream上调用此函数时,您的stream将开始广播“行”事件\ o /