Node.jsstream可读可以转换

我一直在尝试使用可读和变换stream来处理一个非常大的文件。 我似乎遇到的问题是,如果我没有在最后放置一个可写的stream,程序似乎在结果返回之前终止。

例如: rstream.pipe(split()).pipe(tstream)

我的tstream有一个发射器,当计数器达到阈值时发射。 当这个阈值被设置为一个较低的数字,我得到一个结果,但是当它很高时,它不会返回任何东西。 如果我把它传给一个文件编写器,它总是返回一个结果。 我错过了什么明显的?

码:

 // Dependencies var fs = require('fs'); var rstream = fs.createReadStream('file'); var wstream = fs.createWriteStream('output'); var split = require('split'); // used for separating stream by new line var QTransformStream = require('./transform'); var qtransformstream = new QTransformStream(); qtransformstream.on('completed', function(result) { console.log('Result: ' + result); }); exports.getQ = function getQ(filename, callback) { // THIS WORKS if i have a low counter for qtransformstream, // but when it's high, I do not get a result // rstream.pipe(split()).pipe(qtransformstream); // this always works rstream.pipe(split()).pipe(qtransformstream).pipe(wstream); }; 

这里是Qtransformstream的代码

 // Dependencies var Transform = require('stream').Transform, util = require('util'); // Constructor, takes in the Quser as an input var TransformStream = function(Quser) { // Create this as a Transform Stream Transform.call(this, { objectMode: true }); // Default the Qbase to 32 as an assumption this.Qbase = 32; if (Quser) { this.Quser = Quser; } else { this.Quser = 20; } this.Qpass = this.Quser + this.Qbase; this.Counter = 0; // Variables used as intermediates this.Qmin = 120; this.Qmax = 0; }; // Extend the transform object util.inherits(TransformStream, Transform); // The Transformation to get the Qbase and Qpass TransformStream.prototype._transform = function(chunk, encoding, callback) { var Qmin = this.Qmin; var Qmax = this.Qmax; var Qbase = this.Qbase; var Quser = this.Quser; this.Counter++; // Stop the stream after 100 reads and emit the data if (this.Counter === 100) { this.emit('completed', this.Qbase, this.Quser); } // do some calcs on this.Qbase this.push('something not important'); callback(); }; // export the object module.exports = TransformStream; 

编辑:

此外,我不知道你的计数器有多高,但如果你填满缓冲区,它将停止传递数据到变换stream,在这种情况下completed从来没有实际打,因为你永远不会到达计数器的限制。 尝试改变你的highwatermark

编辑2:一个更好的解释

正如你所知道的,一个transform stream 是一个双工stream ,基本上意味着它可以接收来自源的数据,并且可以将数据发送到目的地。 这通常分别被称为阅读和写作。 transform streaminheritanceNode.js实现的read streamwrite stream 。 有一个警告, transform stream 不必实现_read或_write函数。 从这个意义上说,你可以把它看作不太知名的传递stream 。

如果您考虑transform stream实现write stream的事实,则还必须考虑写入stream始终具有转储其内容的目标的事实。 你遇到问题是,当你创build一个transform stream你不能指定一个地方发送你的内容。 将数据完全通过变换stream传递的唯一方法是将其传递给写入stream,否则实际上,您的stream将得到备份,并且无法接受更多的数据,因为数据无法存在。

这就是为什么当你写入一个写入stream时,它总是工作。 写入数据stream通过将数据发送到目的地来缓解数据备份,因此您的所有数据都将被传送,并且完成事件将被发送。

当样本量很小时,您的代码在没有写入stream的情况下工作的原因是您没有填满您的stream,因此转换stream可以接受足够的数据以允许完成事件/阈值。 由于阈值增加了您的stream可以接受的数据量,而不会将其发送到另一个地方(写入stream)保持不变。 这会导致您的数据stream得到备份,并且不能再接受数据,这意味着完成的事件将永远不会被发射。

我会冒昧地说,如果你增加你的highwatermark转换stream,你将能够提高你的门槛,仍然有代码的工作。 这个方法虽然不正确。 将stream传输到一个写入stream,该stream将发送数据到dev / null创build写入stream的方式是:

 var writer = fs.createWriteStream('/dev/null'); 

Node.js文档中关于缓冲的部分解释了您正在运行的错误。

你不打断_transform和进程远得多。 尝试:

 this.emit('completed', ...); this.end(); 

这就是为什么'程序似乎在结果返回之前终止'

不要输出任何无用的数据:

 var wstream = fs.createWriteStream('/dev/null'); 

祝你好运)

我会build议使用Writable而不是Transformstream。 然后,将_transform重命名为_write ,如果您pipe道化,您的代码将消耗该stream。 @Bradgnar已经指出,转换stream需要一个消费者,否则它将停止可读stream将更多数据推送到其缓冲区。