无法从连接的Observable提取/解包值

我有这个链应该将10个Observable链接成1个Observable,其中10个Observable中的每一个基本上都应该被解开为一个整数:

const Rx = require('rxjs'); var i = 0; const obs = Rx.Observable.interval(10) .map(() => i++) .map(val => Rx.Observable.create(obs => { obs.next(val) })) .take(10) .reduce((prev, curr) => { return prev.concat(curr); // concat all observables }) .last(val => val.flatMap(inner => inner)); // subscribe to Observable obs.subscribe(v => { console.log('\n next (and only) result => \n', v); }); 

发生的事情是所有的10个观测值应该被连接在一起,但是我不能从这10个观测值中提取这些值(已经变成了1个可观测值)。 所以我的问题是,我怎么能解开最后的观察和提取的价值?

任何人都知道我在说什么?

这个问题具有确切的问题,因为在Observable.prototype.concatAll似乎没有产生预期的结果

 .map(function(val){ // note: map *not* flatMap return Rx.Observable.create(obs => { obs.next(val) }); }) 

当使用Rx.Observable.create创build自己的observable时,您需要自己.complete() Rx.Observable.create .complete() 。 因为你忘了这么做,像.reduce()这样的操作符将无法工作,因为它们需要等待完成才能运行。

此外,您使用.last()运算符是不正确的; 它需要一个谓词,在这个谓词上你的stream将被过滤,最后一个与谓词匹配的发射将被发射。 这也有点多余,因为你的.reduce()只会发出一个值。 清理它将导致:

 const obs = Rx.Observable.interval(10) .map(() => i++) .map(val => Rx.Observable.of(val)) .take(10) .reduce((acc, curr) => acc.concat(curr)) .flatMap(v => v) .toArray() 

但是可以通过直接使用.concatMap()运算符而不是map + take + reduce + flatMap来缩短这一点:

 const obs = Rx.Observable.interval(10) .map(() => i++) .concatMap(val => Rx.Observable.of(val)) .take(10) .toArray() 

您可以使用concatAll(或concatMap)连接内部序列并保留顺序,而不是使用reduce + last。

 Observable .interval(0) .map(i => Observable.of(i)) .take(10) .concatAll() .takeLast(1) .subscribe(v => console.log('\n next (and only) result => \n', v);) 

编辑: takeLast(1)而不是finalValue()

只要使用非原型方法。 它不那么混乱。

 let range = [...Array(10).keys()]; //array of 10 observables of 10 values each let ten = range.map(i => Rx.Observable.interval(1000).map(_ => i).take(10)); let sequence = Rx.Observable.concat(ten) // all of them in sequence