Nodejs读取非常大的文件(〜10GB),逐行处理然后写入其他文件

我有一个特殊格式的10 GB日志文件,我想逐行处理这个文件 ,然后在输出一些转换之后把输出写到其他文件中 。 我正在使用节点进行此操作。

虽然这个方法很好,但是这样做需要很多时间。 我可以在JAVA 30-45分钟内做到这一点,但是在节点上做同样的工作需要160分钟以上。 以下是代码:

以下是从input读取每行的启动代码。

var path = '../10GB_input_file.txt'; var output_file = '../output.txt'; function fileopsmain(){ fs.exists(output_file, function(exists){ if(exists) { fs.unlink(output_file, function (err) { if (err) throw err; console.log('successfully deleted ' + output_file); }); } }); new lazy(fs.createReadStream(path, {bufferSize: 128 * 4096})) .lines .forEach(function(line){ var line_arr = line.toString().split(';'); perform_line_ops(line_arr, line_arr[6], line_arr[7], line_arr[10]); } ); } 

这是对该行执行某些操作并将input传递给写入方法以将其写入输出文件的方法。

 function perform_line_ops(line_arr, range_start, range_end, daynums){ var _new_lines = ''; for(var i=0; i<days; i++){ //perform some operation to modify line pass it to print } write_line_ops(_new_lines); } 

以下方法用于将数据写入新文件。

 function write_line_ops(line) { if(line != null && line != ''){ fs.appendFileSync(output_file, line); } } 

我想把这个时间降低到15-20分钟。 有没有可能这样做。

同样的logging,我正在尝试使用8 GB内存的英特尔i7处理器

没有模块就可以轻松完成 例如:

 var fs = require('fs'); var inspect = require('util').inspect; var buffer = ''; var rs = fs.createReadStream('foo.log'); rs.on('data', function(chunk) { var lines = (buffer + chunk).split(/\r?\n/g); buffer = lines.pop(); for (var i = 0; i < lines.length; ++i) { // do something with `lines[i]` console.log('found line: ' + inspect(lines[i])); } }); rs.on('end', function() { // optionally process `buffer` here if you want to treat leftover data without // a newline as a "line" console.log('ended on non-empty buffer: ' + inspect(buffer)); }); 

我无法猜测代码中可能的瓶颈。

  • 你可以添加库或lazy函数的源代码?
  • 你的perform_line_ops有多less个操作? (if / else,开关/大小写,函数调用)

我已经创build了一个基于你给定的代码的例子,我知道这不能回答你的问题,但也许可以帮助你了解节点如何处理这种情况。

 const fs = require('fs') const path = require('path') const inputFile = path.resolve(__dirname, '../input_file.txt') const outputFile = path.resolve(__dirname, '../output_file.txt') function bootstrap() { // fs.exists is deprecated // check if output file exists // https://nodejs.org/api/fs.html#fs_fs_exists_path_callback fs.exists(outputFile, (exists) => { if (exists) { // output file exists, delete it // https://nodejs.org/api/fs.html#fs_fs_unlink_path_callback fs.unlink(outputFile, (err) => { if (err) { throw err } console.info(`successfully deleted: ${outputFile}`) checkInputFile() }) } else { // output file doesn't exist, move on checkInputFile() } }) } function checkInputFile() { // check if input file can be read // https://nodejs.org/api/fs.html#fs_fs_access_path_mode_callback fs.access(inputFile, fs.constants.R_OK, (err) => { if (err) { // file can't be read, throw error throw err } // file can be read, move on loadInputFile() }) } function saveToOutput() { // create write stream // https://nodejs.org/api/fs.html#fs_fs_createwritestream_path_options const stream = fs.createWriteStream(outputFile, { flags: 'w' }) // return wrapper function which simply writes data into the stream return (data) => { // check if the stream is writable if (stream.writable) { if (data === null) { stream.end() } else if (data instanceof Array) { stream.write(data.join('\n')) } else { stream.write(data) } } } } function parseLine(line, respond) { respond([line]) } function loadInputFile() { // create write stream const saveOutput = saveToOutput() // create read stream // https://nodejs.org/api/fs.html#fs_fs_createreadstream_path_options const stream = fs.createReadStream(inputFile, { autoClose: true, encoding: 'utf8', flags: 'r' }) let buffer = null stream.on('data', (chunk) => { // append the buffer to the current chunk const lines = (buffer !== null) ? (buffer + chunk).split('\n') : chunk.split('\n') const lineLength = lines.length let lineIndex = -1 // save last line for later (last line can be incomplete) buffer = lines[lineLength - 1] // loop trough all lines // but don't include the last line while (++lineIndex < lineLength - 1) { parseLine(lines[lineIndex], saveOutput) } }) stream.on('end', () => { if (buffer !== null && buffer.length > 0) { // parse the last line parseLine(buffer, saveOutput) } // Passing null signals the end of the stream (EOF) saveOutput(null) }) } // kick off the parsing process bootstrap() 

我知道这是旧的,但…

猜测appendFileSync() _ write() _s到文件系统并等待响应。 许多小写操作通常很贵,假设你在Java中使用了BufferedWriter ,你可以通过跳过一些write()来获得更快的结果。

使用其中一个asynchronous写入操作,查看节点缓冲区是否合理caching,或者将行写入大节点缓冲区直到缓冲区满,并且始终写入完整(或接近满)的缓冲区。 通过调整缓冲区大小,您可以validation写入次数是否影响perf。 我怀疑会这样。

执行速度很慢,因为您没有使用节点的asynchronous操作。 本质上,你正在执行这样的代码:

 > read some lines > transform > write some lines > repeat 

虽然你可以一次做所有事情,或者至less读写。 答案中的一些例子是这样做的,但是语法至less是复杂的。 使用超燃冲压发动机,你可以用简单的几行来完成:

 const {StringStream} = require('scramjet'); fs.createReadStream(path, {bufferSize: 128 * 4096}) .pipe(new StringStream({maxParallel: 128}) // I assume this is an utf-8 file .split("\n") // split per line .parse((line) => line.split(';')) // parse line .map([line_arr, range_start, range_end, daynums] => { return simplyReturnYourResultForTheOtherFileHere( line_arr, range_start, range_end, daynums ); // run your code, return promise if you're doing some async work }) .stringify((result) => result.toString()) .pipe(fs.createWriteStream) .on("finish", () => console.log("done")) .on("error", (e) => console.log("error")) 

这可能会运行得更快。