Tag: apache kafka

kafka错误:请求偏移量X,但是我们只有从Y到Z范围内的日志段

我有工作kafka实例,但现在它产生成千上万的错误,如下所示: [2016-06-24 01:33:28,092] ERROR [代理0上的副本pipe理器]:在处理来自消费者的分区[test-topic,0]偏移45的提取请求时出错,相关标识为85452.可能的原因: 45,但我们只有0到0范围内的日志段。(kafka.server.ReplicaManager) 我正在使用默认configuration。 我在node.js代码中使用了几个主题。 UPD:重新安装卡夫卡帮助我,但我想知道问题的原始来源。

阅读与反应的kafka话题

如果这个问题看起来很普通,我会开始说我很抱歉,但是我很难解决这个问题。 所以我会在这里给你一个机会。 我想用反应build立一个kafka话题的消费者,所以它会渲染,我不知道我的话题中有一个新的消息。 我已经有了消费者的代码: var kafka = require('kafka-node'), Consumer = kafka.Consumer, client = new kafka.Client(), consumer = new Consumer( client, [ { topic: 'logs', partition: 0 } ], { autoCommit: false } ), Producer = kafka.Producer, client = new kafka.Client(), producer = new Producer(client); consumer.on('message', function (message) { console.log(message); }); 任何时候都有一个新的消息,消费者的事件将会被调用。 但是我看不出有什么办法来反应。 我为任何东西,教程,文章,任何东西。 谢谢。

如何使用kafka-node从主题读取数据?

我有主题,我必须从卡夫卡服务器读取,因此,我只需要创build消费者,可以从卡夫卡主题读取数据,我总是得到错误的话题不存在。 1-如何确保kafka连接已build立? 2-如何从kafka的特定主题获取数据? main.js var kafka = require('kafka-node'); var config = require('./config.js'); var kafkaConn = config.kafkaCon.dit; var HighLevelConsumer = kafka.HighLevelConsumer; //var HighLevelProducer = kafka.HighLevelProducer; var Client = kafka.Client; var Offset = kafka.Offset; var topics = [{topic: 'UEQ'}]; var client = new Client(kafkaConn); var payloads = [ { topic: topics, partition : 0}]; var options = […]

在nodejs中检查kafka主题的存在

我目前正在使用nodejs和kafka,由此设置一个nodejs服务器来接收事件,并将与事件相对应的数据发送给kafka。 在kafka中,如果主题不存在,制作人将相应地dynamic地创build主题。 要做到这一点,我想检查该主题的存在,如果它存在或不创build之前。 我目前正在使用kafka-node模块来实现kafka-node集成function。 然而,我找不到任何能够说明这个话题存在的function,或者返回卡夫卡当前存在的所有话题列表。 在互联网上search,我发现卡夫卡rest代理,通过获取当前的主题,有助于了解这一点,但我不知道如何把它用于很多。 任何人都可以告诉我关于任何其他通过我可以实现上述function的API? 请帮忙。

kafka节点js客户端压缩问题与snappy

我正在使用kafka-node( https://github.com/SOHU-Co/kafka-node )消费者来检索数据。 我认为我得到的数据是用SNAPPY压缩的。 我得到它后如何解压缩数据? 我尝试使用node-snappy( https://github.com/kesla/node-snappy )来解压缩数据,但没有奏效。 库中是否有任何选项将压缩设置为无? 任何人都使用kafka-node库来从kafka获取数据。 谢谢,chandu

用node-rdkafka重新连接到Kafka是缓慢和不一致的

我有kafka和zookeeper在一些当地的docker集装箱运行。 我有一个node.js代码库,它使用node-rdkafka作为消费者连接到kafka。 我们将这个代码库称为“消费者” 消费者使用此处显示的代码连接到kafka: https : //github.com/Blizzard/node-rdkafka/blob/master/examples/consumer-flow.md 当试图连接到一个现有的Kafka实例时,我得到这个输出: consumer ready.{"name":"rdkafka#consumer-1"} 但是,接收和处理消息的代码在开始触发之前从不会触发,或者需要几分钟(有时显然是5到10分钟)。 为了确保消息正在生成,我使用kafka附带的脚本kafka-console-consumer.sh来观察消息stream。 果然,数据即将来临。 看起来连接有时会失败,有时需要很长时间才能连接。 这是这段时间的卡夫卡日志: kafka_1 | [2017-04-27 20:55:37,963] INFO [Group Metadata Manager on Broker 1001]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager) kafka_1 | [2017-04-27 20:55:48,576] WARN Attempting to send response via channel for which there is no open connection, connection id 1 […]

比较kafka-node和node-rdkafka

我想用我的Node.JS服务作为消息代理使用Kafka,并在kafka-node和node-rdkafka库之间进行辩论。 我开始testingkafka-node的function(这个库似乎更受欢迎),并且能够创build一个制作者,用它写信给Kafka,创build一个消费者,并且从Kafka读取它。 但是,我注意到,该库目前不支持一些Kafkafunction,例如写时间戳和lz4压缩(我使用的Scala驱动程序支持的function)。 有没有人在kafka-node(或node-rdkafka)遇到这些或其他挫折,并决定使用另一个库,由于他们?

卡夫卡节点,消费者总是有旧信息

即时通讯使用模块kafka节点https://github.com/SOHU-Co/kafka-node 每次当我重新启动消费者,他们得到所有的旧消息,即时通讯使用循环系统(负载平衡) 你有什么想法我怎么能声明到服务器,我消费了一条消息,他不再给我重新启动消费者时,我呢? 在我的代码或configuration服务器的一些错误? 任何想法 ? 生产者代码 var kafka = require('kafka-node'); var HighLevelProducer = kafka.HighLevelProducer; var Client = kafka.Client; var client = new Client('xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181', 'consumer' + process.pid); var argv = require('optimist').argv; var topic = argv.topic || 'test_12345'; var producer = new HighLevelProducer(client); var time = process.hrtime(); var message, diff,i=0; producer.on('ready', function () { setInterval(function(){ var […]

保存在Zookeeper或Kafka中的偏移量?

我对使用Kafka和Zookeeper时偏移的存储位置感到困惑。 看起来在某些情况下补偿存储在Zookeeper中,在其他情况下它们存储在Kafka中。 什么决定偏移量是存储在卡夫卡还是在Zookeeper中? 什么是利弊? 注意:当然,我也可以在一些不同的数据存储中存储偏移量,但这不是本文的图片的一部分。 关于我的设置的更多细节: 我运行这些版本:KAFKA_VERSION =“0.10.1.0”,SCALA_VERSION =“2.11” 我使用我的NodeJS应用程序中的kafka-node连接到Kafka / Zookeeper。