听stream言承诺

我一直在阅读有关Observables 。 有一件事我读到,就是它能够像stream一样听。 我试着用下面的代码。

 const Rx = require('rxjs/Rx'); Promise = require('bluebird'); let i = 0; function calculate() { return new Promise((resolve, reject) => { setInterval(() => { console.log(++i); resolve(i); }, 5000); }) } let source = Rx.Observable.fromPromise(calculate()); source.subscribe( next => { console.log(next, ' next '); }, err => { console.error(err) }, () => { console.log('done!'); } ) 

但它不像一个stream。 所以输出看起来像这样:

 1 1 ' next ' done! 2 3 

我如何继续听取承诺? 这是Observable真正解决的问题之一吗?

Observable.fromPromise被认为是将Promise转换为Observable。 由于承诺只能解决一次,这个stream将始终只输出1项。

您不能通过在其上包装stream来增强承诺function。 如果承诺只能产生1个值,则只有1个值会在结果的可观察值中产生。

在你的具体例子中,有Observable.interval(n)n毫秒发射一个项目。 否则,为了更自定义的可观察性:

 Rx.Observable.create(obs => { let i=0; setInterval(() => { obs.next(++i); }, 1000); }); 

当你创build一个observable时,你会得到一个Observer实例,它代表刚刚订阅了你的stream的观察者。 注意每个新的观察者都会创build一个新的上下文(在我的例子中,每个观察者都有他的计数器)。

你可以使用switchMap和interval:

 let i = 0; function calculate() { return Promise.resolve(++i) } //I reduce to the strictly necessary for the answer, the promise's resolve. //It's possible to have more things let source = Rx.Observable .interval(1000) .switchMap(() => calculate()) source.subscribe( next => { console.log(next, ' next '); }, err => { console.error(err) }, () => { console.log('done!'); } ) 

你可以在这里看到switchMap和interval的文档:

演示: https : //jsbin.com/nutukiyagi/1/edit?js,console

在这种情况下,你永远不会完成stream,console.log('done!')永远不会出现在控制台中

从文档Rx.Observable.fromPromise

如果Promise使用值进行parsing,则输出Observable会将parsing后的值作为下一个值发送,然后完成。

这描述了你正在面对的行为。 您应该使用生成器函数或Rx.Observable.create来解决此特定问题。