有没有办法与RxJSpipe理并发?

TL; DR – 我正在寻找一种方法来控制在使用RxJS的同时连接到REST API的HTTP请求的数量。

我的Node.js应用程序将向第三方提供商进行几千个REST API调用。 但是,我知道,如果我立即提出所有这些请求,则由于DDoS攻击,服务可能会closures或拒绝我的请求。 所以,我想在任何给定的时间设置最大并发连接数。 我曾经通过利用Throat Package来实现Promises的并发控制,但是我还没有find类似的方法来实现它。

我试图使用merge与本文中build议的并发1 如何限制flatMap的并发性? ,但所有的请求都是一次发送的。

这是我的代码:

 var Rx = require('rx'), rp = require('request-promise'); var array = ['https://httpbin.org/ip', 'https://httpbin.org/user-agent', 'https://httpbin.org/delay/3', 'https://httpbin.org/delay/3', 'https://httpbin.org/delay/3' ]; var source = Rx.Observable.fromArray(array).map(httpGet).merge(1); function httpGet(url) { return rp.get(url); } var results = []; var subscription = source.subscribe( function (x) { console.log('=====', x, '======'); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); 

Rx.Observable.fromPromise在你的情况下可能是有用的。 扩充cartant的答案,试试这个,其中concurrent被指定为1

 Rx.Observable.from(array) .mergeMap(url => Rx.Observable.fromPromise(rp.get(url)), 1) .subscribe(x => console.log(x)) 

对于基于时间的控制,这是我能想到的:

 Rx.Observable.from(array) .bufferCount(2) .zip(Rx.Observable.timer(0, 1000), x => x) .mergeMap(x => Rx.Observable.from(x) .mergeMap(url => Rx.Observable.fromPromise(rp.get(url))) .subscribe(x => console.log(x)) 

您可以使用mergeMap操作符来执行HTTP请求,并将响应平铺到组合的observable中。 mergeMap了一个可选的concurrent参数,你可以指定并发订阅观察数的最大值(即HTTP请求):

 let source = Rx.Observable .fromArray(array) .mergeMap(httpGet, 1); 

请注意, concurrent指定为1 concatMap相当于concatMap

您的问题中的代码一次发送所有请求的原因是在map运算符中调用了您的httpGet函数。 httpGet返回一个Promise,Promise不是懒惰的 – 只要httpGet被调用,请求就会被发送。

使用上面的代码,只有在less于指定的并发请求数的情况下,才会在mergeMap实现中调用mergeMap

上面的代码将分别从组成的observable发出每个响应。 如果您希望将所有响应合并到一个数组中,当所有请求都完成时,您可以使用toArray运算符:

 let source = Rx.Observable .fromArray(array) .mergeMap(httpGet, 1) .toArray(); 

你也应该看看Martin在评论中提到的食谱。

感谢上面的回复。 我的问题与使用rx而不是rxjs NPM模块有关。 在我卸载了rx并安装了rxjs之后,所有的例子都开始使用并发性了。 所以,具有Promises,Callbacks和Native Observables的http并发调用工作正常。

我在这里张贴他们,以防万一遇到类似的问题,可以排除故障。

基于HTTP请求callback的示例:

 var Rx = require('rxjs'), request = require('request'), request_rx = Rx.Observable.bindCallback(request.get); var array = [ 'https://httpbin.org/ip', 'https://httpbin.org/user-agent', 'https://httpbin.org/delay/3', 'https://httpbin.org/delay/3', 'https://httpbin.org/delay/3' ]; var source = Rx.Observable.from(array).mergeMap(httpGet, 1); function httpGet(url) { return request_rx(url); } var subscription = source.subscribe( function (x, body) { console.log('=====', x[1].body, '======'); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); 

承诺样本:

 var Rx = require('rxjs'), rp = require('request-promise'); var array = ['https://httpbin.org/ip', 'https://httpbin.org/user-agent', 'https://httpbin.org/delay/3', 'https://httpbin.org/delay/3', 'https://httpbin.org/delay/3' ]; var source = Rx.Observable.from(array).mergeMap(httpGet, 1); function httpGet(url) { return rp.get(url); } var results = []; var subscription = source.subscribe( function (x) { console.log('=====', x, '======'); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); }); 

本地RxJS示例:

 var Rx = require('rxjs'), superagent = require('superagent'), Observable = require('rxjs').Observable; var array = [ 'https://httpbin.org/ip', 'https://httpbin.org/user-agent', 'https://httpbin.org/delay/10', 'https://httpbin.org/delay/2', 'https://httpbin.org/delay/2', 'https://httpbin.org/delay/1', ]; let start = (new Date()).getTime(); var source = Rx.Observable.from(array) .mergeMap(httpGet, null, 1) .timestamp() .map(stamp => [stamp.timestamp - start, stamp.value]); function httpGet(apiUrl) { return Observable.create((observer) => { superagent .get(apiUrl) .end((err, res) => { if (err) { return observer.onError(err); } let data, inspiration; data = JSON.parse(res.text); inspiration = data; observer.next(inspiration); observer.complete(); }); }); } var subscription = source.subscribe( function (x) { console.log('=====', x, '======'); });