我怎样才能缓冲一个简单的Rxjs观察?

我想通过一些任意的(但是简单的)标准来缓冲一个观察值。 我在这里设置了一个简单的例子:

const observable = Rx.Observable.from([1,2,3]) const filtered = observable.filter((n) => n === 3); observable .buffer(filtered) .subscribe((n) => { // Why is this empty? console.log(n); }); 

宾在这里

试图filter做这只会产生一个空的数组。 我期待[1,2,3]的数组,但是这似乎不是如何工作的。 所有缓冲区的文档使用asynchronous事件,如计时器,但这不是我想要的。 我只想根据我自己决定的任意标准来取最后的n个项目。

帮助非常感谢!

您的订阅是在发送值之后发生的。 如果你想要它的工作:

 const observable = new Rx.Subject() const filtered = observable.filter((n) => n === 3); filtered .buffer(filtered) .subscribe((n) => { console.log(n); }); Rx.Observable.from([1,2,3]).subscribe(observable); 

根据olsn的评论,看起来这是Rxjs 5中的一个bug。更改为v4有效地解决了这个问题。