Nodejs createReadStream只读取大型JSON文件的一个数据块

我正在使用Nodejs从一个非常大的JSON文件(1GB +)中读取JSON对象。 JSON文件格式为[{field1:x,field2:x,field3:x},{…},…,{…}]。 每个对象都没有行分隔。 为了避免内存问题,我正在使用fs.createReadStream并按顺序处理每个数据块。 这工作,我得到有效的JSON对象,但读者只读一个数据块后停止。 为什么不读取文件的其余部分?

我的解决scheme受到了这个问题中被接受的答案的启发: 在Nodejs中parsing大的JSON文件

这里是代码:

// Get the JSON file var fs = require('fs'); var stream = fs.createReadStream('Results.json', {flags: 'r', encoding: 'utf-8'}); var buf = ''; var count = 0; stream.on('data', function(chunk) { console.log("Stream on data!"); // ONLY EXECUTED ONCE buf += chunk.toString(); // when data is read, stash it in a string buffer process(); // process the buffer }); stream.on('error', function(err) { // NEVER EXECUTED console.log(err); }); stream.on('end', function() { // NEVER EXECUTED console.log("Count: " + count); }); function process() { var posStart = buf.indexOf('{'); var posEnd = buf.indexOf('}'); while (posStart >= 0 || posEnd >= 0) { // keep going until the start or end of the json object in the string // IF the start bracket is before the end, skip to the start if((posStart < posEnd || posEnd < 0) && posStart >= 0){ buf = buf.slice(posStart); } if(posStart == 0 && posEnd >= 0){ // IF the end bracket is next processObjectString(buf.slice(0, posEnd+1)); // Process the complete object string buf = buf.slice(posEnd+1); // Remove the processed string from the buffer } // Update the positions posStart = buf.indexOf('{'); posEnd = buf.indexOf('}'); } } function processObjectString(objectString) { count++; var obj = JSON.parse(objectString); // parse the JSON console.log(obj.id); // Print object ID (works) } 

编辑:修复导致无限while循环的错误之后,以下是遍历JSON文件中所有对象的工作解决scheme。 它可能不是很优雅,但至less可以工作(对于任何可能有类似问题的人)。

 // Get the JSON file var fs = require('fs'); var stream = fs.createReadStream('Results.json', {flags: 'r', encoding: 'utf-8'}); var buf = ''; var count = 0; stream.on('data', function(chunk) { buf += chunk.toString(); // when data is read, stash it in a string buffer process(buf); // process the buffer }); stream.on('error', function(err) { console.log(err); }); stream.on('end', function() { console.log("Count: " + count); }); function process() { var posStart = buf.indexOf('{'); var posEnd = buf.indexOf('}'); while (posStart >= 0 || posEnd >= 0) { // keep going until the start or end of the json object in the string // IF the start bracket is before the end, skip to the start if((posStart < posEnd || posEnd < 0) && posStart >= 0){ buf = buf.slice(posStart); } if(posStart == 0 && posEnd >= 0){ // IF the end bracket is next processObjectString(buf.slice(0, posEnd+1)); // Process the complete object string buf = buf.slice(posEnd+1); // Remove the processed string from the buffer }else if(posStart < 0 || posEnd < 0){ // Return to get a new chunk return; } // Update the positions posStart = buf.indexOf('{'); posEnd = buf.indexOf('}'); } } function processObjectString(objectString) { count++; var obj = JSON.parse(objectString); // parse the JSON console.log(obj.id); // Print object ID (works) } 

一些理论

Node.js是asynchronous的,但实际上是单线程的。 如果进程在处理接收到的数据时停滞不前,则它将永远不会收到第二个块,因为发送者等待被释放的线程被释放,然后才能执行任何操作。

含义

如果行process();'data', function(chunk)里面'data', function(chunk)在无限循环中,那么你将永远不会收到第二个chunk,因此它可能看起来像发送者是懒惰的。


为了将来:尽量隔离问题,以确保你在正确的位置。

PS在处理文本的过程中,你很容易陷入无限循环,在这里我感觉到你的痛苦。