如何在Node.js中为非常大(> 1GB)的文件的每一行运行一个asynchronous函数
假设你有一个巨大的(> 1GB)loggingID的CSV:
655453 4930285 493029 4930301 493031 ...
对于每个id
您都希望进行REST API调用以获取logging数据,将其转换为本地数据,然后将其插入到本地数据库中。
你如何做到这一点与Node.js的可读Stream
?
我的问题基本上是这样的:你如何逐行阅读一个非常大的文件,为每一行运行一个asynchronous函数,并且[可选地]能够从特定行开始读取文件?
从下面的Quora问题我开始学习使用fs.createReadStream
:
http://www.quora.com/What-is-the-best-way-to-read-a-file-line-by-line-in-node-js
var fs = require('fs'); var lazy = require('lazy'); var stream = fs.createReadStream(path, { flags: 'r', encoding: 'utf-8' }); new lazy(stream).lines.forEach(function(line) { var id = line.toString(); // pause stream stream.pause(); // make async API call... makeAPICall(id, function() { // then resume to process next id stream.resume(); }); });
但是,这个伪码不起作用,因为lazy
模块强迫你读整个文件(作为一个stream,但是没有暂停)。 所以这种方法似乎不会工作。
另一件事是,我想能够开始处理这个文件从一个特定的行。 原因是,处理每个id
(使api调用,清理数据等)每个logging可能需要半秒钟,所以我不想每次都从文件的开始处开始。 我想使用的天真的方法是捕获最后处理的id的行号,并保存。 然后,当您再次parsing文件时,您将逐行浏览所有标识符,直到find您留下的行号,然后执行makeAPICall
业务。 另一种天真的方法是编写小文件(比如说100个ID),并且一次处理每个文件(足够小的数据集以在没有IOstream的情况下完成所有内容)。 有一个更好的方法吗?
我可以看到这是如何变得棘手(和节点懒惰进来),因为stream.on('data', function(chunk) {});
可能只包含一行的一部分 (如果bufferSize很小,每个块可能是10行,但是因为id
是可变长度,所以可能只有9.5行或其他)。 这就是为什么我想知道什么是最好的办法是上述问题。
我想你不需要使用node-lazy
。 以下是我在Node文档中find的内容:
事件 :
data
function (data) { }
如果使用
setEncoding()
则data
事件将发出Buffer
(默认)或string
。
所以这意味着你在你的stream上调用setEncoding()
,那么你的data
事件callback将接受一个string参数。 然后在这个callback中,你可以调用使用.pause()
和.resume()
方法。
伪代码应该是这样的:
stream.setEncoding('utf8'); stream.addListener('data', function (line) { // pause stream stream.pause(); // make async API call... makeAPICall(line, function() { // then resume to process next line stream.resume(); }); })
尽pipe文档没有明确指定该stream是逐行读取的,但我认为这是文件stream的情况。 至less在其他语言和平台上,文本stream是以这种方式工作的,我没有看到节点stream不同的理由。
相关的安德鲁АндрейЛисточкин的回答:
您可以使用像byline这样的模块为每一行获取单独的data
事件。 它是原始文件stream的转换stream,它为每个块产生一个data
事件。 这可以让你在每一行之后暂停。
byline
不会将整个文件看成像lazy
这样的内存显然。
var fs = require('fs'); var byline = require('byline'); var stream = fs.createReadStream('bigFile.txt'); stream.setEncoding('utf8'); // Comment out this line to see what the transform stream changes. stream = byline.createStream(stream); // Write each line to the console with a delay. stream.on('data', function(line) { // Pause until we're done processing this line. stream.pause(); setTimeout(() => { console.log(line); // Resume processing. stream.resume(); }, 200); });