Async.eachSeries:已经调用callback

这是一个有点长和杂乱的代码,但请忍受我,因为我需要完成这个。

我正在尝试为每个用户更新一个json对象。 我想循环迭代等待asynchronous过程结束,以避免竞争条件。 然而,这导致了一个callback地狱,现在我不能决定什么是正确的地方为每个callback被返回。

我在嵌套async.eachSeries上提到了这个答案,并试图根据它来构造我的代码。 但是,这仍然行不通。 该代码在callback1()提供了已经调用的错误。

async.eachOfSeries(res, function (value, camp, callback3) { let _id = res[camp]._id; let arr = res[camp].campaignID; async.eachOfSeries(arr, function2, function (err) { callback3(); }) function function2(value1, i, callback2) { let users = arr[i].users; let id = arr[i].id; let loop = Math.ceil(users / 1000); let limit = 0, offset = 0; for (let j = 0; j < loop; j++) { if (users > 1000) { limit = 1000; users -= limit; } else { limit = users; } console.log(limit + " limit " + offset + " offset"); var start = Date.now(); while (Date.now() < start + 100) {} const request = mailjet .get("messagesentstatistics") .request({ "CampaignID": id, "AllMessages": true, "Limit": limit, "Offset": offset }) request .then((result) => { let data = result.body.Data; var loop = 0; async.eachOfSeries(data, function1, function (err) { console.log("function"); callback2(); }) console.log("oooooo"); }) .catch((err) => { console.log(err); }) offset += limit; } function function1(value2, val, callback1) { console.log(data +" data"); let jsonObj = data[val]; let email = jsonObj.ToEmail; jsonObj['retailer'] = res[camp].retailer; jsonObj['summary'] = 'f'; let tempObj = {}; tempObj[id] = jsonObj; let options = { new: true }; let campId = id; User.addCampaignResponse(email, campId, tempObj, options, function (err, results) { if (err) { throw err; } else { console.log("aasd"); Campaign.updateResponse(_id, function (err, results2) { if (err) throw err; else { console.log("asdasaadas"); callback1(); } }) // console.log(results); } }) } } }, function (err) { callback(undefined, "doneeeeee"); }) 

有没有比这更好的方法? 我能否在某处使用瀑布? 我可以更改callback位置以避免错误吗?


编辑:简化的代码

 function function2(value1, i, callback2) { // ... const request = mailjet .get("messagesentstatistics") .request({ // ... }); request .then((result) => { // ... async.eachOfSeries(data, function1, function (err) { callback2(); }); }) .catch((err) => { // ... }); } 

 function function1(value2, val, callback1) { // ... User.addCampaignResponse(email, campId, tempObj, options, function (err, results) { if (err) { throw err; } else { Campaign.updateResponse(_id, function (err, results2) { if (err) throw err; else callback1(); }); } }); } 

 async.eachOfSeries(res, function (value, camp, callback3) { // ... async.eachOfSeries(arr, function2, function (err) { callback3(); }); }, function (err) { callback(undefined, "doneeeeee"); }); 

我会这样做。

我们使用if (err) return callback(err); 停止当前的asynchronousfunction并将错误发送到更高级别。

 async.eachSeries(res, function (r, callback1) { let _id = r._id; let arr = r.campaignID; async.eachSeries(arr, function firstLevel (a, callback2) { let users = a.users; let id = a.id; let loop = Math.ceil(users / 1000); let limit = 0, offset = 0; // for loop is synchronous whereas mailjet is asynchronous -> usually bad idea to mix those two // instead try async.timesSeries() async.timesSeries(loop, function getSentMessages (n, callback3) { if (users > 1000) { limit = 1000; users -= limit; } else { limit = users; } console.log(n, limit, "limit", offset, "offset"); var start = Date.now(); while (Date.now() < start + 100) {} // this does nothing... // async.js doesn't flow well with Promises so request your resource with a callback function mailjet .get("messagesentstatistics") .request({ CampaignID: id, AllMessages: true, Limit: limit, Offset: offset }) .request(function (err, result, body) { // stop everything if an error occurred; send the error back up if (err) return callback3(err); let data = result.body.Data; var loop = 0; async.eachSeries(data, secondLevel (jsonObj, callback4) { let email = jsonObj.ToEmail; jsonObj.retailer = r.retailer; jsonObj.summary = 'f'; let tempObj = {}; tempObj[id] = jsonObj; let options = { new: true }; let campId = id; User.addCampaignResponse(email, campId, tempObj, options, function (err, results) { // stop everything if an error occurred; send the error back up if (err) return callback4(err); console.log("added campaign response"); Campaign.updateResponse(_id, function (err, results2) { // stop everything if an error occurred; send the error back up if (err) return callback4(err); console.log("updated campaign response"); callback4(); }); }) }, callback3); }); // end of mailjet offset += limit; }, callback2); // end of async.timesSeries }, callback1); // end of async.eachOfSeries }, function (err) { // if an error occurs anywhere, it should back here if (err) { console.log(err); return; } console.log("doneeeeee"); }); 

另外使用有意义的variables和函数名称总是更好。

这是简化的更正版本


 function secondLevel(value2, val, callback) { // ... User.addCampaignResponse(email, campId, tempObj, options, function (err, results) { if (err) { throw err; } Campaign.updateResponse(_id, function (err, results2) { // throw the error to the highest level if there is one if (err) throw err; // Finish the eachOfSeries if all is ok callback(); }); }); } 

 function firstLevel(value1, i, callback) { // ... mailjet .get("messagesentstatistics") .request({ // ... }) .then((result) => { // ... async.eachOfSeries(data, secondLevel, function (err) { // throw the error to the highest level if there is one if (err) throw err; // Finish the eachOfSeries if all is ok callback(); }); }) .catch((err) => { // throw the error to the highest level throw err; }); } 

  // // Start of our program // async.eachOfSeries(res, function (value, camp, callback) { // ... async.eachOfSeries(arr, firstLevel, function (err) { // throw the error to the highest level if there is one if (!err) throw err; // Finish the eachOfSeries if all is ok callback(); }); }, function (err) { if (err) { // Here we re done with an error // ... return; } // We did it, no, error // ... });