NodeJS – 使用stream
我正在尝试在nodejs中构build一个API,这个API将stream式处理针对Vertica DB执行的查询的输出。
nodejs中的vertica db驱动程序暴露了我正在使用的未缓冲的查询接口。 有关更多详细信息,请参阅: https : //github.com/wvanbergen/node-vertica
以下是我的代码:
var vertica = require('vertica'); var Readable = require('stream').Readable; var rs = new Readable; var conn = vertica.connect( { host: 'hostname', user: 'user', password: 'password', database: 'verticadb' }); var q = conn.query('select * from table'); q.on('row', function(row) { rs.push(row.join(',') + "\n"); }); q.on('end', function(status) { rs.push(null); rs.pipe(process.stdout); conn.disconnect(); }); q.on('error', function(err) { conn.disconnect(); });
它确实返回相应的输出,但我的理解是,它实际上缓冲了row.join(',') + "\n"
的输出,并且只有在读取所有行时才将其输出到stdout。 我的目标是尽快读出每一行。 我应该如何修改我的代码才能使其工作? 你可以用任何可比较的东西来replacevertica“row”事件。
提前致谢 !
附录
我已经设法使用所谓的“经典可读stream”基于在https://github.com/substack/stream-handbook上find的文档工作。
代码为:
var vertica = require('vertica'); var Stream = require('stream'); var stream = new Stream; stream.readable = true; var conn = vertica.connect( { host: 'hostname', user: 'user', password: 'password', database: 'verticadb' }); var q = conn.query('select * from affiliate_manager_2'); q.on('row', function(row) { stream.emit('data', row.join(',') + "\n"); }); q.on('end', function(status) { stream.emit('end'); conn.disconnect(); }); q.on('error', function(err) { conn.disconnect(); }); stream.pipe(process.stdout);
然而,这是“老”的做法,我想知道如何用“新方法”来做到这一点。
再次感谢
Readable
是“抽象的”。 它正在寻找一个叫做_read
的函数,它在默认实现中没有定义。 没有它,它只是缓冲每一个push(chunk)
直到它看到push(null)
。 这就是你在例子中看到的行为。
为了得到你想要的行为,只需添加一个_read
函数!
这里有一个例子可以适应你的数据库:
var Readable = require('stream').Readable; var stream = new Readable; stream._read = function () { var query = …; query.on('row', function (row) { stream.push(JSON.stringify(row) + '\n'); }); query.on('end', function () { stream.push(null); }); stream._read = function () {}; }; stream.pipe(process.stdout);
进一步阅读: