在Nodejs RabbitMQ服务器上的高性能

我正在build立一个分析系统,在同一时间内有100万用户在线。 我使用消息代理等RabbitMQ来降低服务器的容量

这是我的图

在这里输入图像说明

我的系统包括3个组件。

发行商服务器:(生产者)这个系统build立在nodejs上。 这个系统将消息发布到queue

RabbitMQ队列 :该系统存储publisher server发送的消息。 之后,打开一个连接以从subscriber server队列发送消息。

订户服务器(消费者) :该系统从queue接收消息

发布服务器源代码

 var amqp = require('amqplib/callback_api'); amqp.connect("amqp://localhost", function(error, connect) { if (error) { return callback(-1, null); } else { connect.createChannel(function(error, channel) { if (error) { return callback(-3, null); } else { var q = 'logs'; var msg = data; // object // convert msg object to buffer var new_msg = Buffer.from(JSON.stringify(msg), 'binary'); channel.assertExchange(q, 'fanout', { durable: false }); channel.publish(q, 'message_queues', new Buffer(new_msg)); console.log(" [x] Sent %s", new_msg); return callback(null, msg); } }); } }); 

创build与"fanout"专门交换"message_queues"向所有消费者发送广播

订户服务器源代码

 var amqp = require('amqplib/callback_api'); amqp.connect("amqp://localhost", function(error, connect) { if (error) { console.log('111'); } else { connect.createChannel(function(error, channel) { if (error) { console.log('1'); } else { var ex = 'logs'; channel.assertExchange(ex, 'fanout', { durable: false }); channel.assertQueue('message_queues', { exclusive: true }, function(err, q) { if (err) { console.log('123'); } else { console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue); channel.bindQueue(q.queue, ex, 'message_queues'); channel.consume(q.queue, function(msg) { console.log(" [x] %s", msg.content.toString()); }, { noAck: true }); } }); } }); } }); 

"message_queues"交换接收messge

当我执行发送消息。 系统工作正常,但是我尝试了这个系统的基准testing性能(每秒约有1000个用户发送请求),那么系统有一些问题。 系统看起来像是过载/缓冲区溢出(或者某些事情不太好)。

我只是在2天前阅读了有关rabbitmq的文章。 我知道它的教程是一个基本的例子,所以我需要帮助build立系统在现实世界比..任何解决scheme和build议

希望我的问题是有道理的

你的问题是一般的。 也许你应该提供更多的细节来帮助确定瓶颈和帮助你。 所以,首先我想你应该检查兔子mq – 不pipe是不是瓶颈。 有很多事情可能会出错:

  1. 可以使用该消息的消费者数量太less(我假设你使用了一群消费者)

  2. networking太慢了

  3. 队列和消息在Rabbit MQ的太多节点之间被复制并且去做磁盘(它可能使用象这样的兔子mq)

  4. 消费者不能真正处理消息,并且不断地重新排队

所以,一般来说在你的testing中,你应该检查一下兔子mq,看看里面发生了什么。

一旦发生这种情况,一旦到达队列中的消息处于就绪状态,它将一直在那里,直到连接到队列的消费者中的一个不会尝试接收消息来处理

当一个消费者(兔子在他们之间循环)挑选消息进行处理时,如果消费者未能处理消息,状态将变为Unacknowledged ,则它将被兔子重新排队,以便另一个消费者有机会处理消息。

当然,如果消费者成功地处理消息,则消息从兔子mq服务器消失。

假设你已经安装了rabbit mq web ui(我强烈推荐它特别适合初学者),你可以直观地看到你的队列中发生了什么 – 你会看到有多less消息处于就绪状态,有多less未被确认。 这将有助于确定一个瓶颈。

例如 – 如果您看到只有一条消息通常处于未确认状态,则可能意味着消费者无法处理该消息并将其发回给兔子。 另一方面,新的消息总是从生产者到达,所以准备好的消息的数量将增加得非常快。也可以指出,你只使用一个消费者,一次只能处理一个消息。 所以你可以考虑在这里平行,通过在不同的线程中运行许多消费者,甚至集中你的应用程序(在兔子消费者可以驻留在不同的机器)

当然,如前所述,如果您有更具体的问题,希望这有助于提供更多的信息,请提供更多关于testing过程中发生的事情的信息