RabbitMQ:如何限制消费率

我需要限制从rabbitmq队列中消费消息的速度。

我发现了很多build议,但是其中大多数提供使用预取选项。 但是这个选项并没有做我所需要的。 即使我将预取设置为1,速率也是大约6000个信息/秒。 这对消费者来说太多了。

我需要每秒限制大约70到200条消息。 这意味着每5-14ms消耗一个消息。 没有同时消息。

我正在使用Node.JS和amqp.node库。

实现令牌桶可能有助于: https : //en.wikipedia.org/wiki/Token_bucket

你可以写一个生产者,以一个固定的速率在消息上生成一个TTL的消息(也许会在一秒之后过期),或者只是设置一个最大队列大小等于你每秒的速率。 接收“正常队列”消息的消费者还必须接收“令牌桶队列”消息,以便有效地处理消息速率限制应用。

NodeJS + amqplib示例:

var queueName = 'my_token_bucket'; rabbitChannel.assertQueue(queueName, {durable: true, messageTtl: 1000, maxLength: bucket.ratePerSecond}); writeToken(); function writeToken() { rabbitChannel.sendToQueue(queueName, new Buffer(new Date().toISOString()), {persistent: true}); setTimeout(writeToken, 1000 / bucket.ratePerSecond); } 

我已经find了一个解决scheme。

我使用npm中的模块nanotimer来计算延迟。

然后我以纳秒计算delay = 1 / [message_per_second]。

然后我使用prefetch = 1的消息

然后我计算真正的延迟作为延迟 – [processing_message_time]

然后,我发送消息之前确实延迟超时=

它完美的作品。 谢谢大家

我不认为RabbitMQ可以为您提供这个function。

如果你只有一个消费者,那么整个事情都很简单,你只需要在消费者之间进行rest。

如果你有多个消费者,我build议你使用一些“共享内存”来保持这个速度。 例如,您可能有10个消费者使用消息。 为了保持70-200的消息速率,你会打电话给Redis,看你是否有资格处理消息。 如果是,则更新Redis,以向其他消费者显示当前正在处理一条消息。

如果您对消费者没有控制权,则执行选项1或2,并将消息发回给Rabbit。 这样,原始消费者将以期望的速度消费消息。

请参阅RabbitMQ文档中的 “公平发送”。

例如在有两个工人的情况下,当所有奇怪的信息都很重,甚至信息很轻时,一个工作人员就会一直很忙,另一个工作人员几乎没有任何工作。 那么,RabbitMQ不知道任何关于这个,并将仍然均匀地发送消息。

发生这种情况是因为RabbitMQ只在消息进入队列时调度消息。 它没有考虑消费者未确认消息的数量。 它只是盲目地把第n条消息分发给第n个消费者。

为了解决这个问题,我们可以使用值为1的预取方法。这告诉RabbitMQ一次不能给一个工作者多个消息。 或者换句话说,不要向工作人员发送新消息,直到处理并确认了前一个消息。 相反,它会将其分派给下一个还不忙的工作人员。