Promiseifystream

我正在努力promisifystream,但它似乎比我预期的更难。 这是我的尝试:

'use strict' const Promise = require('bluebird') const Twitter = require('twitter') const TwitterStream = module.exports = function TwitterStream (config) { // init Twitter Streaming API for OAuth this.stream = new Twitter({ consumer_key: config.get('/twitter/consumerKey'), consumer_secret: config.get('/twitter/consumerSecret'), access_token_key: config.get('/twitter/accessTokenKey'), access_token_secret: config.get('/twitter/accessTokenSecret') }) .stream('statuses/filter', { track: config.get('/twitter/track') }) } TwitterStream.prototype.receive = function () { return new Promise((resolve, reject) => { this.stream.on('data', resolve).on('error', reject) }) } TwitterStream.prototype.destroy = function () { this.stream.destroy() } 

主要的问题是,当我创build对象

 const stream = new TwitterStream(config) stream.receive().then((data) => console.log(data)) 

当我只执行一个对象被读取。 没有其他数据stream。

  TwitterStream.prototype.receive = function () { return new Promise((resolve, reject) => { this.stream .on('data', (data) => resolve(data) .on('error', (error) => reject(error)) }) } 

你需要在stream.on函数的callback中返回一个promise。 现在,被调用的receive方法只返回一个曾经parsing过的promise返回的值或错误。

通过使用Rx扩展,它非常简单:

 TwitterStream.prototype.receive = function () { return Rx.Observable.create((observer) => { this.stream .on('data', (data) => observer.onNext(data)) .on('error', (err) => observer.onError(err)); }); } 

接着

 const stream = new TwitterStream(config) stream.receive().subscribe((data) => console.log(data)); 

这里是一个未经testing,最有可能仍然错误的代码来说明如何用promise做到这一点:

 function defer() { var resolve, reject; var promise = new Promise(function() { resolve = arguments[0]; reject = arguments[1]; }); return { resolve: resolve, reject: reject, promise: promise }; } TwitterStream.prototype.receive = function() { this.stream .on('data', data => { this.dataCache = this.dataCache || []; this.dataCache.push(data); this.tryToSendData() }) .on('end', () => { this.finished = true; this.tryToSendData() }) .on('error', err => { this.lastError = err; // error handling still missing }) return this; } TwitterStream.prototype.tryToSendData = function() { if (this.defered) { let defered = this.defered; this.defered = null; // if data is available or finished then pass the first element of buffer (or undefined) defered.resolve(this.dataCache.shift()) } } TwitterStream.prototype.getNextData = function() { if (this.dataCache.length > 0 || this.finished) { // if data is available or finished then pass the first element of buffer (or undefined) return Promise.resolve(this.dataCache.shift()); } else { // otherwise we need a defered object this.defered = defer(); } } 

用法可能如下所示:

 stream.receive().getNextData() .then(function processData(data) { if (data) { console.dir(data); // if data is available then continue requestin the data return stream.getNextData().then(processData); } }) 

这是一个罕见的情况,你可以使用延期。

我想你可能想看看我已经promramifiedstream在超燃冲压发动机 。

对于你的Twitter例子,这段代码应该可以正常工作

 const stream = new Twitter({ consumer_key: config.get('/twitter/consumerKey'), consumer_secret: config.get('/twitter/consumerSecret'), access_token_key: config.get('/twitter/accessTokenKey'), access_token_secret: config.get('/twitter/accessTokenSecret') }) .stream('statuses/filter', { track: config.get('/twitter/track') }) .pipe(new scramjet.DataStream) 

然后执行你喜欢的任何转换…例如,以某种方式映射stream,并在完成后将stream累积到一个数组中。

 stream.map( function (a) { return modifyTheTweetSomehow(a); } // a Promise can be returned here ).accumulate( function(a, i) { a.push(i); }, [] ) // this returns a Promise that will be resolved on stream end. 

我希望你喜欢它。 🙂