我可以屈服于一个subprocess,并在Node.js中返回响应吗?

总之,我遇到了一个问题,即对我的Node.js服务器的多个并行GET请求导致服务器“堵塞”并挂起,从而导致客户端超时(503,服务不可用)。

经过大量的性能分析,我意识到这是一个CPU问题。 具体请求(我们称之为GET /foo )通过HTTP查询来自多个服务的数据,然后进行大量计算,并将结果返回给客户端,如下所示:

  1. 客户端请求GET /foo
  2. /foo控制器通过来自多个其他服务的HTTP查询数据
  3. /foo控制器然后对数据做一堆迭代来为客户端编译一些输出

步骤3需要大约2秒钟才能完成。 但是,如果我将2个请求并行发送到/foo ,则每个客户端将在大约4秒内收到响应。 当我在使用更多内核的集群中运行应用程序时,请求运行速度要快得多,但不是我想要的。

好像我有几个select在这里:

  1. 预先计算响应(理想情况是现在要避免这种情况,因为它需要整个“caching失效”scheme),或者
  2. /foo将CPU阻塞计算asynchronous发送到另一个进程(使用Heroku,所以这将是另一个testing),然后我可以使用websocket或其他东西将结果推送到客户端(对于我的情况也是非常复杂的),要么
  3. 以某种方式屈服于请求中的subprocess并将结果返回给客户端

会喜欢做选项3.像这样的东西:

 get('/foo', function*(request) { // I/O, so not blocking the event loop (I think) let data = yield getData(request) // make this happen in a different process let response = yield doSomeHeavyProcessing(data) return response }) 

我上面省略了很多实现细节,但是如果有必要知道的话,我正在使用Koa和Node.js 6。

理想情况下, doSomeHeavyProcessing将在一些单独的进程中执行CPU密集型计算,并且在完成时仍然以“同步”方式将结果发送回请求客户端。

一直试图围绕儿童进程,networking工作者,纤维等,并一直在做一些基本的“你好世界”,使他们基本上做到了这一点,但无济于事。 如果需要可以发布更多的细节。

以下是您可以尝试的一些方法:

1.以小块拆分块计算并使用setImmediate将下一块工作放在事件队列的末尾。 所以计算不再阻塞,其他请求可以被处理。

2.微软最近发布了napajs 。 正如他们的自述文件所述

随着它的发展,我们发现在CPU绑定的任务中补充Node.js是有用的,能够在多个V8隔离中执行JavaScript并在它们之间进行通信。

我没有尝试过,但看起来非常有希望:

 var napa = require('napajs'); var zone1 = napa.zone.create('zone1', { workers: 4 }); get('/foo', function*(request) { let data = yield getData(request) let response = yield zone1.execute(doSomeHeavyProcessing, [data]) return response }) 

3.如果以上所有内容都不够用,而且需要将负载分散到多台机器上,则可能无法避免使用某种消息队列将工作分配给不同的服务器。 在这种情况下,检查ZeroMQ 。 从节点使用非常容易,您可以使用它来实现任何types的分布式消息传递模式。

您可以使用附加包装的subprocess ,以方便。

worker.js – 这个模块将运行在一个单独的过程中,并将做繁重的工作

 const crypto = require('crypto'); function doHeavyWork(data) { return crypto.pbkdf2Sync(data, 'salt', 100000, 64, 'sha512'); } process.on('message', (message) => { const result = doHeavyWork(message.data); process.send({ id: message.id, result }); }); 

client.jssubprocess的一个方便(但是原始的)包装器

 const cp = require('child_process'); let worker; const resolves = new Map(); module.exports = { init(moduleName, errorCallback) { worker = cp.fork(moduleName); worker.on('error', errorCallback); worker.on('message', (message) => { const resolve = resolves.get(message.id); resolves.delete(message.id); if (!resolve) { errorCallback(new Error(`Got response from worker with unknown id: ${message.id}`)); return; } resolve(message.result); }); console.log(`Service PID: ${process.pid}, Worker PID: ${worker.pid}`); }, doHeavyWorkRemotly(data) { const id = `${Date.now()}${Math.random()}`; return new Promise((resolve) => { worker.send({ id, data }); resolves.set(id, resolve); }); } } 

我使用fork()来利用额外的通信渠道,正如文档中所述。

此外,我保存所有提交给工作进程请求( const resolves = new Map(); )的logging,并parsingPromises( resolve(message.result); )只有当工作进程返回特定请求的响应( const resolve = resolves.get(message.id); )。

run.js – 一个启动模块,它使用co来“执行”生成器。

 const co = require('co'); const client = require('./client'); function errorCallback(error) { console.log('Got an unexpected error!'); console.log(error); } client.init('./worker.js', errorCallback); function* run() { while(true) { yield client.doHeavyWorkRemotly('mydata'); } } co(run); 

要testing它只需运行node run.js ,它将打印

服务PID:XXXX,工人PID:XXXX

然后看一下CPU利用率,工作进程可能会占用CPU的100%左右,而服务将会相当闲置。