mongodbstream查询与through2.spy结束不被调用

我有一个来自mongo的stream式查询,我正在将它传输到一个通过“间谍”可写入stream。 它完全包含了5个文档的小集合的“结束”callback。 但是,如果收集了大量的344个文件,只有前15个文件才能通过,那么这个文件永远挂起来,而“结束”事件永远不会发生。 这是一个MCVE:

var spy = require("through2-spy").obj; var MongoClient = require("mongodb").MongoClient; function getStream() { var stream = spy(function() { console.log("@bug counting", stream.total++); }); stream.total = 0; return stream; } function onEnd() { console.log("ended"); } MongoClient.connect(process.argv[2], function(error, db) { if (error) { console.error(error); return; } var stream = db.collection(process.argv[3]).find().stream(); stream // behavior is the same with the follow line commented out or not .on("end", db.close.bind(db)) .on("error", console.error) .on("end", onEnd) .pipe(getStream()); }); 

问题在于, through2-spy默认使用16的highWaterMark 。为了处理stream量控制,stream保持内部缓冲区,当数据从中被消耗时被清除。 由于getStream返回的变换stream中没有可读stream消耗数据,因此内部缓冲区将被填充并到达highWaterMark 。 增加highWaterMark应该解决它:

 var stream = spy({highWaterMark: 350}, function() { console.log("@bug counting", stream.total++); }); 

另一个非标准的select是重置变换stream的可读状态:

 var stream = spy(function() { console.log("@bug counting", stream.total++); this._readableState.length = 0; }); 

解决这个问题的另一个方法是确保下游有一些东西能够完整地读取上游来源。 我结束了添加一个额外的.pipe(terminus.devnull({objectMode: true});到我的stream的末尾,这也做了伎俩。

 var MongoClient = require("mongodb").MongoClient; var spy = require("through2-spy").obj; var terminus = require("terminus"); function getStream() { var stream = spy(function() { console.log("@bug counting", stream.total++); }); stream.total = 0; return stream; } function onEnd() { console.log("ended"); } MongoClient.connect(process.argv[2], function(error, db) { if (error) { console.error(error); return; } var stream = db.collection(process.argv[3]).find().stream(); stream // behavior is the same with the follow line commented out or not .on("end", db.close.bind(db)) .on("error", console.error) .on("end", onEnd) .pipe(getStream()) .pipe(terminus.devnull({objectMode: true})); });