SQS到Lambda + SES

我是LambdaSQS新手,我尝试创build一个函数来发送SQS服务中排队的邮件,但我不明白如何调用包含send + delete queue方法的进程函数。

在这里,我粘贴我的代码:

 'use strict'; const AWS = require('aws-sdk'); const SQS = new AWS.SQS({ apiVersion: '2012-11-05' }); const Lambda = new AWS.Lambda({ apiVersion: '2015-03-31' }); const ses = new AWS.SES({ accessKeyId: "xxxxxxxx", secretAccesskey: "xxxxxxx/xxxxxxxxx" }); const s3 = new AWS.S3({ apiVersion: "2006-03-01", region: "us-west-2" }); const QUEUE_URL = 'https://sqs.us-west-2.amazonaws.com/xxxxxxx/queue'; const PROCESS_MESSAGE = 'process-message'; function getPieceOfMail (path, mapObj, replace) { return new Promise(function (resolve, reject) { s3.getObject({ Bucket: "myBucket", Key: "myKey/" + path }, function (err, data) { if (err) { reject(err); } else { if (replace === true) { var re = new RegExp(Object.keys(mapObj).join("|"), "gi"); data = data.Body.toString().replace(re, function (matched) { return mapObj[matched.toLowerCase()]; }); resolve(data); } else { resolve(data.Body.toString()); } } }); }); } function getRegisterSource (nickname, activate_link) { var activate_link, pieces; pieces = [ getPieceOfMail("starts/start.html", {}, false), getPieceOfMail("headers/a.html", {}, false), getPieceOfMail("footers/a.html", {}, false), ]; return Promise.all(pieces) .then(function (data) { return (data[0] + data[1] + data[2]); }) .catch(function (err) { return err; }); } function sendEmail (email, data) { return new Promise(function (resolve, reject) { var params = { Destination: { ToAddresses: [email] }, Message: { Body: { Html: { Data: data }, Text: { Data: data } }, Subject: { Data: "myData" } }, Source: "someone <noreply@mydomain.co>", }; ses.sendEmail(params, function (err, data) { if (err) { reject(err); } else { resolve(data); } }); }); } function process(message, callback) { console.log(message); // process message getRegisterSource(event['nickname'], event['user_id']) .then(function (data) { return sendEmail(event["email"], data); }) .catch(function (err) { console.log("==ERROR=="); callback(err, err); }) .finally(function () {}); // delete message const params = { QueueUrl: QUEUE_URL, ReceiptHandle: message.ReceiptHandle, }; SQS.deleteMessage(params, (err) => callback(err, message)); } function invokePoller(functionName, message) { const payload = { operation: PROCESS_MESSAGE, message, }; const params = { FunctionName: functionName, InvocationType: 'Event', Payload: new Buffer(JSON.stringify(payload)), }; return new Promise((resolve, reject) => { Lambda.invoke(params, (err) => (err ? reject(err) : resolve())); }); } function poll(functionName, callback) { const params = { QueueUrl: QUEUE_URL, MaxNumberOfMessages: 10, VisibilityTimeout: 10, }; // batch request messages SQS.receiveMessage(params, (err, data) => { if (err) { return callback(err); } // for each message, reinvoke the function const promises = data.Messages.map((message) => invokePoller(functionName, message)); // complete when all invocations have been made Promise.all(promises).then(() => { const result = `Messages received: ${data.Messages.length}`; callback(null, result); }); }); } exports.handler = (event, context, callback) => { try { if (event.operation === PROCESS_MESSAGE) { console.log("Invoked by poller"); process(event.message, callback); } else { console.log("invoked by schedule"); poll(context.functionName, callback); } } catch (err) { callback(err); } }; 

有人可以给我一些这样的灯光吗?

感谢您的build议。

UPDATE

经过这么多的误解,我决定开始研究AWS提供的轮询SQS工作的例子 。

在那里,我发现我缺乏一些基本的SQS权限,但现在通过添加正确的策略来解决:

 { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": [ "lambda:InvokeFunction" ], "Resource": ["*"] }] } 

这允许Lambda.invoke()调用process()

process(message, callback)被调用时,如果我console.log(message); ,似乎没有消息,尽pipe队列正在被SQS.deleteMessage(params, (err) => callback(err, message));清除SQS.deleteMessage(params, (err) => callback(err, message));

我正在尝试将我的sendMail函数与SQS服务结合使用,所以我只需push每条消息push送到queue