Tag: apache kafka

加载angular 4和node-rdkafka和kafka-node的问题

我对web前端开发很陌生,在加载kafka客户端库的时候,我在JS / Node / Angular世界有些迷茫。 我考虑了两个选项来访问我的kafka集群:node-rdkafka和kafka-node。 但是,我不认为我的问题在于他们,似乎是缺乏有关JS和/或节点或什么的知识。 我会解释实际问题,最后回到这个概念。 这是我第一次进入JS / Node / Angular世界,所以我的知识还是非常有限的,请忍受任何noob错误。 我开始使用angular度cli(1.2.3)来构build我的组件,服务等。一切都很好,直到我试图加载node-rdkafka:自述文件指示我使用“require”而不是“import”,而那不是find。 通过一些研究,我设法通过改变typings.d.ts文件来得到“需要”的工作(我认为)是这样的: declare var module: NodeModule; interface NodeModule { id: string; } declare var require: NodeRequire; 和tsconfig.app.json包含这个: "types": [ "node" ] 在compilerOptions里面。 应用程序编译和“const kafka = require('node-rdkafka')”开始加载库。 但是,它的一个依赖关系中存在一个错误,即节点绑定。 我发现已经有一个修复,所以我改变了它的版本到最新的提交,只是为了遇到一个新的错误已经报告,但从来没有回答: https://github.com/TooTallNate/node-bindings/issues/34 我也收到以下警告: WARNING in ./~/bindings/bindings.js 81:22-40 Critical dependency: the request of a dependency is […]

根据http请求从kafka获取最新消息

我正在使用kafka节点客户端作出restify api,当被调用返回最新的消息作为响应。 我使用下面的代码取得了一些成功。 var consumerOptions = { groupId: 'ExampleTestGroup', fetchMaxWaitMs:500, fromOffset: 'latest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest' }; consumerStream=new ConsumerStream(client,[{topic:'topic1'},{topic:'topic2'}],{fromOffset:'lates',fecthMaxWaitMs:500}); consumerStream.on('error',(err)=>{ console.log('err',err); }); consumerStream.pipe(Stringify()).pipe(res); setTimeout(()=>{ console.log('100 ms are up'); res.end(); next(); },1000); 如果主题中没有足够的消息并返回响应,则必须使用超时。 这工作正常,但只在第一个请求,因为我假设偏移量是在新的请求到来时提交的,并且不会因此而提取任何消息。

从AWS ECS连接到Heroku kafka – kafka元数据 – 客户端错误

我的nodejs应用程序部署在AWS ECS集群上,它正在监听端口3000,发布此nodejs连接到Heroku通过no-kafka客户端部署kafka我能够连接kafka主题订阅,如果我本地(在我的笔记本电脑上)部署我的nodejs应用程序,但是当我在AWS ECS容器上部署相同时,则出现错误。 2017-11-28T06:11:43.619Z **错误no-kafka-client元数据请求失败:AggregateError:NoKafkaConnectionError [ec2-xx-xx-xx-xxx.compute-1.amazonaws.com:9096]:Kafka服务器已closures连接 我做了一些研究,发现没有kakfa(从github)有Brokerredirection选项,我试过这个,但不知道格式是否正确,我也没有find任何例子在networking上正确给主机端口和卡夫卡端口。 另一个研究,我通过iptables打开IP,允许在ECS实例上的端口9096。 没有运气 nmap来自ECS的端口,并且能够连接到kafka broker节点 主机已启动(延迟0.00031秒)。 PORT STATE SERVICE 9096 / tcpfiltered unknown 这意味着连接到kafka是好的,但返回值是问题或连接本身是一个问题。 如何映射kakfa端口返回与Docker,ECS实例端口? 请帮忙,

我怎样才能得到使用kafka-node库的kafka服务器的主题列表?

我想在kafka服务器上创build新的主题,但在此之前,我想从我的服务器检索主题列表,我怎样才能实现一旦连接build立? main.js var groupId = 'test1'; var clientId = "consumer-" + Math.floor(Math.random() * 10000); var options = {autoCommit: true, fetchMaxWaitMs: 100, fetchMaxBytes: 10 * 1024 * 1024, groupId: groupId}; console.log("Started consumer: ", clientId); var consumer_client = new Client(kafkaConn,clientId); var client = new Client(consumer_client.connectionString,clientId); var consumer = new HighLevelConsumer(client, topics, options);

我应该停止offsetOutOfRange上的节点-kafka-consumer吗?

