child_processstream反压
我正在使用exec-stream
和Node.js,通过一些其他转换stream传输stream,最终通过node-brake
stream来限制数据速率。 制动stream似乎没有效果,事实上数据最终在长链末端丢失。
execStream('some-external-binary').pipe(transform1).pipe(transform2).pipe(brake(1024))
我认为发生的事情是, child_process
STDOUT
stream(在exec-stream
)没有暂停,因此缓冲区填满,直到数据丢失。
做child_process
stream的行为是这样吗? 有什么办法让背压能够正确使用child_process
stream?
我不熟悉execstream和节点制动器来理解数据丢失的所有途径。
不过,我做了一些小实验,看看节点制动器是否有背压效应,你提到的可能是一个潜在的数据丢失区域。
文件也托pipe在Gist上 。
### Created for http://stackoverflow.com/questions/34982953/child-process-stream-backpressure Please pardon the CoffeeScript, but I couldn't stand to extend stream.Transform in native JavaScript. ### fs = require("fs") execStream = require("exec-stream") brake = require("brake") file = fs.createWriteStream("tmp.txt") class Double extends require("stream").Transform _transform: (chunk, enc, cb) -> @_last ?= Date.now() @_called ?= [] @_called.push Date.now() - @_last @_last = Date.now() @push chunk.toString() + chunk.toString() cb() class UpperCase extends require("stream").Transform _transform: (chunk, enc, cb) -> @push chunk.toString().toUpperCase() cb() sum = (nums) -> o = 0 o += i for i in nums o doTest = (size) -> transform1 = new Double() transform2 = new UpperCase() transform3 = new Double() execStream("dd", ["if=/dev/urandom", "bs=1024", "count=1"]) .pipe(transform1) .pipe(transform2) .pipe(brake(size)) .pipe(transform3) .pipe(file) file.on "finish", -> fs.stat "tmp.txt", (err, stats) -> throw err if err called1 = transform1._called averagePreBrake = sum(called1) / called1.length called2 = transform3._called averagePostBrake = sum(called2) / called2.length console.log """ Generated with brake(#{size}): #{stats.size} Average time between transformations pre-brake: #{averagePreBrake}ms Average time between transformations post-brake: #{averagePostBrake}ms """ doTest 1024 doTest 256
结果如下。
我注意到刹车之前的转换之间没有差距。 然而,制动器会中断转换。 给出这个数据,我怀疑节点制动没有背压效应。
[ sh2png实用程序生成的控制台输出的屏幕截图]