使用nodejs / async批处理和延迟API调用

我在一个社交networking图上工作,在那里我想根据从API获得的邻接表来构build一个“六度分离”树。

对于每个人来说,API将以[id1,id2,id3 …]的forms返回一组好友,这正是我想要的。 但是问题是有很多人,API只允许400个电话/ 15分钟。 我可以将数据保存在本地数据库中,但是我不想用请求泛滥API。

我所做的伪代码是这样的:

requestCharacter = function(id) { is this person in my db already? if true, return; else make api call(error, function(){loopFriends(character)}) { save character in database } } loopFriends(character){ foreach(friend in character.friends) requestCharacter(friend); } 

而且我已经编码了,或多或less,并且它工作正常,但是因为它不断遍历树木,而且人们在彼此的朋友列表中重复出现,所以这是非常低效的,并且不断地破坏API限制

所以我想要做的就是排队请求,在添加之前检查是否还没有在队列中,并且一次以400或更less的请求批量运行队列。 (所以如果队列中有1200个,它将运行400,等待15分钟,运行400,等待15分钟,运行400 …)

我尝试使用队列async.js,我可以加载到队列中,但我不认为它实际上运行。 对于这样的情况,最好的办法是什么?

我的实际非排队代码如下所示:

 var lookupAndInsertCharacter = function(id){ Character.findOne({ 'id': id }, function (err, person) { if (err) console.log(err); else { if(person!=null) {console.log('%s already exists in database, not saved', person.name); getCharacterFriends(id);} else insertCharacter(id, function(){getCharacterFriends(id)}); }; }) } var insertCharacter = function(id, callback){ var url = getCharacterURL(id); request(url, function (error, response, body) { if (!error && response.statusCode == 200) { var result = JSON.parse(body); if(result.status_code != 1 ) {console.log("ERROR status_code: %s. Please wait 15 minutes", result.status_code); return;} else { var me = new Character(processCharacter(result)); me.save(function(err){ if (err) return handleError(err); }); console.log("Saved character "+me.name); } } else { console.log(error); } }); } var getCharacterFriends = function(id) { Character.findOne({ 'id': id }, function (err, person) { if (err) console.log(err); else { console.log("Getting friends for %s",person.name); _.each(person.character_friends, function(d){ lookupAndInsertCharacter(d); }); console.log("Getting enemies for %s",person.name); _.each(person.character_enemies, function(d){ lookupAndInsertCharacter(d); }) }; }) } 

在下面的例子中,我将FaceBook上的所有组,其上的post以及作者的公开个人资料都列出来。

为了减缓这个过程,我创build了一个有限的“刮板”池,并保留每个刮板一段时间,所以我“不能重载FaceBook服务器:)”

对于上面的例子,你可以

  • setTimeout(function(){pool.release(scraper);}, 15*60*1000);
  • 或限制您的池大小为1 max : 1并保留你的刮板3.75秒setTimeout(function(){pool.release(scraper);}, 3750);

这里是代码

 function saveData (anyJson) { // put your Db communication here. // console.log(anyJson); } function now() { instant = new Date(); return instant.getHours() +':'+ instant.getMinutes() +':'+ instant.getSeconds() +'.'+ instant.getMilliseconds(); } var graph = require('fbgraph'); console.log(process.argv[2]); graph.setAccessToken(process.argv[2]); var poolModule = require('generic-pool'); var pool = poolModule.Pool({ name : 'scraper', create : function(callback) { console.log(now() +' created scraper'); // parameter order: err, resource callback(null, {created:now()}); }, destroy : function(scraper) { console.log(now() +' released scraper created '+ scraper.created); }, max : 10, min : 1, idleTimeoutMillis : 60*60*1000, log : false }); function pooledGraphGet(path,analyse) { pool.acquire(function(err,scraper) { if (err) { console.log(now() +' Could not get a scraper for '+ path); throw err; } graph.get(path,function(err,res) { if (err) { console.log(now() +' Could not get '+ path +' using scraper created '+ scraper.created); throw err; } else { console.log(now() +' Got '+ path +' using scraper created '+ scraper.created); setTimeout(function(){pool.release(scraper);}, 60*1000); analyse(res); } }); }); } pooledGraphGet('me?fields=friends,groups', function(res) { res.groups.data.forEach(function(group) { saveData (group); pooledGraphGet(group.id +'?fields=id,name,members,feed', function(res) { if (res.feed) res.feed.data.forEach(function(feed){ saveData (feed); pooledGraphGet(feed.from.id +'?fields=id,name', function(res) { saveData (res); }); }); }); }); }); 

最终为我工作的是限制API调用。 我用了

https://github.com/wankdanker/node-function-rate-limit

然后我做了一个有限版的insertCharacter:

 var rateLimit = require('function-rate-limit'); var insertLimited = rateLimit(400, 900000, function (id) { insertCharacter(id); });