卡夫卡节点,消费者总是有旧信息

即时通讯使用模块kafka节点https://github.com/SOHU-Co/kafka-node

每次当我重新启动消费者,他们得到所有的旧消息,即时通讯使用循环系统(负载平衡)

你有什么想法我怎么能声明到服务器,我消费了一条消息,他不再给我重新启动消费者时,我呢?

在我的代码或configuration服务器的一些错误?

任何想法 ?

生产者代码

var kafka = require('kafka-node'); var HighLevelProducer = kafka.HighLevelProducer; var Client = kafka.Client; var client = new Client('xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181', 'consumer' + process.pid); var argv = require('optimist').argv; var topic = argv.topic || 'test_12345'; var producer = new HighLevelProducer(client); var time = process.hrtime(); var message, diff,i=0; producer.on('ready', function () { setInterval(function(){ var date = new Date(); var dateString = date.getFullYear() + "-" +((date.getMonth()+1)<10 ? '0'+(date.getMonth()+1) : (date.getMonth()+1)) + "-" +(date.getDate()<10 ? '0'+date.getDate() : date.getDate()) + " " +(date.getHours()<10 ? '0'+date.getHours() : date.getHours()) + ":" +(date.getMinutes()<10 ? '0'+date.getMinutes() : date.getMinutes()) + ":" +(date.getSeconds()<10 ? '0'+date.getSeconds() : date.getSeconds()); message = JSON.stringify({'message' : 'hello - '+dateString}); console.log(message); send(message); },1000); }); function send(message) { producer.send([ {topic: topic, messages: [message] } ], function (err, data) { console.log(data); if (err) console.log(err); }); } 

工人代码:

 var kafka = require('kafka-node'); var HighLevelConsumer = kafka.HighLevelConsumer; var Offset = kafka.Offset; var Client = kafka.Client; var argv = require('optimist').argv; var topic = argv.topic || 'test_12345'; var client = new Client('xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181','consumer'+process.pid); var payloads = [ { topic: topic }]; var options = { groupId: 'kafka-node-group', // Auto commit config autoCommit: true, autoCommitMsgCount: 100, autoCommitIntervalMs: 5000, // Fetch message config fetchMaxWaitMs: 100, fetchMinBytes: 1, fetchMaxBytes: 1024 * 10, fromOffset: false, fromBeginning: false }; var consumer = new HighLevelConsumer(client, payloads, options); var offset = new Offset(client); consumer.on('message', function (message) { console.log(this.id, message); }); consumer.on('error', function (err) { console.log('error', err); }); consumer.on('offsetOutOfRange', function (topic) { console.log("------------- offsetOutOfRange ------------"); topic.maxNum = 2; offset.fetch([topic], function (err, offsets) { var min = Math.min.apply(null, offsets[topic.topic][topic.partition]); consumer.setOffset(topic.topic, topic.partition, min); }); }); 

zookeeper zoo.cfg(5台服务器)

 The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/etc/zookeeper/data # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir autopurge.snapRetainCount=5 # Purge task interval in hours # Set to "0" to disable auto purge feature autopurge.purgeInterval=24 server.1=xxx.xxx.xxx.xxx:2888:3888 server.2=xxx.xxx.xxx.xxx:2888:3888 server.3=xxx.xxx.xxx.xxx:2888:3888 server.4=xxx.xxx.xxx.xxx:2888:3888 server.5=xxx.xxx.xxx.xxx:2888:3888 leaderServes = false 

kafka server.properties(5台服务器)

 # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=5 ############################# Socket Server Settings ############################# # The port the socket server listens on port=9092 # Hostname the broker will bind to. If not set, the server will bind to all interfaces #host.name=localhost # Hostname the broker will advertise to producers and consumers. If not set, it uses the # value for "host.name" if configured. Otherwise, it will use the value returned from # java.net.InetAddress.getCanonicalHostName(). #advertised.host.name=<hostname routable by clients> # The port to publish to ZooKeeper for clients to use. If this is not set, # it will publish the same port that the broker binds to. #advertised.port=<port accessible by clients> # The number of threads handling network requests num.network.threads=4 # The number of threads doing disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=1048576 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=1048576 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma seperated list of directories under which to store log files log.dirs=/etc/kafka/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=10 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=536870912 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=60000 # By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. # If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. log.cleaner.enable=false ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. eg "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=1000000 # metrics reporter properties #kafka.metrics.polling.interval.secs=5 #kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter #kafka.csv.metrics.dir=/etc/kafka/kafka_metrics # Disable csv reporting by default. #kafka.csv.metrics.reporter.enabled=false replica.lag.max.messages=10000000 default.replication.factor=5 controlled.shutdown.enable=true 

亲切

我不确定…但是看起来你的问题是因为你每次重新运行它(使用进程pid)都在改变用户组,并且每个消费者组都必须从头开始获取消息…

我有同样的问题。 我注意到,当您使用多个分区使用主题时会发生这种情况。

如果在消费者的主题中指定分区号,则只会从一个分区消耗,不会收到较旧的消息。

尝试改变:

 var payloads = [ { topic: topic }]; 

 var payloads = [ { topic: topic, partition : 0 }]; 

你应该尝试调整以下属性 –

此设置是以小时为单位的,因此主题上的消息默认可用24 * 7小时

 #Broker Configs # The minimum age of a log file to be eligible for deletion log.retention.hours=168 

在消费者configuration中,将auto.commit.enable设置为true,这将使消费者能够向zookeeper提交已经获取的消息的偏移量。 另外,将auto.offset.reset更改为“最大”,以便不会从最小的可能偏移量中读取消息。

试试看,如果你仍然遇到问题,你可以通过zookeeper命令行来监视给定用户的偏移更新。 你应该看/消费者和/ /经纪人; 以下会给你抵消 –

 get /consumers/my_test_group/offsets/my_topic/0 

希望这可以帮助

你的代码适合我。 我用kafka-node v0.2.20testing了它。

专注于动物园pipe理员:

  • 检查日志(例如复制错误),
  • 尝试一个zookeeeper实例,
  • 尝试设置选项leaderServes = true,
  • 通过zkCli.sh检查path/消费者/ kafka-node-group / offset / test_12345 / 0。

这是一个客户端模块错误,修复在#314

尝试改变:

 var consumer = new HighLevelConsumer(client, payloads, options); 

至:

 var consumer = new Consumer(client, payloads, options); 
Interesting Posts