kafka节点使用者收到offsetOutOfRange错误

我正在使用kafka-node(kafka的节点客户端),使用消费者检索有关主题的消息。 不幸的是,我收到一个“offsetOutOfRange”条件(调用offsetOutOfRangecallback)。 我的申请工作正常,直到消费者显着滞后于生产者,在最早和最近的抵消之间留下了一个较大的差距。 在这一点上,我(可能错误地)认为消费者将能够继续接收消息(希望赶上生产者)。

我的客户客户代码如下:

: : var kafka = require('kafka-node'); var zookeeper = "10.0.1.201:2181"; var id = "embClient"; var Consumer = kafka.Consumer; var client = new kafka.Client(zookeeper, id); var consumer = new Consumer( client, [ { topic: "test", partition: 0 } ], { autoCommit: false } ); consumer.on('error', [error callback...]); consumer.on('offsetOutOfRange', [offset error callback...]); consumer.on('message', [message callback...]); : : 

我做错了什么,或者我错过了什么?

如果不是,我有几个问题:

(a)是否有一个公认的“最好”的方式来编写客户端来优雅地处理这种情况?

(b)为什么会提高这个条件? (我假设一个客户应该能够继续阅读邮件,最终(理想情况下)赶上…)

(c)是否需要编写代码/逻辑来处理这种情况,并明确地重新定位消费者偏移量来读取? (这似乎有点麻烦)…

任何帮助表示赞赏。

我相信应用程序可能会尝试阅读卡夫卡中不再可用的消息。 Kafka根据log.retention。*属性删除旧的消息。 假设你已经发送了1000封邮件给卡夫卡。 由于保留,卡夫卡删除了前500条消息。 如果您的应用程序试图读取消息350,它将失败,它会引起offsetOutOfRange错误。 这可能是因为你的消费者太慢,卡夫卡经纪人已经在你的消费者可以处理消息之前已经删除了消息。 或者你的消费者崩溃了,但是最后处理的消息的偏移被保存在某处。

您可以使用Offset类来检索最新/最早的可用偏移量(请参阅方法fetch )并更新使用者的偏移量。 我们使用这种方法。

一般来说,当这种情况发生时,应该怎么做应该是不容易的,因为显然有些东西是非常错误的。

希望它有帮助,Lukáš