如何创build一个RxJS缓冲区,将NodeJS中的元素进行分组,但不依赖于永远运行的时间间隔?

我在Rx.Observable.fromEvent中使用Rx.Observable.fromEvent捕获应用程序中的事件。 这些被发送到另一个服务器使用请求( https://www.npmjs.com/package/request )。 为了避免高networking负载,我需要在发送请求之间的给定超时时间内caching这些事件。

问题

使用bufferWithTime(200)将使节点进程保持运行,我不知道应用程序何时完成closuresstream。

有什么办法可以使用Rx缓冲区来说:

  1. 当元素1被按下时设定一个计时器
  2. 当元素2和3在定时器到期之前到达时,将它们推到数组[1,2,3](缓冲区)
  3. 计时器到期时,向pipe道发送[1,2,3]arrays。
  4. 如果元素4在计时器到期之后到来,那么设置一个新的计时器并重新开始。

如果没有元件被推动,则不启动将使过程退出的计时器。

我最初的做法是:

 Rx.Observable .fromEvent(eventEmitter, 'log') .bufferWithTime(200) // this is the issue .map(addEventsToRequestOption) .map(request) .flatMap(Promise.resolve) .subscribe(log('Response received')) 

提议的实现,使用delay运算符:

 function emits(who){ return function (x) { console.log([who, "emits"].join(" ") + " " + x + " click(s)");}; } var source = Rx.Observable.fromEvent(document.body, 'click'); console.log("running"); var delayedSource$ = source.delay(1200); var buffered$ = source .buffer(function () { return delayedSource$;}).map(function(clickBuffer){return clickBuffer.length;}) buffered$.subscribe(emits("buffer")); 

jsbin在这里: http ://jsbin.com/wilurivehu/edit?html,js,console,output

您可能需要拆分stream,并使用第二部分触发第一个。

 var source = Rx.Observable.fromEvent(eventEmitter, 'log'); var closer = source.flatMapFirst(Rx.Observable.timer(2000)); source .buffer(closer) .map(addEventsToRequestOption) .flatMap(function(x) { Promise.resolve(request(x)); }) //I assume this log method returns a function? .subscribe(log('Response received')); 

source.flatMapFirst(Rx.Observable.timer(2000))是这里重要的一行。 它创build一个Observable,生成一个在2000 ms后触发的定时器。 当第一个事件进来时,它将启动计时器。 只要定时器正在运行, flatMapFirst将忽略后续事件。 当定时器最终发出时,将触发缓冲区发出当前的缓冲区并重新开始。

请参阅边界可观察的buffer 文档