rx.js如何链接observables

我有一个可观察的事件,即将事件从服务器上拉下来,过滤应用程序types的事件,然后订阅和分派事件给一个或多个处理程序来处理。

处理程序然后离开,并做一些asynchronous更新到数据库,我发现可观察到将事件发展如此之快,更新互相踩。 我应该预料到的

所以我想我需要我的处理者,每个人都使用它自己的observable作为一个队列来处理一个事件并等待一个确认。

所以我的问题是,如何创build一个可持续接收消息的observable,并在发送下一条消息之前一次发送一条消息,等待确认。

另外,观察者需要冷。 我想,因为我不能放松信息。

谢谢,

RAIF

我想运算符concatMap做了一些你正在寻找的东西。 你可以在这里查看一个前面的答案来说明concatMap一个类似的用例: RxJS依赖于排队的任务

由于没有等待ACK信号释放下一个值,所以它接近但并不完全符合你的要求。 相反, concatMap使用当前“执行”的observable的完成信号来订阅下一个。 如果你的observable包含某个数据库更新执行的地方,那么这些更新将按顺序执行。 例如:

 function handler (source$) { // source$ is your source of events from which you generate the update calls return source$.concatMap(function (event){ return updateDB(event); }) } function updateDB(event) { return Rx.Observable.create(function(observer){ // do the update in the db // you probably have a success and error handler // you plug the observer notification into those handlers if (success) { // if you need to pass down some value from the update observer.onNext(someValue); // In any case, signal completion to allow concatMap to move to next update observer.onCompleted(); } if (error) {observer.onError(error);} }) } 

这是专门针对您正在使用的库的通用代码。 根据数据库更新函数的API,您可能可以直接使用来自fromNodeCallbackfromCallback的操作符。

同样,请注意,在执行当前执行时,可能会有一些缓冲操作来保持下一个可观察状态,并且缓冲区只能是有限的,因此,如果生产者和消费者之间的速度存在显着差异,或内存限制,你可能想要以不同的方式处理事情。

另外,如果你使用RxJS v5, onError变成erroronComplete变成completeonNext变成next (比较新的观察者接口 )。

最后一点,stream的有损/无损本质是一个不同于stream的冷热性质的概念。 你可以看看这两种types的stream的插图订阅和数据stream。