如何使用聚合在mongo中根据子字段查找父母?

这里是我有一个代码:

const _ = require('lodash') const Box = require('./models/Box') const boxesToBePicked = await Box.find({ status: 'ready', client: 27 }) const boxesOriginalIds = _(boxesToBePicked).map('original').compact().uniq().value() const boxesOriginal = boxesOriginalIds.length ? await Box.find({ _id: { $in: boxesOriginalIds } }) : [] const attributes = ['name'] const boxes = [ ...boxesOriginal, ...boxesToBePicked.filter(box => !box.original) ].map(box => _.pick(box, attributes)) 

比方说,我们在“箱子”集合中有以下数据:

 [ { _id: 1, name: 'Original Box #1', status: 'pending' }, { _id: 2, name: 'Nested box', status: 'ready', original: 1 }, { _id: 3, name: 'Nested box', status: 'ready', original: 1 }, { _id: 4, name: 'Nested box', status: 'pending', original: 1 }, { _id: 5, name: 'Original Box #2', status: 'ready' }, { _id: 6, name: 'Original Box #3', status: 'pending' }, { _id: 7, name: 'Nested box', status: 'ready', original: 6 }, { _id: 8, name: 'Original Box #4', status: 'pending' } ] 

工作stream程

find所有准备好被选中的框:

 const boxesToBePicked = await Box.find({ status: 'ready' }) // Returns: [ { _id: 2, name: 'Nested box', status: 'ready', original: 1 }, { _id: 3, name: 'Nested box', status: 'ready', original: 1 }, { _id: 5, name: 'Original Box #2', status: 'ready' }, { _id: 7, name: 'Nested box', status: 'ready', original: 6 } ] 

获得这些原始(父)框的所有ID:

 const boxesOriginalIds = _(boxesToBePicked).map('original').compact().uniq().value() // Returns: [1, 6] 

通过他们的ID获取这些框:

 const boxesOriginal = boxesOriginalIds.length ? await Box.find({ _id: { $in: boxesOriginalIds } }) : [] // Returns [ { _id: 1, name: 'Original Box #1', status: 'pending' }, { _id: 6, name: 'Original Box #3', status: 'pending' } ] 

join那些没有嵌套框的框被挑选:

 const boxes = [ ...boxesOriginal, ...boxesToBePicked.filter(box => !box.original) ].map(box => _.pick(box, attributes)) // Returns [ { name: 'Original Box #1' }, { name: 'Original Box #3' }, { name: 'Original Box #2' } ] 

所以基本上我们在这里做的是获得所有的原始框,如果他们至less有一个状态为“就绪”的嵌套框,并且所有没有嵌套状态为“就绪”的框。

我认为可以通过使用聚合pipe道和投影来简化。 但是,如何?

你可以尝试下面的东西。 使用$ lookUp将自我join到collection中,$ match阶段与$或者与$结合使用,对于第二个条件和$的下一个部分,或者对于第一个条件和$ group阶段,删除重复项和$ project阶段以格式化响应。

 db.boxes.aggregate([{ $lookup: { from: "boxes", localField: "original", foreignField: "_id", as: "nested_orders" } }, { $unwind: { path: "$nested_orders", preserveNullAndEmptyArrays: true } }, { $match: { $or: [{ $and: [{ "status": "ready" }, { "nested_orders": { $exists: false, } }] }, { "nested_orders.status": "pending" }] } }, { $group: { "_id": null, "names": { $addToSet: { name: "$name", nested_name: "$nested_orders.name" } } } }, { $unwind: "$names" }, { $project: { "_id": 0, "name": { $ifNull: ['$names.nested_name', '$names.name'] } } }]).pretty(); 

示例响应

 { "name" : "Original Box #1" } { "name" : "Original Box #2" } { "name" : "Original Box #3" } 

几个答案是接近,但这是最有效的方法。 它累积要拾取的盒子的“_id”值,然后使用$lookup来“补充”每个(顶级)盒子的全部细节。

 db.boxes.aggregate( {$group: { _id:null, boxes:{$addToSet:{$cond:{ if:{$eq:["$status","ready"]}, then:{$ifNull:["$original","$_id"]}, else:null }}} }}, {$lookup: { from:"boxes", localField:"boxes", foreignField:"_id", as:"boxes" }} ) 

您的结果基于样本数据:

 { "_id" : null, "boxIdsToPickUp" : [ { "_id" : 1, "name" : "Original Box #1", "status" : "pending" }, { "_id" : 5, "name" : "Original Box #2", "status" : "ready" }, { "_id" : 6, "name" : "Original Box #3", "status" : "pending" } ] } 

请注意, $lookup仅仅是为了拾取盒子的_id值而完成的,这比所有盒子的效率要高得多。

如果你想要高效的pipe道,你需要在嵌套的盒子文档中存储更多关于原始盒子的细节(比如它的名字)。

