卡夫卡到node.js的Elasticsearch消费

我知道有相当多的node.js模块实现了一个Kafka消费者,它获取消息并写入弹性。 但是我只需要每个msg的一些字段,而不是全部。 有没有我不知道的现有解决scheme?

问题是要求node.js的一个例子。 kafka-node模块为获得Consumer提供了一个非常好的机制 ,您可以将其与elasticsearch-js模块结合使用:

 // configure Elasticsearch client var elasticsearch = require('elasticsearch'); var esClient = new elasticsearch.Client({ // ... connection details ... }); // configure Kafka Consumer var kafka = require('kafka-node'); var Consumer = kafka.Consumer; var client = new kafka.Client(); var consumer = new Consumer( client, [ // ... topics / partitions ... ], { autoCommit: false } ); consumer.on('message', function(message) { if (message.some_special_field === "drop") { return; // skip it } // drop fields (you can use delete message['field1'] syntax if you need // to parse a more dynamic structure) delete message.field1; delete message.field2; delete message.field3; esClient.index({ index: 'index-name', type: 'type-name', id: message.id_field, // ID will be auto generated if none/unset body: message }, function(err, res) { if (err) { throw err; } }); }); consumer.on('error', function(err) { console.log(err); }); 

注意 :使用索引API不是一个好习惯,因为它需要Elasticsearch为每个操作创build一个线程,这显然是浪费的,并且如果线程池耗尽,最终会导致被拒绝的请求结果。 在任何批量服务的情况下,更好的解决scheme是使用Elasticsearch Streams (或基于它的Elasticsearch Bulk Index Stream )之类的东西来进行研究,它build立在官方elasticsearch-js客户端上。 然而,我从来没有使用过这些客户端扩展,所以我不知道他们做了什么或者不能工作,但是用法只会取代显示索引的部分。

我不相信node.js方法在维护和复杂性方面实际上比下面的Logstash方法更好,所以我已经把它们留在了这里作为参考。


更好的方法可能是从Logstash中消费Kafka,然后将其发送给Elasticsearch。

您应该可以使用Logstash以一种简单的方式使用Kafkainput和Elasticsearch输出来完成此操作。

Logstashpipe道中的每个文档都称为“事件”。 Kafkainput假定它将接收到JSON(可由其编解码器configuration),该JSON将填充来自该消息的所有字段的单个事件。

然后,您可以删除那些您无意处理的字段,或者有条件地删除整个事件。

 input { # Receive from Kafka kafka { # ... } } filter { if [some_special_field] == "drop" { drop { } # skip the entire event } # drop specific fields mutate { remove_field => [ "field1", "field2", ... ] } } output { # send to Elasticsearch elasticsearch { # ... } } 

当然,你需要configurationKafkainput(从第一个链接)和Elasticsearch输出(和第二个链接)。