NodeJS |集群:如何将数据从主数据发送到所有或单个子/员工?

我从节点工作(股票)脚本

var cluster = require('cluster'); var http = require('http'); var numReqs = 0; if (cluster.isMaster) { // Fork workers. for (var i = 0; i < 2; i++) { var worker = cluster.fork(); worker.on('message', function(msg) { if (msg.cmd && msg.cmd == 'notifyRequest') { numReqs++; } }); } setInterval(function() { console.log("numReqs =", numReqs); }, 1000); } else { // Worker processes have a http server. http.Server(function(req, res) { res.writeHead(200); res.end("hello world\n"); // Send message to master process process.send({ cmd: 'notifyRequest' }); }).listen(8000); } 

在上面的脚本中,我可以轻松地将数据从工作人员发送到主进程。 但是,如何将数据从主人发送给工人? 用例子,如果可能的话。

因为cluster.fork是在child_process.fork的基础上实现的,所以你可以使用worker.send({ msg: 'test' })从worker发送消息给worker, process.send({ msg: 'test' }); 。 你会收到这样的消息: worker.on('message', callback) (从worker到master)和process.on('message', callback); (从主人到工人)。

这里是我的完整示例,可以通过浏览http:// localhost:8000 /来testing它,然后工作人员将向主服务器发送消息,主服务器将回复:

 var cluster = require('cluster'); var http = require('http'); var numReqs = 0; var worker; if (cluster.isMaster) { // Fork workers. for (var i = 0; i < 2; i++) { worker = cluster.fork(); worker.on('message', function(msg) { // we only want to intercept messages that have a chat property if (msg.chat) { console.log('Worker to master: ', msg.chat); worker.send({ chat: 'Ok worker, Master got the message! Over and out!' }); } }); } } else { process.on('message', function(msg) { // we only want to intercept messages that have a chat property if (msg.chat) { console.log('Master to worker: ', msg.chat); } }); // Worker processes have a http server. http.Server(function(req, res) { res.writeHead(200); res.end("hello world\n"); // Send message to master process process.send({ chat: 'Hey master, I got a new request!' }); }).listen(8000); } 

我发现这个线程,同时寻找一种方式发送消息到所有subprocess,并感谢能够找出感谢数组的意见。 只是想说明使用这种方法向所有儿童进程发送消息的潜在解决scheme。

 var cluster = require('cluster'); var http = require('http'); var numReqs = 0; var workers = []; if (cluster.isMaster) { // Broadcast a message to all workers var broadcast = function() { for (var i in workers) { var worker = workers[i]; worker.send({ cmd: 'broadcast', numReqs: numReqs }); } } // Fork workers. for (var i = 0; i < 2; i++) { var worker = cluster.fork(); worker.on('message', function(msg) { if (msg.cmd) { switch (msg.cmd) { case 'notifyRequest': numReqs++; break; case 'broadcast': broadcast(); break; } }); // Add the worker to an array of known workers workers.push(worker); } setInterval(function() { console.log("numReqs =", numReqs); }, 1000); } else { // React to messages received from master process.on('message', function(msg) { switch(msg.cmd) { case 'broadcast': if (msg.numReqs) console.log('Number of requests: ' + msg.numReqs); break; } }); // Worker processes have a http server. http.Server(function(req, res) { res.writeHead(200); res.end("hello world\n"); // Send message to master process process.send({ cmd: 'notifyRequest' }); process.send({ cmd: 'broadcast' }); }).listen(8000); } 

以下是我如何实现类似问题的解决scheme。 通过挂接到cluster.on('fork') ,您可以将消息处理程序附加到工作人员,因为它们是分叉的(而不是将其存储在数组中),这具有处理工人死亡或断开连接的情况以及新工人分叉。

这段代码会向主人发送一条消息。

 if (cluster.isMaster) { for (var i = 0; i < require('os').cpus.length; i++) { cluster.fork(); } cluster.on('disconnect', function(worker) { cluster.fork(); } // When a new worker process is forked, attach the handler // This handles cases where new worker processes are forked // on disconnect/exit, as above. cluster.on('fork', function(worker) { worker.on('message', messageRelay); } var messageRelay = function(msg) { Object.keys(cluster.workers).forEach(function(id) { cluster.workers[id].send(msg); }); }; } else { process.on('message', messageHandler); var messageHandler = function messageHandler(msg) { // Worker received message--do something }; } 

你应该能够从主人发送消息给工人这样的:

 worker.send({message:'hello'}) 

因为“cluster.fork是在child_process.fork之上实现的”(cluster.fork是在child_process.fork之上实现的)

我理解你的目的是向集群中的所有节点工作进程进行广播,尽pipe你不能发送套接字组件,但是有一个解决方法可以提供服务。 我会用一个例子来解释一下:

步骤1:当客户端操作需要广播时:

 Child.js (Process that has been forked) : socket.on("BROADCAST_TO_ALL_WORKERS", function (data) { process.send({cmd : 'BROADCAST_TO_ALL_WORKERS', message :data.message}); }) 

第2步:在集群创build方面

 Server.js (Place where cluster forking happens): if (cluster.isMaster) { for (var i = 0; i < numCPUs; i++) { 

var worker = cluster.fork();

  worker.on('message', function (data) { if (data.cmd === "BROADCAST_TO_ALL_WORKERS") { console.log(server_debug_prefix() + "Server Broadcast To All, Message : " + data.message + " , Reload : " + data.reload + " Player Id : " + data.player_id); Object.keys(cluster.workers).forEach(function(id) { cluster.workers[id].send({cmd : "BROADCAST_TO_WORKER", message : data.message}); }); } }); } cluster.on('exit', function (worker, code, signal) { var newWorker = cluster.fork(); newWorker.on('message', function (data) { console.log(data); if (data.cmd === "BROADCAST_TO_ALL_WORKERS") { console.log(data.cmd,data); Object.keys(cluster.workers).forEach(function(id) { cluster.workers[id].send({cmd : "BROADCAST_TO_WORKER", message : data.message}); }); } }); }); } else { //Node Js App Entry require("./Child.js"); } 

第3步:在subprocess中广播 –

– >把它放在Child.js的io.on(“连接”)之前

 process.on("message", function(data){ if(data.cmd === "BROADCAST_TO_WORKER"){ io.sockets.emit("SERVER_MESSAGE", { message: data.message, reload: data.reload, player_id : data.player_id }); } }); 

我希望这有帮助。 请让我知道是否需要更多的澄清。