在Node.js中执行并行处理的最佳方法

我正在尝试编写一个小型节点应用程序来search和parsing文件系统上的大量文件。 为了加快search,我们正试图使用​​某种地图缩小。 该计划将是以下简化的情况:

  • networking请求带有search查询
  • 开始3个进程,每个进程分配1000个(不同的)文件
  • 一旦一个进程完成,它将返回结果返回到主线程
  • 一旦所有进程完成,主线程将继续返回组合结果作为JSON结果

我有这个问题是: 这是可以在节点? 推荐的方法是什么?

我一直在摆弄,但没有进一步,然后下面的例子使用过程 :

发起者:

function Worker() { return child_process.fork("myProcess.js); } for(var i = 0; i < require('os').cpus().length; i++){ var process = new Worker(); process.send(workItems.slice(i * itemsPerProcess, (i+1) * itemsPerProcess)); } 

myProcess.js

 process.on('message', function(msg) { var valuesToReturn = []; // Do file reading here //How would I return valuesToReturn? process.exit(0); } 

很less有意见:

  • 我知道进程的数量应取决于服务器上的CPU数量
  • 我也知道文件系统中的速度限制。 在将它移动到数据库或Lucene实例之前,请考虑它的一个概念certificate:-)

应该是可行的。 举一个简单的例子:

 // parent.js var child_process = require('child_process'); var numchild = require('os').cpus().length; var done = 0; for (var i = 0; i < numchild; i++){ var child = child_process.fork('./child'); child.send((i + 1) * 1000); child.on('message', function(message) { console.log('[parent] received message from child:', message); done++; if (done === numchild) { console.log('[parent] received all results'); ... } }); } // child.js process.on('message', function(message) { console.log('[child] received message from server:', message); setTimeout(function() { process.send({ child : process.pid, result : message + 1 }); process.disconnect(); }, (0.5 + Math.random()) * 5000); }); 

所以父进程产生了X个subprocess,并向它们传递一个消息。 它还安装一个事件处理程序来侦听从子项发回的任何消息(例如结果)。

subprocess等待来自父进程的消息,并开始处理(在这种情况下,它只是启动一个随机超时的计时器来模拟正在进行的一些工作)。 一旦完成,它将结果发送回父进程,并使用process.disconnect()从父进程断开连接(基本上停止subprocess)。

父进程跟踪启动的subprocess的数量,以及已经发回结果的数量。 当这些数字相等时,父进程从subprocess收到所有结果,因此它可以合并所有结果并返回JSON结果。

对于这样的分布式问题,我已经使用了zmq,它工作得很好。 我会给你一个类似的问题,我遇到了,并试图通过进程解决(但失败),然后转向zmq。

使用bcrypt或昂贵的散列algorithm是明智的,但它会阻止节点进程大约0.5秒。 我们必须卸载这个到不同的服务器,作为一个快速修复,我基本上使用了你所做的。 运行一个subprocess并发送消息给它并让它响应。 我们发现的唯一的问题是,无论什么原因,当我们的subprocess完全没有工作时,我们的subprocess会把一个完整的核心关联起来(我还没有想出为什么发生这种情况,我们跑了一个踪迹,似乎epoll在标准输出上失败/ stdinstream,这也只会发生在我们的Linux机器上,并且可以在OSX上正常工作。)

编辑:

核心的固定是在https://github.com/joyent/libuv/commit/12210fe固定,并与https://github.com/joyent/node/issues/5504有关,所以如果你遇到问题而你使用的是centos + kernel v2.6.32:更新节点,或者更新你的内核!

不pipe我使用child_process.fork()的问题如何,我总是使用一个漂亮的模式

客户:

 var child_process = require('child_process'); function FileParser() { this.__callbackById = []; this.__callbackIdIncrement = 0; this.__process = child_process.fork('./child'); this.__process.on('message', this.handleMessage.bind(this)); } FileParser.prototype.handleMessage = function handleMessage(message) { var error = message.error; var result = message.result; var callbackId = message.callbackId; var callback = this.__callbackById[callbackId]; if (! callback) { return; } callback(error, result); delete this.__callbackById[callbackId]; }; FileParser.prototype.parse = function parse(data, callback) { this.__callbackIdIncrement = (this.__callbackIdIncrement + 1) % 10000000; this.__callbackById[this.__callbackIdIncrement] = callback; this.__process.send({ data: data, // optionally you could pass in the path of the file, and open it in the child process. callbackId: this.__callbackIdIncrement }); }; module.exports = FileParser; 

subprocess:

 process.on('message', function(message) { var callbackId = message.callbackId; var data = message.data; function respond(error, response) { process.send({ callbackId: callbackId, error: error, result: response }); } // parse data.. respond(undefined, "computed data"); }); 

我们还需要一个模式来同步不同的进程,当每个进程完成任务时,它会响应我们,并且我们将为每个进程完成计数,然后在命中时调用信号量的callback我们想要的数量。

 function Semaphore(wait, callback) { this.callback = callback; this.wait = wait; this.counted = 0; } Semaphore.prototype.signal = function signal() { this.counted++; if (this.counted >= this.wait) { this.callback(); } } module.exports = Semaphore; 

这里有一个将所有上述模式联系在一起的用例:

 var FileParser = require('./FileParser'); var Semaphore = require('./Semaphore'); var arrFileParsers = []; for(var i = 0; i < require('os').cpus().length; i++){ var fileParser = new FileParser(); arrFileParsers.push(fileParser); } function getFiles() { return ["file", "file"]; } var arrResults = []; function onAllFilesParsed() { console.log('all results completed', JSON.stringify(arrResults)); } var lock = new Semaphore(arrFileParsers.length, onAllFilesParsed); arrFileParsers.forEach(function(fileParser) { var arrFiles = getFiles(); // you need to decide how to split the files into 1k chunks fileParser.parse(arrFiles, function (error, result) { arrResults.push(result); lock.signal(); }); }); 

最后,我使用了http://zguide.zeromq.org/page:all#The-Load-Balancing-Pattern ,客户端使用的是nodejs zmq客户端,而工人/经纪人则使用C语言编写。这使我们可以扩展这跨多台机器,而不是只有一个本地机器与subprocess。