NodeJSstreamparsing并写入json行以确保Promise结果

我有一个大的json文件,看起来像这样:

[ {"name": "item1"}, {"name": "item2"}, {"name": "item3"} ] 

我想stream这个文件(目前为止非常简单),每一行在parsing/拒绝调用时都会运行一个asynchronous函数(返回一个promise)来编辑这一行。

input文件的结果可能是:

 [ {"name": "item1", "response": 200}, {"name": "item2", "response": 404}, {"name": "item3"} // not processed yet ] 

我不想创build另一个文件,我想在飞行中编辑相同的文件(如果可能!)。

谢谢 :)

我没有真正回答这个问题,但是不要认为这个问题可以用满意的方式回答,所以这里是我的2分钱。

我假设你知道如何一行一行地进行stream式处理,然后运行这个函数,并且唯一的问题就是编辑你正在读取的文件。

插入的后果

不可能数据本地插入到任何文件中(这是通过更改JSON直播来实现的)。 一个文件只能在最后成长。

因此,在1GB文件的开始处插入10个字节的数据意味着您需要向磁盘写入1GB(将所有数据进一步移动10个字节)。

你的文件系统不能理解JSON,只是看到你在一个大文件的中间插入字节,所以这会慢。

所以,是的,这是可能的。 使用insert()方法在NodeJS的文件API中编写一个包装器。

然后再编写一些代码,以便能够知道将字节插入JSON文件的位置,而不会加载整个文件,并且不会在最后生成无效的JSON。

现在我不会推荐它:)

=>阅读这个问题: 是否有可能在没有重写的情况下将数据前置到文件中?

那为什么呢?

我假设要么

  • 能够随时杀死您的进程,并通过重新读取文件轻松恢复工作。
  • 重试部分处理的文件以仅填充缺less的位。

第一个解决scheme:使用数据库

抽象出随机存放编辑文件需要做的工作是数据库存在的唯一目的。

它们都存在只是为了抽象隐藏在UPDATE mytable SET name = 'a_longer_name_that_the_name_that_was_there_before' where name = 'short_name'后面的魔法。

看看LevelUP / Down ,sqlite等…

他们将抽象所有需要在您的JSON文件中完成的魔法!

第二种解决scheme:使用多个文件

当你stream文件时,写两个新文件!

  • 一个包含input文件中的当前位置和需要重试的行
  • 另一个是预期的结果。

你也可以随时杀死你的进程并重新启动

根据这个答案写在同一个文件,而阅读是不可靠的。 作为一个评论者说,最好写入一个临时文件,然后删除原来的,并重新命名临时文件。

要创build线条stream,您可以使用byline 。 然后,对于每一行,应用一些操作并将其输出到输出文件。

像这样的东西:

 var fs = require('fs'); var stream = require('stream'); var util = require('util'); var LineStream = require('byline').LineStream; function Modify(options) { stream.Transform.call(this, options); } util.inherits(Modify, stream.Transform); Modify.prototype._transform = function(chunk, encoding, done) { var self = this; setTimeout(function() { // your modifications here, note that the exact regex depends on // your json format and is probably the most brittle part of this var modifiedChunk = chunk.toString(); if (modifiedChunk.search('response:[^,}]+') === -1) { modifiedChunk = modifiedChunk .replace('}', ', response: ' + new Date().getTime() + '}') + '\n'; } self.push(modifiedChunk); done(); }, Math.random() * 2000 + 1000); // to simulate an async modification }; var inPath = './data.json'; var outPath = './out.txt'; fs.createReadStream(inPath) .pipe(new LineStream()) .pipe(new Modify()) .pipe(fs.createWriteStream(outPath)) .on('close', function() { // replace input with output fs.unlink(inPath, function() { fs.rename(outPath, inPath); }); }); 

请注意,以上结果一次只发生一次asynchronous操作。 您也可以将修改保存到一个数组,一旦完成所有这些修改,将数组中的行写入一个文件,如下所示:

 var fs = require('fs'); var stream = require('stream'); var LineStream = require('byline').LineStream; var modifiedLines = []; var modifiedCount = 0; var inPath = './data.json'; var allModified = new Promise(function(resolve, reject) { fs.createReadStream(inPath).pipe(new LineStream()).on('data', function(chunk) { modifiedLines.length++; var index = modifiedLines.length - 1; setTimeout(function() { // your modifications here var modifiedChunk = chunk.toString(); if (modifiedChunk.search('response:[^,}]+') === -1) { modifiedChunk = modifiedChunk .replace('}', ', response: ' + new Date().getTime() + '}'); } modifiedLines[index] = modifiedChunk; modifiedCount++; if (modifiedCount === modifiedLines.length) { resolve(); } }, Math.random() * 2000 + 1000); }); }).then(function() { fs.writeFile(inPath, modifiedLines.join('\n')); }).catch(function(reason) { console.error(reason); }); 

如果不是想要stream式传输有效的json块,这将是一个更强大的方法,请看看JSONStream 。

正如在评论中提到的,你拥有的文件不是正确的JSON,虽然在Javascript中是有效的。 为了生成正确的JSON,可以使用JSON.stringify() 。 我认为这会让别人难以parsing非标准的JSON,所以我会build议提供一个新的输出文件,而不是保留原来的。

但是,仍然可以将原始文件parsing为JSON。 这可能通过eval('(' + procline + ')'); 但是,像这样将外部数据引入node.js是不安全的。

 const fs = require('fs'); const readline = require('readline'); const fr = fs.createReadStream('file1'); const rl = readline.createInterface({ input: fr }); rl.on('line', function (line) { if (line.match(new RegExp("\{name"))) { var procline = ""; if (line.trim().split('').pop() === ','){ procline = line.trim().substring(0,line.trim().length-1); } else{ procline = line.trim(); } var lineObj = eval('(' + procline + ')'); lineObj.response = 200; console.log(JSON.stringify(lineObj)); } }); 

输出将是这样的:

 {"name":"item1","response":200} {"name":"item2","response":200} {"name":"item3","response":200} 

这是行分隔的JSON (LDJSON),可以用于stream的东西,而不需要前导和尾随[], 。 还有一个ldjson-stream包。