如何在Node.js中为非常大(> 1GB)的文件的每一行运行一个asynchronous函数

假设你有一个巨大的(> 1GB)loggingID的CSV:

655453 4930285 493029 4930301 493031 ... 

对于每个id您都希望进行REST API调用以获取logging数据,将其转换为本地数据,然后将其插入到本地数据库中。

你如何做到这一点与Node.js的可读Stream

我的问题基本上是这样的:你如何逐行阅读一个非常大的文件,为每一行运行一个asynchronous函数,并且[可选地]能够从特定行开始读取文件?

从下面的Quora问题我开始学习使用fs.createReadStream

http://www.quora.com/What-is-the-best-way-to-read-a-file-line-by-line-in-node-js

 var fs = require('fs'); var lazy = require('lazy'); var stream = fs.createReadStream(path, { flags: 'r', encoding: 'utf-8' }); new lazy(stream).lines.forEach(function(line) { var id = line.toString(); // pause stream stream.pause(); // make async API call... makeAPICall(id, function() { // then resume to process next id stream.resume(); }); }); 

但是,这个伪码不起作用,因为lazy模块强迫你读整个文件(作为一个stream,但是没有暂停)。 所以这种方法似乎不会工作。

另一件事是,我想能够开始处理这个文件从一个特定的行。 原因是,处理每个id (使api调用,清理数据等)每个logging可能需要半秒钟,所以我不想每次都从文件的开始处开始。 我想使用的天真的方法是捕获最后处理的id的行号,并保存。 然后,当您再次parsing文件时,您将逐行浏览所有标识符,直到find您留下的行号,然后执行makeAPICall业务。 另一种天真的方法是编写小文件(比如说100个ID),并且一次处理每个文件(足够小的数据集以在没有IOstream的情况下完成所有内容)。 有一个更好的方法吗?

我可以看到这是如何变得棘手(和节点懒惰进来),因为stream.on('data', function(chunk) {}); 可能只包含一行的一部分 (如果bufferSize很小,每个块可能是10行,但是因为id是可变长度,所以可能只有9.5行或其他)。 这就是为什么我想知道什么是最好的办法是上述问题。

我想你不需要使用node-lazy 。 以下是我在Node文档中find的内容:

事件data

 function (data) { } 

如果使用setEncoding()data事件将发出Buffer (默认)或string

所以这意味着你在你的stream上调用setEncoding() ,那么你的data事件callback将接受一个string参数。 然后在这个callback中,你可以调用使用.pause().resume()方法。

伪代码应该是这样的:

 stream.setEncoding('utf8'); stream.addListener('data', function (line) { // pause stream stream.pause(); // make async API call... makeAPICall(line, function() { // then resume to process next line stream.resume(); }); }) 

尽pipe文档没有明确指定该stream是逐行读取的,但我认为这是文件stream的情况。 至less在其他语言和平台上,文本stream是以这种方式工作的,我没有看到节点stream不同的理由。

相关的安德鲁АндрейЛисточкин的回答:

您可以使用像byline这样的模块为每一行获取单独的data事件。 它是原始文件stream的转换stream,它为每个块产生一个data事件。 这可以让你在每一行之后暂停。

byline不会将整个文件看成像lazy这样的内存显然。

 var fs = require('fs'); var byline = require('byline'); var stream = fs.createReadStream('bigFile.txt'); stream.setEncoding('utf8'); // Comment out this line to see what the transform stream changes. stream = byline.createStream(stream); // Write each line to the console with a delay. stream.on('data', function(line) { // Pause until we're done processing this line. stream.pause(); setTimeout(() => { console.log(line); // Resume processing. stream.resume(); }, 200); });