使用node-mysql创build同步查询
我试图确保一个MySQL查询导致另一个,并没有完成,直到所有的子查询完成。 因此,例如,我从一个select和stream行开始,并执行该行结果的后续查询。 这可以通过callback来实现,但是我最终会耗尽内存,所以我想减慢进程并运行批处理,但是由于调度的asynchronous性,我不能保持相同的状态并结束在所有的行都被处理之后进行连接。
这是一个例子:
var query = conn.query('select id from table1 limit 10'); query.on('result', function(row){ console.log('query1', row); var query2 = conn.query('select id from books where id = ? ', [row.id]); query2.on('result', function(row2){ console.log('query2', row2); var query3 = conn.query('insert into test (id) values (?)', [row2.id]); query3.on('result', function(row3){ console.log(row3); }); }); }); query.on('end', function(){ conn.end(); });
以上失败,因为在初始查询结束后仍然有行要在query3中处理。
有什么想法吗? 实际的代码更加复杂,因为我必须从后续查询处理xml,并在循环批处理时触发更多的插入。
谢谢!
我会build议这个解决scheme与asynchronous模块:
var async = require("async"); // connection instance var conn; // here goes task serving logic // if any async function should be finished before drain callback, push them into q var solvers = { query: function(q, task, row){ console.log('query1', row); q.push({ solver: "query2", req: "select id from books where id = ?", reqArgs: [row.id] }); }, query2: function(q, task, row){ console.log('query2', row); q.push({ solver: "query3", req: "insert into test (id) values (?)", reqArgs: [row.id] }); }, query3: function(q, task, row){ console.log(row); } } // here is a queue of tasks var q = async.queue(function(task, cb){ var query = conn.query(task.req, task.reqArgs); query.on("end", cb); query.on("result",function(row){ solvers[task.solver](q, task, row); }); }, 2); // limit of parallel queries // when every request has reached "end" q.drain = function(){ conn.end(); // continue from here }; // initial task q.push({ solver: "query", req: "select id from table1 limit 10", reqArgs: [] });
但是,我不确定通过ID提出请求ID是一个很好的解决scheme。
也许,我只是没有意识到一个完整的问题。
@glukki,谢谢你的伟大的答案和参考asynchronous。 我用你的代码和两个asynchronous请求进行排列,这些asynchronous请求使用单个连接和连接池来处理超过100K行select到1.2M行插入。 工作得非常好,花了不到10分钟。 这是完整的实施减去模块和连接设置。 我希望这也能帮助别人。 再次感谢!
function populateMesh(row, callback){ xmlParser.parseString('<root>'+row.mesh_heading_list+'</root>', function(err, result){ var q2 = async.queue(function (task, cb) { pool.getConnection(function(err, cnx){ cnx.query('INSERT INTO abstract_mesh (mesh_id, abstract_id, major_topic) SELECT mesh_descriptor.id, ?, ? FROM mesh_descriptor WHERE mesh_descriptor.name = ?', [task.id, task.majorTopic, task.descriptorName], function(err, result){ if (err) {throw err;} cnx.release(); cb(); }); }); }, 50); q2.drain = function() { //console.log('all mesh processed'); callback(); } if(!(result.root instanceof Object)){ //console.log('its not obj!', row.id); q2.push({id: row.id, majorTopic: 'N', descriptorName: 'Null'}, function (err) {}); } for(var i in result.root.MeshHeading){ // console.log('in loop',result.root.MeshHeading[i].DescriptorName); if(typeof result.root.MeshHeading[i].DescriptorName === 'undefined'){ q2.push({id: row.id, majorTopic: 'N', descriptorName: 'Emergency'}, function(err){}); } for(var j in result.root.MeshHeading[i].DescriptorName){ var descriptorName = result.root.MeshHeading[i].DescriptorName[j]._; var majorTopic = result.root.MeshHeading[i].DescriptorName[j].$.MajorTopicYN; q2.push({id: row.id, majorTopic: majorTopic, descriptorName: descriptorName}, function (err) {}); } } }); } // here goes task serving logic // if any async function should be finished before drain callback, push them into q var q = async.queue(function (row, callback) { console.log('got id: ' + row.id); populateMesh(row, function(){ callback(); }); }, 10); q.drain = function() { console.log('all items have been processed'); conn.end(function(err){ console.log('connection ended'); }); pool.end(function(err){ console.log('pool closed'); }); }; var truncate = conn.query('truncate abstract_mesh'); var select = conn.query('SELECT id, mesh_heading_list FROM pubtbl'); select.on('result', function(result){ // console.log(result); q.push(result, function (err) { //console.log('finished processing row'); }); });
在我看来,最好的解决scheme是以非常简单的方式同步代码。
你可以使用“synchonize”包。
只是
npm安装同步
然后var sync = require(synchronize);
把应该同步的逻辑用到光纤中去
sync.fiber(function() { //put your logic here }
两个mysql查询的例子:
var express = require('express'); var bodyParser = require('body-parser'); var mysql = require('mysql'); var sync = require('synchronize'); var db = mysql.createConnection({ host : 'localhost', user : 'user', password : 'password', database : 'database' }); db.connect(function(err) { if (err) { console.error('error connecting: ' + err.stack); return; } }); function saveSomething() { var post = {id: newId}; //no callback here; the result is in "query" var query = sync.await(db.query('INSERT INTO mainTable SET ?', post, sync.defer())); var newId = query.insertId; post = {foreignKey: newId}; //this query can be async, because it doesn't matter in this case db.query('INSERT INTO subTable SET ?', post, function(err, result) { if (err) throw err; }); }
当调用“saveSomething()”时,它在主表中插入一行并接收最后插入的ID。 之后,下面的代码将被执行。 没有必要嵌套承诺或类似的东西。
- 为应用程序定义一个数据库,如果有效的数据库不存在
- 无法使用mongoose,Node.js将数据插入到mongoDB中的Document的字段中
- 带有MySQL查询的Node.js
- Node.js + libmysql-client + pingSync + setInterval = headache(true);
- sequelize bulkCreate()返回主键的NULL值
- (nodejs上的Socket.io)使用mysql数据更新div不会显示错误
- 在Sequelize中创build主键和外键关系
- Sails.js水线查询修饰符date与sails-mysql?
- SailsJS – 创buildlogging时如何指定string属性长度而不会出错?