在Node.js中暂停可读stream
我正在使用csv-to-json ,一个整洁的库来处理CSV文件。
我有一个用例,我需要处理一个大的(> 200万行)CSV并将其插入到数据库中。
要做到这一点,而不会遇到内存问题,我打算处理CSV作为一个stream,暂停每10000行的stream,插入行中的数据库,然后恢复stream。
出于某种原因,我似乎无法pause
stream。
以下面的代码为例:
const rs = fs.createReadStream("./foo.csv"); rs.pause(); let count = 0; csv() .fromStream(rs) .on("json", (json) => { count++; console.log(count); }) .on("done", () => { cb(null, count); }) .on("error", (err) => { cb(err); })
count
logging200次(这是我有多less行我的CSV) – 我期待它没有logging任何东西,因为stream暂停之前,将它传递给fromStream()
除非您修改csv2json库,否则不能这样做。
这是您应该先阅读的链接
https://nodejs.org/dist/latest-v6.x/docs/api/stream.html#stream_three_states
当您执行rs.pause()时,stream处于暂停模式。 事实上,即使你不这样做,可读stream在暂停模式下启动。
该stream在3种情况下进入resume
。
- 有一个
.on('data')
事件监听器或者 - 有一个
.pipe()
方法附加或 -
readable.resume()
被显式调用。
在你的情况下, fromStream()
方法将pipe
方法连接到可读stream,从而恢复stream。
参考代码:
https://github.com/Keyang/node-csvtojson/blob/master/libs/core/Converter.js#L378
Converter.prototype.fromStream=function(readStream,cb){ if (cb && typeof cb ==="function"){ this.wrapCallback(cb); } process.nextTick(function(){ readStream.pipe(this); }.bind(this)) return this; }
以下是本期创build者提出的解决scheme:
var tmpArr=[]; rs.pipe(csv({},{objectMode:true})).pipe(new Writable({ write: function(json, encoding,callback){ tmpArr.push(json); if (tmpArr.length===10000){ myDb.save(tmpArr,function(){ tmpArr=[]; callback(); }) }else{ callback(); } } , objectMode:true })) .on('finish',function(){ if (tmpArr.length>0){ myDb.save(tmpArr,function(){ tmpArr=[]; }) } })
我实际上已经设法通过如此剪切来模拟暂停,但这并不理想:
let count = 0; var csvParser=csv() .fromStream(rs) .on("json", (json) => { rows.push(json); if (rows.length % 1000 === 0) { rs.unpipe(); // clear `rows` right after `unpipe` const entries = rows; rows = []; this._insertEntries(db, entries, ()=> { rs.pipe(csvParser); }); } })