运行NodeJS事件循环/等待subprocess完成

我首先对这个问题进行了一般性描述,然后详细介绍了为什么通常的方法不起作用。 如果你想阅读这些抽象的解释继续。 最后,我解释了更大的问题和具体的应用,所以如果你想阅读,跳转到“实际应用”。

我正在使用一个node.jssubprocess来做一些计算密集型的工作。 父进程是工作的,但是在执行的某个时刻,它会到达一个点,在这个点上它必须有来自subprocess的信息才能继续。 因此,我正在寻找一种方法来等待subprocess完成。

我目前的设置看起来有点像这样:

importantDataCalculator = fork("./runtime"); importantDataCalculator.on("message", function (msg) { if (msg.type === "result") { importantData = msg.data; } else if (msg.type === "error") { importantData = null; } else { throw new Error("Unknown message from dataGenerator!"); } }); 

和其他地方

 function getImportantData() { while (importantData === undefined) { // wait for the importantDataGenerator to finish } if (importantData === null) { throw new Error("Data could not be generated."); } else { // we should have a proper data now return importantData; } } 

所以当父进程启动时,它会执行第一个代码,产生一个subprocess来计算数据,然后继续执行它自己的工作。 当时间到了,它需要subprocess的结果继续调用getImportantData() 。 所以这个想法是getImportantData()阻塞,直到数据被计算。

但是,我用的方式是行不通的。 我认为这是由于我阻止使用while循环执行事件循环。 而且由于事件循环没有执行,subprocess没有消息可以被接收,因此while循环的条件不能改变,使其成为一个无限循环。

当然,我并不想使用这种while循环。 我宁愿做的是告诉node.js“执行事件循环的一个迭代,然后回到我身边”。 我会重复执行这个操作,直到收到我需要的数据,然后通过从getter返回而离开的地方继续执行。

我意识到,他构成了多次重新进入同一个函数的危险,但是我希望在事件循环中使用这个模块,除了等待subprocess的这个消息,并发送其他消息来报告进程,所以这不应该是一个问题。

有没有办法在Node.js中只执行一个事件循环迭代? 还是有另一种方法来实现类似的东西? 还是有一个完全不同的方法来实现我在这里要做的?

到目前为止我唯一能想到的解决scheme就是改变计算方式,以便引入另一个过程。 在这种情况下,将会有计算重要数据的过程,计算不需要重要数据的数据位的过程,以及这两个过程的父进程,它们只是等待来自两个subprocess的数据,当他们到达时的碎片。 由于它本身不需要做任何计算密集的工作,因此它可以等待来自事件循环(=消息)的事件并对它们作出反应,根据需要转发组合的数据并存储不能被组合的数据片段。 然而,这引入了另一个进程,甚至更多的进程间通信,这引入了更多的开销,这是我想避免的。

编辑

我发现需要更多的细节。

父进程(我们称之为进程1)本身是由另一个进程(进程0)产生的进程,用于执行一些计算密集型工作。 实际上,它只是执行一些我没有控制权的代码,所以我不能使它asynchronous工作。 我所能做的(并已完成的)是使定期执行的代码调用一个函数来报告进度并提供部分结果。 这个进度报告然后通过IPC发送回原始进程。

但在极less数情况下,部分结果是不正确的,所以他们必须修改。 要做到这一点,我需要一些数据,我可以从正常计算独立计算。 但是,这个计算可能需要几秒钟的时间。 因此,我开始另一个过程(过程2)来进行这个计算,并通过IPC消息将结果提供给过程1。 现在过程1和2愉快地计算那里的东西,希望在过程1需要它之前完成由过程2计算的校正数据。 但有时需要纠正过程1的早期结果之一,在这种情况下,我必须等待过程2完成计算。 阻塞进程1的事件循环在理论上不是问题,因为主进程(进程0)不会受到它的影响。 唯一的问题是,通过阻止进程1中代码的进一步执行,我也阻塞了事件循环,从而阻止它从进程2接收结果。

所以我需要以某种方式暂停进程1中的代码的进一步执行而不阻塞事件循环。 我希望有一个像process.runEventLoopIteration这样的调用,执行事件循环的迭代,然后返回。

然后我会改变这样的代码:

 function getImportantData() { while (importantData === undefined) { process.runEventLoopIteration(); } if (importantData === null) { throw new Error("Data could not be generated."); } else { // we should have a proper data now return importantData; } } 

从而执行事件循环直到我收到必要的数据,但不继续执行调用getImportantData()的代码。

基本上我在stream程1中做的是这样的:

 function callback(partialDataMessage) { if (partialDataMessage.needsCorrection) { getImportantData(); // use data to correct message process.send(correctedMessage); // send corrected result to main process } else { process.send(partialDataMessage); // send unmodified result to main process } } function executeCode(code) { run(code, callback); // the callback will be called from time to time when the code produces new data // this call is synchronous, run is blocking until the calculation is finished // so if we reach this point we are done // the only way to pause the execution of the code is to NOT return from the callback } 

实际应用/实现/问题

我需要以下应用程序的这种行为。 如果你有一个更好的方法来实现这个随意的build议。

我想执行任意代码,并通知它改变什么variables,调用什么函数,发生什么exception等等。我还需要这些事件在代码中的位置,以便能够显示在UI原始代码。

为了达到这个目的,我将代码插入到代码中并插入callback。 然后我执行代码,将执行包装在try-catch块中。 每当用一些关于执行的数据调用callback(例如variables变化)时,我会向主进程发送一条消息,告诉它有关更改。 这样,用户在运行时就会收到有关代码执行的通知。 这些callback生成的事件的位置信息在检测期间被添加到callback调用中,所以这不是问题。

出现exception时,会出现exception。 我也想通知用户在testing代码中的exception。 因此,我将代码的执行封装在try-catch中,并且捕获执行中的任何exception并将其发送到用户界面。 但错误的位置是不正确的。 由node.js创build的Error对象具有完整的调用堆栈,因此它知道它发生的位置。 但是这个位置如果相对于testing代码,所以我不能使用这个位置信息,直接显示原始代码旁边的错误。 我需要将工具代码中的这个位置转换为原始代码中的一个位置。 为此,在testing代码之后,我计算一个源代码映射 ,将代码化代码中的位置映射到原始代码中的位置。 但是,这个计算可能需要几秒钟的时间。 所以,我想,我会开始一个subprocess来计算源地图,而检测代码的执行已经开始。 然后,当发生exception时,我检查源地图是否已经被计算出来,如果没有,我就等待计算完成,以便能够更正位置。

由于要执行和观察的代码可以是完全任意的,所以我不能简单地将其重写为asynchronous。 我只知道它调用了提供的callback函数,因为我安装了代码。 我也不能只存储消息并返回继续执行代码,在下一次调用中检查源映射是否已经完成,因为继续执行代码也会阻塞事件循环,从而阻止计算出的源从执行过程中收到的地图。 或者,如果它被接收,那么只有在执行的代码完全完成后,这可能是相当晚或从不(如果要执行的代码包含无限循环)。 但在我收到sourceMap之前,我无法发送有关执行状态的进一步更新。 结合起来,这意味着我只能在代码执行完成之后才能发送正确的进度消息(可能永远不会),这完全破坏了程序的目的(使程序员可以观察代码的作用)执行)。

暂时放弃对事件循环的控制可以解决这个问题。 但是,这似乎不可能。 我的另一个想法是引入第三个进程来控制执行进程和sourceMapGeneration进程。 它从执行过程接收进度消息,并且如果任何消息需要更正,则等待sourceMapGeneration过程。 由于进程是独立的,所以控制进程可以存储接收到的消息并等待sourceMapGeneration进程,同时执行进程继续执行,并且一旦它接收到源地图,它就校正消息并将其全部发送出去。

但是,这不仅需要另一个进程(开销),还意味着我必须在进程之间再次传输代码,并且由于代码可能有数千行本身可能需要一些时间,所以我想将其移动尽可能less。

我希望这解释,为什么我不能和不使用通常的“asynchronouscallback”方法。

添加第三个 (:))的解决scheme后,你澄清了你寻求什么行为,我build议使用光纤 。

纤维让你可以在nodejs中做协同例程 。 协程是允许多个进入/退出点的function。 这意味着你将能够控制并恢复它,只要你愿意。

这是官方文档中的一个sleep函数,就是睡眠一段时间并执行操作。

 function sleep(ms) { var fiber = Fiber.current; setTimeout(function() { fiber.run(); }, ms); Fiber.yield(); } Fiber(function() { console.log('wait... ' + new Date); sleep(1000); console.log('ok... ' + new Date); }).run(); console.log('back in main'); 

您可以将等待资源的代码放入函数中,使其产生并在任务完成时再次运行。

例如,从这个问题调整你的例子:

 var pausedExecution, importantData; function getImportantData() { while (importantData === undefined) { pausedExecution = Fiber.current; Fiber.yield(); pausedExecution = undefined; } if (importantData === null) { throw new Error("Data could not be generated."); } else { // we should have proper data now return importantData; } } function callback(partialDataMessage) { if (partialDataMessage.needsCorrection) { var theData = getImportantData(); // use data to correct message process.send(correctedMessage); // send corrected result to main process } else { process.send(partialDataMessage); // send unmodified result to main process } } function executeCode(code) { // setup child process to calculate the data importantDataCalculator = fork("./runtime"); importantDataCalculator.on("message", function (msg) { if (msg.type === "result") { importantData = msg.data; } else if (msg.type === "error") { importantData = null; } else { throw new Error("Unknown message from dataGenerator!"); } if (pausedExecution) { // execution is waiting for the data pausedExecution.run(); } }); // wrap the execution of the code in a Fiber, so it can be paused Fiber(function () { runCodeWithCallback(code, callback); // the callback will be called from time to time when the code produces new data // this callback is synchronous and blocking, // but it will yield control to the event loop if it has to wait for the child-process to finish }).run(); } 

祝你好运! 我总是说用三种方法解决一个问题比用同样的方法解决三个问题要好。 我很高兴能够为你工作。 诚然,这是一个非常有趣的问题。

asynchronous编程的规则是,一旦你input了asynchronous代码,你就必须继续使用asynchronous代码。 虽然可以通过setImmediate或类似的方法反复调用该函数,但仍然存在尝试从asynchronous进程return的问题。

在不了解程序的情况下,我无法确切地告诉你应该如何构build它,但是从一个涉及asynchronous代码的进程中“返回”数据的方式总的来说就是传回一个callback。 也许这会让你走上正轨:

 function getImportantData(callback) { importantDataCalculator = fork("./runtime"); importantDataCalculator.on("message", function (msg) { if (msg.type === "result") { callback(null, msg.data); } else if (msg.type === "error") { callback(new Error("Data could not be generated.")); } else { callback(new Error("Unknown message from sourceMapGenerator!")); } }); } 

然后你可以像这样使用这个函数:

 getImportantData(function(error, data) { if (error) { // handle the error somehow } else { // `data` is the data from the forked process } }); 

我在其中一个屏幕video“ asynchronous思考”中对此进行了更详细的讨论。

你遇到的情况是一个非常普遍的情况,熟练的程序员经常以nodej开始。

你是对的 你不能这样做你试图(循环)。

node.js中的主要过程是单线程的, 并且阻塞了事件循环。

解决这个问题的最简单方法是这样的:

 function getImportantData() { if(importantData === undefined){ // not set yet setImmediate(getImportantData); // try again on the next event loop cycle return; //stop this attempt } if (importantData === null) { throw new Error("Data could not be generated."); } else { // we should have a proper data now return importantData; } } 

我们正在做的是,函数正在使用setImmediate重新尝试在事件循环的下一次迭代中处理数据。

这引入了一个新的问题,但你的函数返回一个值。 由于它没有准备好,所以你返回的值是未定义的。 所以你必须 被动地编码。 你需要告诉你的代码数据到达该怎么做。

这通常在具有callback的节点中完成

 function getImportantData(err,whenDone) { if(importantData === undefined){ // not set yet setImmediate(getImportantData.bind(null,whenDone)); // try again on the next event loop cycle return; //stop this attempt } if (importantData === null) { err("Data could not be generated."); } else { // we should have a proper data now whenDone(importantData); } } 

这可以用以下方式

 getImportantData(function(err){ throw new Error(err); // error handling function callback }, function(data){ //this is whenDone in our case //perform actions on the important data }) 

你的问题(更新)非常有趣,它似乎与我asynchronous捕获exception的问题密切相关。 (Brandon和Ihad也跟我讨论过这个小小的世界)

看到这个问题如何asynchronous捕获exception。 关键的概念是,您可以使用(假设nodejs 0.8+) nodejs域来约束exception的范围。

这将允许您轻松获取exception的位置,因为您可以用atry/catch包围asynchronous块。 我认为这应该解决更大的问题。

你可以在链接的问题中find相关的代码。 用法是这样的:

 atry(function() { setTimeout(function(){ throw "something"; },1000); }).catch(function(err){ console.log("caught "+err); }); 

由于您可以访问atry的范围, atry您可以在那里获取堆栈跟踪,从而可以跳过更复杂的源地图使用情况。

祝你好运!