Socket.io,集群,expression和同步事件

我有一个很大的问题。 我尝试将我的node.JS项目实际上在单核上运行到集群的多核。

有了websockets,在这一刻,我没有事件的问题,但对于xhr轮询或jsonp轮询,我有群集模式下的socket.io大问题。

这是我的服务器configuration:

00 generic.js

'use strict'; var http = require('http'), os = require('os'), cluster = require('cluster'); module.exports = function(done) { var app = this.express, port = process.env.PORT || 3000, address = '0.0.0.0'; if(this.env == 'test'){ port = 3030; } var self = this; var size = os.cpus().length; if (cluster.isMaster) { console.info('Creating HTTP server cluster with %d workers', size); for (var i = 0; i < size; ++i) { console.log('spawning worker process %d', (i + 1)); cluster.fork(); } cluster.on('fork', function(worker) { console.log('worker %s spawned', worker.id); }); cluster.on('online', function(worker) { console.log('worker %s online', worker.id); }); cluster.on('listening', function(worker, addr) { console.log('worker %s listening on %s:%d', worker.id, addr.address, addr.port); }); cluster.on('disconnect', function(worker) { console.log('worker %s disconnected', worker.id); }); cluster.on('exit', function(worker, code, signal) { console.log('worker %s died (%s)', worker.id, signal || code); if (!worker.suicide) { console.log('restarting worker'); cluster.fork(); } }); } else { http.createServer(app).listen(port, address, function() { var addr = this.address(); console.log('listening on %s:%d', addr.address, addr.port); self.server = this; done(); }); } }; 

