Mongoose QueryStream.pause()不暂停?

节点v.4.2.3和mongoosev.4.3.6

我必须遍历一个大的(> 10k文档)集合,并处理每个文档。

阅读关于如何处理这种迭代的文档,我偶然发现了QueryStream,我认为这将解决我所有的问题。

function progress(total, t, current) { process.stdout.clearLine(); // clear current text process.stdout.write(Math.round(t / total * 100) + '% ' + t + ' / ' + total + ' ' + current); process.stdout.cursorTo(0); } function loadBalance(current, stream) { if(!stream.paused && current > 50) { log('DEBUG', 'loadBalance', 'pause'); stream.pause(); } else if (stream.paused && current < 10) { log('DEBUG', 'loadBalance', 'resume'); stream.resume(); } } var total = 0, error = 0, goods = 0, current = 0; stream = Raw.find().stream(); stream.on('data', function (doc) { heavyProcess(doc, function (err, refined) { current = current + 1; loadBalance(current, stream); printP(total, goods + error, current); if(err) { error = error + 1; current = current - 1; loadBalance(current, stream); } else { new Pure(refined).save(function (err) { if(err) { error = error + 1; current = current - 1; loadBalance(current, stream); } else { goods = goods + 1; current = current - 1; loadBalance(current, stream); } }); } }); }).on('error', function (err) { log('ERROR', 'stream', err); }).on('close', function () { log('INFO', 'end', goods + ' / ' + total + ' ( ' + (goods/total*100) + '%) OK_'); log('INFO', 'end', error + ' / ' + total + ' ( ' + (error/total*100) + '%) NOK'); log('INFO', 'end', (total - goods - error) + ' missing'); }); 

loadBalance不会被调用,并打印它暂停stream,但'data'事件继续被解雇,甚至认为stream.paused返回true。

我误解了pause()做了什么? 还是我误用QueryStream?

Mongoose查询stream是v1stream。 在文档中称为Node 0.8 ReadStream( http://mongoosejs.com/docs/api.html#querystream_QueryStream

这意味着暂停事件是“咨询” https://nodejs.org/api/stream.html#stream_compatibility_with_older_node_js_versions

这里的提示意味着在暂停之后,一些数据事件仍然会泄漏。
这与底层streamcaching有关,并且是正确的streamv1行为。
您将不得不使用调用暂停后生成的任何数据事件。 从开发人员的angular度来看,这种行为当然不是最佳的,这就是为什么它在streamv2中更改( https://nodejs.org/en/blog/feature/streams2/

这是一个关于v2查询stream的mongoogejs问题,我不认为有什么计划可以很快实现v2查询stream。
https://github.com/Automattic/mongoose/issues/1907

引用这个问题,这可能是一个解决你的问题的方法:

 var readStream = (new stream.Readable({ objectMode: true })).wrap(Model.find({}).stream()); 

所以,真正的问题不在我发布的代码中,而是在Model中生成。

我使用了一个新的连接,原始链接到它,也开始链接到它,但在最后一刻链接到默认mongoose连接:

 db = mongoose.createConnection('mongodb://127.0.0.1/SNCF'); //Creer la connexion a mongodb db.on('error', console.error.bind(console, 'connection error:')); db.once('open', function () { //Une fois connecte raw = new mongoose.Schema( { //... }, { strict: true, collection: 'Raw' } ); Raw = db.model('Raw', raw, 'Raw'); //<--- OK pure = new mongoose.Schema( { //... }, { strict: true, collection: 'Pure' } ); Pure = mongoose.model('Pure', pure, 'Pure'); //<-- ERROR }); 

所以没有保存纯文档,内存爆炸,而CPU正常工作。

将错误的行更改为Pure = db.model('Pure', pure, 'Pure'); 解决了这个问题,我甚至不必暂停这个stream。