卡夫卡HLC应该如何找出某个主题的分区数量?

我正在使用kafka-node HighLevelConsumer,并且在启动时总是收到重复消息的问题。

为了保持处理顺序,我的使用者只需将消息附加到工作队列中,然后我会连续处理这些事件。 我暂停使用者,如果我打到一个队列高水位标记,我已经禁用了自动提交,并且在我的客户端代码完全处理每个事件后,我会“手动”提交。

尽pipe在启动时,我总是从一个或多个分区(取决于组中有多less其他HLC运行)获取最后(先前已提交)的消息。 我有点惊讶,HLC不会给我(承诺+ 1),但我决定只是“忽略”偏移量早于偏移量的消息。 作为一个快速testing,

offset.fetchCommits('fnord', [{topic:'test', partition: 0}, {topic:'test', partition: 1}, {topic:'test', partition: 2}, {topic:'test', partition: 3}], ... 

如果我的有效载荷列表匹配定义的分区数量,这将起作用。 如果我超出了分区的数量,我得到一个[BrokerNotAvailableError: Could not find the leader]错误。

  1. 我是否正确,我不能自动提交,如果我想有一个更强有力的保证,如果我的消息处理是asynchronous的,可能会失败(即ETL作业),我不会失去消息? kafka-node只是发出“消息”事件,没有办法确认它是否成功处理。
  2. HighLevelConsumer是否会读取上次提交的偏移量(即重复)的消息,而不是下一个偏移量?
  3. 获得某个主题的分区数量的最佳方法是什么?

我挖掘了kafka-node源代码,并且有一个我可以用来获取分区信息的无证电话:

 client.loadMetadataForTopics(['test'], function(err, results) {..} 

(我不喜欢调用那些看起来不是公共API的logging部分的东西,我对返回的结果的原始感觉混合数组性质感到不舒服,但是它解决了我现在的问题。)