Tag: 反压

如何在node.js中实现正确处理背压的stream?

我不能为了我的生活弄清楚如何实现一个正确处理背压的stream 。 你应该永远不要使用暂停和恢复? 我有这个实现我试图得到正确的工作: var StreamPeeker = exports.StreamPeeker = function(myStream, callback) { stream.Readable.call(this, {highWaterMark: highWaterMark}) this.stream = myStream myStream.on('readable', function() { var data = myStream.read(5000) //process.stdout.write("Eff: "+data) if(data !== null) { if(!this.push(data)) { process.stdout.write("Pause") this.pause() } callback(data) } }.bind(this)) myStream.on('end', function() { this.push(null) }.bind(this)) } util.inherits(StreamPeeker, stream.Readable) StreamPeeker.prototype._read = function() { process.stdout.write("resume") //this.resume() // putting […]

Node.JS TCP上的无界并发/stream反压

据我了解,Node的IO模型的后果之一是无法告诉Node进程(例如)通过TCP套接字接收数据,阻塞,一旦你连接你的接收事件处理程序(或否则开始监听数据)。 如果接收者无法足够快地处理传入的数据,那么可能会导致“无限并发”,从而节点底层继续尽可能快地从套接字读取数据,在事件循环中调度新的数据事件,而不是阻塞在套接字上,直到进程最终耗尽内存并死亡。 接收者不能告诉节点减慢读取速度,否则TCP将会允许TCP内置的stream量控制机制启动,并向发送者表明需要减慢速度。 首先,我所描述的到目前为止是准确的? 有什么我错过了,允许节点避免这种情况? 节点stream的特点之一是自动处理背压。 AFAIK,一个可写的stream(的tcp套接字)可以判断是否需要减慢的唯一方法是通过查看socket.bufferSize (表示写入套接字,但尚未发送的数据量)。 鉴于接收端的节点总是以尽可能快的速度读取,这只能指示发送者和接收者之间的networking连接速度较慢,而不能说明接收者是否跟不上。 那么其次,Node Streams自动背压能以某种方式在这种情况下工作来处理一个跟不上的接收器? 也似乎这个问题影响了浏览器通过websocket接收数据,原因类似于websockets API没有提供一种机制来告诉浏览器从套接字读取的速度。 是唯一的解决这个问题的节点(和浏览器使用websockets)在应用程序级别实现手动stream量控制机制,明确地告诉发送进程减慢?

什么是一个正确的方法来暂停从可写的nodejspipe道可读stream?

我正在写一个模块,这是一个可写的stream。 我想为我的用户实现pipe道接口。 如果发生错误,我需要暂停可读stream并发出错误事件。 然后,用户将决定 – 如果他有错误,他应该能够恢复到数据处理。 var writeable = new BackPressureStream(); writeable.on('error', function(error){ console.log(error); writeable.resume(); }); var readable = require('fs').createReadStream('somefile.txt'); readable.pipe.(writeable); 我看到该节点为我们提供了可用于暂停可读stream的readable.pause()方法。 但我无法得到我可以从我的可写stream模块中调用它: var Writable = require('stream').Writable; function BackPressureStream(options) { Writable.call(this, options); } require('util').inherits(BackPressureStream, Writable); BackPressureStream.prototype._write = function(chunk, encoding, done) { done(); }; BackPressureStream.prototype.resume = function() { this.emit('drain'); } 如何在可写入的stream中实现背压? PS可以使用pipe/unpipe事件,提供可读stream作为参数。 但是也有人说,对于pipe道stream,暂停的唯一机会是从可写入的不可读可读stream。 我说得对吗? 我必须删除我的可写入stream,直到用户调用恢复? 而且,在用户调用恢复后,我应该将可读stream传回给用户?