使用node-mysql创build同步查询

我试图确保一个MySQL查询导致另一个,并没有完成,直到所有的子查询完成。 因此,例如,我从一个select和stream行开始,并执行该行结果的后续查询。 这可以通过callback来实现,但是我最终会耗尽内存,所以我想减慢进程并运行批处理,但是由于调度的asynchronous性,我不能保持相同的状态并结束在所有的行都被处理之后进行连接。

这是一个例子:

var query = conn.query('select id from table1 limit 10'); query.on('result', function(row){ console.log('query1', row); var query2 = conn.query('select id from books where id = ? ', [row.id]); query2.on('result', function(row2){ console.log('query2', row2); var query3 = conn.query('insert into test (id) values (?)', [row2.id]); query3.on('result', function(row3){ console.log(row3); }); }); }); query.on('end', function(){ conn.end(); }); 

以上失败,因为在初始查询结束后仍然有行要在query3中处理。
有什么想法吗? 实际的代码更加复杂,因为我必须从后续查询处理xml,并在循环批处理时触发更多的插入。

谢谢!

我会build议这个解决scheme与asynchronous模块:

 var async = require("async"); // connection instance var conn; // here goes task serving logic // if any async function should be finished before drain callback, push them into q var solvers = { query: function(q, task, row){ console.log('query1', row); q.push({ solver: "query2", req: "select id from books where id = ?", reqArgs: [row.id] }); }, query2: function(q, task, row){ console.log('query2', row); q.push({ solver: "query3", req: "insert into test (id) values (?)", reqArgs: [row.id] }); }, query3: function(q, task, row){ console.log(row); } } // here is a queue of tasks var q = async.queue(function(task, cb){ var query = conn.query(task.req, task.reqArgs); query.on("end", cb); query.on("result",function(row){ solvers[task.solver](q, task, row); }); }, 2); // limit of parallel queries // when every request has reached "end" q.drain = function(){ conn.end(); // continue from here }; // initial task q.push({ solver: "query", req: "select id from table1 limit 10", reqArgs: [] }); 

但是,我不确定通过ID提出请求ID是一个很好的解决scheme。
也许,我只是没有意识到一个完整的问题。

@glukki,谢谢你的伟大的答案和参考asynchronous。 我用你的代码和两个asynchronous请求进行排列,这些asynchronous请求使用单个连接和连接池来处理超过100K行select到1.2M行插入。 工作得非常好,花了不到10分钟。 这是完整的实施减去模块和连接设置。 我希望这也能帮助别人。 再次感谢!

 function populateMesh(row, callback){ xmlParser.parseString('<root>'+row.mesh_heading_list+'</root>', function(err, result){ var q2 = async.queue(function (task, cb) { pool.getConnection(function(err, cnx){ cnx.query('INSERT INTO abstract_mesh (mesh_id, abstract_id, major_topic) SELECT mesh_descriptor.id, ?, ? FROM mesh_descriptor WHERE mesh_descriptor.name = ?', [task.id, task.majorTopic, task.descriptorName], function(err, result){ if (err) {throw err;} cnx.release(); cb(); }); }); }, 50); q2.drain = function() { //console.log('all mesh processed'); callback(); } if(!(result.root instanceof Object)){ //console.log('its not obj!', row.id); q2.push({id: row.id, majorTopic: 'N', descriptorName: 'Null'}, function (err) {}); } for(var i in result.root.MeshHeading){ // console.log('in loop',result.root.MeshHeading[i].DescriptorName); if(typeof result.root.MeshHeading[i].DescriptorName === 'undefined'){ q2.push({id: row.id, majorTopic: 'N', descriptorName: 'Emergency'}, function(err){}); } for(var j in result.root.MeshHeading[i].DescriptorName){ var descriptorName = result.root.MeshHeading[i].DescriptorName[j]._; var majorTopic = result.root.MeshHeading[i].DescriptorName[j].$.MajorTopicYN; q2.push({id: row.id, majorTopic: majorTopic, descriptorName: descriptorName}, function (err) {}); } } }); } // here goes task serving logic // if any async function should be finished before drain callback, push them into q var q = async.queue(function (row, callback) { console.log('got id: ' + row.id); populateMesh(row, function(){ callback(); }); }, 10); q.drain = function() { console.log('all items have been processed'); conn.end(function(err){ console.log('connection ended'); }); pool.end(function(err){ console.log('pool closed'); }); }; var truncate = conn.query('truncate abstract_mesh'); var select = conn.query('SELECT id, mesh_heading_list FROM pubtbl'); select.on('result', function(result){ // console.log(result); q.push(result, function (err) { //console.log('finished processing row'); }); }); 

在我看来,最好的解决scheme是以非常简单的方式同步代码。

你可以使用“synchonize”包。

只是

npm安装同步

然后var sync = require(synchronize);

把应该同步的逻辑用到光纤中去

sync.fiber(function() { //put your logic here }

两个mysql查询的例子:

 var express = require('express'); var bodyParser = require('body-parser'); var mysql = require('mysql'); var sync = require('synchronize'); var db = mysql.createConnection({ host : 'localhost', user : 'user', password : 'password', database : 'database' }); db.connect(function(err) { if (err) { console.error('error connecting: ' + err.stack); return; } }); function saveSomething() { var post = {id: newId}; //no callback here; the result is in "query" var query = sync.await(db.query('INSERT INTO mainTable SET ?', post, sync.defer())); var newId = query.insertId; post = {foreignKey: newId}; //this query can be async, because it doesn't matter in this case db.query('INSERT INTO subTable SET ?', post, function(err, result) { if (err) throw err; }); } 

当调用“saveSomething()”时,它在主表中插入一行并接收最后插入的ID。 之后,下面的代码将被执行。 没有必要嵌套承诺或类似的东西。