有没有办法改变mongoDB结构(从嵌套/embedded文档到对象引用列表),同时保持数据?

我有一个mongoDB数据库,由nodejs通过mongoose使用,它涉及嵌套/embedded式文档,如下所示:

"people" : [ {"name" : "james", "_id": ObjectId("randomrandom1")}, {"name" : "arianna","_id": ObjectId("randomrandom2")}, {"name" : "kyle","_id": ObjectId("randomrandom3")} ] 

我需要改变结构,所以我有单独的“人”文件,人们将包含一个人的ObjectId的数组:

 "people" : [{type:mongoose.Schema.Types.ObjectId, ref: 'Person'}] 

每个人的文件都会包含詹姆斯,阿里安娜和凯尔的信息 – 这样我就可以在他们需要的时候填充他们。

我需要在维护已经input的文档的同时更改数据库结构。 有什么办法可以做到这一点?

假设我的文档在这个叫coll集合里

 { "_id" : ObjectId("56b47c7a088d9fa3e1aa77a0"), "people" : [ { "name" : "james", "_id" : ObjectId("56b47c7a088d9fa3e1aa779d") }, { "name" : "arianna", "_id" : ObjectId("56b47c7a088d9fa3e1aa779e") }, { "name" : "kyle", "_id" : ObjectId("56b47c7a088d9fa3e1aa779f") } ] } 

现在我可以做聚合来存储所有的_id在另一个集合使用像这样的aggregate

 db.coll.aggregate([ { $project: { _id : 0, 'people._id' : 1 } }, { $out : 'somecoll' } ]) 

这会将所有ID存储在另一个名为somecoll集合中,如下所示:

 { "_id" : ObjectId("56b47de8b47e47b58b64f312"), "people" : [ { "_id" : ObjectId("56b47c7a088d9fa3e1aa779d") }, { "_id" : ObjectId("56b47c7a088d9fa3e1aa779e") }, { "_id" : ObjectId("56b47c7a088d9fa3e1aa779f") } ] } 

为了提高性能,特别是在处理大型集合时,利用Bulk() API批量高效地更新集合,因为您将批量发送操作(例如批量大小为1000)。 由于您不会将每个请求发送到服务器,而是每1000次请求只发送一次,所以这样可以提供更好的性能,从而使您的更新更加高效快捷。

为了改变数据库结构,这里的一般algorithm是在访问当前文档信息的同时,“循环”集合的find()结果并处理更新。 通常情况下,您需要更改批量结构,更新将基于已经包含在字段中的信息(在您的案例中为people数组)。

要将新文档插入到person集合中,您希望通过在旧集合上运行聚合操作来获取文档,该操作使用_id键对非规范化people数组进行分组,并为每个分组文档返回_idname字段在结果。 使用这个results数组将文档插入到新的集合中,将Bulk API写入操作的insert()方法作为这样做的“最安全”forms,而无需在服务器上运行代码。

由于aggregate()方法返回一个cursor ,所以可以使用它的forEach()方法遍历它并访问每个文档,从而批量设置批量更新操作,然后通过API有效地通过服务器发送。

以下示例演示了这种方法,可以在服务器上和应用程序中执行。 第一个使用MongoDB版本中可用的Bulk() API >= 2.6 and < 3.2

服务器端(mongo shell):


 // Bulk insert new documents to person collection var bulkInsertOp = db.person.initializeUnorderedBulkOp(), // initialise the bulk operations on the new person collection pipeline = [ {"$unwind": "$people"}, { "$group": { "_id": "$people._id", "name": { "$first": "$people.name" } } } ], counter = 0, // counter to keep track of the batch insert size cursor = db.collection.aggregate(pipeline); // Get person documents using aggregation framework on old collection cursor.forEach(function(doc){ bulkInsertOp.insert(doc); // insert the aggregated document to the new person collection counter++; // increment counter if (counter % 1000 == 0) { // execute the bulk insert operation in batches of 1000 bulkInsertOp.execute(); bulkInsertOp = db.person.initializeUnorderedBulkOp(); } }); if (counter % 1000 != 0) { bulkInsertOp.execute(); } // Bulk update old collection to denormalize the people array var bulkUpdateOp = db.collection.initializeUnorderedBulkOp(), // initialise the bulk operations on the new person collection count = 0, // counter to keep track of the batch insert size cur = db.collection.find({}); // Get all documents from collection cur.forEach(function(doc){ var peopleIds = doc.people.map(function(p){ return p._id; }); // Create an array of person ids for referencing bulkUpdateOp.find({ "_id": doc._id }).updateOne({ "$set": { "people": peopleIds } }); if (count % 1000 == 0) { bulkUpdateOp.execute(); bulkUpdateOp = db.collection.initializeUnorderedBulkOp(); } }); if (count % 1000 != 0) { bulkUpdateOp.execute(); } 

