分析node.js中的大型json日志文件

我有以下的JSON文件:

sensorlogs.json {"arr":[{"UTCTime":10000001,"s1":22,"s2":32,"s3":42,"s4":12}, {"UTCTime":10000002,"s1":23,"s2":33,"s4":13}, {"UTCTime":10000003,"s1":24,"s2":34,"s3":43,"s4":14}, {"UTCTime":10000005,"s1":26,"s2":36,"s3":44,"s4":16}, {"UTCTime":10000006,"s1":27,"s2":37,"s4":17}, {"UTCTime":10000004,"s1":25,"s2":35,"s4":15}, ... {"UTCTime":12345678,"s1":57,"s2":35,"s3":77,"s4":99} ]} 

传感器s1,s2,s3等都以不同的频率进行发送(注意s3每2秒发送一次,时间间隔可能无序)。

我怎样才能达到像 –

 Analyzing s1: s = [[10000001, 22], [10000002, 23],.. [12345678,57]] s1 had 2 missing entries Analyzing s2: s = [[10000001, 32], [10000002, 33],.. [12345678,35]] s2 had 0 missing entries Analyzing s3: s = [[10000001, 42], [10000003, 43],.. [12345678,77]] s3 had 0 missing entries Analyzing s4: s = [[10000001, 12], [10000003, 13],.. [12345678,99]] s4 had 1 missing entries 

sensorlogs.json是16 GB。

缺失的条目可以根据连续UTC时间戳的差异find。 每个传感器以已知的频率传输。

由于内存限制,我不能使用多个大型数组进行分析,所以我将不得不对同一个JSON日志文件进行多次传递,而只使用单个大型数组进行分析。

我到现在为止是以下 –

 var result = []; //1. Extract all the keys from the log file console.log("Extracting keys... \n"); var stream = fs.createReadStream(filePath); var lineReader = lr.createInterface( { input: stream }); lineReader.on('line', function (line) { getKeys(line);//extract all the keys from the JSON }); stream.on('end', function() { //obj -> arr for(var key in tmpObj) arrStrm.push(key); //2. Validate individual sensors console.log("Validating the sensor data ...\n"); //Synchronous execution of the sensors in the array async.each(arrStrm, function(key) { { currSensor = key; console.log("validating " + currSensor + "...\n"); stream = fs.createReadStream(filePath); lineReader = lr.createInterface( { input: stream }); lineReader.on('line', function (line) { processLine(line);//Create the arrays for the sensors }); stream.on('end', function() { processSensor(currSensor);//Process the data for the current sensor }); } }); }); function getKeys(line) { if(((pos = line.indexOf('[')) >= 0)||((pos = line.indexOf(']')) >= 0)) return; if (line[line.length-1] == '\r') line=line.substr(0,line.length-1); // discard CR (0x0D) if (line[line.length-1] == ',') line=line.substr(0,line.length-1); // discard , // console.log(line); if (line.length > 1) { // ignore empty lines var obj = JSON.parse(line); // parse the JSON for(var key in obj) { if(key != "debug") { if(tmpObj[key] == undefined) tmpObj[key]=[]; } }; } } 

当然这是行不通的,我也无法在网上find解释如何实现的东西。

