多次调用cursor.next()会使驱动程序崩溃
动机:
我有一个架构,涉及到许多“消耗”像这样的文档的工作人员:
worker.on('readyForAnotherDoc', () => worker.consume( await cursor.next() ));
这是一种伪代码 – 我正在检查cursor.hasNext()
在真正的代码。 有数百名工作人员, cursor.next()
可能会突然爆发200个请求一次。
我试图解决在mongodb node.js驱动程序中的错误/怪癖,如果太多的请求cursor.next()
重叠“重叠”,导致错误。
背景:
看起来像MongoDB Node.js驱动程序没有正确处理cursor.next
发出的突发请求的情况。 尝试运行这个代码:
(async function() { // create a collection for testing: let db = await require('mongodb').MongoClient.connect('mongodb://localhost:27017/tester-db-478364'); await db.collection("test").drop(); for(let i = 0; i < 1000; i++) { await db.collection("test").insertOne({num:i, foo:'bar'}); } let cursor = await db.collection("test").find({}); async function go() { let doc = await cursor.next(); console.log(doc.num); } // start 100 simulataneous requests to `cursor.next()` for(let i = 0; i < 1000; i++) { go(); } })();
这是我输出的内容:
0 1 2 3 4 5 6 7 8 9 /home/joe/Downloads/testtt/node_modules/mongodb-core/lib/connection/pool.js:410 if(workItem.socketTimeout) { ^ TypeError: Cannot read property 'socketTimeout' of null at Connection.messageHandler (/home/me/Downloads/testtt/node_modules/mongodb-core/lib/connection/pool.js:410:16) at Socket.<anonymous> (/home/me/Downloads/testtt/node_modules/mongodb-core/lib/connection/connection.js:361:20) at emitOne (events.js:115:13) at Socket.emit (events.js:210:7) at addChunk (_stream_readable.js:252:12) at readableAddChunk (_stream_readable.js:239:11) at Socket.Readable.push (_stream_readable.js:197:10) at TCP.onread (net.js:589:20)
所以它看起来很好,直到当前的批量耗尽。 但奇怪的是,如果在.find({})
之后添加.batchSize(100)
.find({})
,那么它不会修复它。 但有趣的是,如果你添加.batchSize(5)
,你会得到这个:
0 1 2 3 4 0 1 2 3 /home/joe/Downloads/testtt/node_modules/mongodb-core/lib/connection/pool.js:410 if(workItem.socketTimeout) { ^ TypeError: Cannot read property 'socketTimeout' of null etc...
不知道那里发生了什么…
试图解决它:
但是,让我们说,我们刚刚在此之后的解决方法。 比方说,我们稍微改变我们的function:
let cursorBusy = false; async function go() { if(cursorBusy) await waitForCursor(); cursorBusy = true; let doc = await cursor.next(); cursorBusy = false; console.log(doc.num); } function waitForCursor() { return new Promise(resolve => { let si = setInterval(() => { if(!cursorBusy) { resolve(); clearInterval(si); } }, 50); }); }
这导致了一个新的错误,似乎出现在console.log(doc.num)
之间:
... 359 415 466 (node:16259) UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: 12): MongoError: clientcursor already in use? driver problem? 427 433 459 ...
我认为这并不能避免因为一种“竞争条件”的事情与setInterval的错误。 有趣的是,这是一个不同的错误信息。
问题:有什么方法可以testing游标当前是否“忙”? 任何其他潜在的解决方法,直到这个错误被修复(如果它甚至是一个错误)?
这个问题有一些相似的(但绝对不一样)的行为, 类似的问题似乎已经出现在第三方node.js库。
您的列表中有一些错误。 所以真的只是把它清理一下:
const MongoClient = require('mongodb').MongoClient; (async function() { let db; try { db = await MongoClient.connect('mongodb://localhost/test'); await db.collection('test').drop(); await db.collection('test').insertMany( Array(1000).fill(1).map((e,num) => ({ num, foo: 'bar' })) ); // This is not async. It returns immediately let cursor = db.collection('test').find(); async function go() { let doc = await cursor.next(); // This awaits before continuing. Not concurrent. console.log(doc.num); } for ( let i = 0; i < 100; i++ ) { go(); // Note that these "await" internally } } catch(e) { console.error(e); } finally { db.close(); } })();
要真正防弹 ,那么你真的应该await
每一个操作。 因此,在返回时添加一个Promise.resolve()
并await go()
以获得较好的度量,并通过减less批处理大小来强制破坏条件:
const MongoClient = require('mongodb').MongoClient; (async function() { let db; try { db = await MongoClient.connect('mongodb://localhost/test'); await db.collection('test').drop(); await db.collection('test').insertMany( Array(1000).fill(1).map((e,num) => ({ num, foo: 'bar' })) ); let cursor = db.collection('test').find().batchSize(1); async function go() { let doc = await cursor.next(); console.log(doc.num); return Promise.resolve(); } for ( let i = 0; i < 100; i++ ) { await go(); } console.log('done'); } catch(e) { console.error(e); } finally { db.close(); } })();
正确地按顺序打印出来。 缩短了,但实际上如预期的那样达到了99
:
0 1 2 3 4 5 6 7 8 9 10 (etc..)
解释主要是在代码的评论,你似乎错过了哪些东西是async
,哪些东西不是 。
所以从.find()
返回一个Cursor
不是一个async
方法,并立即返回。 这是因为它只是一个操作的句柄,在这一点上什么都不做。 MongoDB驱动程序(所有这些驱动程序)都不会联系服务器,也不会在该端build立游标,直到发出“获取”数据的实际请求。
当你调用.next()
的时候,是有实际的通信到服务器,并返回一批“结果”。 “批”实际上只影响后续调用是否实际返回到服务器或不检索数据,即“批”可能已经具有“更多”结果,可以在另一个“批”请求之前“清空” “是做成的。 无论如何,每个对.next()
调用都被认为是async
,无论是否有外部I / O。
通常你用.hasNext()
包装每个迭代(也是async
)调用,因为在Cursor
上调用.next()
而没有更多的结果是错误的。 这也是一般的“循环控制”方式,如下所示:
(async function() { let db; try { db = await MongoClient.connect('mongodb://localhost/test'); await db.collection('test').drop(); await db.collection('test').insertMany( Array(1000).fill(1).map((e,num) => ({ num, foo: 'bar' })) ); let cursor = db.collection('test').find(); async function go() { let doc = await cursor.next(); console.log(doc.num); } //for ( let i = 0; i < 100; i++ ) { while( await cursor.hasNext() ) { // Check the cursor still has results go(); } } catch(e) { console.error(e); } finally { db.close(); } })();
然后循环直到游标结束。
关于“并发”的注意事项也不是你所期待的。 如果你确实想要并行发出多个请求,那么你仍然需要等待一个当前的游标获取。 如果你不这样做,那么你要求服务器在所有请求中返回相同的数据,而不是在“迭代”游标中的顺序数据。
这似乎是混淆了这一点是什么一些实用function(特别是asyncEach()
在实现并行“fetch”)。在代码(从内存)基本上人为插入一个setTimeout()
为了等待“下一个打勾“ ,这基本上意味着每个.next()
必须实际上仍然是开火。
如上所述,这是“人为的”,因为批处理只是将.map()
(在底层代码中的某个地方)合并为一个更大的批处理。
但是如所certificate的。 实际上基本的预期用法确实如预期的那样工作,因为实际上“等待”每个.next()
。 就像你应该的。
编辑:虽然这个答案确实工作, 我的新答案是一个更好的解决这个问题。 离开这个繁荣的答案。 编辑2:其他答案错了:(
好吧,所以我整理了waitForCursor
函数,所以它没有竞争条件的东西,因此似乎正常工作:
let cursorBusy = false; async function go() { await waitForCursorLock(); let doc = await cursor.next(); cursorBusy = false; console.log(doc.num); } function waitForCursorLock() { return new Promise(resolve => { let si = setInterval(() => { if(!cursorBusy) { cursorBusy = true; resolve(); clearInterval(si); } }, 50); }); }
尽pipe如此,我可能不会接受这个答案。 如果你能想出一个更好的,请张贴它!
编辑:这不起作用。 (看评论)
受@ NeilLunn解释的启发,我们需要做的就是修改原来的代码,添加一个await cursor.hasNext();
在我们创build游标之后:
(async function() { // create a collection for testing: let db = await require('mongodb').MongoClient.connect('mongodb://localhost:27017/tester-db-478364'); await db.collection("test").drop(); await db.collection('test').insertMany( Array(1000).fill(1).map((e,num) => ({ num, foo: 'bar' })) ); let cursor = db.collection("test").find({}); await cursor.hasNext(); // <-- add this line to "pre-instantiate" cursor async function go() { let doc = await cursor.next(); console.log(doc.num); } // start 100 simulataneous requests to `cursor.next()` for(let i = 0; i < 100; i++) { go(); } })();
这是因为游标在被使用之前 (例如.next
, .hasNext
等)没有被实例化,并且驱动似乎有一个怪异的地方,它不检查游标是否已经被实例化,所以它最终会发送很多请求到服务器,这可能会导致错误(也许连接太多)。 我们通过调用一个“无用的” await cursor.hasNext()
来解决这个问题,所以在实例化游标的时候,我们可以像cursor.next
一样对cursor.next
多次“并发”调用。
因此,尽pipe在这个线程中进行了其他讨论,但似乎可以按照您喜欢的方式(并行)尽可能快地调用cursor.next
,而不会出现错误或exception行为,只要您首先触发(并等待)游标实例。
理想情况下,驱动程序只是检查游标初始化是否已经发生,并等待完成,但也许有一些奇怪的技术原因,为什么不能/不会完成。
- 环回的跳过和限制filter不与filter一起使用
- 我如何使用Elastic Beanstalk创build一个“tmp”目录?
- 可以编写道具来覆盖react-redux中的connect()吗?
- 自动将index.html的引用复制到karma.conf.js中
- 将async.eachLimit转换为promise
- 使用Dalekjstesting工具,如何在Option标签中没有“value”属性时selectDropdown(Select element)中的Option?
- $ npm脚本中的PWD
- 如何在testing过程中存储node.js内置的fs?
- Node.js:对象在响应对象中转换为undefined