如何在NodeJs中使用asynchronous数据源创build可读stream?

环境: NodeJS,Express,DynamoDB(但真的可以是任何数据库)

场景:需要读取大量logging并作为可下载文件返回给用户。 这意味着我无法一次缓冲所有的内容,然后将其发送到Express的响应中。 此外,我可能需要多次执行查询,因为所有数据可能不会在一个查询中返回。

build议的解决scheme:使用一个可读的stream,可以传送到Express中的响应stream。

我开始创build一个从streaminheritance的对象.Readable并实现了一个推送查询结果的_read()方法。 问题是在_read()中调用的数据库查询是asynchronous的,但是stream.read()是同步方法。

当stream被传送到服务器的响应时,在db查询甚至有机会执行之前,调用多次该读取。 所以查询被调用多次,甚至当查询的第一个实例完成并执行推(空),其他查询完成,我得到一个“EOF后”(错误)。

  1. 有没有办法正确使用_read()?
  2. 我应该忘记_read(),只是在构造函数中执行查询和push()结果?
  3. 我应该执行查询并发出数据事件而不是push()?

谢谢

function DynamoDbResultStream(query, options){ if(!(this instanceof DynamoDbResultStream)){ return new DynamoDbResultStream(query, options); } Readable.call(this, options); this.dbQuery = query; this.done = false; } util.inherits(DynamoDbResultStream, Readable); DynamoDbResultStream.prototype._read = function(){ var self = this; if(!this.done){ dynamoDB.query(this.dbQuery, function(err, data) { if (!err) { try{ for(i=0;i<data.Items.length;i++){ self.push(data.Items[i]); } }catch(err){ console.log(err); } if (data.LastEvaluatedKey) { //Next read() should invoke the query with a new start key self.dbQuery.ExclusiveStartKey = data.LastEvaluatedKey; }else{ self.done=true; self.push(null); } }else{ console.log(err); self.emit('error',err); } }); }else{ self.push(null); } }; 

编辑:发布这个问题后,我发现这个post的答案,显示如何做到这一点,而不使用inheritance: 如何调用node.js可读stream内的asynchronous函数

那里有一个评论,那就是在_read()里面应该只有一个push()。 每个push()通常会生成另一个read()调用。

请注意stream的不同模式: https : //nodejs.org/api/stream.html#stream_two_modes

 const Readable = require('stream').Readable; // starts in paused mode const readable = new Readable(); let i = 0; fetchMyAsyncData() { setTimeout(() => { // still remains in paused mode readable.push(++i); if (i === 5) { return readable.emit('end'); } fetchMyAsyncData(); }, 500); } // "The res object is an enhanced version of Node's own response object and supports all built-in fields and methods." app.get('/mystreamingresponse' (req, res) => { // remains in paused mode readable.on('readable', () => res.write(readable.read())); fetchMyAsyncData(); // closes the response stream once all external data arrived readable.on('end', () => res.end()); })