在Node.js中暂停可读stream

我正在使用csv-to-json ,一个整洁的库来处理CSV文件。

我有一个用例,我需要处理一个大的(> 200万行)CSV并将其插入到数据库中。

要做到这一点,而不会遇到内存问题,我打算处理CSV作为一个stream,暂停每10000行的stream,插入行中的数据库,然后恢复stream。

出于某种原因,我似乎无法pausestream。

以下面的代码为例:

 const rs = fs.createReadStream("./foo.csv"); rs.pause(); let count = 0; csv() .fromStream(rs) .on("json", (json) => { count++; console.log(count); }) .on("done", () => { cb(null, count); }) .on("error", (err) => { cb(err); }) 

countlogging200次(这是我有多less行我的CSV) – 我期待它没有logging任何东西,因为stream暂停之前,将它传递给fromStream()

除非您修改csv2json库,否则不能这样做。

这是您应该先阅读的链接
https://nodejs.org/dist/latest-v6.x/docs/api/stream.html#stream_three_states

当您执行rs.pause()时,stream处于暂停模式。 事实上,即使你不这样做,可读stream在暂停模式下启动。

该stream在3种情况下进入resume

  • 有一个.on('data')事件监听器或者
  • 有一个.pipe()方法附加或
  • readable.resume()被显式调用。

在你的情况下, fromStream()方法将pipe方法连接到可读stream,从而恢复stream。

参考代码:
https://github.com/Keyang/node-csvtojson/blob/master/libs/core/Converter.js#L378

 Converter.prototype.fromStream=function(readStream,cb){ if (cb && typeof cb ==="function"){ this.wrapCallback(cb); } process.nextTick(function(){ readStream.pipe(this); }.bind(this)) return this; } 

以下是本期创build者提出的解决scheme:

 var tmpArr=[]; rs.pipe(csv({},{objectMode:true})).pipe(new Writable({ write: function(json, encoding,callback){ tmpArr.push(json); if (tmpArr.length===10000){ myDb.save(tmpArr,function(){ tmpArr=[]; callback(); }) }else{ callback(); } } , objectMode:true })) .on('finish',function(){ if (tmpArr.length>0){ myDb.save(tmpArr,function(){ tmpArr=[]; }) } }) 

我实际上已经设法通过如此剪切来模拟暂停,但这并不理想:

 let count = 0; var csvParser=csv() .fromStream(rs) .on("json", (json) => { rows.push(json); if (rows.length % 1000 === 0) { rs.unpipe(); // clear `rows` right after `unpipe` const entries = rows; rows = []; this._insertEntries(db, entries, ()=> { rs.pipe(csvParser); }); } })