多次调用cursor.next()会使驱动程序崩溃

动机:

我有一个架构,涉及到许多“消耗”像这样的文档的工作人员:

worker.on('readyForAnotherDoc', () => worker.consume( await cursor.next() )); 

这是一种伪代码 – 我正在检查cursor.hasNext()在真正的代码。 有数百名工作人员, cursor.next()可能会突然爆发200个请求一次。

我试图解决在mongodb node.js驱动程序中的错误/怪癖,如果太多的请求cursor.next()重叠“重叠”,导致错误。

背景:

看起来像MongoDB Node.js驱动程序没有正确处理cursor.next发出的突发请求的情况。 尝试运行这个代码:

 (async function() { // create a collection for testing: let db = await require('mongodb').MongoClient.connect('mongodb://localhost:27017/tester-db-478364'); await db.collection("test").drop(); for(let i = 0; i < 1000; i++) { await db.collection("test").insertOne({num:i, foo:'bar'}); } let cursor = await db.collection("test").find({}); async function go() { let doc = await cursor.next(); console.log(doc.num); } // start 100 simulataneous requests to `cursor.next()` for(let i = 0; i < 1000; i++) { go(); } })(); 

这是我输出的内容:

 0 1 2 3 4 5 6 7 8 9 /home/joe/Downloads/testtt/node_modules/mongodb-core/lib/connection/pool.js:410 if(workItem.socketTimeout) { ^ TypeError: Cannot read property 'socketTimeout' of null at Connection.messageHandler (/home/me/Downloads/testtt/node_modules/mongodb-core/lib/connection/pool.js:410:16) at Socket.<anonymous> (/home/me/Downloads/testtt/node_modules/mongodb-core/lib/connection/connection.js:361:20) at emitOne (events.js:115:13) at Socket.emit (events.js:210:7) at addChunk (_stream_readable.js:252:12) at readableAddChunk (_stream_readable.js:239:11) at Socket.Readable.push (_stream_readable.js:197:10) at TCP.onread (net.js:589:20) 

所以它看起来很好,直到当前的批量耗尽。 但奇怪的是,如果在.find({})之后添加.batchSize(100) .find({}) ,那么它不会修复它。 但有趣的是,如果你添加.batchSize(5) ,你会得到这个:

 0 1 2 3 4 0 1 2 3 /home/joe/Downloads/testtt/node_modules/mongodb-core/lib/connection/pool.js:410 if(workItem.socketTimeout) { ^ TypeError: Cannot read property 'socketTimeout' of null etc... 

不知道那里发生了什么…

试图解决它:

但是,让我们说,我们刚刚在此之后的解决方法。 比方说,我们稍微改变我们的function:

 let cursorBusy = false; async function go() { if(cursorBusy) await waitForCursor(); cursorBusy = true; let doc = await cursor.next(); cursorBusy = false; console.log(doc.num); } function waitForCursor() { return new Promise(resolve => { let si = setInterval(() => { if(!cursorBusy) { resolve(); clearInterval(si); } }, 50); }); } 

这导致了一个新的错误,似乎出现在console.log(doc.num)之间:

 ... 359 415 466 (node:16259) UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: 12): MongoError: clientcursor already in use? driver problem? 427 433 459 ... 

我认为这并不能避免因为一种“竞争条件”的事情与setInterval的错误。 有趣的是,这是一个不同的错误信息。

问题:有什么方法可以testing游标当前是否“忙”? 任何其他潜在的解决方法,直到这个错误被修复(如果它甚至是一个错误)?

这个问题有一些相似的(但绝对不一样)的行为, 类似的问题似乎已经出现在第三方node.js库。

您的列表中有一些错误。 所以真的只是把它清理一下:

 const MongoClient = require('mongodb').MongoClient; (async function() { let db; try { db = await MongoClient.connect('mongodb://localhost/test'); await db.collection('test').drop(); await db.collection('test').insertMany( Array(1000).fill(1).map((e,num) => ({ num, foo: 'bar' })) ); // This is not async. It returns immediately let cursor = db.collection('test').find(); async function go() { let doc = await cursor.next(); // This awaits before continuing. Not concurrent. console.log(doc.num); } for ( let i = 0; i < 100; i++ ) { go(); // Note that these "await" internally } } catch(e) { console.error(e); } finally { db.close(); } })(); 

真正防弹 ,那么你真的应该await每一个操作。 因此,在返回时添加一个Promise.resolve()await go()以获得较好的度量,并通过减less批处理大小来强制破坏条件:

 const MongoClient = require('mongodb').MongoClient; (async function() { let db; try { db = await MongoClient.connect('mongodb://localhost/test'); await db.collection('test').drop(); await db.collection('test').insertMany( Array(1000).fill(1).map((e,num) => ({ num, foo: 'bar' })) ); let cursor = db.collection('test').find().batchSize(1); async function go() { let doc = await cursor.next(); console.log(doc.num); return Promise.resolve(); } for ( let i = 0; i < 100; i++ ) { await go(); } console.log('done'); } catch(e) { console.error(e); } finally { db.close(); } })(); 

正确地按顺序打印出来。 缩短了,但实际上如预期的那样达到了99

 0 1 2 3 4 5 6 7 8 9 10 (etc..) 

解释主要是在代码的评论,你似乎错过了哪些东西是async ,哪些东西不是

所以从.find()返回一个Cursor 不是一个async方法,并立即返回。 这是因为它只是一个操作的句柄,在这一点上什么都不做。 MongoDB驱动程序(所有这些驱动程序)都不会联系服务器,也不会在该端build立游标,直到发出“获取”数据的实际请求。

当你调用.next()的时候,是有实际的通信到服务器,并返回一批“结果”。 “批”实际上只影响后续调用是否实际返回到服务器或不检索数据,即“批”可能已经具有“更多”结果,可以在另一个“批”请求之前“清空” “是做成的。 无论如何,每个对.next()调用都认为是async ,无论是否有外部I / O。

通常你用.hasNext()包装每个迭代(也是async )调用,因为在Cursor上调用.next()而没有更多的结果是错误的。 这也是一般的“循环控制”方式,如下所示:

 (async function() { let db; try { db = await MongoClient.connect('mongodb://localhost/test'); await db.collection('test').drop(); await db.collection('test').insertMany( Array(1000).fill(1).map((e,num) => ({ num, foo: 'bar' })) ); let cursor = db.collection('test').find(); async function go() { let doc = await cursor.next(); console.log(doc.num); } //for ( let i = 0; i < 100; i++ ) { while( await cursor.hasNext() ) { // Check the cursor still has results go(); } } catch(e) { console.error(e); } finally { db.close(); } })(); 

然后循环直到游标结束。

关于“并发”的注意事项也不是你所期待的。 如果你确实想要并行发出多个请求,那么你仍然需要等待一个当前的游标获取。 如果你不这样做,那么你要求服务器在所有请求中返回相同的数据,而不是在“迭代”游标中的顺序数据。

这似乎是混淆了这一点是什么一些实用function(特别是asyncEach()在实现并行“fetch”)。在代码(从内存)基本上人为插入一个setTimeout()为了等待“下一个打勾“ ,这基本上意味着每个.next()必须实际上仍然是开火。

如上所述,这是“人为的”,因为批处理只是将.map() (在底层代码中的某个地方)合并为一个更大的批处理。

但是如所certificate的。 实际上基本的预期用法确实如预期的那样工作,因为实际上“等待”每个.next() 。 就像你应该的。

编辑:虽然这个答案确实工作, 我的新答案是一个更好的解决这个问题。 离开这个繁荣的答案。 编辑2:其他答案错了:(


好吧,所以我整理了waitForCursor函数,所以它没有竞争条件的东西,因此似乎正常工作:

 let cursorBusy = false; async function go() { await waitForCursorLock(); let doc = await cursor.next(); cursorBusy = false; console.log(doc.num); } function waitForCursorLock() { return new Promise(resolve => { let si = setInterval(() => { if(!cursorBusy) { cursorBusy = true; resolve(); clearInterval(si); } }, 50); }); } 

尽pipe如此,我可能不会接受这个答案。 如果你能想出一个更好的,请张贴它!

编辑:这不起作用。 (看评论)

受@ NeilLunn解释的启发,我们需要做的就是修改原来的代码,添加一个await cursor.hasNext(); 在我们创build游标之后:

 (async function() { // create a collection for testing: let db = await require('mongodb').MongoClient.connect('mongodb://localhost:27017/tester-db-478364'); await db.collection("test").drop(); await db.collection('test').insertMany( Array(1000).fill(1).map((e,num) => ({ num, foo: 'bar' })) ); let cursor = db.collection("test").find({}); await cursor.hasNext(); // <-- add this line to "pre-instantiate" cursor async function go() { let doc = await cursor.next(); console.log(doc.num); } // start 100 simulataneous requests to `cursor.next()` for(let i = 0; i < 100; i++) { go(); } })(); 

这是因为游标在被使用之前 (例如.next.hasNext等)没有被实例化,并且驱动似乎有一个怪异的地方,它不检查游标是否已经被实例化,所以它最终会发送很多请求到服务器,这可能会导致错误(也许连接太多)。 我们通过调用一个“无用的” await cursor.hasNext()来解决这个问题,所以在实例化游标的时候,我们可以像cursor.next一样对cursor.next多次“并发”调用。

因此,尽pipe在这个线程中进行了其他讨论,但似乎可以按照您喜欢的方式(并行)尽可能快地调用cursor.next而不会出现错误或exception行为,只要您首先触发(并等待)游标实例。

理想情况下,驱动程序只是检查游标初始化是否已经发生,并等待完成,但也许有一些奇怪的技术原因,为什么不能/不会完成。