Nodejs sqs队列处理器

我正在尝试编写一个nodejs sqs队列处理器。

"use strict"; var appConf = require('./config/appConf'); var AWS = require('aws-sdk'); AWS.config.loadFromPath('./config/aws_config.json'); var sqs = new AWS.SQS(); var exec = require('child_process').exec; function readMessage() { sqs.receiveMessage({ "QueueUrl": appConf.sqs_distribution_url, "MaxNumberOfMessages": 1, "VisibilityTimeout": 30, "WaitTimeSeconds": 20 }, function (err, data) { var sqs_message_body; if (data.Messages) { if (typeof data.Messages[0] !== 'undefined' && typeof data.Messages[0].Body !== 'undefined') { //sqs msg body sqs_message_body = JSON.parse(data.Messages[0].Body); //make call to nodejs handler in codeigniter exec('php '+ appConf.CI_FC_PATH +'/index.php nodejs_handler make_contentq_call "'+ sqs_message_body.contentq_cat_id+'" "'+sqs_message_body.cnhq_cat_id+'" "'+sqs_message_body.network_id+'"', function (error, stdout, stderr) { if (error) { throw error; } console.log('stdout: ' + stdout); if(stdout == 'Success'){ //delete message from queue sqs.deleteMessage({ "QueueUrl" : appConf.sqs_distribution_url, "ReceiptHandle" :data.Messages[0].ReceiptHandle }); } }); } } }); } readMessage(); 

上面的代码对于队列中的单个消息正常工作。 我应该如何编写这个脚本,以便它保持对队列中的消息进行轮询,直到所有的消息都被处理? 我应该使用设置超时?

首先你应该定义使用Amazon提供的长轮询技术,据我所知你已经在使用它,因为你有"WaitTimeSeconds": 20sqs.receiveMessage调用"WaitTimeSeconds": 20参数。 我希望您不要忘记在AWS Web界面中configuration它。

关于轮询消息 – 你可以使用不同的技术,包括定时器,但我认为最简单的方法就是在receiveMessage的(甚至是exec的)callback函数的末尾调用你的readMessage()函数。 因此,处理(或等待)队列中的下一个消息将在处理先前的队列中的消息结束之后立即开始。

更新:

至于我在你的新版本的代码中有很多readMessage()调用。 我认为最好将其最小化,以使代码更清晰易于维护。 但是,如果你离开,例如,在主要的receiveMessagecallback结束时唯一的一个调用,你将会得到很多并行运行的PHP工作者脚本 – 从性能的angular度来看也许不是那么糟糕,但是你将不得不添加一些复杂的脚本来控制并行工作量。 我认为你可以减less一些调用execcallback,尝试joinif s和主callbackjoin调用。

 "use strict"; var appConf = require('./config/appConf'); var AWS = require('aws-sdk'); AWS.config.loadFromPath('./config/aws_config.json'); var delay = 20 * 1000; var sqs = new AWS.SQS(); var exec = require('child_process').exec; function readMessage() { sqs.receiveMessage({ "QueueUrl": appConf.sqs_distribution_url, "MaxNumberOfMessages": 1, "VisibilityTimeout": 30, "WaitTimeSeconds": 20 }, function (err, data) { var sqs_message_body; if (data.Messages) && (typeof data.Messages[0] !== 'undefined' && typeof data.Messages[0].Body !== 'undefined')) { //sqs msg body sqs_message_body = JSON.parse(data.Messages[0].Body); //make call to nodejs handler in codeigniter exec('php '+ appConf.CI_FC_PATH +'/index.php nodejs_handler make_contentq_call "'+ sqs_message_body.contentq_cat_id+'" "'+sqs_message_body.cnhq_cat_id+'" "'+sqs_message_body.network_id+'"', function (error, stdout, stderr) { if (error) { // error handling } if(stdout == 'Success'){ //delete message from queue sqs.deleteMessage({ "QueueUrl" : appConf.sqs_distribution_url, "ReceiptHandle" :data.Messages[0].ReceiptHandle }, function(err, data){ }); } readMessage(); }); } } readMessage(); }); } readMessage(); 

关于内存泄漏:我认为你不应该担心,因为readMessage()的下一个调用发生在callback函数中 – 所以不是recursion的,recursion调用函数在调用receiveMessage()函数之后返回父函数的值。

如果您正在使用节点,请使用https://www.npmjs.com/package/sqs-worker模块。 它会为你做这个工作。

 var SQSWorker = require('sqs-worker') var options = { url: 'https://sqs.eu-west-1.amazonaws.com/001123456789/my-queue' } var queue = new SQSWorker(options, worker) function worker(notifi, done) { var message; try { message = JSON.parse(notifi.Data) } catch (err) { throw err } // Do something with `message` var success = true // Call `done` when you are done processing a message. // If everything went successfully and you don't want to see it any more, // set the second parameter to `true`. done(null, success) }