从两个来源rx.js追赶订阅

我需要结合追赶和订阅新的饲料。 所以,首先我要查询数据库中所有我错过的新logging,然后切换到所有正在进入的新logging的pub sub。

第一部分很容易做你的查询,也许在500批次,这将给你一个数组,你可以rx.observeFrom。

第二部分很简单,你只需要在pubsub上放一个rx.observe。

但我需要按顺序进行,所以在我开始播放新歌曲之前,我需要播放所有的旧唱片。

我想我可以开始订阅pubsub,把这些放在一个数组中,然后开始处理旧的数据,当我完成后,删除dups(或者因为我做了dup检查)允许less数dups,但是播放积累的logging,直到他们走了,然后一个一个出来。

我的问题是做这件事的最好方法是什么? 我应该创build一个订阅,开始build立一个数组中的新logging,然后开始处理旧的,然后在oldrecord过程的“然后”订阅另一个数组?

好吧,这是我迄今为止。 我需要build立testing,并完成一些psudo代码,以确定它是否工作,更不用说是一个很好的实现。 在我埋葬自己之前,随时可以阻止我。

var catchUpSubscription = function catchUpSubscription(startFrom) { EventEmitter.call(this); var subscription = this.getCurrentEventsSubscription(); // calling map to start subscription and catch in an array. // not sure if this is right var events = rx.Observable.fromEvent(subscription, 'event').map(x=> x); // getPastEvents gets batches of 500 iterates over and emits each // till no more are returned, then resolves a promise this.getPastEvents({count:500, start:startFrom}) .then(function(){ rx.Observable.fromArray(events).forEach(x=> emit('event', x)); }); }; 

我不知道这是最好的方法。 有什么想法吗?

谢谢

我会避免不必要地混合你的不同的asynchronous策略。 您可以使用concat将两个序列连接在一起:

 var catchUpSubscription = function catchUpSubscription(startFrom) { var subscription = this.getCurrentEventsSubscription(); return Rx.Observable.fromPromise(this.getPastEvents({count:500, start:startFrom})) .flatMap(x => x) .concat(Rx.Observable.fromEvent(subscription, 'event')); }; ///Sometime later catchUpSubscription(startTime).subscribe(x => /*handle event*/)