asynchronous并行请求正在顺序运行

我正在运行一个服务器使用Node.js,并需要从我运行的另一台服务器( localhost:3001 )请求数据。 我需要向数据服务器发送多个请求(〜200)并收集数据(响应大小从〜20Kb到〜20Mb不等)。 每个请求都是独立的,我想将响应保存为一个巨大的数组:

 [{"urlAAA": responseAAA}, {"urlCCC": responseCCC}, {"urlBBB": responseBBB}, etc ] 

请注意,这些项目的顺序并不重要,理想情况下应该按照数据可用的顺序填充数组。

 var express = require('express'); var router = express.Router(); var async = require("async"); var papa = require("papaparse"); var sync_request = require('sync-request'); var request = require("request"); var pinnacle_data = {}; var lookup_list = []; for (var i = 0; i < 20; i++) { lookup_list.push(i); } function write_delayed_files(object, key, value) { object[key] = value; return; } var show_file = function (file_number) { var file_index = Math.round(Math.random() * 495) + 1; var pinnacle_file_index = 'http://localhost:3001/generate?file=' + file_index.toString(); var response_json = sync_request('GET', pinnacle_file_index); var pinnacle_json = JSON.parse(response_json.getBody('utf8')); var object_key = "file_" + file_number.toString(); pinnacle_data[object_key] = pinnacle_json; console.log("We've handled file: " + file_number); return; }; async.each(lookup_list, show_file, function (err) {}); console.log(pinnacle_data); /* GET contact us page. */ router.get('/', function (req, res, next) { res.render('predictionsWtaLinks', {title: 'Async Trial'}); }); module.exports = router; 

现在当这个程序运行时,它显示:

 We've handled file: 0 We've handled file: 1 We've handled file: 2 We've handled file: 3 We've handled file: 4 We've handled file: 5 etc 

现在,由于这些文件的大小是可变的,所以我希望这样能够“并行”执行请求,但似乎是按顺序执行,这正是我试图通过使用async.each()来避免的。 目前连接到数据服务器需要大约1-2秒的时间,所以在很多文件上执行这个过程花费的时间太长了。

我意识到我正在使用同步请求,所以想理想地取代:

 var response_json = sync_request('GET', pinnacle_file_index); 

与类似的东西

 request(pinnacle_file_index, function (error, response, body) { if (!error && response.statusCode == 200) { pinnacle_data[object_key] = JSON.parse(body); } }); 

任何帮助将非常感激。

