为数组的每个元素发出查询

我目前正在查询我的mondo数据库在一个集合中返回一个数组的URL的数组。 然后,我想使用该数组来通过另一个集合,并find前一个查询的返回数组中的每个元素的匹配元素。 在数组上使用forEach并进行个别查询是否正确? 我的代码看起来像这样,第一个函数getUrls很好。 目前我得到的错误是:

(节点:10754)UnhandledPromiseRejectionWarning:未处理的承诺拒绝(拒绝ID:1):TypeError:无法读取未定义的属性“限制”(节点:10754)[DEP0018]弃用警告:弃用未处理的承诺拒绝。 将来,未处理的承诺拒绝将使用非零退出代码来终止Node.js进程。

async function useUrls () { let domains = await getUrls() let db = await mongo.connect("mongodb://35.185.206.31:80/lc_data") let results = [] domains.forEach( domain =>{ let query = {"$match": {"email_domain": domain} } let cursor = db.collection('circleback') .aggregate([query], (err, data) =>{ if(err) throw err; console.log("cb", data) }).limit(1100) }) 

如上所述,问题中的代码存在一些问题,其中大部分问题可以通过查看本答复末尾提供的完整样本列表来解决。 你在本质上要求的是“Top-N结果”问题的一个变种,对此,有几种方法可以“实际”处理这个问题。

所以排名从“最差”到“最好”:

聚合$切片

因此,而不是“循环”你的函数的结果,你可以交替使用$in提供所有的结果到查询。 这减轻了“循环投入”的需要,但这里需要的另一件事是“最高N的产出”。

在MongoDB中确实没有一个“稳定”的机制,但是如果它对于给定的集合的大小是合理的,那么事实上你可以简单地在你的“distinct”键上匹配所提供的$in参数$group ,然后$push所有文档$push送到一个数组中,然后$slice结果:

 let results = await db.collection('circleback').aggregate([ { "$match": { "email_domain": { "$in": domains } } }, { "$group": { "_id": "$email_domain", "docs": { "$push": "$$ROOT" } }}, { "$sort": { "_id": 1 } }, { "$addFields": { "docs": { "$slice": [ "$docs", 0, 1100 ] } } } ]).toArray(); 

这里的“更广泛的”问题是,MongoDB无法“限制”初始$push的数组内容。 而这实际上正在等待一个长期悬而未决的问题。 SERVER-9377 。

所以虽然我们可以在理论上做这样的操作,但是由于16MB的BSON限制通常限制了“初始”数组的大小,即使$slice结果确实会低于这个限制,也经常是不实际的。

串行循环执行asynchronous/等待

你的代码显示你在这个环境下运行,所以我build议你实际使用它。 只需await来自源的每个循环迭代:

 let results = []; for ( let domain of domains ) { results = results.concat( await db.collection('circleback').find({ "email_domain": domain }) .limit(1100).toArray() ); } 

简单的函数允许您执行此操作,例如通过.toArray().concat()的标准游标结果作为数组返回,然后使用.concat()与之前的结果数组进行连接。

这很简单有效,但我们可以做得更好一点

并发执行asynchronous方法

因此,不是使用“循环”,而是await每个被调用的asynchronous函数,而是可以同时执行它们全部(或至less“最”)。 这实际上就是问题中提出的问题的一部分,因为没有任何内容实际上“等待”循环迭代。

我们可以使用Promise.all()来有效地做到这一点,但是如果它实际上是一个“非常大”的并发运行的承诺数量,这将遇到与超过调用堆栈的经验相同的问题。

为了避免这种情况,我们仍然可以使用Bluebird承诺的Promise.map() 。 这有一个“并发限制器”选项,只允许指定数量的操作同时进行:

 let results = [].concat.apply([], await Promise.map(domains, domain => db.collection('circleback').find({ "email_domain": domain }) .limit(1100).toArray() ,{ concurrency: 10 }) ); 

事实上,你甚至应该能够使用像Bluebird这样的库来承诺将.map()函数“插入”其他任何返回Promise东西,例如返回"domains"列表的"domains" source”函数。 那么你可以像后面的例子中所示的那样“链接”。

未来的MongoDB

未来的MongoDB版本(从MongoDB 3.6开始)实际上有一个新的“非相关”forms的$lookup ,这里允许一个特殊情况。 因此,回到原始聚合的例子,我们可以得到每个匹配键的“独特”值,然后用"pipeline"参数进行$lookup ,然后允许将$limit应用于结果。

 let results = await db.collection('circleback').aggregate([ { "$match": { "email_domain": { "$in": domains } } }, { "$group": { "_id": "$email_domain" }}, { "$sort": { "_id": 1 } }, { "$lookup": { "from": "circleback", "let": { "domain": "$_id" }, "pipeline": [ { "$redact": { "$cond": { "if": { "$eq": [ "$email_domain", "$$domain" ] }, "then": "$$KEEP", "else": "$$PRUNE" } }}, { "$limit": 1100 } ], "as": "docs" }} ]).toArray(); 

这将始终保持在16MB的BSON限制之下,当然假设$in的参数允许成为这种情况。

示例列表

作为完整的示例列表,您可以运行,并且通常会随着默认数据集的创build而故意相当大。 它演示了上面描述的所有技术以及遵循的一些常规使用模式。

 const mongodb = require('mongodb'), Promise = require('bluebird'), MongoClient = mongodb.MongoClient, Logger = mongodb.Logger; const uri = 'mongodb://localhost/bigpara'; function log(data) { console.log(JSON.stringify(data,undefined,2)) } (async function() { let db; try { db = await MongoClient.connect(uri,{ promiseLibrary: Promise }); Logger.setLevel('info'); let source = db.collection('source'); let data = db.collection('data'); // Clean collections await Promise.all( [source,data].map( coll => coll.remove({}) ) ); // Create some data to work with await source.insertMany( Array.apply([],Array(500)).map((e,i) => ({ item: i+1 })) ); let ops = []; for (let i=1; i <= 10000; i++) { ops.push({ item: Math.floor(Math.random() * 500) + 1, index: i, amount: Math.floor(Math.random() * (200 - 100 + 1)) + 100 }); if ( i % 1000 === 0 ) { await data.insertMany(ops,{ ordered: false }); ops = []; } } /* Fetch 5 and 5 example * * Note that the async method to supply to $in is a simulation * of any real source that is returning an array * * Not the best since it means ALL documents go into the array * for the selection. Then you $slice off only what you need. */ console.log('\nAggregate $in Example'); await (async function(source,data) { let results = await data.aggregate([ { "$match": { "item": { "$in": (await source.find().limit(5).toArray()).map(d => d.item) } }}, { "$group": { "_id": "$item", "docs": { "$push": "$$ROOT" } }}, { "$addFields": { "docs": { "$slice": [ "$docs", 0, 5 ] } }}, { "$sort": { "_id": 1 } } ]).toArray(); log(results); })(source,data); /* * Fetch 10 by 2 example * * Much better usage of concurrent processes and only get's * what is needed. But it is actually 1 request per item */ console.log('\nPromise.map concurrency example'); await (async function(source,data) { let results = [].concat.apply([], await source.find().limit(10).toArray().map(d => data.find({ item: d.item }).limit(2).toArray() ,{ concurrency: 5 }) ); log(results); })(source,data); /* * Plain loop async/await serial example * * Still one request per item, requests are serial * and therefore take longer to complete than concurrent */ console.log('\nasync/await serial loop'); await (async function(source,data) { let items = (await source.find().limit(10).toArray()); let results = []; for ( item of items ) { results = results.concat( await data.find({ item: item.item }).limit(2).toArray() ); } log(results); })(source,data); /* * Non-Correlated $lookup example * * Uses aggregate to get the "distinct" matching results and then does * a $lookup operation to retrive the matching documents to the * specified $limit * * Typically not as efficient as the concurrent example, but does * actually run completely on the server, and does not require * additional connections. * */ let version = (await db.db('admin').command({'buildinfo': 1})).version; if ( version >= "3.5" ) { console.log('\nNon-Correlated $lookup example $limit') await (async function(source,data) { let items = (await source.find().limit(5).toArray()).map(d => d.item); let results = await data.aggregate([ { "$match": { "item": { "$in": items } } }, { "$group": { "_id": "$item" } }, { "$sort": { "_id": 1 } }, { "$lookup": { "from": "data", "let": { "itemId": "$_id", }, "pipeline": [ { "$redact": { "$cond": { "if": { "$eq": [ "$item", "$$itemId" ] }, "then": "$$KEEP", "else": "$$PRUNE" } }}, { "$limit": 5 } ], "as": "docs" }} ]).toArray(); log(results); })(source,data); } else { console.log('\nSkipped Non-Correlated $lookup demo'); } } catch(e) { console.error(e); } finally { db.close(); } })();