NodeJS:可读对象stream,用于asynchronous生成数据的模式

我想用NodeJS通过SSH在服务器群集中抓取数据。

远程脚本输出JSON,然后parsing并分解成对象stream 。

我现在的问题是,我使用的面向callback的库(SSH2,MySQL)导致了一种callback模式,我发现很难与可读API规范相匹配。 如何实现_read(size)当推的数据是一堆callback后面?

我目前的实现利用了Streams也是EventEmitters的事实。 我开始构buildStream实例时填充我的数据。 当我所有的callback完成后,我发出一个事件。 然后我听取自定义事件,然后才开始向下推送数据。

 // Calling code var stream = new CrawlerStream(argsForTheStream); stream.on('queue_completed', function() { stream .pipe(logger) .pipe(dbWriter) .on('end', function() { // Close db connection etc... }); }); 

一个CrawlerStream的模拟将是

 // Mock of the Readable stream implementation function CrawlerStream(args) { // boilerplate // array holding the data to push this.data = []; // semi-colon separated string of commands var cmdQueue = getCommandQueue(); var self = this; db.query(sql, function(err, sitesToCrawl, fields) { var servers = groupSitesByServer(sitesToCrawl); for (var s in servers) { sshConnect(getRemoteServer(s), function(err, conn) { sshExec({ ssh: conn, cmd: cmdQueue }, function(err, stdout, stderr) { // Stdout is parsed as JSON // Finally I can populate self.data! // Check if all servers are done // If I'm the last callback to execute self.data.push(null); self.emit('queue_completed'); }) }); } }); } util.inherits(CrawlerStream, Readable); CrawlerStream.prototype._read = function(size) { while (this.data.length) { this.push(this.data.shift()); } } 

我不确定这是否是实现这一点的惯用方法,并希望得到您的build议。

请注意在你的答案中,我想保留使用callback(无承诺)的香草NodeJS风格,并坚持使用ES5。

谢谢你的时间!