有没有办法与RxJSpipe理并发?
TL; DR – 我正在寻找一种方法来控制在使用RxJS的同时连接到REST API的HTTP请求的数量。
我的Node.js应用程序将向第三方提供商进行几千个REST API调用。 但是,我知道,如果我立即提出所有这些请求,则由于DDoS攻击,服务可能会closures或拒绝我的请求。 所以,我想在任何给定的时间设置最大并发连接数。 我曾经通过利用Throat Package来实现Promises的并发控制,但是我还没有find类似的方法来实现它。
我试图使用merge
与本文中build议的并发1 如何限制flatMap的并发性? ,但所有的请求都是一次发送的。
这是我的代码:
var Rx = require('rx'), rp = require('request-promise'); var array = ['https://httpbin.org/ip', 'https://httpbin.org/user-agent', 'https://httpbin.org/delay/3', 'https://httpbin.org/delay/3', 'https://httpbin.org/delay/3' ]; var source = Rx.Observable.fromArray(array).map(httpGet).merge(1); function httpGet(url) { return rp.get(url); } var results = []; var subscription = source.subscribe( function (x) { console.log('=====', x, '======'); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
Rx.Observable.fromPromise
在你的情况下可能是有用的。 扩充cartant的答案,试试这个,其中concurrent
被指定为1
:
Rx.Observable.from(array) .mergeMap(url => Rx.Observable.fromPromise(rp.get(url)), 1) .subscribe(x => console.log(x))
对于基于时间的控制,这是我能想到的:
Rx.Observable.from(array) .bufferCount(2) .zip(Rx.Observable.timer(0, 1000), x => x) .mergeMap(x => Rx.Observable.from(x) .mergeMap(url => Rx.Observable.fromPromise(rp.get(url))) .subscribe(x => console.log(x))
您可以使用mergeMap
操作符来执行HTTP请求,并将响应平铺到组合的observable中。 mergeMap
了一个可选的concurrent
参数,你可以指定并发订阅观察数的最大值(即HTTP请求):
let source = Rx.Observable .fromArray(array) .mergeMap(httpGet, 1);
请注意, concurrent
指定为1
concatMap
相当于concatMap
。
您的问题中的代码一次发送所有请求的原因是在map
运算符中调用了您的httpGet
函数。 httpGet
返回一个Promise,Promise不是懒惰的 – 只要httpGet
被调用,请求就会被发送。
使用上面的代码,只有在less于指定的并发请求数的情况下,才会在mergeMap
实现中调用mergeMap
。
上面的代码将分别从组成的observable发出每个响应。 如果您希望将所有响应合并到一个数组中,当所有请求都完成时,您可以使用toArray
运算符:
let source = Rx.Observable .fromArray(array) .mergeMap(httpGet, 1) .toArray();
你也应该看看Martin在评论中提到的食谱。
感谢上面的回复。 我的问题与使用rx而不是rxjs NPM模块有关。 在我卸载了rx并安装了rxjs之后,所有的例子都开始使用并发性了。 所以,具有Promises,Callbacks和Native Observables的http并发调用工作正常。
我在这里张贴他们,以防万一遇到类似的问题,可以排除故障。
基于HTTP请求callback的示例:
var Rx = require('rxjs'), request = require('request'), request_rx = Rx.Observable.bindCallback(request.get); var array = [ 'https://httpbin.org/ip', 'https://httpbin.org/user-agent', 'https://httpbin.org/delay/3', 'https://httpbin.org/delay/3', 'https://httpbin.org/delay/3' ]; var source = Rx.Observable.from(array).mergeMap(httpGet, 1); function httpGet(url) { return request_rx(url); } var subscription = source.subscribe( function (x, body) { console.log('=====', x[1].body, '======'); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
承诺样本:
var Rx = require('rxjs'), rp = require('request-promise'); var array = ['https://httpbin.org/ip', 'https://httpbin.org/user-agent', 'https://httpbin.org/delay/3', 'https://httpbin.org/delay/3', 'https://httpbin.org/delay/3' ]; var source = Rx.Observable.from(array).mergeMap(httpGet, 1); function httpGet(url) { return rp.get(url); } var results = []; var subscription = source.subscribe( function (x) { console.log('=====', x, '======'); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
本地RxJS示例:
var Rx = require('rxjs'), superagent = require('superagent'), Observable = require('rxjs').Observable; var array = [ 'https://httpbin.org/ip', 'https://httpbin.org/user-agent', 'https://httpbin.org/delay/10', 'https://httpbin.org/delay/2', 'https://httpbin.org/delay/2', 'https://httpbin.org/delay/1', ]; let start = (new Date()).getTime(); var source = Rx.Observable.from(array) .mergeMap(httpGet, null, 1) .timestamp() .map(stamp => [stamp.timestamp - start, stamp.value]); function httpGet(apiUrl) { return Observable.create((observer) => { superagent .get(apiUrl) .end((err, res) => { if (err) { return observer.onError(err); } let data, inspiration; data = JSON.parse(res.text); inspiration = data; observer.next(inspiration); observer.complete(); }); }); } var subscription = source.subscribe( function (x) { console.log('=====', x, '======'); });