如何使用AWS JavaScript SDK(dynamoDB)处理UnprocessedItems?

我正在尝试使用AWS Lambda函数处理来自SendGrid的事件。 据我所知,事件将是一个具有可变数量的JSON对象的数组,每个JSON对象代表一个给定的事件。 我想使用batchWriteItem将这些事件写入DynamoDB,并循环这个过程,直到我没有返回任何UnprocessedItems。 但是,我陷入了一个无限循环。 这是我现在的代码:

console.log('Loading function'); var aws = require('aws-sdk'); var dynamo = new aws.DynamoDB(); params = {}; exports.handler = function(sg_event, context) { var items = []; for(var i = 0; i < sg_event.length; i++) { var obj = sg_event[i]; var request = { PutRequest: { Item: { email: { S: obj.email }, timestamp: { S: obj.timestamp.toString() }, sg_message_id: { S: obj.sg_message_id }, event: { S: obj.event } } } }; items.push(request); } params = { RequestItems: { sendgrid_response: items } } do { dynamo.batchWriteItem( params, function(err, data) { if(err) context.fail(err); else params.RequestItems = data.UnprocessedItems; }); } while(!isEmpty(params.RequestItems)); }; function isEmpty(obj) { return (Object.keys(obj).length === 0); } 

我认为问题是试图在callback函数中设置参数,但我不知道我应该怎么做…我知道我可以调用另一个batchWriteItem使用UnprocessedItems在原来的callback,但我仍然需要能够根据需要多次运行该函数,以确保写入所有未处理的项目。 我怎样才能正确地循环batchWriteItem?

@Daniela Miao,感谢分享解决scheme。

我们可以在发布的代码中添加一个代码块,以避免发生DynamoDBexception。 在再次请求DynamoDB批量写入之前,这将检查params.RequestItems是否有未处理的数据。

 //db is AWS.DynamoDB Client var processItemsCallback = function(err, data) { if (err) { //fail } else { var params = {}; params.RequestItems = data.UnprocessedItems; /* * Added Code block */ if(Object.keys(params.RequestItems).length != 0) { db.batchWriteItem(params, processItemsCallback); } } }; db.batchWriteItem(/*initial params*/, processItemsCallback); 

Nodejs是单线程的,它首先执行所有的主函数,这样while循环永远不会结束,callback将永远不会执行。

这是你如何做到的:

 //db is AWS.DynamoDB Client var processItemsCallback = function(err, data) { if (err) {   //fail } else {  var params = {};  params.RequestItems = data.UnprocessedItems;  db.batchWriteItem(params, processItemsCallback); } }; db.batchWriteItem(/*initial params*/, processItemsCallback);