03-socket.io.js

 "use strict"; var _ = require('underscore'), socketio = require('socket.io'), locomotive = require('locomotive'), RedisStore = require("socket.io/lib/stores/redis"), redis = require("socket.io/node_modules/redis"), v1 = require(__dirname + '/../app/socket.io/v1'), sockets = require(__dirname + '/../../app/socket/socket'), config = require(__dirname + '/../app/global'), cluster = require('cluster'); module.exports = function () { if (!cluster.isMaster) { this.io = socketio.listen(this.server); var pub = redis.createClient(), sub = redis.createClient(), client = redis.createClient(); this.io.enable('browser client minification'); // send minified client this.io.enable('browser client etag'); // apply etag caching logic based on version number this.io.enable('browser client gzip'); // gzip the file this.io.set("store", new RedisStore({ redisPub : pub, redisSub : sub, redisClient : client })); this.io.set('log level', 2); this.io.set('transports', [ 'websocket', 'jsonp-polling' ]); this.io.set('close timeout', 24*60*60); this.io.set('heartbeat timeout', 24*60*60); this.io.sockets.on('connection', function (socket) { console.log('connected with ' + this.io.transports[socket.id].name); // partie v1 @deprecated v1.events(socket); // partie v1.1 refaite _.each(sockets['1.1'], function(Mod) { var mod = new Mod(); mod.launch({ socket : socket, io : this.io }); }, this); }.bind(this)); } }; 

通过轮询,客户端不时地与不同于发起的监听者的进程连接。 同样,通信服务器向客户端发出。

有一点search,我发现有必要通过一个商店socket.io共享数据连接。 所以我build立了RedisStore的socket.io文件,但即使如此,我发现自己的事件没有安全到达,我仍然得到这个错误消息:

 warn: client not handshaken client should reconnect 

编辑

现在,警告错误不会被调用。 我将redisStore更改为socket.io-clusterhub但现在事件并不总是被调用。 有时,如果投票请求被另一名工作人员捕获,而不是开始收听者,所以没有任何反应。 这是新的configuration:

 'use strict'; var http = require('http'), locomotive = require('locomotive'), os = require('os'), cluster = require('cluster'), config = require(__dirname + '/../app/global'), _ = require('underscore'), socketio = require('socket.io'), v1 = require(__dirname + '/../app/socket.io/v1'), sockets = require(__dirname + '/../../app/socket/socket'); module.exports = function(done) { var app = this.express, port = process.env.PORT || 3000, address = '0.0.0.0'; if(this.env == 'test'){ port = 3030; } var self = this; var size = os.cpus().length; this.clusterStore = new (require('socket.io-clusterhub')); if (cluster.isMaster) { for (var i = 0; i < size; ++i) { console.log('spawning worker process %d', (i + 1)); cluster.fork(); } cluster.on('fork', function(worker) { console.log('worker %s spawned', worker.id); }); cluster.on('online', function(worker) { console.log('worker %s online', worker.id); }); cluster.on('listening', function(worker, addr) { console.log('worker %s listening on %s:%d', worker.id, addr.address, addr.port); }); cluster.on('disconnect', function(worker) { console.log('worker %s disconnected', worker.id); }); cluster.on('exit', function(worker, code, signal) { console.log('worker %s died (%s)', worker.id, signal || code); if (!worker.suicide) { console.log('restarting worker'); cluster.fork(); } }); } else { var server = http.createServer(app); this.io = socketio.listen(server); this.io.configure(function() { this.io.enable('browser client minification'); // send minified client this.io.enable('browser client etag'); // apply etag caching logic based on version number this.io.enable('browser client gzip'); // gzip the file this.io.set('store', this.clusterStore); this.io.set('log level', 2); this.io.set('transports', [ 'websocket', 'jsonp-polling' ]); //this.io.set('close timeout', 24*60*60); //this.io.set('heartbeat timeout', 24*60*60); }.bind(this)); this.io.sockets.on('connection', function (socket) { console.log('connected with ' + this.io.transports[socket.id].name); console.log('connected to worker: ' + cluster.worker.id); // partie v1 @deprecated v1.events(socket); // partie v1.1 refaite _.each(sockets['1.1'], function(Mod) { var mod = new Mod(); mod.launch({ socket : socket, io : this.io }); }, this); }.bind(this)); server.listen(port, address, function() { var addr = this.address(); console.log('listening on %s:%d', addr.address, addr.port); self.server = this; done(); }); } }; 

从这个来源: http : //socket.io/docs/using-multiple-nodes/

如果您计划在不同进程或机器之间分配连接负载,则必须确保与特定会话ID相关联的请求连接到源自它们的进程。

这是由于某些传输,如XHR轮询或JSONP轮询,依靠在“套接字”的生命周期内发出几个请求。

每次将连接路由到同一个工作者:

粘会议

在socket.io文档中,这是每次将请求路由到同一个工作者的推荐方法。

https://github.com/indutny/sticky-session

与群集一起使用socket.io的一个简单的高性能的方法。

Socket.io正在执行多个请求来执行握手并build立与客户端的连接。 对于一个集群,这些请求可能会到达不同的工作者,这将破坏握手协议。

 var sticky = require('sticky-sesion'); sticky(function() { // This code will be executed only in slave workers var http = require('http'), io = require('socket.io'); var server = http.createServer(function(req, res) { // .... }); io.listen(server); return server; }).listen(3000, function() { console.log('server started on 3000 port'); }); 

在节点之间传递消息:

socket.io,Redis的

这是在socket.io文档中推荐的在工作人员之间共享消息的方式。

https://github.com/automattic/socket.io-redis

通过使用socket.io-redis适配器运行socket.io,您可以在不同的进程或服务器上运行多个socket.io实例,这些进程或服务器可以相互广播和发送事件。

socket.io-redis是这样使用的:

 var io = require('socket.io')(3000); var redis = require('socket.io-redis'); io.adapter(redis({ host: 'localhost', port: 6379 })); 

我想你不是使用socket.io v1.0.0。 你可能想更新你的版本,以获得更多的稳定性。

你可以在http://socket.io/docs/migrating-from-0-9/查看他们的迁移指&#x5357;

使用时,socket.io文档中缺less一个步骤

 var io = require('socket.io')(3000); var redis = require('socket.io-redis'); io.adapter(redis({ host: 'localhost', port: 6379 })); 

您需要告诉客户端,您要使用“websockets”作为唯一的传输forms,否则将无法正常工作…所以对于客户端上的构造函数

 io.connect(yourURL , { transports : ['websocket']}); 

看到我在这里回答类似的问题(我的答案可能更适合这个线程): https : //stackoverflow.com/a/30791006/4127352

下面的代码适用于我,这是创build群集的socket.io,我将config.clusterSticky设置为true以激活兼容性群集和socket.io

 'use strict'; /* var cl = console.log; console.log = function(){ console.trace(); cl.apply(console,arguments); }; */ var cluster = require('cluster'), config = require('./config/all'), deferred = require('q').defer(), express = require('express'), app = express(), http = require('http'), sticky = require('socketio-sticky-session'), io = require('socket.io'); // Code to run if we're in the master process or if we are not in debug mode/ running tests if ((cluster.isMaster) && (process.execArgv.indexOf('--debug') < 0) && (process.env.NODE_ENV !== 'test') && (process.env.NODE_ENV !== 'development') && (process.execArgv.indexOf('--singleProcess') < 0) && (!config.clusterSticky)) { console.log('for real!'); // Count the machine's CPUs var cpuCount = process.env.CPU_COUNT || require('os').cpus().length; // Create a worker for each CPU for (var i = 0; i < cpuCount; i += 1) { console.log('forking ', i); cluster.fork(); } // Listen for dying workers cluster.on('exit', function (worker) { // Replace the dead worker, we're not sentimental console.log('Worker ' + worker.id + ' died :('); cluster.fork(); }); // Code to run if we're in a worker process } else { var port = config.http.port; var workerId = 0; if (!cluster.isMaster) { workerId = cluster.worker.id; } var server = http.createServer(app); io.listen(server); //TODO routes etc (core) server.on('listening', function () { console.log('Slave app started on port ' + port + ' (' + process.env.NODE_ENV + ') cluster.worker.id:', workerId); }); if(config.clusterSticky && (process.env.NODE_ENV !== 'test') && (process.env.NODE_ENV !== 'development')) { sticky(server).listen(port); } else { server.listen(port); } deferred.resolve(server); } module.exports = deferred.promise;