用Node.js编写大文件

我正在用node.js写一个使用可写入stream的大文件:

var fs = require('fs'); var stream = fs.createWriteStream('someFile.txt', { flags : 'w' }); var lines; while (lines = getLines()) { for (var i = 0; i < lines.length; i++) { stream.write( lines[i] ); } } 

我想知道如果这个scheme是安全的,而不使用drain事件? 如果不是的话(我认为是这种情况),将任意大数据写入文件的模式是什么?

这就是我最终做到的 背后的想法是创build实现ReadStream接口的可读stream,然后使用pipe()方法将数据传输到可写stream。

 var fs = require('fs'); var writeStream = fs.createWriteStream('someFile.txt', { flags : 'w' }); var readStream = new MyReadStream(); readStream.pipe(writeStream); writeStream.on('close', function () { console.log('All done!'); }); 

MyReadStream类的示例可以从MyReadStream QueryStream中获取 。

背后的想法是,你会用它来testing在这里:

 var fs = require('fs'); var stream = fs.createWriteStream('someFile.txt', {flags: 'w'}); var lines; while (lines = getLines()) { for (var i = 0; i < lines.length; i++) { stream.write(lines[i]); //<-- the place to test } } 

你不是。 所以你需要重新devise来使其“可重入”。

 var fs = require('fs'); var stream = fs.createWriteStream('someFile.txt', {flags: 'w'}); var lines; while (lines = getLines()) { for (var i = 0; i < lines.length; i++) { var written = stream.write(lines[i]); //<-- the place to test if (!written){ //do something here to wait till you can safely write again //this means prepare a buffer and wait till you can come back to finish // lines[i] -> remainder } } } 

但是,这是否意味着在等待的时候还需要保持缓冲getLine?

 var fs = require('fs'); var stream = fs.createWriteStream('someFile.txt', {flags: 'w'}); var lines, buffer = { remainingLines = [] }; while (lines = getLines()) { for (var i = 0; i < lines.length; i++) { var written = stream.write(lines[i]); //<-- the place to test if (!written){ //do something here to wait till you can safely write again //this means prepare a buffer and wait till you can come back to finish // lines[i] -> remainder buffer.remainingLines = lines.slice(i); break; //notice there's no way to re-run this once we leave here. } } } stream.on('drain',function(){ if (buffer.remainingLines.length){ for (var i = 0; i < buffer.remainingLines.length; i++) { var written = stream.write(buffer.remainingLines[i]); //<-- the place to test if (!written){ //do something here to wait till you can safely write again //this means prepare a buffer and wait till you can come back to finish // lines[i] -> remainder buffer.remainingLines = lines.slice(i); } } } }); 

我发现stream是处理大文件的糟糕performance方式 – 这是因为您无法设置足够的input缓冲区大小(至less我没有意识到这是一个好办法)。 这就是我所做的:

 var fs = require('fs'); var i = fs.openSync('input.txt', 'r'); var o = fs.openSync('output.txt', 'w'); var buf = new Buffer(1024 * 1024), len, prev = ''; while(len = fs.readSync(i, buf, 0, buf.length)) { var a = (prev + buf.toString('ascii', 0, len)).split('\n'); prev = len === buf.length ? '\n' + a.splice(a.length - 1)[0] : ''; var out = ''; a.forEach(function(line) { if(!line) return; // do something with your line here out += line + '\n'; }); var bout = new Buffer(out, 'ascii'); fs.writeSync(o, bout, 0, bout.length); } fs.closeSync(o); fs.closeSync(i); 

[编辑]更新的Node.js writable.write(...) API文档说:

返回值是严格的咨询。 即使返回false,你也可以继续写。 但是,写入操作将被caching在内存中,因此最好不要过多地执行此操作。 而是在写入更多数据之前等待排水事件。

[原创]stream.write(...)文档 (重点是我的):

如果string已经刷新到内核缓冲区,则返回true 。 返回false表示内核缓冲区已满,数据将在未来发送出去

我解释这意味着,如果给定的string立即写入底层的操作系统缓冲区,“写入”函数返回true ,如果它尚未写入, 则返回 false ,但将由写入函数写入 (例如,大概为您缓冲WriteStream),以便您不必再次调用“写入”。

这个问题的几个build议的答案完全错过了关于stream的观点。

这个模块可以帮助https://www.npmjs.org/package/JSONStream

不过,让我们假设自己所描述的情况并编写代码。 您正在从一个MongoDB读取stream,默认情况下ObjectMode = true。

这会导致问题,如果你试图直接stream到文件 – 就像“无效的非string/缓冲区块”错误。

这种types的问题的解决scheme非常简单。

只需在可读和可写之间插入另一个Transform,以便将Object可读为适合可写的String。

示例代码解决scheme

 var fs = require('fs'), writeStream = fs.createWriteStream('./out' + process.pid, {flags: 'w', encoding: 'utf-8' }), stream = require('stream'), stringifier = new stream.Transform(); stringifier._writableState.objectMode = true; stringifier._transform = function (data, encoding, done) { this.push(JSON.stringify(data)); this.push('\n'); done(); } rowFeedDao.getRowFeedsStream(merchantId, jobId) .pipe(stringifier) .pipe(writeStream).on('error', function (err) { // handle error condition } 

处理这个最简单的方法是让你的行生成器成为一个可读的stream – 我们称之为lineReader 。 然后,下面的代码会自动处理缓冲区并为你排水。

 lineReader.pipe(fs.createWriteStream('someFile.txt')); 

如果你不想做一个可读的stream,你可以听写的输出缓冲区充满,并像这样回应:

 var i = 0, n = lines.length; function write () { if (i === n) return; // A callback could go here to know when it's done. while (stream.write(lines[i++]) && i < n); stream.once('drain', write); } write(); // Initial call. 

这种情况的更长时间的例子可以在这里find。