在节点js中用Avro序列化数据

我想序列化来自JSON对象的数据,并通过networking发送给kafka作为结束。 现在,我在文件中有一个avro模式,确定发送给日志系统的kafka所需的字段:

{"namespace": "com.company.wr.messages", "type": "record", "name": "Log", "fields": [ {"name": "timestamp", "type": "long"}, {"name": "source", "type": "string"}, {"name": "version", "type": "string"}, {"name": "ipAddress", "type": "string"}, {"name": "name", "type": "string"}, {"name": "level", "type": "string"}, {"name": "errorCode", "type": "string"}, {"name": "message", "type": "string"} ] } 

我正在使用一个节点包“avro-schema”,我尝试了其他的,但是没有一个能够正常工作,我只需要从节点js中以一种方式进行序列化。

avsc

 var avro = require('avsc'); // Parse the schema. var logType = avro.parse({ "namespace": "com.company.wr.messages", "type": "record", "name": "Log", "fields": [ {"name": "timestamp", "type": "long"}, {"name": "source", "type": "string"}, {"name": "version", "type": "string"}, {"name": "ipAddress", "type": "string"}, {"name": "name", "type": "string"}, {"name": "level", "type": "string"}, {"name": "errorCode", "type": "string"}, {"name": "message", "type": "string"} ] }); // A sample log record. var obj = { timestamp: 2313213, source: 'src', version: '1.0', ipAddress: '0.0.0.0', name: 'foo', level: 'INFO', errorCode: '', message: '' }; // And its corresponding Avro encoding. var buf = logType.toBuffer(obj); 

您可以在这里find更多关于各种编码方法的信息。

下面是一个例子,我们正在做一个类似的用例,我们将Avrologging发送到另一个队列(Amazon Kinesis),并根据您的模式进行调整。 我们使用node-avro-io 0.2.0和stream-to-arry 2.0.2。

 var avro = require('node-avro-io'); var toArray = require('stream-to-array'); var schema = { "namespace": "com.company.wr.messages", "type": "record", "name": "Log", "fields": [ {"name": "timestamp", "type": "long"}, {"name": "source", "type": "string"}, {"name": "version", "type": "string"}, {"name": "ipAddress", "type": "string"}, {"name": "name", "type": "string"}, {"name": "level", "type": "string"}, {"name": "errorCode", "type": "string"}, {"name": "message", "type": "string"} ] }; var writer = new avro.DataFile.Writer(schema, "snappy"); toArray(writer, function(err, arr) { var dataBuffer = Buffer.concat(arr); // Send dataBuffer to Kafka here }); var record = { "timestamp": 123, "source": "example.com", "version": "HTTP 1.1", "ipAddress": "123.123.123.123", "name": "Jim", "level": "INFO", "errorCode": "200", "message": "foo" }; writer.append(record).end(); 

在撰写本文时,node-avro-io的示例是用于序列化/反序列化文件系统上的Avro文件。 本示例使用stream到数组的软件包作为从基于stream的node-avro-io软件包中获取Buffer的快捷方式。 Buffer可以发送到您的队列作为您的卡夫卡生产者的消息。

其他一些node.js包,如avronode和Collective的node-avro ,都是C ++库的包装器。 我没有这些软件包的成功。 下面是node-avro的Avro C ++库安装说明(为其构build一个.deb软件包)的tl:dr。 它可以帮助任何C ++包装程序包。

 sudo apt-get install -y libboost-all-dev cmake checkinstall ssh clone git@github.com:apache/avro.git cd avro git checkout release-1.7.7 cd lang/c++ cmake -G "Unix Makefiles" sudo checkinstall -y \ --install=no \ --pkgname="avro-cpp" \ --pkgrelease="1.7.7" \ --maintainer="me@example.com" \ --addso=yes 

对于Collective的node-avro,我必须从Ubuntu 14.04上的bin/install-and-run-tests脚本中删除export CXXFLAGS="-fcxx-exceptions"行。