运行在heapspace之外的nodejs数据库迁移

您好我正在尝试将遗留数据库迁移到一个新的数据库与nodejs。

一些遗留的数据库表有近一百万行,因此这是一个任务与高内存消耗。

在我目前的脚本中,我很快就用完了堆内存。 我做了一些改变,我希望能解决这个问题,但是使用的空间还在迭代中增长。

下面的代码基本上是查询旧表,映射一些字段并插入新的数据库。 我已经把循环内部的variables声明移到了外部,希望旧的variables将被覆盖,现在需要新的空间。 此外,我使用arrays上的.pop希望这将不断减less剩余行所需的空间。

不过,正如我已经说过,所需的空间是随着迭代而不断增长的。 有没有人有一个想法,为什么?

 function migrate_user_table(callback) { // Migrate user table logger.log('info', "Starting migration of user table..."); let row = null; let userid = null; let fullname = null; let active = null; let imagepath = null; let statusbase64 = null; let gender = null; let orientation = null; let reports = null; let reviewStatus = null; let region = null; let newReviewStatus = null; let newgender = null; let neworientation = null; let newregion = null; let banned = null; let lastActive = null; let numberOfRequests = null; let requestsSend = null; let moji = null; let created = null; let minAgeS = null; let maxAgeS = null; let minAgeC = null; let maxAgeC = null; let genderS = null; let orientS = null; let genderC = null; let newgenderS = null; let neworientS = null; let newgenderC = null; let user = null; let user_has_social = null; let user_has_data_username = null; let user_has_data_status = null; let user_has_data_report = null; let user_has_data_sent = null; let user_has_data_recv = null; let user_has_moji = null; let user_has_filter_searchage = null; let user_has_filter_chatage = null; let user_has_filter_searchgender = null; let user_has_filter_chatgender = null; let user_has_filter_searchorient = null; legacy.query('SELECT * FROM user u LEFT JOIN behavior b ON (u.userid = b.userid) LEFT JOIN filter f ON (u.username = f.username)', (error, results) => { if( error ) throw error; while (results.length > 0 ) { row = results.pop(); userid = row["userid"]; kikname = row["username"]; fullname = row["fullname"]; active = row["active"]; imagepath = row["img"]; statusbase64 = row["status"]; gender = parseInt(row["gender"]); orientation = row["orientation"]; reports = row["reports"]; reviewStatus = parseInt(row["reviewStatus"]); region = row["region"]; // map to new reviewstatus newReviewStatus = 1; switch (reviewStatus) { case 0 : newReviewStatus = 1; break; case 1 : newReviewStatus = 3; break; case 2 : newReviewStatus = 4; break; case -1 : newReviewStatus = 2; break; } // map to new gender, orientation and region newgender = gender +1; neworientation = orientation +1; newregion = 7; if( region >= 0 ) { newregion = region +1; } banned = row["banned"]; lastActive = row["pendingSince"]; numberOfRequests = row["numberOfRequests"]; requestsSend = row["requestsSend"]; moji = row["moji_idmoji"]; created = row["created"]; minAgeS = row["minAgeS"]; maxAgeS = row["maxAgeS"]; minAgeC = row["minAgeC"]; maxAgeC = row["maxAgeC"]; genderS = row["genderS"]; orientS = row["orientS"]; genderC = row["genderC"]; newgenderS = genderS + 1; if( newgenderS === 0 ) { newgenderS = null; } neworientS = orientS + 1; if( neworientS === 0 ) { neworientS = null; } newgenderC = genderC + 1; if( newgenderC === 0 ) { newgenderC = null; } user = {iduser : userid, imageurl : imagepath, birthdate : null, active : active, banned : banned, reviewstatus_idreviewstatus : newReviewStatus, last_active : lastActive, created : created, gender_idgender : newgender, orientation_idorientation : neworientation, region_idregion : newregion}; connection.query('INSERT INTO user SET ?', user, (error) => { if( error ) throw error; logger.log('debug', "User Insert successfull"); }); user_has_social = {user_iduser : userid, socialtype_idsocialtype : 1, value : kikname}; connection.query('INSERT INTO user_has_social SET ?', user_has_social, (error) => { if( error ) throw error; logger.log('debug', "User_has_social Insert successfull"); }); user_has_data_username = {user_iduser : userid, datatype_iddatatype : 5, value : fullname}; user_has_data_status = {user_iduser : userid, datatype_iddatatype : 1, value : statusbase64}; user_has_data_report = {user_iduser : userid, datatype_iddatatype : 7, value : reports}; user_has_data_sent = {user_iduser : userid, datatype_iddatatype : 4, value : requestsSend}; user_has_data_recv = {user_iduser : userid, datatype_iddatatype : 3, value : numberOfRequests}; datainsert(connection, user_has_data_username); datainsert(connection, user_has_data_status); datainsert(connection, user_has_data_report); datainsert(connection, user_has_data_sent); datainsert(connection, user_has_data_recv); user_has_moji = {user_iduser : userid, moji_idmoji : moji}; connection.query('INSERT INTO user_has_moji SET ?', user_has_moji, (error) => { if( error ) throw error; logger.log('debug', "User_has_moji" + " Insert successfull"); }); user_has_filter_searchage = { user_iduser : userid, filtertype_idfiltertype : 1, value : minAgeS, add_value : maxAgeS}; user_has_filter_chatage = { user_iduser : userid, filtertype_idfiltertype : 2, value : minAgeC, add_value : maxAgeC}; user_has_filter_searchgender = { user_iduser : userid, filtertype_idfiltertype : 3, value : newgenderS, add_value : null}; user_has_filter_chatgender = { user_iduser : userid, filtertype_idfiltertype : 4, value : newgenderC, add_value : null}; user_has_filter_searchorient = { user_iduser : userid, filtertype_idfiltertype : 5, value : neworientS, add_value : null}; filterinsert(connection, user_has_filter_searchage); filterinsert(connection, user_has_filter_chatage); filterinsert(connection, user_has_filter_searchgender); filterinsert(connection, user_has_filter_chatgender); filterinsert(connection, user_has_filter_searchorient); logger.log('debug', results.length + " row to go"); } callback(); }); } 

您使用query(stmt, function(error, results) {...})的方式从旧表中加载整个结果集到RAM中。 然后循环遍历结果集的内容(用pop取出行)。

考虑到SQL的要点是要处理大量RAM的数据,所以你没有什么意外的。

你正在浪费RAM: SELECT * 。 如果只用SELECT userid, username, ...枚举所需的列SELECT userid, username, ...则行数会变短,所以更多的数据将放入RAM中。

但是这不能解决你的问题,只是延迟它。

要解决它,你有两个select。 一个是大块地处理遗留表。

例如,你可以检索你的数据块。 你得到这个查询的第一个块

  SELECT whatever ORDER BY user_id LIMIT 1000 OFFSET 0 

和下一个块这些查询

  SELECT whatever ORDER BY user_id LIMIT 1000 OFFSET 1000 SELECT whatever ORDER BY user_id LIMIT 1000 OFFSET 2000 

这给你每一千行的大块。 继续,直到你没有检索到行。

第二个select:逐个调查结果集的行。 这需要使用query()的稍微不同的设置。 这是写在这里。 https://github.com/mysqljs/mysql#streaming-query-rows

基本上是这样的:

 var stream = legacy.query('SELECT whatever'); stream .on('result', function(row) { // Pausing the connnection is useful if your processing involves I/O legacy.pause(); // handle your row of data here... legacy.resume(); }) .on('end', function() { // all rows have been received }); 

这可以让你一次处理你的数据。