如何在“并行”中处理队列以保持进程饱和?

我想以下列方式处理一个队列:

  • 队列被保存在MongoDb中(可能使用npm包“mongodb-queue”)
  • 几个节点进程可以在同一个队列上工作
  • 任务涉及各种各样的I / O,每个I / O需要从零到几秒
  • 每个节点进程应该同时处理最大任务

节点进程也服务一个简单的网站(快递)。 但是这个网站的请求很less。 大部分工作将会完成处理队列

目标是能够比每个进程一次只能popup一个任务更好地饱和每个进程。 由于任务涉及到很多等待外部服务,我认为不处理它们是“平行的”(我知道它并不是真正的并行)是一种浪费。 我在想,五个左右的限制,但这将不得不稍微调整。

粗糙的伪代码:

loopForewer { popNextTaskFromQueue(function(task) { if(task && concurrentTasks <= limit) { concurrentTasks ++; processTask(task, function(err) { concurrentTasks --; }) }) } } 

我应该如何解决这个问题?

提前致谢。 //迈克尔


编辑:

我将详细介绍一下关于async.parallelLimit的Paulsbuild议以及async.queue的尝试。 我希望可以通过编辑我自己的问题来做到这一点。

async.parallelLimit:

其实,我不明白这将如何符合这项法案。 但我在Node和JavaScript中是一个初学者,我可能错过了一些东西。

我看不到如何在没有先从MongoDB队列中获取所有可用的任务的情况下保持进程饱和(即填充5个任务)。 但是如果我得到所有的任务,除了五个之外,其他所有的任务都将等待而不是被另一个节点进程处理。

async.queue:

有什么想法呢?

 q = async.queue.., 5) //create a queue with concurrency limit 5 dbQ = someQueueWithMongoStorage.. while(true) { nextTick(function() { if (!q.saturated) { dbQ.getTask(function(err, task) { if (task) q.push(task) }) } }) } 
  • 可以使用while(true)循环吗?
  • 还是应该recursion?
  • 我需要nextTick吗? 我担心循环会接pipe事件队列,否则。
  • 问题是,当dbQ为空时,我将非常多地击中数据库。 当dbQ为空时,我应该添加一个超时吗?

我继续尝试了async.queue的概念。 我将使用(种)下面的解决scheme。 请叮嘱我,告诉我是否在做一些愚蠢的事情,即locking某事。 当dbQ为空时,我会每秒钟敲db,但这对我很好。

 var limit = 5; var q = async.queue(function(task, callback) { console.log('Processing ' + task.payload.task); setTimeout(function() { dbQ.ack(task.ack, function(err) { console.log('Finished ' + task.id); callback(); }) }, 1000); }, limit); function enqueueTasks() { console.log('QueueLength: ' + q.length() + ', Running tasks: ' + q.running()); if (q.length() < limit) { dbQ.get(function(err, task) { if (task) { console.log('Enqueing task ' + task.id) q.push(task, function(err) { }); enqueueTasks(); } else { console.log('dbQ is empty, taking a 1 second nap') setTimeout(function() { enqueueTasks(); }, 1000); } }); } else { console.log('Queue length limit hit, taking a 1 second nap') setTimeout(function() { enqueueTasks(); }, 1000); } } enqueueTasks(); 

看看async库,特别是parallelLimit函数。

https://github.com/caolan/async#parallellimittasks-limit-callback