限制Q承诺Node js中的并发

有什么办法可以限制在js中一次执行的并发Q promise的数量吗?

我正在build立一个networking刮板,它必须要求和parsing更多的3000多页,并没有扼杀一些请求我没有响应的时间,所以连接rest和所需的响应(HTML代码)变得不可用。

为了对付这个问题,我发现限制了我的问题消失的请求数量。


我已经尝试了以下方法,但无济于事:

  • Q承诺中的并发限制 – 节点
  • 我怎样才能限制Q promise的并发性?
  • https://gist.github.com/gaearon/7930162
  • https://github.com/ForbesLindesay/throat

我需要请求一个url数组,每次只做一个请求,当数组中的所有url都完成时,然后将结果返回给一个数组。

function processWebsite() { //computed by this stage urls = [u1,u2,u3,u4,l5,u6,u7,u8,u9]; var promises = throttle(urls,1,myfunction); // myfunction returns a Q promise and takes a considerable // amount of time to resolve (approximately 2-5 minutes) Q.all(promises).then(function(results){ //work with the results of the promises array }); } 

你可以在一个then()块中请求一个新的url

 myFunction(urls[0]).then(function(result) { myFunction(urls[1]).then(function(result) { myFunction(urls[2]).then(function(result) { ... }); }); }); 

当然,这将是它的dynamic行为。 一旦解决了承诺,我会保持一个队列,并将一个URL排队。 然后再提出一个请求。 也许有一个与search结果相关的哈希对象。

第二个要点:

 var urls = ...; var limit = ...; var dequeue = function() { return an array containing up to limit }; var myFunction = function(dequeue) { var urls = dequeue(); $q.all(process urls); }; myFunction(dequeue).then(function(result) { myFunction(dequeue).then(function(result) { myFunction(dequeue).then(function(result) { ... }); }); }); 

我会这样做,它将迭代每个URL,构build一个前一个完成时运行的承诺链,并与请求结果数组解决。

 return urls.reduce(function(acc, url){ return acc.then(function(results) return myfunction(url).then(function(requestResult){ return results.concat(requestResult) }); }); }, Q.resolve([])); 

你也可以把它变成一个帮手:

 var results = map(urls, myfunction); function map(items, fn){ return items.reduce(function(acc, item){ return acc.then(function(results) return fn(item).then(function(result){ return results.concat(result) }); }); }, Q.resolve([]) } 

请注意, bluebird诺言库有一个简化这种事情的帮手。

 return Bluebird.map(urls, myfunction, {concurrency: 1}); 

这里是我为Q做一个扼杀map函数的刺。

 function qMap(items, worker, concurrent) { var result = Q.defer(); var work = []; var working = 0; var done = 0; concurrent = parseInt(concurrent, 10) || 1; function getNextIndex() { var i; for (i = 0; i < items.length; i++) { if (typeof work[i] === "undefined") return i; } } function doneWorking() { working--; done++; result.notify( +((100 * done / items.length).toFixed(1)) ); if (!startWorking() && done === items.length) { result.resolve(work); } } function startWorking() { var index = getNextIndex(); if (typeof index !== "undefined" && working < concurrent) { working++; work[index] = worker(items[index]).finally(doneWorking); return true; } } while (startWorking()); return result.promise; } 

它接受

  • 要处理的items的数组(在您的情况下,URL),
  • 一个worker (它必须是一个接受一个item并返回一个promise的函数)
  • 以及在任何给定时间处理concurrent项目的最大值。

它返回

  • 一个承诺和
  • 当所有的工人都完成时,解决了一系列落实的承诺。

它不会失败,你必须检查个人的承诺,以确定整个操作的状态。

在你的情况下,你会像这样使用它,例如15个并发请求:

 // myfunction returns a Q promise and takes a considerable // amount of time to resolve (approximately 2-5 minutes) qMap(urls, myfunction, 15) .progress(function (percentDone) { console.log("progress: " + percentDone); }) .done(function (urlPromises) { console.log("all done: " + urlPromises); });