注意:我可以select任何我select的语言来开发这个工具(C / C ++,C#/ Java / Python),但是我正在使用JavaScript,因为它能够轻松地parsingJSON数组(我也有兴趣改进JS以及)。 如果JavaScript不是最好的语言使用这样的工具,是否有人喜欢build议替代语言来做到这一点?

编辑:一些重要的信息,要么是不是很清楚,或者我没有包括在内,但看起来像包含在问题中是重要的 –

  1. JSON日志中的数据不是实时stream式传输,而是存储在硬盘中的JSON文件
  2. 存储的数据不是按时间顺序排列的,这意味着时间戳可能没有正确的顺序。 因此,每个传感器数据需要根据时间戳存储在一个数组中之后进行sorting
  3. 我不能为每个传感器使用单独的数组(这将与在RAM中存储整个16 GB JSON相同),并且为了节省内存,一次只能使用一个数组。 是的,在我的日志中有超过4个传感器,这只是一个样本(大概20个给出一个想法)

我修改了我的JSON和预期的输出

一种解决scheme可能是对JSON文件进行多次传递,一次将具有时间戳的传感器数据存储在数组中,然后对数组进行sorting,最后分析数据的损坏和间隙。 这就是我在上面的代码中所要做的

所以你有json中包含的16GB大胖传感器日志。

首先,一个16GB的整个json文件是不现实的,只是因为开始和结束的括号打破规则,变成只是恼人的字符在数组中。 我们知道该文件有一个开始和结束,此外,没有它们,你的程序可以处理文件的块,甚至直接插入到设备上的stream。 那么让我们假设我们将要处理的是这样的:

 {"UTCTime":10000001,"s1":22,"s2":32,"s3":42,"s4":12}, {"UTCTime":10000002,"s1":23,"s2":33,"s4":13}, {"UTCTime":10000003,"s1":24,"s2":34,"s3":43,"s4":14}, ... {"UTCTime":12345678,"s1":57,"s2":35,"s3":77,"s4":99}, 

甚至添加或检测到最后缺less的逗号不应太难。

现在每一行的格式都是一样的,可以解释为json。 问题是:传感器是否在预期时输出数据? 如果我们确定他们在正确的时间和正确的频率发言(案例1),但有时他们可能会错过一个写作,一切都很好。 但是,如果他们开始在时间范围内轻微滑动(情况2),则需要某种启发来恢复适当的线路频率,分析将会更长。

如果我们没有处理这个实时的话,那么对这个文件进行一个简单的validation检查就是告诉每个freq线是否find了预期的传感器数据,对不对?

无论如何,因为它是一个非常大的文件,所以只要有可能,就必须逐行处理。

在下面的程序中,我只考虑了案例1,而且我们可以处理连续的stream。

 #!/usr/bin/python import json sensors={} sensors['s1']=[1] # frequencies sensors['s2']=[1] sensors['s3']=[2] sensors['s4']=[1] # append data array and error counter at sensors[i] # it holds [freq,err,data] for k,v in sensors.iteritems(): sensors[k].extend([0,[]]) FRQ=0;ERR=1;DAT=2 print list(sorted(sensors.items())) S=list(sorted(sensors.keys())) with open('./sensors.json', "r") as stream: i=0 for line in stream: if not line.rstrip(): continue # skip blank lines j=json.loads(line[:-2]) # skip comma and \n t=j["UTCTime"] for k in S: sensor=sensors[k] if i%sensor[FRQ]==0 : # every Nth iteration v=j.get(k) if v is None: sensor[ERR]+=1 print k,"has",sensor[ERR],"missing entries" sensor[DAT].append([t,v]) # append that sensor data # filling up the memory... i+=1 for k,v in sorted(sensors.iteritems()): print k,sensors[k][DAT] for k,v in sorted(sensors.iteritems()): print k,'had',sensors[k][ERR],"missing entries" 

为了处理第二种情况,我们将使用模数检查来反转“ None ”检查,validation传感器是否在不应该的情况下写了一些内容,然后尝试检测偏移。

最后一点:你的程序在内存上可能会缩短,所以也许把整个数据保存在内存中不是一个好主意。 如果打算为每个传感器使用单独的arrays进行进一步处理,将它们写入文件可能更为明智。

再次编辑考虑您的编辑:

 var fs = require('fs'); var stream = fs.createReadStream('sensorlogs.json', {flags: 'r', encoding: 'utf-8'}); var buffer = ''; var sensor = process.argv[2]; var readings = []; var missingCont = 0; console.log('Analizying ' + sensor + ':'); stream.on('data', function(d) { buffer += d.toString(); processBuffer(); console.log(readings); console.log(sensor + ' had ' + missingCont + ' missing entries'); }); function processBuffer() { buffer = buffer.slice(buffer.indexOf('[{')); while(buffer.indexOf('{') != -1) { buffer = buffer.slice(buffer.indexOf('{"')); processLine(buffer.slice(0, buffer.indexOf('}') + 1)); buffer = buffer.slice(buffer.indexOf('}') + 2); } }; function processLine(line) { if(line != ""){ var obj = JSON.parse(line); if(!obj[sensor]){ missingCont++; }else{ var pos; for(pos = 0; pos < readings.length; pos++){ if(obj.UTCTime < readings[pos][0]){ var reading = [obj.UTCTime, obj[sensor]] readings.splice(pos, 0, reading); break; } } if(pos == readings.length){ readings.push([obj.UTCTime, obj[sensor]]); } } } }; 

你必须用你想要分析的传感器的参数来调用它:

 node.exe scripts\processJson.js <param> 

为了testing,我拿了这个样本:

 {"arr":[{"UTCTime":10000001,"s1":22,"s2":32,"s3":42,"s4":12}, {"UTCTime":10000005,"s1":20,"s2":30,"s3":40,"s4":10}, {"UTCTime":10000002,"s1":23,"s2":33,"s4":13}, {"UTCTime":10000003,"s1":24,"s2":34,"s3":43,"s4":14}, {"UTCTime":12345678,"s1":57,"s2":35,"s3":77,"s4":99} ]} 

产出是:

 > node.exe scripts\processJson.js s1 Analizying s1: [[10000001, 22], [10000002, 23], [10000003, 24], [10000005, 20], [12345678, 57]] s1 had 0 missing entries > node.exe scripts\processJson.js s2 Analizying s2: [[10000001, 32], [10000002, 33], [10000003, 34], [10000005, 30], [12345678, 35]] s2 had 0 missing entries > node.exe scripts\processJson.js s3 Analizying s3: [[10000001, 42], [10000003, 43], [10000005, 40], [12345678, 77]] s3 had 1 missing entries > node.exe scripts\processJson.js s4 Analizying s4: [[10000001, 12], [10000002, 13], [10000003, 14], [10000005, 10], [12345678, 99]] s4 had 0 missing entries