Socket.io,redis和redis消息队列

我正在使用基于以下代码的SocketIO和Redis,

var sub = redis.createClient(); var pub = redis.createClient(); sub.subscribe('chat'); io.use(socketHandshake({store: sessionStore, key:'jsessionid', secret:'secret', parser:cookieParser()})); io.on('connection', function (socket) { socket.on('chat', function (message) { // io.emit('chat', "hello world"); pub.publish('chat', "hello world"); }); sub.on('message', function (channel, message) { io.emit(channel, message); }); }); 

这是基本代码。 我已经修改了代码,以便如果有任何用户下线,在服务器端,我在RSMQ(Redis简单消息队列)中存储消息,当用户上线时,消息从队列中提取并发送给用户。我已经使用下面的代码来实现这一点。我已经将用户状态存储在一个数组中。

  var fs = require('fs') , http = require('http') , socketio = require('socket.io'); var redis = require('redis'); var store = redis.createClient(); var pub = redis.createClient(); var sub = redis.createClient(); RedisSMQ = require("rsmq"); rsmq = new RedisSMQ( {host: "127.0.0.1", port: 6379, ns: "rsmq"} ); var active_users=[]; var inactive_users=[]; var user_status=[]; var channel_users=[]; var users_queue=[]; var socket_ids=[]; var cname,qn; var clients=[]; var server = http.createServer(function(req, res) { res.writeHead(200, { 'Content-type': 'text/html'}); res.end(fs.readFileSync(__dirname + '/index.html')); }).listen(9000, function() { console.log('Listening at: http://localhost:9000'); }); socketio.listen(server).on('connection', function (socket) { socket.on('login', function(data){ console.log('a user ' + data.userId + ' connected'+socket.id); //saving userId to array with socket ID active_users[socket.id] = data.userId; socket_ids[data.userId]=socket.id; clients[socket.id] = socket; user_status[data.userId]="online"; }); socket.on('message', function (msg) { console.log('Message Received: ', msg); socket.broadcast.emit('message', msg); }); socket.on('json', function (msg) { if(msg.channel_name=='UserState'){ rsmq.listQueues(function(err,resp){ //console.log("QUEUES LIST"+resp); }); if(msg.user_state=='active'){ store.hmset("active_users."+msg.sender_id,{"user":"online"}); user_status[msg.sender_id]="online";console.log(user_status); if(users_queue[msg.sender_id]!=undefined && users_queue[msg.sender_id].length>0){ console.log("USERS QUEUE:"+users_queue[msg.sender_id]['0']); for(var i=0;i<users_queue[msg.sender_id].length;i++){ cname=users_queue[msg.sender_id][i].split('_')[0];//get channel name from queue name qn=users_queue[msg.sender_id][i]; rsmq.getQueueAttributes({qname:users_queue[msg.sender_id][i]},function(err,resp){ console.log("RESP:"+resp.msgs); if(resp.msgs>0){ //if there are messages in queue...... for(var j=0;j<resp.msgs;j++){ rsmq.popMessage({qname:qn},function(err,resp){ console.log(resp); var sid=socket_ids[msg.sender_id]; console.log("SOCKETID:"+sid); //get socket.id for the user pub.publish(cname,resp.message); }); } } }); } } } else{ store.hmset("active_users."+msg.sender_id,{"status":"offline"}); user_status[msg.sender_id]="offline"; } } if(msg.channel_name=='ShareConversation'){ var channel=msg.conversations_data.conversation_id;//have to change to conversation_id or whatever channel..... sub.subscribe(channel); channel_users[channel]=[]; var m=msg.conversations_data.users.split(','); for(var i=0;i<m.length;i++){ channel_users[channel].push(m[i]); } for(var i=0;i<channel_users[channel].length;i++){ var q=channel_users[channel][i].split('@')[0].replace(/(^\s+|\s+$)/g, ''); var queue_name=channel+"_"+q;console.log(queue_name); var uname=channel_users[channel][i].replace(/(^\s+|\s+$)/g, ''); users_queue[uname]=[]; users_queue[uname].push(queue_name); rsmq.createQueue({qname:queue_name}, function (err, resp) { console.log(err); console.log(queue_name); if (resp===1) { console.log("queue created"); } }); } } socket.broadcast.emit('json', msg); }); sub.on('message', function (channel, message) { console.log("Message: " + message); for(var i=0;i<channel_users[channel].length;i++){ var c=channel_users[channel][i].replace(/(^\s+|\s+$)/g, '');console.log("channel_users:"+channel_users[channel][i]);console.log("USER STATE :"+ user_status[c]); if(user_status[c]=='offline'){ //send notification......... //put messages in queue....... var q=channel_users[channel][i].split('@')[0].replace(/(^\s+|\s+$)/g, ''); var queue_name=channel+"_"+q;console.log(queue_name); rsmq.sendMessage({qname:queue_name, message:message}, function (err, resp) { console.log(err); if (resp) { console.log("Message sent. ID:", resp); } }); } } socket.emit(channel, message); }); }); 

这是我的整个代码。 这里的问题是当用户下线时,消息被多次保存在队列中,当用户上线时,多次接收到消息,因为队列中存有重复的消息。 如何克服这一点,请帮助….