当通过es.map()和JSONStream.stringify()将JSONStream.parsed()数据pipe道传输到文件stream时节点堆耗尽

我试图通过JSONStream.parse()将inputstream(通过巨大的GeoJSON文件创build)分stream成对象,然后通过event-stream.map()来允许我转换对象,然后通过JSONStream .stringify()创build一个string,最后写入一个可写的输出stream。 随着进程的运行,我可以看到节点的内存占用不断增长,直到最终耗尽堆。 以下是重现问题的最简单的脚本(test.js):

const fs = require("fs") const es = require("event-stream") const js = require("JSONStream") out = fs.createWriteStream("/dev/null") process.stdin .pipe(js.parse("features.*")) .pipe(es.map( function(data, cb) { cb(null, data); return; } )) .pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}")) .pipe(out) 

一个小小的bash脚本(barf.sh)将无尽的JSONstream传递给节点的process.stdin,这将导致节点的堆逐渐增长:

 #!/bin/bash echo '{"type":"FeatureCollection","features":[' while : do echo '{"type":"Feature","properties":{"name":"A Street"}, "geometry":{"type":"LineString"} },' done 

通过这样运行它:

 barf.sh | node test.js 

有几个奇怪的方法来回避这个问题:

  • 删除fs.createWriteStream()并将最后一个pipe段从“.pipe(out)”更改为“.pipe(process.stdout)”,然后将节点的stdoutpipe道更改为/ dev / null
  • 将asynchronouses.map()更改为同步es.mapSync()

前两个动作中的任何一个都将允许脚本永远运行,节点的内存占用低且不变。 我正在使用运行Ubuntu 16.04的8GB内存的八核心机器上使用节点v6.3.1,事件streamv3.3.4和JSONStream 1.1.4。

我希望有人能够帮助我纠正我确信的一个明显的错误。

JSONStream不是一个streams2stream,所以它不支持背压。 (这里有关于streams2的简要总结。)

这意味着数据将从data事件中的parsestream中data出来,并且datastream将持续抽取出消费stream是否准备好。 如果在某种程度上可以读写某种东西的速度之间存在某种程度的差异,那么就会有缓冲 – 这就是你所看到的。

您的barf.sh线束可以看到通过stdin 。 相反,如果您正在读取大量文件,则应该可以通过暂停文件的读取stream来pipe理stream程。 所以,如果你想在你的mapcallback中插入一些pause/resume逻辑,你应该能够得到它来处理一个巨大的文件; 它会花一点时间。 我会尝试这样的事情:

 let in = fs.createReadStream("/some/massive/file"); let out = fs.createWriteStream("/dev/null"); in .pipe(js.parse("features.*")) .pipe(es.map(function(data, cb) { // This is just an example; a 10-millisecond wait per feature would be very slow. if (!in.isPaused()) { in.pause(); global.setTimeout(function () { in.resume(); }, 10); } cb(null, data); return; })) .pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}")) .pipe(out); 

可以mapSync是,使用mapSync对我的电脑(这是旧的和慢)几乎没有区别。 但是,除非你有一些asynchronous操作在map上执行,我会去mapSync