Node.js:执行多个asynchronous操作的最佳方式,然后做别的事情?

在下面的代码中,我试图一次性创build多个(大约10个)HTTP请求和RSS分析。

我在我需要访问和parsing结果的URI数组上使用标准的forEach构造。

码:

 var articles; feedsToFetch.forEach(function (feedUri) { feed(feedUri, function(err, feedArticles) { if (err) { throw err; } else { articles = articles.concat(feedArticles); } }); }); // Code I want to run once all feedUris have been visited 

我明白,一旦调用一个函数,我应该使用callback。 然而,在这个例子中,我能想到使用callback的唯一方法就是调用一个函数来计算它被调用了多less次,并且只有在调用与feedsToFetch.length相同的次数时才会继续。 。

所以我的问题是, 在node.js中处理这种情况的最好方法什么

最好没有任何forms的阻塞! (我仍然想要快速的速度)。 是承诺还是别的?

谢谢,丹尼

无堆栈解决scheme

承诺将被包含在下一个JavaScript版本中

stream行的Promise库为这个确切的用例提供了一个.all()方法(等待一堆asynchronous调用完成,然后执行其他操作)。 这是你场景的完美搭配

蓝鸟也有.map() ,它可以采取一个值的数组,并使用它来启动一个Promise链。

这是一个使用Bluebird .map()的例子:

 var Promise = require('bluebird'); var request = Promise.promisifyAll(require('request')); function processAllFeeds(feedsToFetch) { return Promise.map(feedsToFetch, function(feed){ // I renamed your 'feed' fn to 'processFeed' return processFeed(feed) }) .then(function(articles){ // 'articles' is now an array w/ results of all 'processFeed' calls // do something with all the results... }) .catch(function(e){ // feed server was down, etc }) } function processFeed(feed) { // use the promisified version of 'get' return request.getAsync(feed.url)... } 

还要注意,你不需要在这里使用闭包来累积结果。

蓝鸟API文档写得很好,有很多例子,所以它更容易拾取。

一旦我学会了诺言模式,它使生活变得更加容易。 我不能推荐它。

另外,这里有一篇关于使用承诺, async模块和其他处理asynchronous函数的不同方法的文章

希望这可以帮助!

没有必要的黑客

我build议使用asynchronous模块,因为它使这些事情更容易。

async提供async.eachSeries作为arr.forEach的asynchronousreplace,并允许您在done时传递donecallback函数。 它将处理每个项目,就像每个项目一样。 此外,它会方便地将错误引发到您的callback中,以便您不必在循环内部具有处理程序逻辑。 如果你想/需要并行处理,你可以使用async.each 。

async.eachSeries调用和callback之间没有阻塞

 async.eachSeries(feedsToFetch, function(feedUri, done) { // call your async function feed(feedUri, function(err, feedArticles) { // if there's an error, "bubble" it to the callback if (err) return done(err); // your operation here; articles = articles.concat(feedArticles); // this task is done done(); }); }, function(err) { // errors generated in the loop above will be accessible here if (err) throw err; // we're all done! console.log("all done!"); }); 

或者,您可以构build一个asynchronous操作数组并将其传递给async.series 。 系列将在一系列 (不平行)中处理您的结果,并在每个function完成时调用callback。 在async.eachSeries上使用这个的唯一原因是,如果你喜欢熟悉的arr.forEach语法。

 // create an array of async tasks var tasks = []; feedsToFetch.forEach(function (feedUri) { // add each task to the task array tasks.push(function() { // your operations feed(feedUri, function(err, feedArticles) { if (err) throw err; articles = articles.concat(feedArticles); }); }); }); // call async.series with the task array and callback async.series(tasks, function() { console.log("done !"); }); 

或者你可以滚动你自己™

也许你感觉更加雄心勃勃,或者你不想依赖async依赖。 也许你只是像我一样无聊。 无论如何,我故意复制了async.eachSeries的API,以便理解它是如何工作的。

一旦我们删除了这里的注释,我们只有9行代码可以被我们想asynchronous处理的任何数组重复使用! 它不会修改原始数组,可以将错误发送到迭代的“短路”,并且可以使用单独的callback。 它也将在空数组上工作。 相当于9行的function:)

 // void asyncForEach(Array arr, Function iterator, Function callback) // * iterator(item, done) - done can be called with an err to shortcut to callback // * callback(done) - done recieves error if an iterator sent one function asyncForEach(arr, iterator, callback) { // create a cloned queue of arr var queue = arr.slice(0); // create a recursive iterator function next(err) { // if there's an error, bubble to callback if (err) return callback(err); // if the queue is empty, call the callback with no error if (queue.length === 0) return callback(null); // call the callback with our task // we pass `next` here so the task can let us know when to move on to the next task iterator(queue.shift(), next); } // start the loop; next(); } 

现在让我们创build一个示例asynchronous函数来使用它。 我们setTimeout这里的延迟时间为500毫秒。

 // void sampleAsync(String uri, Function done) // * done receives message string after 500 ms function sampleAsync(uri, done) { // fake delay of 500 ms setTimeout(function() { // our operation // <= "foo" // => "async foo !" var message = ["async", uri, "!"].join(" "); // call done with our result done(message); }, 500); } 

好的,让我们看看他们是如何工作的!

 tasks = ["cat", "hat", "wat"]; asyncForEach(tasks, function(uri, done) { sampleAsync(uri, function(message) { console.log(message); done(); }); }, function() { console.log("done"); }); 

输出(每个输出之前500 ms延迟)

 async cat ! async hat ! async wat ! done 

使用URL列表的副本作为队列来跟踪到达使得它很简单:(所有的变化评论)

 var q=feedsToFetch.slice(); // dupe to censor upon url arrival (to track progress) feedsToFetch.forEach(function (feedUri) { feed(feedUri, function(err, feedArticles) { if (err) { throw err; } else { articles = articles.concat(feedArticles); } q.splice(q.indexOf(feedUri),1); //remove this url from list if(!q.length) done(); // if all urls have been removed, fire needy code }); }); function done(){ // Code I want to run once all feedUris have been visited } 

最后,这并不比承诺“更肮脏”,并为您提供重新加载未完成的url的机会(单独一个柜台不会告诉您哪一个失败)。 对于这个简单的并行下载任务,它实际上是将更多的代码添加到您的项目Promise中,而不是一个简单的队列,而Promise.all()并不是最直观的地方。 一旦进入了子查询,或者想要比火车故障更好的error handling,我强烈build议使用Promise,但是不需要火箭发射器来杀死一只松鼠。