Q承诺中的并发限制 – 节点

有什么方法来限制使用Q promise库的promise的并发性吗?

这个问题有点相关我怎样才能限制Q promise的并发性?

但问题是,我试图做这样的事情:

for (var i = 0; i <= 1000; i++) { return Q.all([ task1(i), task2(i) ]); // <-- limit this to 2 at a time. } 

真正的用例是:

  1. 从数据库提取post
  2. 在数据库中像posts.forEach(function(post) {}
  3. 对于每个职位做任务1,任务2,任务3(检索社交计数器,检索评论计数等)
  4. 在数据库中保存新的发布数据。

但问题是节点正在同时执行所有文章的所有任务,例如同时向Facebook发送500个post的“点数”。

如何限制Q.all()所以一次只有2个职位正在执行他们的任务? 或者还有其他可行的解决scheme可以应用

注:大多数任务(如果不是全部)依赖于请求库

感谢Dan,他的回答和他的帮助将它与我的代码整合在一起,可以用他的要点和一个小窍门来完成:

 var qlimit = require('../libs/qlimit'); var test = function(id) { console.log('Running ' + id); return Q.nfcall(request, 'some dummy url which takes some time to process, for example a php file with sleep(5)').spread(function(response, body) { console.log('Response ' + id); return body; }); } test = qlimit.limitConcurrency(test, 1); var data = [0, 1, 2]; data.forEach(function(id) { console.log('Starting item ' + id); Q.all([ test(id) ]); }); 

这样你得到像这样的东西:

  • 从第0项开始
  • 开始项目1
  • 从第2项开始
  • 运行0
  • 回应0
  • 正在运行1
  • 回应1
  • 运行2
  • 回应2

这显然是一次一个请求。

我在执行过程中遗漏的一点是,您需要在启动循环之前使用limitConcurrency重新声明函数,而不是在循环内部。

几天前我问了一个非常类似的问题: Node.js / Express和并行队列

我find的解决scheme(看我自己的答案)是使用Caolan的asynchronous 。 它允许您创build“操作队列”,并且可以限制同时运行多less个操作:请参阅“队列”方法。

在你的情况下,Node的主循环会从Q中提取元素,并在队列中为每个元素创build一个任务。 你也可以限制这个(所以不要基本上重新创build队列之外的队列),例如,只有当最后一个被执行的时候,向队列中添加N个新元素(“queue”方法的“空”callback)。

这是我用来遏制Q承诺的代码。

我只是把它从我需要的项目中剥离出来。 如果有更多的人感兴趣,我可以把它分解成一个模块或其他东西。

检出方法spex.page和spex.sequence 。 他们的目的是实现任何可能的策略数据调节+承诺负载平衡 。

请参阅下面的项目文档中的几个例子。

平衡页面源

下面的示例使用方法页面来启动一个5页的序列,然后将parsing的数据logging到控制台中。 源函数为每个页面提供半秒的延迟。

 var $q = require('q'); var spex = require('spex')($q); function source(index, data, delay) { return new $q.Promise(function (resolve, reject) { setTimeout(function () { resolve([ "page-" + index, // simple value; $q.resolve(Date.now()) // promise value; ]) }, 500); // wait 1/2 second before serving the next page; }); } function logger(index, data, delay) { console.log("LOG:", data); } spex.page(source, {dest: logger, limit: 5}) .then(function (data) { console.log("FINISHED:", data); }); 

输出:

 LOG: [ 'page-0', 1446050705823 ] LOG: [ 'page-1', 1446050706327 ] LOG: [ 'page-2', 1446050706834 ] LOG: [ 'page-3', 1446050707334 ] LOG: [ 'page-4', 1446050707839 ] FINISHED: { pages: 5, total: 10, duration: 2520 } 

平衡序列接收器

在下面的例子中,我们有一个序列 ,它在索引小于5时返回数据,目标函数在处理从源parsing的每个数据时执行1秒的延迟。

 var $q = require('q'); var spex = require('spex')($q); function source(index, data, delay) { console.log("SOURCE:", index, data, delay); if (index < 5) { return $q.resolve(index); } } function dest(index, data, delay) { console.log("DEST:", index, data, delay); return new $q.Promise(function (resolve, reject) { setTimeout(function () { resolve(); }, 1000); }); } spex.sequence(source, dest) .then(function (data) { console.log("DATA:", data); }); 

输出:

 SOURCE: 0 undefined undefined DEST: 0 0 undefined SOURCE: 1 0 1011 DEST: 1 1 1001 SOURCE: 2 1 1001 DEST: 2 2 1001 SOURCE: 3 2 1000 DEST: 3 3 1000 SOURCE: 4 3 1001 DEST: 4 4 1001 SOURCE: 5 4 1000 DATA: { total: 5, duration: 5013 }