使用RxJS5设置费率

我有这个代码只读取.csv文件中的数据并将其转换为json并logging数据:

const fs = require('fs'); const path = require('path'); const sd = path.resolve(__dirname + '/fixtures/SampleData.csv'); const strm = fs.createReadStream(sd).setEncoding('utf8'); const Rx = require('rxjs/Rx'); const csv2json = require('csv2json'); const dest = strm .pipe(csv2json({ separator: ',' })); dest.on('error', function(e){ console.error(e.stack || e); }) const obs = Rx.Observable.fromEvent(dest, 'data') .flatMap(d => Rx.Observable.timer(100).mapTo(d)) obs.subscribe(v => { console.log(String(v)); }) 

代码正在做的是在100毫秒延迟后logging所有的数据。 我实际上是想延迟每一行数据,并在一小段延迟后logging每一行。

上面的代码没有达到 – 控制数据logging速度的最好方法是什么?

假设:所有的数据线几乎同时出现,所有的数据都延迟100毫秒,所以最终几乎同时打印。 我只需要开始延迟前一个logging后的下一行。

下面的代码似乎做了使用上面的计时器相同的事情:

 const obs = Rx.Observable.fromEvent(dest, 'data') .delay(100) 

假设:所有的数据线几乎同时出现,所有的数据都延迟100毫秒,所以最终几乎同时打印。 我只需要开始延迟前一个logging后的下一行。

你的假设是正确的

使用.flatMap()将原始解决scheme中的.concatMap()

 Rx.Observable.from([1,2,3,4]) .mergeMap(i => Rx.Observable.timer(500).mapTo(i)) .subscribe(val => console.log('mergeMap value: ' + val)); Rx.Observable.from([1,2,3,4]) .concatMap(i => Rx.Observable.timer(500).mapTo(i)) .subscribe(val => console.log('concatMap value: ' + val)); 
 <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script> 

我无法在RxJS库中find我需要的function(虽然可能在那里,但我找不到它,让我知道是否有更好的,更习惯的方法)。

所以我写了这个,似乎做这个工作:

 const fs = require('fs'); const path = require('path'); const sd = path.resolve(__dirname + '/fixtures/SampleData.csv'); const strm = fs.createReadStream(sd).setEncoding('utf8'); const Rx = require('rxjs/Rx'); const csv2json = require('csv2json'); const p = Rx.Observable.prototype; p.eachWait = function(timeout){ const source = this; const values = []; let flipped = true; const onNext = function (sub){ flipped = false; setTimeout(() => { var c = values.pop(); if(c) sub.next(c); if(values.length > 0){ onNext(sub); } else{ flipped = true; } }, timeout); } return Rx.Observable.create(sub => { return source.subscribe( function next(v){ values.unshift(v); if(flipped){ onNext(sub); } }, sub.error.bind(sub), sub.complete.bind(sub) ); }); } const dest = strm .pipe(csv2json({ separator: ',' })); dest.on('error', function(e){ console.error(e.stack || e); }); const obs = Rx.Observable.fromEvent(dest, 'data') .eachWait(1000) obs.subscribe(v => { console.log(String(v)); }); 

我认为这是一样的性能,你可以做到 – 只有一个计时器应该在任何时候运行。