将节点stream转换为Rx.js Observables
我努力将节点stream转换为Rxjs Observables。
当我尝试一个URL时,stream式传输本身就很好用。但是,当我尝试通过一系列URLS映射相同的函数时,我得到错误。
我正在使用Rx.Node将stream转换为Observable。
这是我目前正在尝试
// data_array is an array of 10 urls that I'm scraping data from. let parentStream = Rx.Observable.from(data_array); parentStream.map(createStream).subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete')); function createStream(url){ return RxNode.fromStream(x(url, '#centercol ul li', [{name: 'a', link: 'a@href'}]).write().pipe(JSONStream.parse('*'))) }
但是这是输出 X 10(data_array中的URL数量)
RefCountObservable { source: ConnectableObservable { source: AnonymousObservable { source: undefined, __subscribe: [Function] }, _connection: null, _source: AnonymousObservable { source: [Object], __subscribe: [Function: subscribe] }, _subject: Subject { isDisposed: false, isStopped: false, observers: [], hasError: false } }, _count: 0, _connectableSubscription: null }
我首先想到flatMap可以工作,因为它在可观察的事物中展开可观察的…但是当我尝试flatMap时,我得到这个:
Complete Error TypeError: unknown type returned
但是,如果我这样做:
这适用于1个url ,但我无法捕获一个stream中的data_array中的所有url。
let stream = RxNode.fromStream(x(url, '#centercol ul li', [{name: 'a', link: 'a@href'}]).write().pipe(JSONStream.parse('*'))) stream.subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete'))
我觉得我误解了一些东西,这不仅仅是因为它的清除不适用于多个URL,而且即使它在第二个例子中工作也是如此……我在所有数据进入之前首先得到“完成”
显然,我误解了一些东西。 任何帮助将是美好的。 谢谢。
* UPDATE *
我尝试了一个不同的path,它工作,但不使用节点stream。 节点stream将是理想的,所以仍然想要使上面的例子工作。
接下来我使用的方法是围绕我的网页抓取function,这是刮下面的承诺。 这个工作,但结果是十个巨大的arrays,每个数组中的每个URL的所有数据。 我真正想要的是一个对象stream,我可以在数据对象通过时组成一系列转换。
这里是不同的,但工作方式:
let parentStream = Rx.Observable.from(data_array); parentStream.map(url => { return Rx.Observable.defer(() => { return scrape(url, '#centercol ul li', [{name: 'a', link: 'a@href'}]); }) }) .concatAll() .subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete')); function scrape(url, selector, scope) { return new Promise( (resolve, reject) => x( url, selector, scope )((error, result) => error != null ? reject(error) : resolve(result)) ); }
*解决scheme*我想通了。 我附上了下面的解决scheme:
而是使用RxNode,我select使用Rx.Observable.fromEvent()。
节点stream发射事件,无论是新数据,错误还是完整。
因此, fromEvent静态运算符正在侦听“data”事件,并为每个事件创build一个新的Observable。
然后我合并所有这些,并订阅。 代码如下:
let parentStream = Rx.Observable.from(data_array); parentStream.map((url)=> { return createEventStream(url); } ).mergeAll().subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete')); function createEventStream(url){ return Rx.Observable.fromEvent(x(url, '#centercol ul li', [{name: 'a', link: 'a@href'}]).write().pipe(JSONStream.parse('*')), 'data'); }
- 使用webpack-manifest-plugin得到错误'Uncaught SyntaxError:Unexpected token <'
- 使用Node / Express从Firebase中读取数据