stream高水标误解
在阅读Github上的一些代码之后,我好像错误地理解了highWaterMark
概念是如何工作的。
在一个可写入的stream尽可能快地写入大量数据的情况下,这里是我对生命周期的想法:
1)虽然未达到highWaterMark
限制,但数据stream能够缓冲和写入数据。
2)如果达到highWaterMark
限制,stream不能再缓冲,所以#write方法返回false,让你知道你试图写的内容不会被写入(从不)。
3)一旦stream发生drain
事件,这意味着缓冲区已被清理,你可以从你被“拒绝”的地方再次写入。
在我看来这很简单明了,但是看起来这并不完全正确(在步骤2中),当#write方法返回false时,试图写入的数据真的被“拒绝”了吗? 还是被缓冲(或其他)?
对不起,基本的问题,但我需要确定!
2)如果达到highWaterMark限制,stream不能再缓冲,所以#write方法返回false,让你知道你试图写的内容不会被写入(从不)。
这是错误的,数据仍然被缓冲,stream不会丢失。 但是你现在应该停止写作。 这是为了允许背压传播。
你的问题在writable.write(chunk[, encoding][, callback])
docs中解决:
这个返回值是严格的咨询。 即使返回
false
,你也可以继续写。 但是,写入操作将被caching在内存中,因此最好不要过多地执行此操作。 相反,在写入更多数据之前,先等待'drain'
事件。
即使调用返回false
(并在此之前缓冲在内存中),您write
stream中的任何数据最终都将被写入。
highWaterMark
选项可以让您对使用的“缓冲存储器”的数量进行一些控制。 一旦你写了超过指定的金额, write
将返回false
给你一个机会,停止写作。 你不需要这样做:如果你不停下来,没有数据被丢弃,你最终会占用更多的内存(重写数据会导致重复)。 而且,正如你所提到的那样,你可以再听一次'drain'
事件,知道什么时候再写一次。
当#write方法返回false时,试图写入的数据真的被“拒绝”了吗? 还是被缓冲(或其他)?
数据被缓冲。 然而,过多的调用write()
而不允许缓冲区耗尽将导致高内存使用率,糟糕的垃圾收集器性能,甚至可能导致Node.js崩溃, Allocation failed - JavaScript heap out of memory
不足错误。 看到这个相关的问题:
节点:fs write()不写入内部循环。 为什么不?
作为参考,这里是关于highWaterMark
和当前文档(v8.4.0)背压的一些相关细节:
writable.write()
如果内部缓冲区小于在
highWaterMark
chunk
之后创buildstream时configuration的highWaterMark
则返回值为true
。 如果返回false
则应该停止向stream写入数据的更多尝试,直到发生'drain'
事件。当一个stream没有消耗,调用
write()
会缓冲chunk
,并返回false
。 一旦所有当前缓冲的块被排空(被操作系统接受),'drain'
事件将被发射。 build议一旦write()
返回false
,直到发生'drain'
事件时才会写入更多的块。 虽然在不stream失的stream上调用write()
是允许的,但是Node.js会caching所有写入的块,直到最大的内存使用量发生,此时它将无条件地中止 。 即使在中止之前,高内存使用率也会导致较差的垃圾收集器性能和较高的RSS (即使不再需要内存,通常也不会将其释放回系统)。
在溪stream背压
在任何情况下,数据缓冲区已经超过了
highWaterMark
或写入队列正忙,.write()
将返回false
。当返回一个
false
值时,反压系统开始工作。它将暂停传入的Readable
stream发送任何数据,并等待消费者再次准备好。 一旦数据缓冲区被清空,一个.drain()
事件将被发射并恢复传入的数据stream。一旦队列完成,背压将允许数据再次发送。 正在使用的内存空间将自行释放,并准备下一批数据。
+-------------------+ +=================+ | Writable Stream +---------> .write(chunk) | +-------------------+ +=======+=========+ | +------------------v---------+ +-> if (!chunk) | Is this chunk too big? | | emit .end(); | Is the queue busy? | +-> else +-------+----------------+---+ | emit .write(); | | ^ +--v---+ +---v---+ ^-----------------------------------< No | | Yes | +------+ +---v---+ | emit .pause(); +=================+ | ^-----------------------+ return false; <-----+---+ +=================+ | | when queue is empty +============+ | ^-----------------------< Buffering | | | |============| | +> emit .drain(); | ^Buffer^ | | +> emit .resume(); +------------+ | | ^Buffer^ | | +------------+ add chunk to queue | | <---^---------------------< +============+