如何在rxjs中使用节点的变换stream?

我一直在玩rxjs一段时间,我喜欢如何使用它的运算符逻辑,而不是命令式编程。 不过,我也喜欢节点的stream,这也是高度可组合的,所以我明显的反应是使用它们两个,但我没有看到它被提及了很多(实际上,我根本没有),除了绑定在rxjs的书 。

所以,我的问题确实是,我如何利用RxJS中的所有转换stream? 或者,甚至有可能?
例:-

 var fs = require('fs'); var csv = require('csv-parse')({delimiter:';'}); var src = fs.createReadStream('./myFile.csv'); src.pipe(csv).pipe(process.stdout); 

基本上,我想这样做: –

 var fs = require('fs'); var csv = require('csv-parse')({delimiter:';'}); var rx= require('rx-node'); var src = fs.createReadStream('./myFile.csv'); var obj = rx.fromReadableStream(src); obj.pipe(csb).map(x=>console.log(x)); 

过去我被告知使用highland ,但是我在这里严格地寻找rxjs解决scheme。

您不必使用rx-node但可以! 记住: All streams are event emitters!

准备: input.txt

 Hello World! Hello World! Hello World! Hello World! Hello World! 

跑:

 npm install through2 split2 rx rx-node 

而在index.js

 var Rx = require('rx'); Rx.Node = require('rx-node'); var fs = require('fs'); var th2 = require('through2'); var split2 = require('split2'); var file = fs.createReadStream('./input.txt').on('error', console.log.bind(console, 'fs err')); var transform = th2(function(ch, en, cb) { cb(null, ch.toString()); }).on('error', function(err) { console.log(err, err.toString()); }); // All streams are event emitters ! (one way without using rx-node) // var subs = Rx.Observable.fromEvent(transform, 'data').share(); // subs // .map(value => 'Begin line: ' + value) // .subscribe(value => console.log(value)); // rx-node has convenience functions (another way) var subs = Rx.Node.fromTransformStream(transform).share() .map(value => 'Begin line: ' + value) .subscribe(value => console.log(value)); file.pipe(split2()).pipe(transform); 

输出:

 Begin line: Hello World! Begin line: Hello World! Begin line: Hello World! Begin line: Hello World! Begin line: Hello World! 

EdinM给出了一个使用RxJS和节点转换stream的很好的一般例子,但是你原来的问题还没有得到解答。 由于前几天我有几乎相同的问题,所以我想尽力回答那些不熟悉RxJS和Node的人。 而不是使用csv-parse模块,我将使用csv-streamify 。 我们来设置基本结构:

test_data.csv:

 thing,name,owner,loc chair,sitty,billy,san fran table,setty,bryan,new oak 

跑:

 $ npm install rx rx-node csv-streamify 

index.js:

 "use strict"; const Rx = require('rx'); Rx.Node = require('rx-node'); const fs = require('fs'); const csv = require('csv-streamify'); //Setting up the transform-stream CSV parser let config = { delimiter: ',', // comma, semicolon, whatever newline: '\n', // newline character (use \r\n for CRLF files) quote: '"', // what's considered a quote empty: '', // empty fields are replaced by this //objectMode: true, //parses csv table into an array of objects //columns: true //uses column headers for the object fields }; let parseCsv = csv(config); //Setting up the RxJS Observer function onNext (x) { //do your side-effects here, after the data has //gone through the observables operator chain console.log('Next: ' + x); }; function onError (err) { console.log('Error: ' + err); }; function onComplete () { console.log('Completed'); }; let readStream = fs.createReadStream('test_files/test_data.csv'); readStream.pipe(parseCsv); let subscription = Rx.Node.fromTransformStream(parseCsv) //do something with the data with an operator such as: //.map() .subscribe(onNext, onError, onComplete); 

现在让我们运行代码:

 $ node index.js 

我们将得到这个输出:

 Next: ["thing","name","owner","loc\r"] Next: ["chair","sitty","billy","san fran\r"] Next: ["table","setty","bryan","new oak"] Completed 

结语

如果在csvconfiguration对象中将objectModecolumns设置为true,然后使用map运算符来投影这个sideEffect函数,如下所示:

 function sideEffect (v){ console.log(v) return v; }; let subscription = Rx.Node.fromTransformStream(parseCsv) .map(sideEffect) .subscribe(onNext, onError, onComplete); 

你会得到这个输出:

 { thing: 'chair', name: 'sitty', owner: 'billy', 'loc\r': 'san fran\r' } Next: [object Object] { thing: 'table', name: 'setty', owner: 'bryan', 'loc\r': 'new oak' } Next: [object Object] Completed