在快速stream读取stream中平衡缓慢的I / O

在node.js中我有一个读取stream,我希望重新格式化并写入数据库。 由于读取stream速度快且写入速度慢,因此随着写入队列的build立(假定stream为gb的数据),node.js队列可能会不堪重负。 如何强制读取等待代码的写入部分,所以这不会发生阻塞?

var request = http.get({ host: 'api.geonames.org', port: 80, path: '/children?' + qs.stringify({ geonameId: geonameId, username: "demo" }) }).on('response', function(response) { response.setEncoding('utf8'); var xml = new XmlStream(response, 'utf8'); xml.on('endElement: geoname ', function(input) { console.log('geoname'); var output = new Object(); output.Name = input.name; output.lat = input.lat; output.lng = input.lng; output._key = input.geonameId; data.db.document.create(output, data.doc, function(callback){ //this is really slow. } // i do not want to return from here and receive more data until the 'create' above has completed }); }); 

我昨天晚上遇到这个问题,在我的黑客马拉松引起睡眠不足的状态,这是我如何解决它:

每当我发出一个工作要处理的时候,我会增加一个计数器,并且在操作完成时递减计数器。 为了防止出站stream量压倒其他服务,当有一定数量的待处理出站请求时,我会暂停stream。 代码非常类似于以下内容。

 var instream = fs.createReadStream('./combined.csv'); var outstream = new stream; var inProcess = 0; var paused = false; var rl = readline.createInterface(instream, outstream); rl.on('line', function(line) { inProcess++; if(inProcess > 100) { console.log('pausing input to clear queue'); rl.pause(); paused = true; } someService.doSomethingSlow(line, function() { inProcess--; if(paused && inProcess < 10) { console.log('resuming stream'); paused = false; rl.resume(); } if (err) throw err; }); }); rl.on('end', function() { rl.close(); }); 

不是最优雅的解决scheme,但它的工作,并允许我处理数百万行,而不会耗尽内存或限制其他服务。

我的解决scheme只是扩展一个空的stream.Writable和基本相同的@ Timothy的,但使用事件,不依赖于Streams1 .pause().resume() (这似乎没有任何影响我的数据pipe道,反正)。

 var stream = require("stream"); var liveRequests = 0; var maxLiveRequests = 100; var streamPaused = false; var requestClient = new stream.Writable(); function requestCompleted(){ liveRequests--; if(streamPaused && liveRequests < maxLiveRequests){ streamPaused = false; requestClient.emit("resumeStream"); } } requestClient._write = function (data, enc, next){ makeRequest(data, requestCompleted); liveRequests++; if(liveRequests >= maxLiveRequests){ streamPaused = true; requestClient.once("resumeStream", function resume(){ next(); }); } else { next(); } }; 

一个计数器liveRequests跟踪并发请求的数量,并且每当makeRequest()被调用时递增,当它完成时递减(即,当requestCompleted() )被调用时)。 如果一个请求刚刚完成, liveRequests超过maxLiveRequests ,我们用maxLiveRequests暂停stream。 如果请求完成,stream暂停,并且liveRequests现在小于maxLiveRequests ,我们可以恢复stream。 由于随后的数据项在调用next()callback函数时被_write()读取,我们可以简单地将后者与事件侦听器推迟到我们自定义的"resumeStream"事件上,该事件模仿暂停/恢复。 现在,只需readStream.pipe(requestClient)


编辑:我把这个解决scheme和自动批量input数据一起抽象到一个包中 。