在Node.js中批量请求

我的程序正在与一个只接受〜10每秒请求的Web服务进行通信。 我的程序不时发送100多个并发请求到Web服务,导致我的程序崩溃。

如何将Node.js中的并发请求限制为每秒5个? 我使用请求库。

// IF EVENT AND SENDER if(data.sender[0].events && data.sender[0].events.length > 0) { // FIND ALL EVENTS for(var i = 0; i < data.sender[0].events.length; i++) { // IF TYPE IS "ADDED" if(data.sender[0].events[i].type == "added") { switch (data.sender[0].events[i].link.rel) { case "contact" : batch("added", data.sender[0].events[i].link.href); //_initContacts(data.sender[0].events[i].link.href); break; } // IF TYPE IS "UPDATED" } else if(data.sender[0].events[i].type == "updated") { switch (data.sender[0].events[i].link.rel){ case "contactPresence" : batch("updated", data.sender[0].events[i].link.href); //_getContactPresence(data.sender[0].events[i].link.href); break; case "contactNote" : batch("updated", data.sender[0].events[i].link.href); // _getContactNote(data.sender[0].events[i].link.href); break; case "contactLocation" : batch("updated", data.sender[0].events[i].link.href); // _getContactLocation(data.sender[0].events[i].link.href); break; case "presenceSubscription" : batch("updated", data.sender[0].events[i].link.href); // _extendPresenceSubscription(data.sender[0].events[i].link.href); break; } } }; 

然后是本土的批量方法:

 var updated = []; var added = []; var batch = function(type, url){ console.log("batch called"); if (type === "added"){ console.log("Added batched"); added.push(url); if (added.length > 5) { setTimeout(added.forEach(function(req){ _initContacts(req); }), 2000); added = []; } } else if (type === "updated"){ console.log("Updated batched"); updated.push(url); console.log("Updated length is : ", updated.length); if (updated.length > 5){ console.log("Over 5 updated events"); updated.forEach(function(req){ setTimeout(_getContactLocation(req), 2000); }); updated = []; } } }; 

还有一个实际请求的例子:

 var _getContactLocation = function(url){ r.get(baseUrl + url, { "strictSSL" : false, "headers" : { "Authorization" : "Bearer " + accessToken }}, function(err, res, body){ if(err) console.log(err); else { var data = JSON.parse(body); self.emit("data.contact", data); } } ); }; 

使用asynchronous库, mapLimit函数正是你想要的。 由于您没有提供任何代码,因此无法为您的特定用例提供示例。

自述文件:


mapLimit(arr,limit,iterator,callback)

和map一样,只有“极限”迭代器可以随时同时运行。

请注意,这些项目不会被批量处理,因此不能保证在启动任何其他项目之前第一个“限制”迭代器函数将会完成。

参数

  • arr – 要迭代的数组。
  • limit – 在任何时候运行的迭代器的最大数目。
  • iterator(item,callback) – 应用于数组中每个项目的函数。 迭代器传递一个callback函数(err,transformed),一旦它完成了一个错误(可以是null)和一个转换后的项目,就必须调用它。
  • callback(err,results) – 在所有迭代器函数完成之后调用的callback,或发生错误。 结果是来自原始数组的转换项目的数组。

async.mapLimit(['file1','file2','file3'], 1, fs.stat, function(err, results){ // results is now an array of stats for each file });


编辑:现在你提供的代码,我看到你的使用是有点不同于我所设想的。 当您知道所有要预先运行的任务时, async库更有用。 我不知道一个图书馆是否会轻易解决这个问题。 上面的注释可能仍然与search这个话题的人有关,所以我将把它留在。

对不起,我没有时间重构你的代码,但是这是一个(未经testing的)例子,它使得一个asynchronous的请求,同时自我节制每秒5个请求。 我强烈build议从这个方面开发出适合您的代码库的更一般的解决scheme。

 var throttledRequest = (function () { var queue = [], running = 0; function sendPossibleRequests() { var url; while (queue.length > 0 && running < 5) { url = queue.shift(); running++; r.get(url, { /* YOUR OPTIONS HERE*/ }, function (err, res, body) { running--; sendPossibleRequests(); if(err) console.log(err); else { var data = JSON.parse(body); self.emit("data.contact", data); } }); } } return function (url) { queue.push(url); sendPossibleRequests(); }; })(); 

基本上,你保持所有的数据队列asynchronous处理(如要求的url),然后在每个callback(从请求)后,尽可能多地发起请求。

这正是节点的Agent类所devise的目的。 你做了一些傻乎乎的require('http').globalAgent.maxSockets = Number.MAX_VALUE或通过agent: false作为请求选项?

使用Node的默认行为,您的程序一次不会发送超过5个并发请求。 此外,代理还提供简单队列不能(即HTTP保持活动)的优化。

如果您尝试发出很多请求(例如,从循环中发出100个请求),则前5个请求将开始,并且代理将排队剩余的95个请求。请求完成后,将开始下一个请求。

您可能想要做的是为您的Web服务请求创build一个Agent ,并将其传递给每个调用请求(而不是将请求混入全局代理)。

 var http=require('http'), svcAgent = http.Agent(); request({ ... , agent: svcAgent });