同时完成儿童程序和承诺决议

tl; dr:我写的一个函数创build了几个subprocess,当他们在消息中提交他们的数据时,这个subprocess解决了这个承诺。 虽然函数将所有这些承诺包装在一个Promise中。所有的函数都会突然返回,并且promise也不会解决也不会拒绝,即使所有的过程都完成了没有错误。 任何想法为什么发生这种情况?

为了加速数据收集过程,我有一个父进程获取一些input数据(在SQL数据库中查询的date是准确的),并将它发送给大小相等的一些subprocess,等待subprocess完成处理他们的数据通过包装他们的结果在一个大的承诺。

虽然这适用于较小的数据集,但对于较大的数据集,父承诺只会返回到命令行 – 既不解决也不拒绝,甚至不能继续执行function。 看了几个日志之后,似乎所有的subprocess都正确地处理并发送了他们的数据,但是父进程并没有收到less数几个进程的结果。 错过的消息发生在数据处理结束的附近(当几个subprocess在大约同一时间完成并发送消息时)

缩写代码:

// main function function createArray(i,j) // returns an array of i empty arrays, each of length j function chunkify(a, n, balanced) // divides array a into n chunks (balancing them in size if true) returning an array of chunks function kidcollector(snaptimes,course) { var done = 1; var numchild = 10 const chunked = chunkify(snaptimes,numchild,true); // array of numchild promises to be resolved upon arrival of data var collectedPromises = _.times(numchild).map(i => { return new Promise((resolve, reject) => { var child = child_process.fork('./child.js'); // send chunk of data to each child child.send({ times:chunked[i], c:course }); child.on('error', (err) => { console.log('Child error.'); reject(err) }); child.on('message', function(m) { if (m.err) { console.log('Got error from '+ m.child, m.err); reject(m.err); } else { console.log('recieved data from ' + m.child + '! ' + done + ' out of ' + numchild); done++; resolve(m.data); } }); }); }) return Promise.all(collectedPromises) .then(results => { // compile all data into one array then return it }) .catch(err => { console.log("One of the kids messed up:", err); }) }; // child.js, a separate file const connString = // it's a secret! const client = new Client(connString); client.connect(); client.on('error', (err) => { console.error('Client error:', err.stack) }) process.on('exit', (err) => { if (err) console.log(process.pid + ' has recieved error:', err); client.end(() => console.log(process.pid + ' has disconnected on process end', err)); }) process.on('disconnect', (err) => { if (err) console.log(process.pid + ' has recieved error:', err); client.end(() => console.log(process.pid + ' has disconnected on process disconnect')); }) process.on('message', function(m) { collector(m.times,mc,process.pid) // async function which compiles data across SQL databases .then(async function(subdata) { console.log("all done"); await process.send({ child: process.pid, data: subdata }); await process.disconnect(); }) .catch(async function(err) { console.log("FAILED IN CHILD", err) await process.send({ child: process.pid, err: err }); await process.disconnect(); }) }); 

因此,在按照预期运行一段时间之后,在数据处理结束时,日志如下所示:

 all done // child says they're done recieved data from 5486! 5 out of 10 // parent has received their data 5486 has disconnected on process disconnect // child disconnects 5481 processing snaptime #35 at 2017-07-31T20:26:40.322Z // child is now processing a new time from their given array all done recieved data from 5478! 6 out of 10 5478 has disconnected on process disconnect 5483 processing snaptime #34 at 2017-07-31T20:26:51.065Z 5485 processing snaptime #35 at 2017-07-31T20:27:01.876Z all done // child says they're done 5477 has disconnected on process disconnect // child disconnects, but parent hasn't received data all done recieved data from 5481! 7 out of 10 // all good here 5481 has disconnected on process disconnect 5483 processing snaptime #35 at 2017-07-31T20:27:47.834Z all done 5485 has disconnected on process disconnect // didn't receive message here all done recieved data from 5483! 8 out of 10 5483 has disconnected on process disconnect hansy@Hansys-MacBook-Air ~/Documents/GitHub // and we're at the command line...? 

在promise.all()解决scheme上,代码应该logging运行时间,如果拒绝,它应该logging下其中一个孩子搞砸了,并且错误。

关于发生什么事情和/或如何解决这个问题的任何想法,尤其是因为它只发生在较大的数据集上? (我正在使用节点v8.0.0和10个subprocess)

你的问题似乎是process.send不会返回你可以await的承诺,而是(可选的)callback。 所以你的disconnect呼叫不会等待消息被发送。

当没有更多的事件在队列中被处理时,即使当承诺尚未结算,你的父进程也完成了。 你要听的东西是孩子过程的exit事件 ,不仅是error 事件 。 当你reject ,你要确保你的Promise.all将永远满足,不pipesubprocess做什么。

我build议

 // parent … new Promise((resolve, reject) => { const child = child_process.fork('./child.js'); child.on('error', reject); child.on('exit', reject); child.on('message', resolve); // should happen before exit child.send({ times: chunked[i], c: course }); }).then(function(m) { if (m.err) { console.log(`received error from #${i} (${m.child})`, m.err); throw m.err; } else { console.log(`received data from #${i} (${m.child})`); return m.data; } }, function(err) { console.log(`Got abort from #${i} (${m.child})`); throw err; }); 

 // child … process.on('message', function(m) { collector(m.times, mc, process.pid) // async function which compiles data across SQL databases .then(function(subdata) { console.log(process.pid+" done"); return { child: process.pid, data: subdata }; }, function(err) { console.log(process.pid+" FAILED:", err) return { child: process.pid, err: err }; }).then(function(data) { return new Promise(function(resolve, reject) { process.send(data, function(err) { if (err) reject(err); else resolve(); }); }); }).catch(function(err) { console.log(process.pid+" FAILED to send result", err) }).then(function() { process.disconnect(); }) });