下一个例子适用于新的MongoDB版本3.2 ,它已经弃用了Bulk API,并使用bulkWrite()提供了一组更新的apis。

它使用与上面相同的游标,但不是迭代结果,而是使用map()方法使用批量操作创build数组:


  var pipeline = [ {"$unwind": "$people"}, { "$group": { "_id": "$people._id", "name": { "$first": "$people.name" } } } ], cursor = db.collection.aggregate(pipeline), bulkInsertOps = cursor.map(function (doc) { return { "insertOne": { "document": doc } }; }), cur = db.collection.find({}), bulkUpdateOps = cur.map(function (doc) { var peopleIds = doc.people.map(function(p){ return p._id; }); return { "updateOne": { "filter": { "_id": doc._id } , "update": { "$set": { "people": peopleIds } } } }; }); db.person.bulkWrite(bulkInsertOps, { "ordered": true }); db.collection.bulkWrite(bulkUpdateOps, { "ordered": true }); 

mongoose实施

在客户端实现这一点,有这样做的各种方法。 您可以使用查询stream来“插入”其他节点stream,例如http响应和写入stream,以便与“批量api”一起开箱即用。

在Mongoose中,可以通过从基本驱动程序访问底层集合对象来完成循环,但要确保在尝试访问Bulk() api方法之前数据库连接已打开。 这确保存在一个Node.js Db实例,并且可以获取一个Collection()对象。 一旦在模型上使用.collection访问器,就可以使用For Mongoose版本〜3.8.8,〜3.8.22,4.x中提供的Bulk() API来支持MongoDB Server版本>= 2.6 and < 3.2

客户端:


 // Get the results using a find stream var pipeline = [ {"$unwind": "$people"}, { "$group": { "_id": "$people._id", "name": { "$first": "$people.name" } } } ], stream = Model.aggregate(pipeline).stream(); mongoose.connection.on("open", function (err, conn) { var bulkInsertOp = Person.collection.initializeUnorderedBulkOp(), counter = 0; stream.on("error", function(err) { // handle err }); stream.on("data", function (doc) { async.series( [ function(callback) { bulkInsertOp.insert(doc); counter++; if (counter % 1000 == 0) { bulkInsertOp.execute(function(err, result) { if (err) throw err; // handle err appropriately bulkInsertOp = Person.collection.initializeOrderedBulkOp(); // re-initialise bulk operations callback(); // do something with result }); } else { callback(); } } ], // When all is done function(err) { if ( counter % 1000 != 0 ) // // Clean up remaining operations in queue bulkInsertOp.execute(function(err,result) { console.log("Inserted some more docs which had remained in the batch queue." ); }); console.log("I'm done now!") ; } ); }); 

在上面, Stream api打破了聚合结果,以便一次处理一个文档,因为这样可以批量生成插入文件,然后批量发送到服务器,而不是一次加载所有文件。

在实际发送到服务器之前, Bulk()会一次排队尽可能多的操作。 所以在上面这种情况下,写入操作只能发送到服务器进行批量处理。 你真的可以select任何高达16MB的BSON限制,但保持可pipe理性。

在批量处理的操作之上, asynchronous库作为附加的限制器,确保在任何时候基本上不超过文档的限制。 通过确保操作等待而不是排队等待更多的事情,限制防止昂贵的“执行”呼叫。