节点postgres与大量的查询

我刚开始使用node-postgres和node.js一起使用postgres。 我试图做的事情之一是写一个简短的js填充我的数据库,使用约20万条目的文件。

我注意到有一段时间(不到10秒),我开始得到“错误:连接终止”。 我不确定这是否是使用node-postgres的问题,或者是因为我发送了postgres垃圾邮件。

无论如何,这是一个简单的代码,显示了这种行为:

var pg = require('pg'); var connectionString = "postgres://xxxx:xxxx@localhost/xxxx"; pg.connect(connectionString, function(err,client,done){ if(err) { return console.error('could not connect to postgres', err); } client.query("DROP TABLE IF EXISTS testDB"); client.query("CREATE TABLE IF NOT EXISTS testDB (id int, first int, second int)"); done(); for (i = 0; i < 1000000; i++){ client.query("INSERT INTO testDB VALUES (" + i.toString() + "," + (1000000-i).toString() + "," + (-i).toString() + ")", function(err,result){ if (err) { return console.error('Error inserting query', err); } done(); }); } }); 

在大约18,000到20,000个查询之后,它失败了。 这是错误的方式来使用client.query? 我试图改变默认的客户端号码,但似乎没有帮助。

client.connect()似乎也没有帮助,但那是因为我有太多的客户端,所以我一定认为客户端池是要走的路。

谢谢你的帮助!

UPDATE

这个答案已经被这篇文章所取代: Data Imports ,代表了最新的方法。


为了复制你的场景,我使用了pg-promise库,而且我可以确认,试试它无法正常工作,无论使用哪个库,这都是重要的方法。

下面是一个修改的方法,我们将插入分成块,然后在一个事务中执行每个块,这是一个负载平衡(aka throttling):

 function insertRecords(N) { return db.tx(function (ctx) { var queries = []; for (var i = 1; i <= N; i++) { queries.push(ctx.none('insert into test(name) values($1)', 'name-' + i)); } return promise.all(queries); }); } function insertAll(idx) { if (!idx) { idx = 0; } return insertRecords(100000) .then(function () { if (idx >= 9) { return promise.resolve('SUCCESS'); } else { return insertAll(++idx); } }, function (reason) { return promise.reject(reason); }); } insertAll() .then(function (data) { console.log(data); }, function (reason) { console.log(reason); }) .done(function () { pgp.end(); }); 

这在大约4分钟内产生了100万条logging,在前3次交易之后急剧放缓。 我使用的节点JS 0.10.38(64位),其中消耗大约340MB的内存。 这样我们插入了10万条logging,连续10次。

如果我们这样做,只有这次在100个事务中插入10,000条logging,在1毫秒内添加相同的1,000,000条logging,不会减慢,而节点JS消耗大约100MB的内存,这告诉我们像这样的分区数据是非常好主意。

你使用哪个库并不重要,方法应该是一样的:

  1. 将插入分区/限制为多个事务;
  2. 将插入列表保存在大约10,000条logging中的单个事务中;
  3. 在同步链中执行所有的事务。
  4. 在每个事务的COMMIT之后释放连接回池。

如果你违反了这些规则,你就有麻烦了。 例如,如果您违反了规则3,那么您的Node JS进程可能会很快耗尽内存并导致错误。 我例子中的规则4是由图书馆提供的。

如果遵循这种模式,则不需要为连接池设置烦恼。

更新1

pg-promise的后续版本完全支持这样的场景,如下所示:

 function factory(index) { if (index < 1000000) { return this.query('insert into test(name) values($1)', 'name-' + index); } } db.tx(function () { return this.batch([ this.none('drop table if exists test'), this.none('create table test(id serial, name text)'), this.sequence(factory), // key method this.one('select count(*) from test') ]); }) .then(function (data) { console.log("COUNT:", data[3].count); }) .catch(function (error) { console.log("ERROR:", error); }); 

如果你不想包括任何额外的东西,比如创build表格,那么看起来就更简单了:

 function factory(index) { if (index < 1000000) { return this.query('insert into test(name) values($1)', 'name-' + index); } } db.tx(function () { return this.sequence(factory); }) .then(function (data) { // success; }) .catch(function (error) { // error; }); 

详情请参阅同步事务 。

例如,使用Bluebird作为承诺库,我的生产机器上需要1m43插入1,000,000条logging(没有启用长堆栈跟踪)。

你只需要你的factory方法根据index返回请求,直到你没有剩下,就这么简单。

最好的部分是,这不仅仅是快速,而且也会给你的NodeJS进程带来一点负担。 内存testing在整个testing过程中保持在60MB以下,CPU消耗时间只有7-8%。

更新2

从版本1.7.2开始, pg-promise可以轻松支持超大规模交易。 请参见“ 同步事务”一章。

例如,我可以在家用电脑上用Windows 8.1 64位在短短的15分钟内在单个事务中插入10,000,000条logging。

为了testing,我将PC设置为生产模式,并使用Bluebird作为承诺库。 在testing过程中,整个NodeJS 0.12.5进程(64位)的内存消耗没有超过75MB,而我的i7-4770 CPU显示一致的15%负载。

以同样的方式插入100米logging只需要更多的耐心,但不是更多的计算机资源。

与此同时,之前对1m插件的testing从1m43s下降到1m31s。

更新3

以下考虑可以造成巨大的差异: 性能提升 。

更新4

相关的问题,有一个更好的实现示例: 使用pg-promise进行大量插入 。

更新5

一个更好更新的例子可以在这里find: nodeJS插入数据到PostgreSQL的错误

我猜你正在达到最大池大小。 由于client.query是asynchronous的,所以在返回之前使用所有可用的连接。

默认池大小是10.检查这里: https : //github.com/brianc/node-postgres/blob/master/lib/defaults.js#L27

您可以通过设置pg.defaults.poolSize来增加默认池大小:

 pg.defaults.poolSize = 20; 

更新:释放连接后执行另一个查询。

 var pg = require('pg'); var connectionString = "postgres://xxxx:xxxx@localhost/xxxx"; var MAX_POOL_SIZE = 25; pg.defaults.poolSize = MAX_POOL_SIZE; pg.connect(connectionString, function(err,client,done){ if(err) { return console.error('could not connect to postgres', err); } var release = function() { done(); i++; if(i < 1000000) insertQ(); }; var insertQ = function() { client.query("INSERT INTO testDB VALUES (" + i.toString() + "," + (1000000-i).toString() + "," + (-i).toString() + ")", function(err,result){ if (err) { return console.error('Error inserting query', err); } release(); }); }; client.query("DROP TABLE IF EXISTS testDB"); client.query("CREATE TABLE IF NOT EXISTS testDB (id int, first int, second int)"); done(); for (i = 0; i < MAX_POOL_SIZE; i++){ insertQ(); } }); 

基本的想法是因为你正在排队大量的连接池大小相对较小的查询,你正在达到最大池大小。 这里我们只在现有的连接被释放之后才进行新的查询。