要达到您的目标,您可以按照以下步骤操作:

  1. 首先select状态logging已准备就绪 (因为你想获得没有嵌套框的父母,但是状态已经准备好 ,谁的嵌套框至less有一个已经准备好了

  2. 使用$lookup查找父框

  3. 然后$group获得唯一的父框

  4. 然后$project框名称

所以可以试试这个查询:

 db.getCollection('boxes').aggregate( {$match:{"status":'ready'}}, {$lookup: {from: "boxes", localField: "original", foreignField: "_id", as: "parent"}}, {$unwind: {path: "$parent",preserveNullAndEmptyArrays: true}}, {$group:{ _id:null, list:{$addToSet:{"$cond": [ { "$ifNull": ["$parent.name", false] }, {name:"$parent.name"}, {name:"$name"} ]}} } }, {$project:{name:"$list.name", _id:0}}, {$unwind: "$name"} ) 

要么

  1. 获得状态准备就绪
  2. 获得所需的recordID
  3. 根据recordID获取名称
 db.getCollection('boxes').aggregate( {$match:{"status":'ready'}}, {$group:{ _id:null, parent:{$addToSet:{"$cond": [ { "$ifNull": ["$original", false] }, "$original", "$_id" ]}} } }, {$unwind:"$parent"}, {$lookup: {from: "boxes", localField: "parent", foreignField: "_id", as: "parent"}}, {$project: {"name" : { $arrayElemAt: [ "$parent.name", 0 ] }, _id:0}} ) 

分解聚合:

  • $group创build

    • 匹配ready status的数组ids ,它将添加*original
    • 一个与ready status匹配的数组box_ready ,并保持其他字段原样(稍后将使用)
    • 包含整个原始文档( $$ROOT )的数组document

       { $group: { _id: null, ids: { $addToSet: { $cond: [ { $eq: ["$status", "ready"] }, "$original", null ] } }, box_ready: { $addToSet: { $cond: [ { $eq: ["$status", "ready"] }, { _id: "$_id", name: "$name", original: "$original", status: "$status" }, null ] } }, document: { $push: "$$ROOT" } } } 
  • $unwind文档字段以删除数组

     { $unwind: "$document" } 
  • 使用$redact聚合来保留或删除基于前面创build的数组ids$document._id (包含匹配的originalstatus

     { $redact: { "$cond": { "if": { "$setIsSubset": [{ "$map": { "input": { "$literal": ["A"] }, "as": "a", "in": "$document._id" } }, "$ids" ] }, "then": "$$KEEP", "else": "$$PRUNE" } } } 
  • $group将所有匹配前一个$redact文档推送到另一个名为filtered数组(我们现在有两个数组可以联合使用)

     { $group: { _id: null, box_ready: { $first: "$box_ready" }, filtered: { $push: "$document" } } } 
  • 使用setUnion $project来联合数组box_readyfiltered

     { $project: { union: { $setUnion: ["$box_ready", "$filtered"] }, _id: 0 } } 
  • $unwind你已经获得的数组以获得不同的logging

     { $unwind: "$union" } 
  • $match$match那些original缺less的和非空的(因为最初的状态:就绪状态必须在第一个$group上得到一个空值

     { $match: { "union.original": { "$exists": false }, "union": { $nin: [null] } } } 

整个聚合查询是:

 db.collection.aggregate( [{ $group: { _id: null, ids: { $addToSet: { $cond: [ { $eq: ["$status", "ready"] }, "$original", null ] } }, box_ready: { $addToSet: { $cond: [ { $eq: ["$status", "ready"] }, { _id: "$_id", name: "$name", original: "$original", status: "$status" }, null ] } }, document: { $push: "$$ROOT" } } }, { $unwind: "$document" }, { $redact: { "$cond": { "if": { "$setIsSubset": [{ "$map": { "input": { "$literal": ["A"] }, "as": "a", "in": "$document._id" } }, "$ids" ] }, "then": "$$KEEP", "else": "$$PRUNE" } } }, { $group: { _id: null, box_ready: { $first: "$box_ready" }, filtered: { $push: "$document" } } }, { $project: { union: { $setUnion: ["$box_ready", "$filtered"] }, _id: 0 } }, { $unwind: "$union" }, { $match: { "union.original": { "$exists": false }, "union": { $nin: [null] } } }] ) 

它给你:

 { "union" : { "_id" : 1, "name" : "Original Box #1", "status" : "pending" } } { "union" : { "_id" : 5, "name" : "Original Box #2", "status" : "ready" } } { "union" : { "_id" : 6, "name" : "Original Box #3", "status" : "pending" } } 

如果您想select特定的字段,请使用额外的$project

对于mongoose ,你应该可以这样做来执行聚合:

 Box.aggregate([ //the whole aggregation here ], function(err, result) { }); 

使用mongoose(4.x)

架构:

 var schema = mongoose.Schema({ _id: Number, .... status: String, original: { type: Number, ref: 'Box'} }); var Box = mongoose.model('Box', schema); 

实际查询:

 Box .find({ status: 'ready' }) .populate('original') .exec((err, boxes) => { if (err) return; boxes = boxes.map((b) => b.original ? b.original : b); boxes = _.uniqBy(boxes, '_id'); console.log(boxes); }); 

Docs on Mongoose#populate: http : //mongoosejs.com/docs/populate.html