如何强制Node.js转换stream完成?

考虑以下情况。 我有两个节点转换stream:

转换stream1

function T1(options) { if (! (this instanceof T1)) { return new T1(options); } Transform.call(this, options); } util.inherits(T1, Transform); T1.prototype._transform = function(chunk, encoding, done) { console.log("### Transforming in t1"); this.push(chunk); done(); }; T1.prototype._flush = function(done) { console.log("### Done in t1"); done(); }; 

转换stream2

 function T2(options) { if (! (this instanceof T2)) { return new T2(options); } Transform.call(this, options); } util.inherits(T2, Transform); T2.prototype._transform = function(chunk, encoding, done) { console.log("### Transforming in t2"); this.push(chunk); done(); }; T2.prototype._flush = function(done) { console.log("### Done in t2"); done(); }; 

而且,我想在返回响应之前应用这些转换stream。 我有一个简单的HTTP服务器,并在每个请求上,我获取资源,并希望这些转换应用于此获取的资源,然后将第二个转换的结果发送到原始响应:

 var options = require('url').parse('http://localhost:1234/data.json'); options.method = 'GET'; http.createServer(function(req, res) { var req = http.request(options, function(httpRes) { var t1 = new T1({}); var t2 = new T2({}); httpRes .pipe(t1) .pipe(t2) .on('finish', function() { // Do other stuff in here before sending request back t2.pipe(res, { end : true }); }); }); req.end(); }).listen(3001); 

最终, finish事件永远不会被调用,并且请求挂起并超时,因为响应永远不会被解决。 我注意到,如果我只是将t2inputres ,它似乎工作正常:

  .pipe(t1) .pipe(t2) .pipe(res, { end : true }); 

但是,这种情况似乎不可行,因为我需要在返回响应之前做一些额外的工作。

发生这种情况是因为您需要让节点知道某个地方的stream正在被消耗,否则最后一个stream将会填满缓冲区,并且考虑到您的数据比highwaterMark选项长(通常为16),然后停止等待数据消耗。

有三种方式来完整地使用一个stream:

  • pipe道到可读stream(你在问题的第二部分做了什么)
  • 通过调用stream的read方法读取连续的块
  • 监听"data"事件(本质上是stream.on("data", someFunc) )。

最后一个选项是最快的,但会导致消耗stream而不查看内存使用情况。

我也注意到,使用"finish"事件可能有点误导,因为它被称为当最后一个数据被读取,但不一定发射。 在转换stream上,因为它是可读的,所以使用"end"事件会更好。