计算跨水平服务器的socket.io用户

我有多个使用redisstore水平缩放的socket.io服务器。 我已经有效地设置了房间,并且能够成功地通过服务器等向房间进行广播。现在,我正在尝试构build一个状态页面,而我没有弄清楚的是如何简单地统计连接的用户数量所有服务器。

io.sockets.clients('room')和io.sockets.sockets将只告诉你在一台服务器上连接的客户端数量,而不是连接到同一个RedisStore的所有服务器数量。

build议?

谢谢。

当用户连接到聊天室时,可以自动增加RedisStore中的用户计数器。 当用户断开连接时,您将减小该值。 这样Redis保持用户数量,并且可以被所有服务器访问。

参见INCR和DECR

SET userCount = "0" 

当用户连接时:

 INCR userCount 

当用户断开连接时:

 DECR userCount 

这是我如何使用Redis脚本解决它。 它需要版本2.6或更高版本,所以现在很可能仍然需要编译自己的实例。

每次进程启动时,我都会生成一个新的UUID并将其保留在全局范围内。 我可以使用pid,但这感觉更安全一些。

 # Pardon my coffeescript processId = require('node-uuid').v4() 

当用户连接(socket.io连接事件)时,我然后将该用户的id推入基于该processId的用户列表中。 我也将该键的失效时间设置为30秒。

 RedisClient.lpush "process:#{processId}", user._id RedisClient.expire "process:#{processId}", 30 

当用户断开连接(断开事件)时,我将其删除并更新到期。

 RedisClient.lrem "process:#{processId}", 1, user._id RedisClient.expire "process:#{processId}", 30 

我还设置了一个运行在30秒间隔的function,以实质上“ping”该键,使其停留在那里。 所以如果这个过程意外死亡,所有这些用户会话将基本消失。

 setInterval -> RedisClient.expire "process:#{processId}", 30 , 30 * 1000 

现在的魔法。 Redis 2.6包含LUA脚本,它本质上提供了一种存储过程的function。 这真的很快,并不是处理器密集(他们把它比作“几乎”运行的C代码)。

我的存储过程基本上循环遍历所有进程列表,并创build一个user:user_id密钥,其总数为当前login名。 这意味着,如果他们使用两个浏览器login等,它仍然允许我使用逻辑来判断他们是否完全断开连接,或只是其中一个会话。

我在所有进程中每15秒运行一次该函数,并且在连接/断开连接事件之后运行。 这意味着我的用户数很可能会精确到秒,并且从不错误超过15到30秒。

生成该redis函数的代码如下所示:

 def = require("promised-io/promise").Deferred reconcileSha = -> reconcileFunction = " local keys_to_remove = redis.call('KEYS', 'user:*') for i=1, #keys_to_remove do redis.call('DEL', keys_to_remove[i]) end local processes = redis.call('KEYS', 'process:*') for i=1, #processes do local users_in_process = redis.call('LRANGE', processes[i], 0, -1) for j=1, #users_in_process do redis.call('INCR', 'user:' .. users_in_process[j]) end end " dfd = new def() RedisClient.script 'load', reconcileFunction, (err, res) -> dfd.resolve(res) dfd.promise 

然后我可以在以后的脚本中使用它:

 reconcileSha().then (sha) -> RedisClient.evalsha sha, 0, (err, res) -> # do stuff 

我所做的最后一件事是尝试并处理一些closures事件,以确保该进程尝试最好不要依赖redis超时,并且实际上正常closures。

 gracefulShutdown = (callback) -> console.log "shutdown" reconcileSha().then (sha) -> RedisClient.del("process:#{processId}") RedisClient.evalsha sha, 0, (err, res) -> callback() if callback? # For ctrl-c process.once 'SIGINT', -> gracefulShutdown -> process.kill(process.pid, 'SIGINT') # For nodemon process.once 'SIGUSR2', -> gracefulShutdown -> process.kill(process.pid, 'SIGUSR2') 

到目前为止,它一直在努力工作。

我还想做的一件事是让redis函数返回任何已经改变其值的键。 这样,如果某个特定用户的计数已经发生变化,而没有任何服务器主动知道(例如某个进程已经死亡),我实际上可以发送一个事件。 现在,我必须依靠轮询用户:*值再次知道它已更改。 它的作品,但可能会更好…

我解决了这个问题,让每个服务器周期性地在redis中设置一个用户数,并包含他们自己的pid:

每做setex userCount:<pid> <interval+10> <count>

那么状态服务器可以查询这些密钥中的每一个,然后获取每个密钥的值:

对于每个keys userCount* do total + = get <key>

所以如果一台服务器崩溃或者正在关机,那么在时间间隔+10之后,它的计数将会从redis中退出

对于这个丑陋的伪代码感到抱歉。 🙂

您可以使用散列键来存储值。

当用户连接到服务器1时,您可以在名为“userCounts”的密钥上设置名为“srv1”的字段。 只要将值覆盖到当前使用HSET的计数值即可。 无需增加/减less。 只需设置socket.io已知的当前值即可。

 HSET userCounts srv1 "5" 

当另一个用户连接到不同的服务器设置不同的字段。

 HSET userCounts srv2 "10" 

然后,任何服务器都可以通过返回“userCounts”中的所有字段并使用HVALS将它们添加到一起来返回值列表来获得总数。

 HVALS userCounts 

当服务器崩溃时,您需要运行一个脚本来响应从userCounts或HSET将该服务器字段删除为“0”的崩溃。

你可以看看永远自动重启服务器。