使用NodeJS将大量行插入到Postgres DB中

我试图插入超过100万行到Postgres表使用NodeJs问题是当我启动脚本,内存不断增加,直到它达到1.5 GB的RAM,然后我得到错误:致命错误:CALL_AND_RETRY_LAST分配失败 – 进程退出的记忆

结果总是相同的 – 大约7000插入行,而不是100万

这是代码

var pg = require('pg'); var fs = require('fs'); var config = require('./config.js'); var PgClient = new pg.Client(config.pg); PgClient.connect(); var lineReader = require('readline').createInterface({ input: require('fs').createReadStream('resources/database.csv') //file contains over 1 million lines }); var n=0; lineReader.on('line', function(line) { n++; var insert={"firstname":"John","lastname":"Conor"}; //No matter what data we insert, the point is that the number of inserted rows much less than it should be PgClient.query('INSERT INTO HUMANS (firstname,lastname) values ($1,$2)', [insert.firstname,insert.lastname]); }); lineReader.on('close',function() { console.log('end '+n); }); 

我用pg-promise作为重要的build议。 而且这个代码工作得很快

 const fs = require('fs'); const pgp = require('pg-promise')(); const config = require('./config.js'); // Db connection const db = pgp(config.pg); // Transform a lot of inserts into one function Inserts(template, data) { if (!(this instanceof Inserts)) { return new Inserts(template, data); } this._rawType = true; this.toPostgres = () => { return data.map(d => '(' + pgp.as.format(template, d) + ')').join(); }; } // insert Template function Insert() { return { firstname: null, lastname: null, birthdate: null, phone: null, email: null, city: null, district: null, location: null, street: null }; }; const lineReader = require('readline').createInterface({ input: require('fs').createReadStream('resources/database.csv') }); let n = 0; const InsertArray = []; lineReader.on('line', function(line) { var insert = new Insert(); n ++; var InsertValues=line.split(','); if (InsertValues[0]!=='"Firstname"'){ //skip first line let i = 0; for (let prop in insert){ insert[prop] = (InsertValues[i]=='')?insert[prop]:InsertValues[i]; i++; } InsertArray.push(insert); if (n == 10000){ lineReader.pause(); // convert insert array into one insert const values = new Inserts('${firstname}, ${lastname},${birthdate},${phone},${email},${city},${district},${location},${street}', InsertArray); db.none('INSERT INTO users (firstname, lastname,birthdate,phone,email,city,district,location,street) VALUES $1', values) .then(data => { n = 0; InsertArray=[]; lineReader.resume(); }) .catch(error => { console.log(error); }); } } }); lineReader.on('close',function() { console.log('end '+n); //last insert if (n > 0) { const values = new Inserts('${firstname}, ${lastname},${birthdate},${phone},${email},${city},${district},${location},${street}', InsertArray); db.none('INSERT INTO users (firstname, lastname,birthdate,phone,email,city,district,location,street) VALUES $1', values) .then(data => { console.log('Last'); }) .catch(error => { console.log(error); }); } }); 

所以我解决了这个问题。 有PgClient.queryQueue处理速度远远低于读取文件。 当大文件被读取时,队列溢出。 在这里的解决scheme,我们应该改变lineReader.on('line',cb)部分,每次队列有很多元素,我们暂停lineReader

 lineReader.on('line', function(line) { n++; var insert={"firstname":"John","lastname":"Conor"}; PgClient.query('INSERT INTO HUMANS (firstname,lastname) values ($1,$2)', [insert.firstname,insert.lastname],function (err,result){ if (err) console.log(err); if (PgClient.queryQueue.length>15000) { lineReader.pause(); } else lineReader.resume(); }); });