什么是“反应”的方式来逐行读取文件

我正在学习使用RxJS的反应式编程,遇到需要逐行读取文件的情况。 其实我解决它使用一个解决scheme喜欢:

https://gist.github.com/yvele/447555b1c5060952a279

它的工作原理,但我需要使用一些正常的JS代码将缓冲区的stream转换为线的stream。 (使用上面例子中的“readline”模块)

我想知道是否还有其他的方法可以将Observable of Buffer转换为Observable的行,使用RxJS操作符,喜欢下面的例子。

var Rx = require('rx'); var fs = require('fs'); var lines = Rx.Observable .fromEvent(rl, 'data') // emits buffers overtime // some transforms ... .subscribe( (line) => console.log(line), // emit string line by line err => console.log("Error: %s", err), () => console.log("Completed") ); 

你可以用scanconcatMap实现你想要的concatMap

就像是:

 bufferSource .concat(Rx.Observable.of("\n")) // parens was missing // to make sure we don't miss the last line! .scan(({ buffer }, b) => { const splitted = buffer.concat(b).split("\n"); const rest = splitted.pop(); return { buffer: rest, items: splitted }; }, { buffer: "", items: [] }) // Each item here is a pair { buffer: string, items: string[] } // such that buffer contains the remaining input text that has no newline // and items contains the lines that have been produced by the last buffer .concatMap(({ items }) => items) // we flatten this into a sequence of items (strings) .subscribe( item => console.log(item), err => console.log(err), () => console.log("Done with this buffer source"), );