如何确保顺序asynchronous放入kinesisstream中的logging?

我正在写一个应用程序,它读取MySQL bin日志并将更改推送到Kinesisstream。 我的用例需要完善的kinesisstream中的事件顺序,我正在使用putrecord操作而不是putrecords ,还包括' SequenceNumberForOrdering '键。 但是,还有一个失败点,即重试逻辑。 作为asynchronous函数(使用aws的js sdk),如何确保在写入操作期间发生故障时的顺序。

阻塞写入(阻塞事件循环,直到收到logging的callback)太糟糕的解决scheme? 或者,还有更好的方法?

在将logging添加到stream中时,不要试图强制sorting,而要在读取logging时对其进行sorting。 在你的用例中,每个binlog条目都有唯一的文件序列,起始位置和结束位置。 所以订购它们并找出差距是微不足道的。

如果你在阅读时发现有空白,消费者将不得不等待,直到填满。 但是,假设现在是灾难性的失败,所有的logging应该在stream中彼此接近,所以缓冲量应该是最小的。

通过在生产者方面执行sorting,您将限制您的整体吞吐量,因为您可以快速编写单个logging。 如果你能跟上实际的数据库更改,那就没关系。 但是,如果你不能跟上,即使消费者可能轻装上阵,你的pipe道滞后也将不断增加。

而且,你只能在一个分片中执行命令,所以如果你的制作者需要每秒超过1 MB的logging(或者说大于1000条logging/秒),那么你运气不好(根据我的经验, d通过PutRecords达到1,000条logging/秒;如果您一次只写一条logging,则会得到20-30个请求/秒)。

如果你想完美的订购,那么你需要确保在插入下一个事件之前插入每个事件,所以是的,你必须等到一个放置请求完成后再执行下一个放置请求。 问题是你是否真的需要所有事件的完美sorting,或者你是否需要在一些子集中完美sorting? 因为您正在使用关系数据库,所以在同一个表中的行之间不太可能存在关系。 你更有可能在表之间的行之间存在关系,所以你可以使用一些技巧来利用批量放置请求。

批量放置请求的问题在于请求中的无序。 由于bin日志为您提供了更改后的行的完整映像,因此您实际上只关心每个主键的bin日志中的最近条目,因此您可以改为从该日志中收集相对较大的一批事件bin日志应该按时间sorting,按照主键进行分组,然后只取binloglogging中的after_values图像作为每个主键组的最新logging。 然后,您可以安全地对这些logging中的每一个使用批量放入请求,并确保您不会意外地将某个给定密钥的陈旧logging放入该stream的最新logging之前的stream中。

这对于所有情况都是不够的,但是在许多CDC( https://en.wikipedia.org/wiki/Change_data_capture )设置中,这足以将数据准确地复制到其他系统中。

假设您在bin日志中有以下logging(格式取自https://aws.amazon.com/blogs/database/streaming-changes-in-a-database-with-amazon-kinesis/ ):

 {"table": "Users", "row": {"values": {"id": 1, "Name": "Foo User", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"} {"table": "Users", "row": {"before_values": {"id": 1", "Name": "Foo User", "idUsers": 123}, "after_values": {"id": 1, "Name": "Bar User", "idUsers": 123}}, "type": "UpdateRowsEvent", "schema": "kinesistest"} {"table": "Users", "row": {"values": {"id": 2, "Name": "User A", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"} {"table": "Users", "row": {"before_values": {"id": 1", "Name": "Bar User", "idUsers": 123}, "after_values": {"id": 1, "Name": "Baz User", "idUsers": 123}}, "type": "UpdateRowsEvent", "schema": "kinesistest"} {"table": "Users", "row": {"values": {"id": 3, "Name": "User C", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"} 

在这个例子中,有三个由主键id标识的行。 插入id=1的行,然后更新两次,插入id=2的行,插入id=3的行。 您需要分别处理每种types的事件(写入,更新,删除),并仅收集每个ID的最新状态。 因此,对于写入操作,您需要获取行的values ,获取行的after_values更新,以及将行放入一批删除操作中的删除操作。 在这个例子中,只有三个重要的条目是:

 {"table": "Users", "row": {"values": {"id": 2, "Name": "User A", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"} {"table": "Users", "row": {"before_values": {"id": 1", "Name": "Bar User", "idUsers": 123}, "after_values": {"id": 1, "Name": "Baz User", "idUsers": 123}}, "type": "UpdateRowsEvent", "schema": "kinesistest"} {"table": "Users", "row": {"values": {"id": 3, "Name": "User B", "idUsers": 123}}, "type": "WriteRowsEvent", "schema": "kinesistest"} 

这是因为它们是每个ID的最新版本。 您可以使用批量处理来批量处理包含这三个写入的批处理,而且不必担心在大多数情况下这些批处理会出现乱序,除非您在单个表中的条目之间存在相互依赖关系或其他非常具体的要求。

如果您删除了,则只需将它们放入单独的批量删除,即在批量放入logging后执行。 在过去,我已经看到通过执行压缩和批处理程序来改善吞吐量。 但是,如果您真的需要阅读每个事件,而不是将最新的数据复制到各种其他商店,那么这可能不起作用。

我能够通过使用内部的FIFO队列实现完美的sorting。 我将每个事件都推送到一个FIFO队列中,这个队列正在被一个推送Kinesisstream中的事件的recursion函数读取(一次一个)。 我也在每次成功的putRecord操作中将bin日志偏移量存储在外部存储器(在我的情况下为redis),如果任何写入kinesis失败,我可以重新启动服务器,并开始从最后一个成功的偏移值再次读取。

任何build议,这个解决scheme或不同的解决scheme将不胜感激。

这是我的recursion函数的代码片段,从FIFO队列中读取。

 const fetchAndPutEvent = () => { let currentEvent = eventQueue.shift(); // dequeue from the fifo queue if (currentEvent) { currentEvent = JSON.parse(currentEvent); // put in the kinesis stream with sequence number of last putRecord operation to achieve ordering of events return kinesis.putRecord(currentEvent, sequenceNumber, (err, result) => { if (err) { // in case of error while putting in kinesis stream kill the server and replay from the last successful offset logger.fatal('Error in putting kinesis record', err); return setTimeout(() => { process.exit(0); }, 10000); } try { //store the binlog offset and kinesis sequence number in an external memory sequenceNumber = result.SequenceNumber; let offsetObject = { binlogName: currentEvent.currentBinlogName, binlogPos: currentEvent.currentBinlogPos, sequenceNumber: sequenceNumber }; redisClient.hmset(redisKey, offsetObject); } catch (ex) { logger.fatal('Exception in putting kinesis record', ex); setTimeout(function() { process.exit(0); }, 10000); } return setImmediate(function() { return fetchAndPutEvent(); }); }); } else { // in case of empty queue just recursively call the function again return setImmediate(function() { return fetchAndPutEvent(); }); } };