无法使用Node.js将大块数据填充到mongodb

我被要求input从全市许多地点收集的一大块天气数据。 每个站点有一台计算机有一个文件夹,每5分钟同步到一台中央服务器。 每天都会创build一个新文件。 所以基本上就是这样的结构 一个txt文件格式为csv文件,第一行为字段,其余为数字。

folder_on_server
| __ site1 __ date1.txt
| | __ date2.txt
|
| __ site2 __ date1.txt
| __ date2.txt
我写了一个小的node.js应用程序来将这些数据填充到mongoDB上。 但目前我们只有3个站点,但每个站点有近900个txt文件,每个文件包含24 * 20 = 288行(每5分钟logging一次数据)。 我尝试运行节点应用程序,但读取第一个文件夹的大约100个文件后,程序崩溃,出现内存分配失败的错误。

我已经尝试了很多方法来改善这一点:

  1. 将nodejs的内存大小增加到8GB =>更好,更多的文件读入,但仍然不能移动到下一个文件夹。
  2. 在_.forEach循环(我使用下划线)=>结束时将一些variables设置为null并且未定义。
  3. 移动文件数组(使用fs.readdir),以便第一个元素将被删除=>也没有帮助。

有没有办法强制js清理内存每次完成读取文件? 谢谢

更新1:我最终一次在每个文件夹中添加100个文件。 这似乎是乏味的,但它的工作,这就像一次工作。 不过,我仍然想为此find一个解决scheme。

正如罗比所说,stream是这条路走的路。 应该使用.readFileSync()而不是.readFileSync() 。 我将开始创build一个线路阅读器,它需要一个path和任何你想要分割的string/正则expression式:

linereader.js

 var fs = require("fs"); var util = require("util"); var EventEmitter = require("events").EventEmitter; function LineReader(path, splitOn) { var readStream = fs.createReadStream(path); var self = this; var lineNum = 0; var buff = "" var chunk; readStream.on("readable", function() { while( (chunk = readStream.read(100)) !== null) { buff += chunk.toString(); var lines = buff.split(splitOn); for (var i = 0; i < lines.length - 1; i++) { self.emit("line",lines[i]); lineNum += 1; } buff = lines[lines.length - 1]; } }); readStream.on("close", function() { self.emit("line", buff); self.emit("close") }); readStream.on("error", function(err) { self.emit("error", err); }) } util.inherits(LineReader, EventEmitter); module.exports = LineReader; 

这将读取一个文本文件,并为每行读取发出“行”事件,所以你不会一次在内存中的所有。 然后,使用asynchronous包(或者您想要使用的任何asynchronous循环),循环插入每个文档的文件:

app.js

 var LineReader = require("./linereader.js"); var async = require("async"); var paths = ["./text1.txt", "./text2.txt", "./path1/text3.txt"]; var reader; async.eachSeries(paths, function(path, callback) { reader = new LineReader(path, /\n/g); reader.on("line", function(line) { var doc = turnTextIntoObject(line); db.collection("mycollection").insert(doc); }) reader.on("close", callback); reader.on("error", callback); }, function(err) { // handle error and finish; }) 

尝试使用stream而不是将每个文件加载到内存中。

我已经使用stream和asynchronousI / O向您发送了一个实现请求 。

这是最重要的:

 var Async = require('async'); var Csv = require('csv-streamify'); var Es = require('event-stream'); var Fs = require('fs'); var Mapping = require('./folder2siteRef.json'); var MongoClient = require('mongodb').MongoClient; var sourcePath = '/hnet/incoming/' + new Date().getFullYear(); Async.auto({ db: function (callback) { console.log('opening db connection'); MongoClient.connect('mongodb://localhost:27017/test3', callback); }, subDirectory: function (callback) { // read the list of subfolder, which are sites Fs.readdir(sourcePath, callback); }, loadData: ['db', 'subDirectory', function (callback, results) { Async.each(results.subDirectory, load(results.db), callback); }], cleanUp: ['db', 'loadData', function (callback, results) { console.log('closing db connection'); results.db.close(callback); }] }, function (err) { console.log(err || 'Done'); }); var load = function (db) { return function (directory, callback) { var basePath = sourcePath + '/' + directory; Async.waterfall([ function (callback) { Fs.readdir(basePath, callback); // array of files in a directory }, function (files, callback) { console.log('loading ' + files.length + ' files from ' + directory); Async.each(files, function (file, callback) { Fs.createReadStream(basePath + '/' + file) .pipe(Csv({objectMode: true, columns: true})) .pipe(transform(directory)) .pipe(batch(200)) .pipe(insert(db).on('end', callback)); }, callback); } ], callback); }; }; var transform = function (directory) { return Es.map(function (data, callback) { data.siteRef = Mapping[directory]; data.epoch = parseInt((data.TheTime - 25569) * 86400) + 6 * 3600; callback(null, data); }); }; var insert = function (db) { return Es.map( function (data, callback) { if (data.length) { var bulk = db.collection('hnet').initializeUnorderedBulkOp(); data.forEach(function (doc) { bulk.insert(doc); }); bulk.execute(callback); } else { callback(); } } ); }; var batch = function (batchSize) { batchSize = batchSize || 1000; var batch = []; return Es.through( function write (data) { batch.push(data); if (batch.length === batchSize) { this.emit('data', batch); batch = []; } }, function end () { if (batch.length) { this.emit('data', batch); batch = []; } this.emit('end'); } ); }; 

我已经使用stream更新了tomongo.js脚本。 我也改变了它的文件I / O使用asynchronous而不是同步。

我根据代码中定义的结构对小数据集进行了testing,结果非常好。 我用900xfiles和288xlines对3xdirs做了一些有限的testing。 我不确定你的每行数据有多大,所以我扔了一些随机的属性。它相当快。 看看它是如何与您的数据。 如果导致问题,那么在执行批量插入操作时,可以尝试用不同的写入关注来限制它。

还请查看其中一些链接,了解有关node.js中的stream的更多信息:

http://nodestreams.com – John Resig编写的一个工具,带有许多stream示例。

事件stream是一个非常有用的stream模块。