Tag: rxjs

RxJS – 使用zip并行化使HTTP请求不工作的延迟Observable – 您提供了一个无效的对象,其中stream是预期的

我正在运行一个Node.js API端点,它必须在它的一个端点中对几个HTTP调用进行并列。 以星球大战API为例。 我通过id获取用户信息。 它包含一系列的电影,我想并行检索所有这些电影信息。 得到人的信息后,我创build了一个观察数组,每个人得到一个电影。 我build立了一个zip操作员来组成所有得到的电影的结果。 关键是,在订阅后,我看到n次,每个电影一个, console.log(filmData)信息,这是正确的。 但它似乎与console.log("*********************************************************************"); 不叫。 下一个,错误,完整的callback都不是。 为什么会这样? client.get("https://swapi.co/api/people/"+uid, args, (data, response) => { var filmsIt = data.films; for(var i in filmsIt){ var observable = Rx.Observable.defer(function () { client.get(filmsIt[i], args, (filmData, filmResponse) => { console.log(filmData); return filmData; }); }); observables.push(observable); } var observableFinal = Rx.Observable.zip(…observables, function() { console.log("*********************************************************************"); }).subscribe( function […]

节点本地mongodb驱动程序连接池问题

我使用Node.js + RxJS + MongoDB作为socket.io服务器。 经过一定数量的请求后,我的连接池到DB变得非常大。 所以文件描述符永远不会被释放,服务器closures。 对于db查询我使用下面的代码: /* @flow */ var { Observable } = require('rx'); var client = require('mongodb'); var { assign } = require('lodash'); var __DEV__ = process.env.NODE_ENV !== 'production'; var URL = 'my database url'; class QueryBuilder { _db$: Observable; _selectors: Object; constructor(db$: Observable, selectors?: Object) { this._db$ = db$; if […]

什么是“反应”的方式来逐行读取文件

我正在学习使用RxJS的反应式编程,遇到需要逐行读取文件的情况。 其实我解决它使用一个解决scheme喜欢: https://gist.github.com/yvele/447555b1c5060952a279 它的工作原理,但我需要使用一些正常的JS代码将缓冲区的stream转换为线的stream。 (使用上面例子中的“readline”模块) 我想知道是否还有其他的方法可以将Observable of Buffer转换为Observable的行,使用RxJS操作符,喜欢下面的例子。 var Rx = require('rx'); var fs = require('fs'); var lines = Rx.Observable .fromEvent(rl, 'data') // emits buffers overtime // some transforms … .subscribe( (line) => console.log(line), // emit string line by line err => console.log("Error: %s", err), () => console.log("Completed") );

在Node.js中将三个不同的函数映射到Observable

我是Rxjs的新手。 如果可能,我想遵循最佳做法。 我正在执行三个不同的函数在可观察的返回相同的数据。 在“数据stream”概念之后,我一直认为我需要将这个可观察对象分成三个stream,并继续进行。 这是我的代码,所以我可以停止抽象地说话: // NotEmptyResponse splits the stream in 2 to account based on whether I get an empty observable back. let base_subscription = RxNode.fromStream(siteStream).partition(NotEmptyResponse); // Success Stream to perform further actions upon. let successStream = base_subscription[0]; // The Empty stream for error reporting let failureStream = base_subscription[1]; //Code works up until this […]

将节点stream转换为Rx.js Observables

我努力将节点stream转换为Rxjs Observables。 当我尝试一个URL时,stream式传输本身就很好用。但是,当我尝试通过一系列URLS映射相同的函数时,我得到错误。 我正在使用Rx.Node将stream转换为Observable。 这是我目前正在尝试 // data_array is an array of 10 urls that I'm scraping data from. let parentStream = Rx.Observable.from(data_array); parentStream.map(createStream).subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete')); function createStream(url){ return RxNode.fromStream(x(url, '#centercol ul li', [{name: 'a', link: 'a@href'}]).write().pipe(JSONStream.parse('*'))) } 但是这是输出 X 10(data_array中的URL数量) RefCountObservable { source: ConnectableObservable { source: AnonymousObservable { source: undefined, __subscribe: […]

RxJS去嵌套callback

我想做一些事情: Rx.Observable.of(userToken) .flatMap(verifyToken) .flatMap(getUserInformation) .flatMap(createUser) .flatMap(signNewToken) .subcribe({ next: result => useResult(result), error: error => handleError(error) }) 我试图避免的是一个callback混乱。 在我的代码中,像verifyToken这样的verifyToken是Observables,我想链接它们。 这种模式是否正确? 因为现在只要其中一个内部做observer.error(new Error('problem')) ,链崩溃,我的error handling程序不会被调用。 我怎样才能改善这个?

RxJS – 等待5个事件发生,然后再继续

我有一个简单的RxJS5观察像这样: function foo(){ return Rx.Observable.create(obs => { obs.next(); }); } 可观察的是这样创build的: const x = foo(); function bar(){ return someObs() .flatMap(() => x.wait(5)) // wait for x to fire 5 times .map(v => ({z:v})); } 我想要做的就是等待X个可观察对象中的5个事件发生,然后再继续。 我不认为这个运营商是我想要的,所以我只是叫运营商“ wait ” 我怎样才能用RxJS5做到这一点?

RxJS具有asynchronous订户function的Observable

我试图做一些感觉应该是直截了当的事情,但certificate令人惊讶的困难。 我有一个订阅RabbitMQ队列的函数。 具体来说,这里是Channel.consume函数: http ://www.squaremobius.net/amqp.node/channel_api.html#channel_consume 它返回一个承诺,这个承诺用一个订阅ID来解决 – 稍后需要取消订阅 – 并且还有一个callback参数来在邮件从队列中取出时调用。 当我想取消订阅队列时,我需要在这里使用Channel.cancel函数取消使用者: http ://www.squaremobius.net/amqp.node/channel_api.html#channel_cancel。 这需要先前返回的订阅ID。 我想把所有这些东西包装在观察者订阅时订阅队列的Observable中,并且当observable退订时取消订阅。 然而,由于调用的“双重asynchronous”性质(我的意思是说,它们同时具有callback和返回承诺),这样做有些困难。 理想情况下,我想写的代码是: return new Rx.Observable(async (subscriber) => { var consumeResult = await channel.consume(queueName, (message) => subscriber.next(message)); return async () => { await channel.cancel(consumeResult.consumerTag); }; }); 然而,这是不可能的,因为这个构造函数不支持asynchronous订阅函数或者拆卸逻辑。 我一直无法找出这一个。 我在这里错过了什么? 为什么这么难? 干杯,Alex

如何/何时从Observable.create返回的函数执行(rxjs)

我从' https://chrisnoring.gitbooks.io/rxjs-5-ultimate/content/observable-anatomy.html '有下面的代码: const Observable = require('rxjs/Observable').Observable; require('rxjs/add/observable/of'); require('rxjs/add/operator/map'); let stream = Observable.create((observer) => { let i = 0; let id = setInterval(() => { observer.next(i++); }, 500); return function () { // Line 11 clearInterval(id); }; }) let subscription = stream.subscribe((value) => { console.log('Value: ', value); }) setTimeout(() => { subscription.unsubscribe(); }, 1500); 这个程序的输出如下。 […]

NodeJS – 响应stream

我使用Sails.js构build了一个使用NodeJS的简单API端点。 当有人访问我的API端点时,服务器开始等待数据,每当出现新数据时,他都使用套接字广播它。 每个客户应该根据他的用户input接收他自己的数据stream。 var Cap = require('cap').Cap; collect: function (req, res) { var iface = req.param("ip"); var c = new Cap(), device = Cap.findDevice(ip); c.on('data', function(myData) { sails.sockets.blast('message', {"host": myData}); }); }); 响应没有完成(我从来没有发送res.json() – 实际上发生的事情是浏览器不断加载 – 但上述function)。 2问题: 我正尝试从我的客户端(使用RxJS)订阅和取消订阅此API端点。 当我订阅时,我开始通过套接字接收数据,但我不能退订API端点(浏览器期望完成请求)。 每个客户端都应该根据请求IP参数订阅他自己的套接字空间(请参阅更新后的代码)。 目前它把这个信息传达给每个人。 如何用Sails.js创build一个类似Stream / Service的API端点,并根据他的input向每个用户发送新的数据? 我的目标是能够从每个客户端订阅/取消订阅此API端点。