Q承诺中的并发限制 – 节点
有什么方法来限制使用Q promise库的promise的并发性吗?
这个问题有点相关我怎样才能限制Q promise的并发性?
但问题是,我试图做这样的事情:
for (var i = 0; i <= 1000; i++) { return Q.all([ task1(i), task2(i) ]); // <-- limit this to 2 at a time. }
真正的用例是:
- 从数据库提取post
- 在数据库中像
posts.forEach(function(post) {}
- 对于每个职位做任务1,任务2,任务3(检索社交计数器,检索评论计数等)
- 在数据库中保存新的发布数据。
但问题是节点正在同时执行所有文章的所有任务,例如同时向Facebook发送500个post的“点数”。
如何限制Q.all()
所以一次只有2个职位正在执行他们的任务? 或者还有其他可行的解决scheme可以应用
注:大多数任务(如果不是全部)依赖于请求库
感谢Dan,他的回答和他的帮助将它与我的代码整合在一起,可以用他的要点和一个小窍门来完成:
var qlimit = require('../libs/qlimit'); var test = function(id) { console.log('Running ' + id); return Q.nfcall(request, 'some dummy url which takes some time to process, for example a php file with sleep(5)').spread(function(response, body) { console.log('Response ' + id); return body; }); } test = qlimit.limitConcurrency(test, 1); var data = [0, 1, 2]; data.forEach(function(id) { console.log('Starting item ' + id); Q.all([ test(id) ]); });
这样你得到像这样的东西:
- 从第0项开始
- 开始项目1
- 从第2项开始
- 运行0
- 回应0
- 正在运行1
- 回应1
- 运行2
- 回应2
这显然是一次一个请求。
我在执行过程中遗漏的一点是,您需要在启动循环之前使用limitConcurrency重新声明函数,而不是在循环内部。
几天前我问了一个非常类似的问题: Node.js / Express和并行队列
我find的解决scheme(看我自己的答案)是使用Caolan的asynchronous 。 它允许您创build“操作队列”,并且可以限制同时运行多less个操作:请参阅“队列”方法。
在你的情况下,Node的主循环会从Q中提取元素,并在队列中为每个元素创build一个任务。 你也可以限制这个(所以不要基本上重新创build队列之外的队列),例如,只有当最后一个被执行的时候,向队列中添加N个新元素(“queue”方法的“空”callback)。
这是我用来遏制Q承诺的代码。
我只是把它从我需要的项目中剥离出来。 如果有更多的人感兴趣,我可以把它分解成一个模块或其他东西。
检出方法spex.page和spex.sequence 。 他们的目的是实现任何可能的策略数据调节+承诺负载平衡 。
请参阅下面的项目文档中的几个例子。
平衡页面源
下面的示例使用方法页面来启动一个5页的序列,然后将parsing的数据logging到控制台中。 源函数为每个页面提供半秒的延迟。
var $q = require('q'); var spex = require('spex')($q); function source(index, data, delay) { return new $q.Promise(function (resolve, reject) { setTimeout(function () { resolve([ "page-" + index, // simple value; $q.resolve(Date.now()) // promise value; ]) }, 500); // wait 1/2 second before serving the next page; }); } function logger(index, data, delay) { console.log("LOG:", data); } spex.page(source, {dest: logger, limit: 5}) .then(function (data) { console.log("FINISHED:", data); });
输出:
LOG: [ 'page-0', 1446050705823 ] LOG: [ 'page-1', 1446050706327 ] LOG: [ 'page-2', 1446050706834 ] LOG: [ 'page-3', 1446050707334 ] LOG: [ 'page-4', 1446050707839 ] FINISHED: { pages: 5, total: 10, duration: 2520 }
平衡序列接收器
在下面的例子中,我们有一个序列 ,它在索引小于5时返回数据,目标函数在处理从源parsing的每个数据时执行1秒的延迟。
var $q = require('q'); var spex = require('spex')($q); function source(index, data, delay) { console.log("SOURCE:", index, data, delay); if (index < 5) { return $q.resolve(index); } } function dest(index, data, delay) { console.log("DEST:", index, data, delay); return new $q.Promise(function (resolve, reject) { setTimeout(function () { resolve(); }, 1000); }); } spex.sequence(source, dest) .then(function (data) { console.log("DATA:", data); });
输出:
SOURCE: 0 undefined undefined DEST: 0 0 undefined SOURCE: 1 0 1011 DEST: 1 1 1001 SOURCE: 2 1 1001 DEST: 2 2 1001 SOURCE: 3 2 1000 DEST: 3 3 1000 SOURCE: 4 3 1001 DEST: 4 4 1001 SOURCE: 5 4 1000 DATA: { total: 5, duration: 5013 }