Kinesis Lambda消费者最小批量

我正在使用AWS Lambda(node.js)作为AWS Kinesis使用者。 我可以看到,您可以设置最大批量大小,但是我想知道是否可以设置最小批量大小。 所以我可以确保每个lambda将至less处理50个(或任何数量)的logging。

我想有一个最小批量大小,因为lambda消费者将build立一个连接到RDS MySQL实例,我试图保持低并发连接数。

如果没有configuration能力,将设置最低限度,任何解决方法的想法将不胜感激。

谢谢。

我的第一个问题是你在stream中打开了多less个碎片? 您只能获得1个并发执行的每个shard的lambda实例。 所以如果你只有一个碎片,那么你一次只能有一个Lambda碰到你的RDS实例。

你有数据表明这是一个问题?

接下来是可能或可能不可靠工作的破解 。 而且可能不应该用在产品环境中。

对于最小批量大小,如果批量大小小于所需的logging数量,则可以从node.js lambda函数返回error

例如

 handler(event, context, callback) { const records = event.Records; if (records.length() < minBatchSize) { callback('insufficient batch size'); } else { processRecords(records, callback); } } 

但是想到两个问题:

1)由于在您的stream上configuration了最大事件时间限制,因此您不能无限期地执行此操作,而不会有丢失数据的风险。 在这段时间之后,logging从stream中消失。 请注意,您为此function额外付费(请参阅扩展数据保留 )。

您可以从lambda / kinesis分片迭代器年龄指标中推断批处理年龄,请参阅http://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html

我不确定这是多么可靠,特别是如果你有一个以上的碎片,但是例如

 handler(event, context, callback) { const records = event.Records; if (records.length() < minBatchSize) { if (calculateLambdaAge() > tooLongDelayThreshold) { processRecords(records, callback); } else { callback(new Error('insufficient batch size')); } } else { processRecords(records, callback); } } calculateLambdaAge() { // interrogate cloudwatch } 

如果cloudwatch不会告诉你,你可能需要跟踪你自己的某个地方,至less和你的RDS(redis / dynamo)一样可扩展。

2)不是为了让#1可靠而付出努力,那么额外的努力是不是可以扩大你的RDS实例,使你的当前使用更有效率?


在将代码示例放在一起时,我已经提到了这个和这个 。