我怎样才能将一个JSON数组从NodeJSstream到postgres

我试图通过接收来自客户端的请求每个批量插入尝试10,000条logging(使用sequelize .js和bulkCreate() )来插入几百万条logging(约6个字段/列bulkCreate()

这显然不是一个好主意,所以我试着研究node-pg-copy-streams

但是,我不想在客户端发起更改,在这里发送一个json数组

 # python data = [ { "column a":"a values", "column b":"b values", }, ... # 10,000 items ... ] request.post(data=json.dumps(data), url=url) 

在nodejs服务器端,我将如何stream接收request.body在以下骨架?

 .post(function(req, res){ // old sequelize code /* table5.bulkCreate( req.body, {raw:true} ).then(function(){ return table5.findAll(); }).then(function(result){ res.json(result.count); });*/ // new pg-copy-streams code pg.connect(function(err, client, done) { var stream = client.query(copyFrom('COPY my_table FROM STDIN')); // My question is here, how would I stream or pipe the request body ? // ?.on('error', done); // ?.pipe(stream).on('finish', done).on('error', done); }); }); 

以下是我如何解决我的问题,

首先是将我的req.body字典转换为TSV(不是最初问题的一部分)

 /** * Converts a dictionary and set of keys to a Tab Separated Value blob of text * @param {Dictionary object} dict * @param {Array of Keys} keys * @return {Concatenated Tab Separated Values} String */ function convertDictsToTSV(dicts, keys){ // ... } 

其次是我原来的.post函数的其余部分

 .post(function(req, res){ // ... /* requires 'stream' as * var stream = require('stream'); * var copyFrom = require('pg-copy-streams').from; */ var read_stream_string = new stream.Readable(); read_stream_string.read = function noop() {}; var keys = [...]; // set of dictionary keys to extract from req.body read_stream_string.push(convertDictsToTSV(req.body, keys)); read_stream_string.push(null); pg.connect(connectionString, function(err, client, done) { // ... // error handling // ... var copy_string = 'Copy tablename (' + keys.join(',') + ') FROM STDIN' var pg_copy_stream = client.query( copyFrom( copy_string ) ); read_stream_string.pipe(pg_copy_stream).on('finish', function(finished){ // handle finished and done appropriately }).on('error', function(errored){ // handle errored and done appropriately }); }); pg.end(); }); 

从技术上来说,这里没有stream式传输,也不在NodeJSstream式传输方面。

您每次发送大量10,000条logging,期望您的服务器端插入这些logging,并向客户端返回一个“确定”,以发送另外10,000条logging。 这是限制/分页数据,而不是stream媒体。

一旦你的服务器收到了下一个10000条logging,插入它们(通常是一个事务),然后用OK回应给客户端,这样它就可以发送下一个10000条logging。

使用node-postgres编写事务并不是一件容易的事,因为它太低级了。

下面是在pg-promise的帮助下如何做到这一点的一个例子:

 function insertRecords(records) { return db.tx(t=> { var inserts = []; records.forEach(r=> { var query = t.none("INSERT INTO table(fieldA, ...) VALUES(${propA}, ...)", r); inserts.push(query); }); return t.batch(inserts); }); } 

然后在你的HTTP处理程序中,你会写:

 function myPostHandler(req, res) { // var records = get records from the request; insertRecords(records) .then(data=> { // set response as success; }) .catch(error=> { // set response as error; }); }