amqp.node不会检测到连接丢失

我们有一个运行socket.io服务器的nod​​e.js脚本,它的客户端使用RabbitMQ队列中的消息。 我们最近已经迁移到Amazon AWS,RabbitMQ现在是两台机器(冗余实例)的集群。 AMQP连接会不时丢失(这是一个限制,从具有冗余VM的高可用性环境到达,我们必须应付它),如果尝试重新连接,DNS将select要连接的实例它是一个具有数据复制的集群,因此连接哪个实例并不重要)。

问题是重新连接的尝试从来没有做过; 一段时间后,当连接丢失时,amqp.node显然没有注意到连接已经丢失。 此外,消费者停止接收消息,并且socket.io服务器停止接受新的连接。

我们在RabbitMQ URL处设置了55秒的心跳超时(不要与socket.io心跳超时相混淆),并使用amqp.node的callbackAPI检查“错误”和“closures”事件,但显然从未发出。 队列期望消耗的消息被消除。 我们希望节点脚本检测丢失的连接并自行完成,因此环境将自动启动新的进程并重新build立连接。

这里是代码,也许我们正在做一些错误的amqp.nodecallbackAPI或其他东西。

var express = require('express'); app = express(); var http = require('http'); var serverio = http.createServer(app); var io = require('socket.io').listen(serverio, { log: false }); var socket; var allcli = []; var red, blue, green, magenta, reset; red = '\033[31m'; blue = '\033[34m'; green = '\033[32m'; magenta = '\033[35m'; orange = '\033[43m'; reset = '\033[0m'; var queue = 'ha.atualizacao_mobile'; var urlRabbit = 'amqp://login:password@host?heartbeat=55' // Amazon var amqp = require('amqplib/callback_api'); var debug = true; console.log("Original Socket.IO heartbeat interval: " + io.get('heartbeat interval') + " seconds."); io.set('heartbeat interval', 10 * 60); console.log("Hearbeat interval changed to " + io.get('heartbeat interval') + " seconds to reduce battery consumption in the mobile clients."); console.log("Original Socket.IO heartbeat timeout: " + io.get('heartbeat timeout') + " seconds."); io.set('heartbeat timeout', 11 * 60); console.log("Heartbeat timeout set to " + io.get('heartbeat timeout') + " seconds."); io.sockets.on('connection', function(socket){ socket.on('error', function (exc) { console.log(orange+"Ignoring exception: " + exc + reset); }); socket.on('send-indice', function (data) { // Some business logic }); socket.on('disconnect', function () { // Some business logic }); }); function updatecli(data){ // Some business logic } amqp.connect(urlRabbit, null, function(err, conn) { if (err !== null) { return console.log("Error creating connection: " + err); } conn.on('error', function(err) { console.log("Generated event 'error': " + err); }); conn.on('close', function() { console.log("Connection closed."); process.exit(); }); processRabbitConnection(conn, function() { conn.close(); }); }); function processRabbitConnection(conn, finalize) { conn.createChannel(function(err, channel) { if (err != null) { console.log("Error creating channel: " + err); return finalize(); } channel.assertQueue(queue, null, function(err, ok) { if (err !== null) { console.log("Error asserting queue " + queue + ": " + err); return finalize(); } channel.consume(queue, function (msg) { if (msg !== null) { try { var dataObj = JSON.parse(msg.content); if (debug == true) { //console.log(dataObj); } updatecli(dataObj); } catch(err) { console.log("Error in JSON: " + err); } channel.ack(msg); } }, null, function(err, ok) { if (err !== null) { console.log("Error consuming message: " + err); return finalize(); } }); }); }); } serverio.listen(9128, function () { console.log('Server: Socket IO Online - Port: 9128 - ' + new Date()); }); 

显然这个问题已经解决了。 接近60秒的心跳是问题。 它与RabbitMQ负载平衡器冲突,每隔1分钟左右检查数据是否通过连接(如果没有数据通过,则断开连接)。 AMQP连接停止接收消息,库显然不会对此作出反应。 为了避免这种情况,需要较低的心跳(例如30秒)。