Tag: amazon kinesis

分布式展示数据收集架构

我有一个用node.js编写的web应用程序,分布在多个负载平衡的Web服务器中,用于我正在开发的广告技术项目。 我正在设法弄清楚如何最好地收集和分发所投放的广告展示次数的日志数据,以便日后可以分析这些数据。 我们决定将数据存储在Apache Avro格式中,我们将使用Apache Spark处理数据。 我们selectAvro是因为我们要访问数据的方式,而且看起来是最合适的。 目前我有一个Kinesis Firehosestream设置,我使用Node.js的AWS-SDK发送有关每个展示的Avro数据,然后最终存储在S3中。 问题Kinesis把我发送给它的所有文件连接到它写入到S3的每个批处理中,所以如果我把它configuration成每隔300秒写一次,那么它将排队所有的Avro文件,连接它们,然后把它们写到S3。 由于合并在一起,生成的文件不再是有效的Avro文件。 也许这会更好,如果我只是将原始的JSON数据发送到Kinesisstream,然后让另一个应用程序将JSON数据转换成Avro? 有一种烦人的做法,就是为了中间数据处理而编写另一个服务,而我将不得不使用我的转换服务协调对源数据格式的更改。 有更好的方法吗?

如何确定哪个特定的Lambda请求触发了我的Kinesis事件?

我有一个连接到AWS Kinesis Stream(由其触发)的AWS lambda。 当我将事件发射到Kinesis时,我的lambda被调用。 下面是一些将事件推送到Kinesis的示例代码(这部分工作): var kinesis = new AWS.Kinesis({ region: 'us-east-1' }); var params = { Data: new Buffer(JSON.stringify(data)), StreamName: 'myStreamName', PartitionKey: uuid.v1() }; kinesis.putRecord(params, function(err, data) { done(); }); 当我成功创build一个logging,我得到这样的回应: { ShardId: 'shardId-000000000000', SequenceNumber: '49570419697469019326213778569044054238145932258132885506' } 我如何使用SequenceNumber来查找被触发的lambda的RequestId?

在nodejs中解压缩数据

我正在尝试使用下面的代码来解压缩base64解码的缓冲区对象,但callback函数没有得到执行。 有人能告诉我我在这里失踪吗? function (event, context) { event.Records.forEach(function (record) { // Kinesis data is base64 encoded so decode here var payload = new Buffer(record.kinesis.data, 'base64'); zlib.gunzip(payload, function (err, data) { console.log("new data2"); if (!err) { console.log("new data3"); console.log(data); } else { console.log("new data4"); console.log(err, err.stack); } }); }); context.done(); };

如何确保顺序asynchronous放入kinesisstream中的logging?

我正在写一个应用程序,它读取MySQL bin日志并将更改推送到Kinesisstream。 我的用例需要完善的kinesisstream中的事件顺序,我正在使用putrecord操作而不是putrecords ,还包括' SequenceNumberForOrdering '键。 但是,还有一个失败点,即重试逻辑。 作为asynchronous函数(使用aws的js sdk),如何确保在写入操作期间发生故障时的顺序。 阻塞写入(阻塞事件循环,直到收到logging的callback)太糟糕的解决scheme? 或者,还有更好的方法?

Kinesis Lambda消费者最小批量

我正在使用AWS Lambda(node.js)作为AWS Kinesis使用者。 我可以看到,您可以设置最大批量大小,但是我想知道是否可以设置最小批量大小。 所以我可以确保每个lambda将至less处理50个(或任何数量)的logging。 我想有一个最小批量大小,因为lambda消费者将build立一个连接到RDS MySQL实例,我试图保持低并发连接数。 如果没有configuration能力,将设置最低限度,任何解决方法的想法将不胜感激。 谢谢。