Kinesis Lambda消费者最小批量
我正在使用AWS Lambda(node.js)作为AWS Kinesis使用者。 我可以看到,您可以设置最大批量大小,但是我想知道是否可以设置最小批量大小。 所以我可以确保每个lambda将至less处理50个(或任何数量)的logging。
我想有一个最小批量大小,因为lambda消费者将build立一个连接到RDS MySQL实例,我试图保持低并发连接数。
如果没有configuration能力,将设置最低限度,任何解决方法的想法将不胜感激。
谢谢。
我的第一个问题是你在stream中打开了多less个碎片? 您只能获得1个并发执行的每个shard的lambda实例。 所以如果你只有一个碎片,那么你一次只能有一个Lambda碰到你的RDS实例。
你有数据表明这是一个问题?
接下来是可能或可能不可靠工作的破解 。 而且可能不应该用在产品环境中。
对于最小批量大小,如果批量大小小于所需的logging数量,则可以从node.js lambda函数返回error
。
例如
handler(event, context, callback) { const records = event.Records; if (records.length() < minBatchSize) { callback('insufficient batch size'); } else { processRecords(records, callback); } }
但是想到两个问题:
1)由于在您的stream上configuration了最大事件时间限制,因此您不能无限期地执行此操作,而不会有丢失数据的风险。 在这段时间之后,logging从stream中消失。 请注意,您为此function额外付费(请参阅扩展数据保留 )。
您可以从lambda / kinesis分片迭代器年龄指标中推断批处理年龄,请参阅http://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html 。
我不确定这是多么可靠,特别是如果你有一个以上的碎片,但是例如
handler(event, context, callback) { const records = event.Records; if (records.length() < minBatchSize) { if (calculateLambdaAge() > tooLongDelayThreshold) { processRecords(records, callback); } else { callback(new Error('insufficient batch size')); } } else { processRecords(records, callback); } } calculateLambdaAge() { // interrogate cloudwatch }
如果cloudwatch不会告诉你,你可能需要跟踪你自己的某个地方,至less和你的RDS(redis / dynamo)一样可扩展。
2)不是为了让#1可靠而付出努力,那么额外的努力是不是可以扩大你的RDS实例,使你的当前使用更有效率?
在将代码示例放在一起时,我已经提到了这个和这个 。