在RabbitMQ和节点上使用AMQP.Node不会消耗死字母消息

我想在我的工作人员中经过一段时间后收到一条消息。 发现所谓的死信交stream后,我决定和Node和RabbitMQ一起去。

该消息似乎被送到DeadExchange中的队列中,但消费者在WorkExchange中的WorkQueue中经过的时间之后永远不会收到消息。 bindQueue是closures的,或者死信不起作用?

我现在尝试了很多不同的值。 有人能指出我错过了什么吗?

var amqp = require('amqplib'); var url = 'amqp://dev.rabbitmq.com'; amqp.connect(url).then(function(conn) { //Subscribe to the WorkQueue in WorkExchange to which the "delayed" messages get dead-letter'ed (is that a verb?) to. return conn.createChannel().then(function(ch) { return ch.assertExchange('WorkExchange', 'direct').then(function() { return ch.assertQueue('WorkQueue', { autoDelete: false, durable: true }) }).then(function() { return ch.bindQueue('WorkQueue', 'WorkExchange', ''); }).then(function() { console.log('Waiting for consume.'); return ch.consume('WorkQueue', function(msg) { console.log('Received message.'); console.log(msg.content.toString()); ch.ack(msg); }); }); }) }).then(function() { //Now send a test message to DeadExchange to a random (unique) queue. return amqp.connect(url).then(function(conn) { return conn.createChannel(); }).then(function(ch) { return ch.assertExchange('DeadExchange', 'direct').then(function() { return ch.assertQueue('', { arguments: { 'x-dead-letter-exchange': 'WorkExchange', 'x-message-ttl': 2000, 'x-expires': 10000 } }) }).then(function(ok) { console.log('Sending delayed message'); return ch.sendToQueue(ok.queue, new Buffer(':)')); }); }) }).then(null, function(error) { console.log('error\'ed') console.log(error); console.log(error.stack); }); 

我正在使用amqp.node( https://github.com/squaremo/amqp.node ),它是在npm amqplib。 尽pipenode-amqp( https://github.com/postwait/node-amqp )似乎更受欢迎,但它并没有实现完整的协议,并且在重新连接方面还存在一些悬而未决的问题。

dev.rabbitmq.com正在运行RabbitMQ 3.1.3。

这是一个工作代码。当消息在DeadExchange中花费超过ttl时,它将被推送到WorkExchange。 成功的关键是定义正确的路由密钥。 您希望发送到ttl后的交换队列应该以路由键(note:not default)为界,而'x-dead-letter-routing-key'属性值应与该路由键相匹配。

 var amqp = require('amqplib'); var url = 'amqp://localhost'; amqp.connect(url).then(function(conn) { //Subscribe to the WorkQueue in WorkExchange to which the "delayed" messages get dead-letter'ed (is that a verb?) to. return conn.createChannel().then(function(ch) { return ch.assertExchange('WorkExchange', 'direct').then(function() { return ch.assertQueue('WorkQueue', { autoDelete: false, durable: true }) }).then(function() { return ch.bindQueue('WorkQueue', 'WorkExchange', 'rk1'); }).then(function() { console.log('Waiting for consume.'); return ch.consume('WorkQueue', function(msg) { console.log('Received message.'); console.log(msg.content.toString()); ch.ack(msg); }); }); }) }).then(function() { //Now send a test message to DeadExchange to DEQ queue. return amqp.connect(url).then(function(conn) { return conn.createChannel(); }).then(function(ch) { return ch.assertExchange('DeadExchange', 'direct').then(function() { return ch.assertQueue('DEQ', { arguments: { 'x-dead-letter-exchange': 'WorkExchange', 'x-dead-letter-routing-key': 'rk1', 'x-message-ttl': 15000, 'x-expires': 100000 } }) }).then(function() { return ch.bindQueue('DEQ', 'DeadExchange', ''); }).then(function() { console.log('Sending delayed message'); return ch.publish('DeadExchange', '', new Buffer("Over the Hills and Far Away!")); }); }) }).then(null, function(error) { console.log('error\'ed') console.log(error); console.log(error.stack); }); 

AMQP.Node中的Channel#assertQueue中存在一个错误,请参阅https://github.com/squaremo/amqp.node/commit/3749c66b448875d2df374e6a89946c0bdd0cb918 。 修正是在GitHub上,但不是在npm中。

Interesting Posts