如何在rxjs中使用节点的变换stream?
我一直在玩rxjs
一段时间,我喜欢如何使用它的运算符逻辑,而不是命令式编程。 不过,我也喜欢节点的stream,这也是高度可组合的,所以我明显的反应是使用它们两个,但我没有看到它被提及了很多(实际上,我根本没有),除了绑定在rxjs的书 。
所以,我的问题确实是,我如何利用RxJS中的所有转换stream? 或者,甚至有可能?
例:-
var fs = require('fs'); var csv = require('csv-parse')({delimiter:';'}); var src = fs.createReadStream('./myFile.csv'); src.pipe(csv).pipe(process.stdout);
基本上,我想这样做: –
var fs = require('fs'); var csv = require('csv-parse')({delimiter:';'}); var rx= require('rx-node'); var src = fs.createReadStream('./myFile.csv'); var obj = rx.fromReadableStream(src); obj.pipe(csb).map(x=>console.log(x));
过去我被告知使用highland
,但是我在这里严格地寻找rxjs
解决scheme。
您不必使用rx-node
但可以! 记住: All streams are event emitters!
。
准备: input.txt
Hello World! Hello World! Hello World! Hello World! Hello World!
跑:
npm install through2 split2 rx rx-node
而在index.js
:
var Rx = require('rx'); Rx.Node = require('rx-node'); var fs = require('fs'); var th2 = require('through2'); var split2 = require('split2'); var file = fs.createReadStream('./input.txt').on('error', console.log.bind(console, 'fs err')); var transform = th2(function(ch, en, cb) { cb(null, ch.toString()); }).on('error', function(err) { console.log(err, err.toString()); }); // All streams are event emitters ! (one way without using rx-node) // var subs = Rx.Observable.fromEvent(transform, 'data').share(); // subs // .map(value => 'Begin line: ' + value) // .subscribe(value => console.log(value)); // rx-node has convenience functions (another way) var subs = Rx.Node.fromTransformStream(transform).share() .map(value => 'Begin line: ' + value) .subscribe(value => console.log(value)); file.pipe(split2()).pipe(transform);
输出:
Begin line: Hello World! Begin line: Hello World! Begin line: Hello World! Begin line: Hello World! Begin line: Hello World!
EdinM给出了一个使用RxJS和节点转换stream的很好的一般例子,但是你原来的问题还没有得到解答。 由于前几天我有几乎相同的问题,所以我想尽力回答那些不熟悉RxJS和Node的人。 而不是使用csv-parse
模块,我将使用csv-streamify
。 我们来设置基本结构:
test_data.csv:
thing,name,owner,loc chair,sitty,billy,san fran table,setty,bryan,new oak
跑:
$ npm install rx rx-node csv-streamify
index.js:
"use strict"; const Rx = require('rx'); Rx.Node = require('rx-node'); const fs = require('fs'); const csv = require('csv-streamify'); //Setting up the transform-stream CSV parser let config = { delimiter: ',', // comma, semicolon, whatever newline: '\n', // newline character (use \r\n for CRLF files) quote: '"', // what's considered a quote empty: '', // empty fields are replaced by this //objectMode: true, //parses csv table into an array of objects //columns: true //uses column headers for the object fields }; let parseCsv = csv(config); //Setting up the RxJS Observer function onNext (x) { //do your side-effects here, after the data has //gone through the observables operator chain console.log('Next: ' + x); }; function onError (err) { console.log('Error: ' + err); }; function onComplete () { console.log('Completed'); }; let readStream = fs.createReadStream('test_files/test_data.csv'); readStream.pipe(parseCsv); let subscription = Rx.Node.fromTransformStream(parseCsv) //do something with the data with an operator such as: //.map() .subscribe(onNext, onError, onComplete);
现在让我们运行代码:
$ node index.js
我们将得到这个输出:
Next: ["thing","name","owner","loc\r"] Next: ["chair","sitty","billy","san fran\r"] Next: ["table","setty","bryan","new oak"] Completed
结语
如果在csvconfiguration对象中将objectMode
和columns
设置为true,然后使用map运算符来投影这个sideEffect
函数,如下所示:
function sideEffect (v){ console.log(v) return v; }; let subscription = Rx.Node.fromTransformStream(parseCsv) .map(sideEffect) .subscribe(onNext, onError, onComplete);
你会得到这个输出:
{ thing: 'chair', name: 'sitty', owner: 'billy', 'loc\r': 'san fran\r' } Next: [object Object] { thing: 'table', name: 'setty', owner: 'bryan', 'loc\r': 'new oak' } Next: [object Object] Completed