节点使用mongoose插入大数据

我正在尝试用mongoose将大数据集插入到mongodb中。 但在此之前,我需要确保我的for循环正常工作。

// basic schema settings var mongoose = require('mongoose'); var Schema = mongoose.Schema; var TempcolSchema = new Schema({ cid: { type: Number, required: true }, loc:[] }); TempcolSchema.index({ 'loc': "sphere2d" }); // we can easily see from the output that the forloop runs correctly mongoose.connect('mongodb://localhost/mean-dev', function(err){ for (var i = 0; i < 10000000; i++) { var c = i; console.log(c); } }); 

输出是1,2,3,4,….等

现在我想添加一个mongoose保存语句到for循环中。

 mongoose.connect('mongodb://localhost/mean-dev', function(err){ var Tempcol = mongoose.model('Tempcol', TempcolSchema); for (var i = 0; i < 10000000; i++) { var c = i; console.log(c); var lon = parseInt(c/100000); var lat = c%100000; new Tempcol({cid: Math.random(), loc: [lon, lat]}).save(function(err){}); } }); 

输出仍然是1,2,3,4,…..然而,for循环在一段时间后停止,并且说达到了最大的堆栈并且有某种内存问题。 另外当我检查集合时,我意识到根本没有插入任何数据点。

那么有谁知道可能会发生什么? 谢谢。

这里的问题是你正在运行的循环没有等待每个操作完成。 所以实际上你只是排队.save()请求,并试图同时运行它们。 你不能在合理的限制内做到这一点,所以你得到错误的回应。

asynchronous模块有多种迭代器处理callback函数的迭代方法,其中最简单的方法就是while。 Mongoose还为您处理连接pipe理,无需embeddedcallback,因为模型是连接感知:

 var tempColSchema = new Schema({ cid: { type: Number, required: true }, loc:[] }); var TempCol = mongoose.model( "TempCol", tempColSchema ); mongoose.connect( 'mongodb://localhost/mean-dev' ); var i = 0; async.whilst( function() { return i < 10000000; }, function(callback) { i++; var c = i; console.log(c); var lon = parseInt(c/100000); var lat = c%100000; new Tempcol({cid: Math.random(), loc: [lon, lat]}).save(function(err){ callback(err); }); }, function(err) { // When the loop is complete or on error } ); 

不是最奇妙的方式,它仍然是一个写一次,你可以使用其他方法来“治理”并发操作,但至less不会炸掉调用堆栈。

在MongoDB 2.6及更高版本中,您可以使用批量操作API来在服务器上一次处理多个写操作。 所以这个过程是类似的,但是这次你可以一次发送1000个到服务器的单个写入和响应,这个速度要快得多:

 var tempColSchema = new Schema({ cid: { type: Number, required: true }, loc:[] }); var TempCol = mongoose.model( "TempCol", tempColSchema ); mongoose.connect( 'mongodb://localhost/mean-dev' ); mongoose.on("open",function(err,conn) { var i = 0; var bulk = TempCol.collection.initializeOrderedBulkOp(); async.whilst( function() { return i < 10000000; }, function(callback) { i++; var c = i; console.log(c); var lon = parseInt(c/100000); var lat = c%100000; bulk.insert({ "cid": Math.random(), "loc": [ lon, lat ] }); if ( i % 1000 == 0 ) { bulk.execute(function(err,result) { bulk = TempCol.collection.initializeOrderedBulkOp(); callback(err); }); } else { process.nextTick(callback); } }, function(err) { // When the loop is complete or on error // If you had a number not plainly divisible by 1000 if ( i % 1000 != 0 ) bulk.execute(function(err,result) { // possibly check for errors here }); } ); }); 

这实际上是使用本身的驱动程序方法,这是尚未暴露在mongoose,所以额外的照顾正在采取,以确保连接可用。 这是一个例子,但不是唯一的方法,但重点是mongoose的“魔法”连接不是在这里build立的,所以你应该确定它是成立的。

你有一个要处理的项目的数量,但不是这样的情况下,你应该在最后一个块中调用bulk.execute()以及显示,但是它取决于响应模数的数量。

重点是不要将一堆操作增加到不合理的大小,并保持处理的有限性。 这里的stream程控制允许在进行下一次迭代之前需要一些时间才能真正完成的操作。 所以无论是批量更新还是一些额外的并行排队是你想要的最佳性能。

如果你不想写错误是致命的,也可以用.initializeUnorderedBulkOp()forms来代替。 主要看批量API的官方文档,以及如何解释给出的响应。