使用nodejs脚本将GB的数据从MongoDB迁移到Cassandra的最佳方法

我在MongoDB中有一个很大的集合。 希望通过在该数据上运行一些业务逻辑nodejs脚本来迁移所有数据到cassandra。 做这个的最好方式是什么 ?

我编写了一个脚本,在mongo的单个请求中处理5000个文档,处理数据并将文档插入到cassandra中。 经过40-50次迭代需要很多时间。 CPU使用率显示100%。 这是因为很多callback发生? 我是节点j的新手,所以不能完成任何事情。var cassandra = require('../ models / tracking_cassandra'); var TrackingEvents = require('../ models / tracking_mongo_events');

var counter = 0; var incr = 5000; var final_counter = 0; var start_point = function (callback){ TrackingEvents.count(function(err, data){ final_counter = data; TrackingEvents.getEventsByCounter(counter, function(counter, obj) { var prevId = obj[0].toObject()._id; getMessagesFromMongo(prevId, callback); }); }); }; function getMessagesFromMongo(prevId, callback){ counter = counter + incr; TrackingEvents.getEventsByCounter(counter, function(counter, obj) { var nextId = obj[0].toObject()._id; var start_time = new Date(); TrackingEvents.getEventsBtwIds(prevId, nextId, function ( err, userEvents ) { if(userEvents.length !== 0){ insert_into_cassandra( userEvents, callback ); }else{ console.log('empty data set'); } }); if(counter >= final_counter){ callback(); }else{ getMessagesFromMongo(nextId, callback); } }); }; var insert_into_cassandra = function( events, callback ){ var inserts = 0; total_documents = total_documents + events.length; for(var i = 0 ; i< events.length ; i++){ var userEventData = events[i].toObject(); if(typeof userEventData.uid == 'undefined'){ total_nuid ++; }else{ create_cassandra_query( userEventData ); } } }; var create_cassandra_query = function ( eventData ) { delete eventData._id; delete eventData[0]; delete eventData.appid; delete eventData.appversion; var query = "INSERT INTO userwise_events "; var keys = "("; var values = "("; for(var key in eventData){ if(eventData[key] == null || typeof eventData[key] == 'undefined'){ delete eventData[key]; } if (eventData.hasOwnProperty(key)) { keys = keys + key + ', '; values = values + ':' + key + ', '; } if(key != 'uid' && key!= 'date_time' && key != 'etypeId'){ eventData[key] = String(eventData[key]); } } keys = keys.slice(0, -2); values = values.slice(0, -2); keys = keys + ")"; values = values + ")"; query = query + keys + " VALUES " + values; cassandra.trackingCassandraClient.execute(query, eventData, { prepare: true }, function (err, data) { if(err){ console.log(err); } }); }; var start_time = new Date(); start_point(function(res, err){ var end_time = new Date(); var diff = end_time.getTime() - start_time.getTime(); var seconds_diff = diff / 1000; var totalSec = Math.abs(seconds_diff); console.log('Total Execution Time : ' + totalSec); }); process.on('uncaughtException', function (err) { console.log('Caught exception: ' + err); });` 

这是因为很多callback发生?

对于我所知的所有人来说,可能都没有callback – 不可能告诉你什么是你的代码问题,甚至没有包含一行代码。

对于这样一个模糊的问题,我只能给你一个一般的build议:确保你没有长时间运行或while循环。 并且不要在事件循环的第一个记号中使用阻塞系统调用。 如果你不知道事件循环的第一个勾号是什么,那么就根本不要使用阻塞调用。 只要你可以,使用stream数据 – 特别是如果你有很多。

100%的CPU利用率是一个不好的迹象,不应该发生像你正在执行的I / O繁重的操作。 您应该能够轻松处理疯狂的数据量,特别是在使用stream时。 让您的进程最大限度地利用CPU执行内在的I / O绑定操作,例如通过networking传输大量数据,这肯定表明您在代码中做错了什么。 那到底是什么? 这将是一个谜,因为你甚至没有显示我们的代码的单一行。