我写了一个节点卡夫卡消费者。 在极less数情况下,我使用一个group-id来启动kafka客户端,当某些偏移量可用时使用,但不再可用 – 导致调用“offsetOutOfRange”事件。 在这种情况下推荐的行为是什么? logging错误并退出? 有没有办法恢复? 我总是希望从上次提交的偏移量(如果存在并可用)运行zookeeper。 client = new kafka.Client(ZOOKEEPER_URLS), consumer = new Consumer(client, [], { groupId: GROUP_ID, fromOffset: true }); consumer.on('offsetOutOfRange', function (topic) { applicationLogger.error('Kafka consumer is trying to read from offset which is out of range', topic); process.exit(1); });

卡夫与卡夫卡 – 不能产生kafka.connect()

我是Koa的新手,但是设置了一个使用Kafka的应用程序。 我正在使用kafkaesque( https://github.com/pelger/Kafkaesque )。 我尝试了yield* kafkaesque.tearUp() 。 结果: cb(err); ^ TypeError: undefined is not a function 我也尝试过kafkaesque.tearUp(function *() {…})但是这也不起作用 – 只有函数()风格的callback工作。 以Koa的方式处理这些types的例子是否可能? 如果需要,我可以处理callback,但现在不能使用代码工作,因为我需要在Kafka连接(kafkaesque.tearUp)和主题设置(kafkaesque.poll)之后再调用yield next 。

卡夫卡到node.js的Elasticsearch消费

我知道有相当多的node.js模块实现了一个Kafka消费者,它获取消息并写入弹性。 但是我只需要每个msg的一些字段,而不是全部。 有没有我不知道的现有解决scheme?

kafka节点使用者收到offsetOutOfRange错误

我正在使用kafka-node(kafka的节点客户端),使用消费者检索有关主题的消息。 不幸的是,我收到一个“offsetOutOfRange”条件(调用offsetOutOfRangecallback)。 我的申请工作正常,直到消费者显着滞后于生产者,在最早和最近的抵消之间留下了一个较大的差距。 在这一点上,我(可能错误地)认为消费者将能够继续接收消息(希望赶上生产者)。 我的客户客户代码如下: : : var kafka = require('kafka-node'); var zookeeper = "10.0.1.201:2181"; var id = "embClient"; var Consumer = kafka.Consumer; var client = new kafka.Client(zookeeper, id); var consumer = new Consumer( client, [ { topic: "test", partition: 0 } ], { autoCommit: false } ); consumer.on('error', [error callback…]); consumer.on('offsetOutOfRange', [offset error callback…]); […]

Node.js kafka节点分区程序types的用法

我试图通过节点js使用kafka-node将数据插入到kafka主题中。 如果我尝试使用partitionerType,数据不会被插入。 如果我删除PartitionerType(即不使用选项),代码的作品,但在这种情况下,所有的数据只有一个分区。 你能帮我解决这个问题吗? 提前致谢。 var kafka = require('kafka-node'), Producer = kafka.Producer, KeyedMessage = kafka.KeyedMessage, client = new kafka.Client("zookeeper-host:2181","node-id"), options = { requireAcks: 1, ackTimeoutMs: 100, partitionerType: 3 }, producer = new Producer(client, options); //producer = new Producer(client); — This works var km = new KeyedMessage('key1', 'message6'), kn = new KeyedMessage('key2', 'message5'), kv = new […]

卡夫卡HLC应该如何找出某个主题的分区数量?

我正在使用kafka-node HighLevelConsumer,并且在启动时总是收到重复消息的问题。 为了保持处理顺序,我的使用者只需将消息附加到工作队列中,然后我会连续处理这些事件。 我暂停使用者,如果我打到一个队列高水位标记,我已经禁用了自动提交,并且在我的客户端代码完全处理每个事件后,我会“手动”提交。 尽pipe在启动时,我总是从一个或多个分区(取决于组中有多less其他HLC运行)获取最后(先前已提交)的消息。 我有点惊讶,HLC不会给我(承诺+ 1),但我决定只是“忽略”偏移量早于偏移量的消息。 作为一个快速testing, offset.fetchCommits('fnord', [{topic:'test', partition: 0}, {topic:'test', partition: 1}, {topic:'test', partition: 2}, {topic:'test', partition: 3}], … 如果我的有效载荷列表匹配定义的分区数量,这将起作用。 如果我超出了分区的数量,我得到一个[BrokerNotAvailableError: Could not find the leader]错误。 我是否正确,我不能自动提交,如果我想有一个更强有力的保证,如果我的消息处理是asynchronous的,可能会失败(即ETL作业),我不会失去消息? kafka-node只是发出“消息”事件,没有办法确认它是否成功处理。 HighLevelConsumer是否会读取上次提交的偏移量(即重复)的消息,而不是下一个偏移量? 获得某个主题的分区数量的最佳方法是什么?