如何使用kafka-node从主题读取数据?

我有主题,我必须从卡夫卡服务器读取,因此,我只需要创build消费者,可以从卡夫卡主题读取数据,我总是得到错误的话题不存在。

1-如何确保kafka连接已build立?

2-如何从kafka的特定主题获取数据?

main.js

var kafka = require('kafka-node'); var config = require('./config.js'); var kafkaConn = config.kafkaCon.dit; var HighLevelConsumer = kafka.HighLevelConsumer; //var HighLevelProducer = kafka.HighLevelProducer; var Client = kafka.Client; var Offset = kafka.Offset; var topics = [{topic: 'UEQ'}]; var client = new Client(kafkaConn); var payloads = [ { topic: topics, partition : 0}]; var options = { groupId: 'kafka-node-group', // Auto commit config autoCommit: true, autoCommitMsgCount: 100, autoCommitIntervalMs: 5000, // Fetch message config fetchMaxWaitMs: 100, fetchMinBytes: 1, fetchMaxBytes: 1024 * 10, }; var consumer = new HighLevelConsumer(client, payloads, options); consumer.on('message', function (message) { console.log('TEST',this.id, message); }); 

错误

 events.js:141 throw er; // Unhandled 'error' event ^ TopicsNotExistError: The topic(s) [object Object] do not exist at new TopicsNotExistError (C:\uilogging\node_modules\kafka-node\lib\errors\ TopicsNotExistError.js:11:11) 

我正在做一个类似的项目,我在自己的服务器上有一个Kafka生产者,并使用Kafka-Node作为我的应用程序的消费者。 我对Kafka-Node相当陌生,并没有太多的经验,但我可以尝试分享我发现的一些见解。

我相信你的问题是字面上你的话题不存在。

1.如何确保卡夫卡连接已build立?

如果你的连接没有build立,我不认为它会继续说这个话题不存在。 当我input一个不存在的话题时,我为我的卡夫卡制作人input一个随机的IP,没有任何错误。 但是,当我指向正确的IP,并仍然有不正确的话题,我得到同样的错误,你看到。

2.这个代码正在为我的应用程序工作

 var kafka = require('kafka-node'); var Consumer = kafka.Consumer, // The client specifies the ip of the Kafka producer and uses // the zookeeper port 2181 client = new kafka.Client("<ip to producer>:2181"), // The consumer object specifies the client and topic(s) it subscribes to consumer = new Consumer( client, [ { topic: 'myTopic', partition: 0 } ], { autoCommit: false }); consumer.on('message', function (message) { // grab the main content from the Kafka message var data = JSON.parse(message.value); console.log(data); }); 

希望这不会太晚。