产生一个mongoinsert

我的目标是插入非常大的csv的,所以没有我使用csvstream如下所示:

var myCollection = db.collection(myCollectionId); var q = async.queue(Collection.insert.bind(myCollection), 10); csv() .from.path(myFilePath, {columns: true}) .transform(function(data, index, cb){ q.push(data, function (err, res) { if (err) return cb(err); cb(null, res[0]); }); }) .on('end', function () { q.drain = function() { //do some stufff }; }) .on('error', function (err) { res.end(500, err.message); console.log('on.error() executed'); }); }); 

但是,当文件变得非常大,比如70M +,并且它正在stream式传输时,我的服务器非常慢,并且需要永久性,而当我尝试在网站上加载页面时,这个过程中昏昏欲睡。

为什么不可能像这样使用cron-job来执行mongo插入。 我问,因为相同的插入需要从mongo命令行30秒。

PS不介意readFile和lines部分,我这样做是因为我想testing所有的行何时被插入到集合中,在这个过程开始之后(还没有实现这个)。

 var cronJob = require('cron').CronJob; var spawn = require('child_process').spawn; var fs = require('fs'); function MongoImportEdgeFile(dataID, filePath){ var scriptPath = "/local/main/db/mongodb-linux-x86_64-2.4.5/bin/mongoimport"; console.log("script path = "+scriptPath) var output = ""; fs.readFile(filePath, 'utf-8',function(err, data) { if (err){ console.log(err) throw err; } //console.log('data = '+data); var lines = data.split('\n'); console.log("total lines in file = " + lines); var job = new cronJob(new Date(), function() { // store reference to 'this', which is cronJob object. needed to stop job after script is done executing. var context = this; // execute R script asynchronously var script = spawn(scriptPath, [" -d mydb -c Data_ForID_" + dataID + " --file " + filePath + " --type csv" ]); console.log("Executing R script via node-cron: " + scriptPath); // script has finished executing, so complete cron job and fire completion callback script.on('close', function() { console.log('inside script.on(close, function() for import'); context.stop(); }); }, function() { // callback function that executes upon completion console.log("Finished executing import"); }, true); }); 

}

你不应该使用单独的insert调用。 你迫使mongo在每次调用时都要执行内部同步 – 我认为考虑到你的并行方法,情况更糟。

使用bulk insertion :就像使用array调用insert()一样简单。

您可以通过创buildsubprocess直接从节点执行mongoimport 。 这里有一篇关于使用mongoimport导入csv的文章 。 你也可以做json 。

不知何故,我错过了有关在cron中使用mongoimport的部分。 如果我理解正确,看起来你知道你想导入的csv,而你正在使用cron来检查它们。

你考虑过一个消息队列吗? 这将允许您的处理器立即接收导入作业,而不是间隔。 这也会抑制你的处理。

如果您需要更多吞吐量,则可以创build附加到同一队列的其他侦听器进程。 他们将争夺下一份工作。 这将允许您的解决scheme进行扩展。