NodeJS:可读对象stream,用于asynchronous生成数据的模式
我想用NodeJS通过SSH在服务器群集中抓取数据。
远程脚本输出JSON,然后parsing并分解成对象stream 。
我现在的问题是,我使用的面向callback的库(SSH2,MySQL)导致了一种callback模式,我发现很难与可读API规范相匹配。 如何实现_read(size)
当推的数据是一堆callback后面?
我目前的实现利用了Streams
也是EventEmitters
的事实。 我开始构buildStream实例时填充我的数据。 当我所有的callback完成后,我发出一个事件。 然后我听取自定义事件,然后才开始向下推送数据。
// Calling code var stream = new CrawlerStream(argsForTheStream); stream.on('queue_completed', function() { stream .pipe(logger) .pipe(dbWriter) .on('end', function() { // Close db connection etc... }); });
一个CrawlerStream
的模拟将是
// Mock of the Readable stream implementation function CrawlerStream(args) { // boilerplate // array holding the data to push this.data = []; // semi-colon separated string of commands var cmdQueue = getCommandQueue(); var self = this; db.query(sql, function(err, sitesToCrawl, fields) { var servers = groupSitesByServer(sitesToCrawl); for (var s in servers) { sshConnect(getRemoteServer(s), function(err, conn) { sshExec({ ssh: conn, cmd: cmdQueue }, function(err, stdout, stderr) { // Stdout is parsed as JSON // Finally I can populate self.data! // Check if all servers are done // If I'm the last callback to execute self.data.push(null); self.emit('queue_completed'); }) }); } }); } util.inherits(CrawlerStream, Readable); CrawlerStream.prototype._read = function(size) { while (this.data.length) { this.push(this.data.shift()); } }
我不确定这是否是实现这一点的惯用方法,并希望得到您的build议。
请注意在你的答案中,我想保留使用callback(无承诺)的香草NodeJS风格,并坚持使用ES5。
谢谢你的时间!