根据rxjs中的时间处理一系列事件

我有一个过程,每隔一段时间发送一个数据包,我需要根据数据包到达的时间来pipe理这个数据stream,等等。 在某些时候,我也closuresstream和过程。

现在,我使用一组定时器来做这件事,但是我希望我可以用rxjs因为它看起来非常适合这种事情。 到目前为止,我还没有成功。

问题

stream应该定期发送数据包,但通常会偏离很多,有时会卡住。

在下列情况下,我想在某一时刻closuresstream:

  1. 如果需要比startDelay更多的时间发送第一个数据包。
  2. 在发送第一个数据包之后,如果两个数据包之间存在多于middleDelay的暂停。
  3. 经过一个恒定的时间段maxChannelTime

由于上述任何一种原因,当我即将closures这个stream时,我首先要求它有礼貌地closures,这样可以进行一些清理工作。 有时它也会在清理过程中给我发送一个最后的数据包。 但是我想等待cleanupTime的清理和最后的数据到达之前,我closures了stream,忽略任何更多的消息。

我将通过用Observable包装事件来创build“stream”。 我没有这样做的麻烦。

通过“closures”stream,我的意思是告诉stream程停止发送数据,并可能closures(即死亡)。

棘手的问题。

我已经把它分为两个阶段 – “规定”(因为我们要定期检查)和“清理”。

反向工作,输出是

 const regulated = source.takeUntil(close) const cleanup = source.skipUntil(close).takeUntil(cleanupCloser) const output = regulated.merge(cleanup) 

“closures器”是在closures时发出的可观测量(每超时值一个更近)。

 const startTimeout = 600 const intervalTimeout = 200 const maxtimeTimeout = 3000 const cleanupTimeout = 300 const startCloser = Observable.timer(startTimeout) // emit once after initial delay .takeUntil(source) // cancel after source emits .mapTo('startTimeoutMarker') const intervalCloser = source.switchMap(x => // reset interval after each source emit Observable.timer(intervalTimeout) // emit once after intervalTimeout .mapTo('intervalTimeoutMarker') ) const maxtimeCloser = Observable.timer(maxtimeTimeout) // emit once after maxtime .takeUntil(startCloser) // cancel if startTimeout .takeUntil(intervalCloser) // cancel if intervalTimeout .mapTo('maxtimeTimeoutMarker') const close = Observable.merge(startCloser, intervalCloser, maxtimeCloser).take(1) const cleanupCloser = close.switchMap(x => // start when close emits Observable.timer(cleanupTimeout) // emit once after cleanup time ) .mapTo('cleanupTimeoutMarker') 

这里有一个工作示例CodePen (请一次运行一个testing)

如果不知道如何使用RxJS创build“stream”,或者稍后想要如何使用它们,就很难给出任何build议。

一般来说,只要使用takeUntil()switchMap()timeout()就可以达到你想要的效果。

 Observable.defer(...) .startWith(undefined) // Trigger the first `timeout` .switchMap((val, i) => { if (i === 0) { // waiting for the first value return Observable.of().timeout(startDelay); } else { return Observable.of(val).timeout(middleDelay); } }) .takeUntil(Observable.timer(maxChannelTime)); 

我不知道你的意思是什么“在某一点上closuresstream” 。 你期望errorcomplete通知? 这个解决scheme将在超时过期时发出error ,并在takeUntil发出时complete

最后,这就是我所做的。 我的回答主要是基于理查德·马森的回答,所以我将他的答案留给接受。

原来是我需要做的一些额外的改变。

此代码是接收数据消息stream的代码,并返回包含收集的所有数据和终止原因的单例观察值。

 let startCloser$ = Observable.timer(this.options.maxStartDelay).takeUntil(dataStream$).mapTo(TerminationReason.StartTimeout); let intervalCloser$ = dataStream$.switchMap(x => Observable.timer(this.options.timeBetweenPackets).mapTo(TerminationReason.Inactivity)); let maxTimeCloser$ = Observable.timer(this.options.totalConnectionTime).takeUntil(startCloser$).takeUntil(intervalCloser$).mapTo(TerminationReason.ChannelTimeout); //we need to publishReplay it so we can get the reason afterwards... let close$ = startCloser$.merge(intervalCloser$, maxTimeCloser$).take(1).publishReplay(1); //basically treating close$ like a promise close$.connect(); //cleanupAction has side-effects so it must only be subscribed to once. let cleanupAction$ = Observable.defer(async () => { //it's just a promise that yields nothing and waits until requestTermination has terminated //requestTermination is an async function and it already has a timeout thing in promise-language await this.requestTermination(); }); let result$ = dataStream$.takeUntil(close$).concat(dataStream$.takeUntil(cleanupAction$)).toArray().switchMap(arrs => { //switchMap will only resolve once because the observable is a singleton return close$.map(reason => { //this should fire immediately because close is publishReplay(1) and has already happened let totalArr = _.flattenDeep(arrs); return { reason : reason, data : totalArr } }) }); return result$;