另外我看了试着:

  • 将url列表转换为匿名函数列表,并使用async.parallel(function_list, function (err, results) { //add results to pinnacle_data[]}); 。 (我遇到了问题,试图为数组中的每个元素定义唯一的函数)。

同样我也看了其他相关的话题:

  • 我试图模仿从asynchronoushttp调用build议的解决scheme与nodeJS没有进展。

  • Node.js – Async.js:并行执行是如何工作的? 。

  • 如何使用节点中的承诺一次执行并行asynchronous多个请求

编辑 – 工作解决scheme


下面的代码现在执行任务(每个请求需要〜80ms,包括使用npm requestretry重复请求)。 同样,这个比例很好,总计5个请求之间的平均请求时间约为80ms,最多为1000个。

 var performance = require("performance-now"); var time_start = performance(); var async = require("async"); var request_retry = require('requestretry'); var lookup_list = []; var total_requests = 50; for (var i = 0; i < total_requests; i++) { lookup_list.push(i); } var pinnacle_data = {}; async.map(lookup_list, function (item, callback) { var file_index = Math.round(Math.random() * 495) + 1; var pinnacle_file_index = 'http://localhost:3001/generate?file=' + file_index; request_retry({ url: pinnacle_file_index, maxAttempts: 20, retryDelay: 20, retryStrategy: request_retry.RetryStrategies.HTTPOrNetworkError }, function (error, response, body) { if (!error && response.statusCode == 200) { body = JSON.parse(body); var data_array = {}; data_array[file_index.toString()] = body; callback(null, data_array); } else { console.log(error); callback(error || response.statusCode); } }); }, function (err, results) { var time_finish = performance(); console.log("It took " + (time_finish - time_start).toFixed(3) + "ms to complete " + total_requests + " requests."); console.log("This gives an average rate of " + ((time_finish - time_start) / total_requests).toFixed(3) + " ms/request"); if (!err) { for (var i = 0; i < results.length; i++) { for (key in results[i]) { pinnacle_data[key] = results[i][key]; } } var length_array = Object.keys(pinnacle_data).length.toString(); console.log("We've got all the data, totalling " + length_array + " unique entries."); } else { console.log("We had an error somewhere."); } }); 

谢谢您的帮助。

正如你所发现的, async.parallel()只能并行处理本身asynchronous的操作。 如果这些操作是同步的,那么由于node.js的单线程特性,操作会一个接一个地运行,而不是并行运行。 但是,如果操作本身是asynchronous的,那么async.parallel() (或其他asynchronous方法)将一次启动它们并协调结果。

这是一个使用async.map()的一般想法。 我使用async.map()是因为它的想法是,它需要一个数组作为input,并产生与原始相同的顺序结果数组,但并行运行所有的请求,似乎与你想要的:

 var async = require("async"); var request = require("request"); // create list of URLs var lookup_list = []; for (var i = 0; i < 20; i++) { var index = Math.round(Math.random() * 495) + 1; var url = 'http://localhost:3001/generate?file=' + index; lookup_list.push(url); } async.map(lookup_list, function(url, callback) { // iterator function request(url, function (error, response, body) { if (!error && response.statusCode == 200) { var body = JSON.parse(body); // do any further processing of the data here callback(null, body); } else { callback(error || response.statusCode); } }); }, function(err, results) { // completion function if (!err) { // process all results in the array here console.log(results); for (var i = 0; i < results.length; i++) { // do something with results[i] } } else { // handle error here } }); 

而且,这是一个使用Bluebird Promise.map()的版本,有些类似地使用Promise.map()迭代初始数组:

 var Promise = require("bluebird"); var request = Promise.promisifyAll(require("request"), {multiArgs: true}); // create list of URLs var lookup_list = []; for (var i = 0; i < 20; i++) { var index = Math.round(Math.random() * 495) + 1; var url = 'http://localhost:3001/generate?file=' + index; lookup_list.push(url); } Promise.map(lookup_list, function(url) { return request.getAsync(url).spread(function(response, body) { if response.statusCode !== 200) { throw response.statusCode; } return JSON.parse(body); }); }).then(function(results) { console.log(results); for (var i = 0; i < results.length; i++) { // process results[i] here } }, function(err) { // process error here }); 

听起来像你只是试图并行下载一堆url。 这将做到这一点:

 var request = require('request'); var async = require('async'); var urls = ['http://microsoft.com', 'http://yahoo.com', 'http://google.com', 'http://amazon.com']; var loaders = urls.map( function(url) { return function(callback) { request(url, callback); } }); async.parallel(loaders, function(err, results) { if (err) throw(err); // ... handle appropriately // results will be an array of the results, in // the same order as 'urls', even thought the operation // was done in parallel console.log(results.length); // == urls.length }); 

或者更简单,使用async.map

 var request = require('request'); var async = require('async'); var urls = ['http://microsoft.com', 'http://yahoo.com', 'http://google.com', 'http://amazon.com']; async.map(urls, request, function(err, results) { if (err) throw(err); // handle error console.log(results.length); // == urls.length }); 

尝试这个:

 var async = require("async"); var request = require("request"); var show_file = function (file_number,cb) { //..Sync ops var file_index = Math.round(Math.random() * 495) + 1; var pinnacle_file_index = 'http://localhost:3001/generate?file='+file_index.toString(); //request instance from Request npm Module //..Async op --> this should make async.each asynchronous request(pinnacle_file_index, function (error, response, body) { if(error) return cb(error); var object_key = "file_" + file_number.toString(); pinnacle_data[object_key] = JSON.parse(body); return cb(); }); }; async.each( lookup_list, show_file, function (err) { if(err){ console.log("Error",err); }else{ console.log("Its ok"); console.log(pinnacle_data); } });