NodeJS – 使用stream

我正在尝试在nodejs中构build一个API,这个API将stream式处理针对Vertica DB执行的查询的输出。

nodejs中的vertica db驱动程序暴露了我正在使用的未缓冲的查询接口。 有关更多详细信息,请参阅: https : //github.com/wvanbergen/node-vertica

以下是我的代码:

var vertica = require('vertica'); var Readable = require('stream').Readable; var rs = new Readable; var conn = vertica.connect( { host: 'hostname', user: 'user', password: 'password', database: 'verticadb' }); var q = conn.query('select * from table'); q.on('row', function(row) { rs.push(row.join(',') + "\n"); }); q.on('end', function(status) { rs.push(null); rs.pipe(process.stdout); conn.disconnect(); }); q.on('error', function(err) { conn.disconnect(); }); 

它确实返回相应的输出,但我的理解是,它实际上缓冲了row.join(',') + "\n"的输出,并且只有在读取所有行时才将其输出到stdout。 我的目标是尽快读出每一行。 我应该如何修改我的代码才能使其工作? 你可以用任何可比较的东西来replacevertica“row”事件。

提前致谢 !

附录

我已经设法使用所谓的“经典可读stream”基于在https://github.com/substack/stream-handbook上find的文档工作。

代码为:

 var vertica = require('vertica'); var Stream = require('stream'); var stream = new Stream; stream.readable = true; var conn = vertica.connect( { host: 'hostname', user: 'user', password: 'password', database: 'verticadb' }); var q = conn.query('select * from affiliate_manager_2'); q.on('row', function(row) { stream.emit('data', row.join(',') + "\n"); }); q.on('end', function(status) { stream.emit('end'); conn.disconnect(); }); q.on('error', function(err) { conn.disconnect(); }); stream.pipe(process.stdout); 

然而,这是“老”的做法,我想知道如何用“新方法”来做到这一点。

再次感谢

Readable是“抽象的”。 它正在寻找一个叫做_read的函数,它在默认实现中没有定义。 没有它,它只是缓冲每一个push(chunk)直到它看到push(null) 。 这就是你在例子中看到的行为。

为了得到你想要的行为,只需添加一个_read函数!

这里有一个例子可以适应你的数据库:

 var Readable = require('stream').Readable; var stream = new Readable; stream._read = function () { var query = …; query.on('row', function (row) { stream.push(JSON.stringify(row) + '\n'); }); query.on('end', function () { stream.push(null); }); stream._read = function () {}; }; stream.pipe(process.stdout); 

进一步阅读: