用Node.js和async.queue插入大的CSV到MongoDB中

我试图上传并插入大的csv文件(100K行,10-100M +)到mongo。

下面的代码是我用来从表单接受input的路由,并将logging首先插入到我的所有csv的元数据集合中,然后将csv的logging插入到它自己的集合中。 它适用于较小的文件(成千上万行),但当它达到50K +的顺序时花费太长。

下一个片段是使用csvstream来处理较大的文件(请参阅下文),但是在尝试使用stream时遇到错误。

问题:有人可以帮助将第一个示例修改为stream,以便它可以处理大型csv而不会挂起。

exports.addCSV = function(req,res){ var body = req.body; fileSystem.renameSync(req.files.myCSV.path, 'uploads/myFile', function(err){ if(err){ fileSystem.unlink(req.files.myCSV.path, function(){}); throw error; } }); var myObject = { userid: body.userid, name: body.name, description: body.description }; var MongoClient = require('mongodb').MongoClient; MongoClient.connect('mongodb://localhost:27017/csvdb', function(err, db){ if(err) throw err; var collection = db.collection('myCSVs'); collection.insert(myObject, function(err, insertedMyObject){ csvParser.mapFile('uploads/myFile', function(err, allRows){ if (err) throw err; var collectionId = "Rows_ForID_" + insertedMyObject[0]._id; for (r in allRows) { allRows[r].metric = parseFloat(allRows[r].metric); } var finalcollection = db.collection(collectionId); finalcollection.insert(allRows, function(err, insertedAllRows) { if (err) { res.send(404, "Error"); } else { res.send(200); } }); }); }); }); } 

编辑(让人们去除保持状态):

我试过这种方法使用stream:

 exports.addCSV = function(req,res){ var body = req.body; fileSystem.renameSync(req.files.myCSV.path, 'uploads/myFile', function(err){ if(err){ fileSystem.unlink(req.files.myCSV.path, function(){}); throw error; } }); var myObject = { userid: body.userid, name: body.name, description: body.description }; var MongoClient = require('mongodb').MongoClient; MongoClient.connect('mongodb://localhost:27017/csvdb', function(err, db){ if(err) throw err; var collection = db.collection('myCSVs'); collection.insert(myObject, function(err, insertedMyObject){ var collectionId = "Rows_ForID_" + insertedMyObject[0]._id; var finalcollection = db.collection(collectionId); var q = async.queue(finalcollection.insert.bind(finalcollection), 5); q.drain = function() { console.log('all items have been processed'); } csv() .from.path('uploads/myFile', {columns: true}) .transform(function(data, index, cb){ q.push(data, cb); }) .on('end', function () { res.send(200); console.log('on.end() executed'); }) .on('error', function (err) { res.end(500, err.message); console.log('on.error() executed'); }); }); }); } 

但是我得到这个错误:

 events.js:72 throw er; // Unhandled 'error' event ^ TypeError: object is not a function 

第三,我尝试了这种stream式方法:

 var q = async.queue(function (task,callback) { finalollection.insert.bind(task,function(err, row) { }); callback(); }, 5); q.drain = function() { console.log('all items have been processed'); } csv() .from.path('uploads/myFile', {columns: true}) .transform(function(data, index, cb){ q.push(data) }) .on('end', function () { res.send(200); console.log('on.end() executed'); }) .on('error', function (err) { res.end(500, err.message); console.log('on.error() executed'); }); 

这插入一些,然后中止:

 all items have been processed all items have been processed Error: Request aborted at IncomingMessage.<anonymous> 

这一个实际上试图插入数据库相同的CSV的多个集合。 最后,我尝试了q的单行定义:

 var q = async.queue(finalcollection.insert.bind(finalcollection), 5); 

随着:

 .transform(function(data, index, cb){ q.push(data,function (err) { console.log('finished processing foo'); }); }) 

它每次插入集合几次并中止(下面是每次发生的输出 – 为什么它不能正确退出并重新插入?):

 finished processing foo finished processing foo finished processing foo finished processing foo finished processing foo all items have been processed Error: Request aborted at IncomingMessage.<anonymous> (.../node_modules/express/node_modules/connect/node_modules/multiparty/index.js:93:17) at IncomingMessage.EventEmitter.emit (events.js:92:17) at abortIncoming (http.js:1892:11) at Socket.serverSocketCloseListener (http.js:1904:5) at Socket.EventEmitter.emit (events.js:117:20) at TCP.close (net.js:466:12) 

你应该用stream处理一个大文件。

这是一个可能的解决scheme:

 var queue = async.queue(collection.insert.bind(collection), 5); csv() .from.path('./input.csv', { columns: true }) .transform(function (data, index, cb) { queue.push(data, function (err, res) { if (err) return cb(err); cb(null, res[0]); }); }) .on('error', function (err) { res.send(500, err.message); }) .on('end', function () { queue.drain = function() { res.send(200); }; }); 

请注意:

  • 我们使用node-csv的streamAPI来确保在读取文件的同时处理数据:这样整个文件就不会一次读入内存。 每个logging都执行transform处理程序;
  • 我们使用async.queue ,这是一个asynchronous处理队列:最多5个处理程序( finalcollection.insert )并行执行。

这个例子应该被testing,因为我不太确定它是否真的处理了背压。 此外,队列的并发级别应根据您的特定configuration进行调整。

你也可以在这里find一个工作的要点 。