stream高水标误解

在阅读Github上的一些代码之后,我好像错误地理解了highWaterMark概念是如何工作的。

在一个可写入的stream尽可能快地写入大量数据的情况下,这里是我对生命周期的想法:

1)虽然未达到highWaterMark限制,但数据stream能够缓冲和写入数据。

2)如果达到highWaterMark限制,stream不能再缓冲,所以#write方法返回false,让你知道你试图写的内容不会被写入(从不)。

3)一旦stream发生drain事件,这意味着缓冲区已被清理,你可以从你被“拒绝”的地方再次写入。

在我看来这很简单明了,但是看起来这并不完全正确(在步骤2中),当#write方法返回false时,试图写入的数据真的被“拒绝”了吗? 还是被缓冲(或其他)?

对不起,基本的问题,但我需要确定!

2)如果达到highWaterMark限制,stream不能再缓冲,所以#write方法返回false,让你知道你试图写的内容不会被写入(从不)。

这是错误的,数据仍然被缓冲,stream不会丢失。 但是你现在应该停止写作。 这是为了允许背压传播。

你的问题在writable.write(chunk[, encoding][, callback]) docs中解决:

这个返回值是严格的咨询。 即使返回false ,你也可以继续写。 但是,写入操作将被caching在内存中,因此最好不要过多地执行此操作。 相反,在写入更多数据之前,先等待'drain'事件。

即使调用返回false (并在此之前缓冲在内存中),您writestream中的任何数据最终都将被写入。

highWaterMark选项可以让您对使用的“缓冲存储器”的数量进行一些控制。 一旦你写了超过指定的金额, write将返回false给你一个机会,停止写作。 你不需要这样做:如果你不停下来,没有数据被丢弃,你最终会占用更多的内存(重写数据会导致重复)。 而且,正如你所提到的那样,你可以再听一次'drain'事件,知道什么时候再写一次。

当#write方法返回false时,试图写入的数据真的被“拒绝”了吗? 还是被缓冲(或其他)?

数据被缓冲。 然而,过多的调用write()而不允许缓冲区耗尽将导致高内存使用率,糟糕的垃圾收集器性能,甚至可能导致Node.js崩溃, Allocation failed - JavaScript heap out of memory不足错误。 看到这个相关的问题:

节点:fs write()不写入内部循环。 为什么不?


作为参考,这里是关于highWaterMark和当前文档(v8.4.0)背压的一些相关细节:

writable.write()

如果内部缓冲区小于在highWaterMark chunk之后创buildstream时configuration的highWaterMark则返回值为true 。 如果返回false应该停止向stream写入数据的更多尝试,直到发生'drain'事件。

当一个stream没有消耗,调用write()会缓冲chunk ,并返回false 。 一旦所有当前缓冲的块被排空(被操作系统接受), 'drain'事件将被发射。 build议一旦write()返回false ,直到发生'drain'事件时才会写入更多的块。 虽然在不stream失的stream上调用write()是允许的,但是Node.js会caching所有写入的块,直到最大的内存使用量发生,此时它将无条件地中止 。 即使在中止之前,高内存使用率也会导致较差的垃圾收集器性能和较高的RSS (即使不再需要内存,通常也不会将其释放回系统)。

在溪stream背压

在任何情况下,数据缓冲区已经超过了highWaterMark或写入队列正忙, .write()将返回false

当返回一个false值时,反压系统开始工作。它将暂停传入的Readablestream发送任何数据,并等待消费者再次准备好。 一旦数据缓冲区被清空,一个.drain()事件将被发射并恢复传入的数据stream。

一旦队列完成,背压将允许数据再次发送。 正在使用的内存空间将自行释放,并准备下一批数据。

  +-------------------+ +=================+ | Writable Stream +---------> .write(chunk) | +-------------------+ +=======+=========+ | +------------------v---------+ +-> if (!chunk) | Is this chunk too big? | | emit .end(); | Is the queue busy? | +-> else +-------+----------------+---+ | emit .write(); | | ^ +--v---+ +---v---+ ^-----------------------------------< No | | Yes | +------+ +---v---+ | emit .pause(); +=================+ | ^-----------------------+ return false; <-----+---+ +=================+ | | when queue is empty +============+ | ^-----------------------< Buffering | | | |============| | +> emit .drain(); | ^Buffer^ | | +> emit .resume(); +------------+ | | ^Buffer^ | | +------------+ add chunk to queue | | <---^---------------------< +============+