在NodeJS中插入一个大型的csv文件,200,000行+到MongoDB中

我试图分析和插入一个大的csv文件到MongoDB中,但是当文件扩展到100000行时,我得到了服务器的错误响应。 我需要插入的文件通常在200000行以上。

我已经尝试了批量插入(insertMany)和Babyparse(Papaparse)stream式方法来逐行插入文件。 但效果不佳。

节点api:

router.post('/csv-upload/:id', multipartMiddleware, function(req, res) { // Post vartiables var fileId = req.params.id; var csv = req.files.files.path; // create a queue object with concurrency 5 var q = async.queue(function(row, callback) { var entry = new Entry(row); entry.save(); callback(); }, 5); baby.parseFiles(csv, { header: true, // Includes header in JSON skipEmptyLines: true, fastMode: true, step: function(results, parser) { results.data[0].id = fileId; q.push(results.data[0], function (err) { if (err) {throw err}; }); }, complete: function(results, file) { console.log("Parsing complete:", results, file); q.drain = function() { console.log('All items have been processed'); res.send("Completed!"); }; } }); }); 

这种stream式传输方式导致: POST SERVER net :: ERR_EMPTY_RESPONSE

不知道我是否正确使用async.queue。

有没有更好更有效的方法来做到这一点,或者我做错了什么?

Express服务器:

 // Dependencies var express = require('express'); var path = require('path'); var bodyParser = require('body-parser'); var routes = require('./server/routes'); var mongoose = require("mongoose"); var babel = require("babel-core/register"); var compression = require('compression'); var PORT = process.env.PORT || 3000; // Include the cluster module var cluster = require('cluster'); mongoose.connect(process.env.MONGOLAB_URI || 'mongodb://localhost/routes'); // Code to run if we're in the master process if (cluster.isMaster) { // Count the machine's CPUs var cpuCount = require('os').cpus().length; // Create a worker for each CPU for (var i = 0; i < cpuCount; i += 1) { cluster.fork(); } // Code to run if we're in a worker process } else { // Express var app = express(); app.use(bodyParser.json({limit: '50mb'})); app.use(bodyParser.urlencoded({limit: '50mb', extended: true})); // Compress responses app.use(compression()); // Used for production build app.use(express.static(path.join(__dirname, 'public'))); routes(app); // Routes app.use('/api', require('./server/routes/api')); app.all('/*', function(req, res) { res.sendFile(path.join(__dirname, 'public/index.html')); }); // Start server app.listen(PORT, function() { console.log('Server ' + cluster.worker.id + ' running on ' + PORT); }); } 

处理导入:

很好的问题,从我迄今为止最快的方式插入到mongo的csv是通过命令行:

 mongoimport -d db_name -c collection_name --type csv --file file.csv --headerline 

我不相信mongoose有一种叫mongoimport的方法(如果我错了,有人纠正我)

但是直接调用通过节点就够简单了:

 var exec = require('child_process').exec; var cmd = 'mongoimport -d db_name -c collection_name --type csv --file file.csv --headerline'; exec(cmd, function(error, stdout, stderr) { // do whatever you need during the callback }); 

以上将被修改为dynamic的,但它应该是不言自明的。

处理上传:

从前端客户端上传文件是另一个挑战。

如果向服务器发出请求并且在60秒内没有得到响应,大多数浏览器都会超时(可能是上面提到的)

一个解决scheme是打开套接字连接(在npm中searchsocket.io)以获取详细信息。 这将创build一个到服务器的连接,不会受到超时限制。

如果上传不是问题,超时是由于parsing/插入缓慢,那么您可能不必担心这一点,一旦你实现上述。

其他考虑:

我不确定你需要发回给用户,还是需要进行分析。 但是这可以在正常的请求/响应周期之外完成,或者可以在套接字连接期间处理,如果在一个请求/响应周期中需要的话。