amqplib – 即使调用channel.ack(msg)

我正在编写一个模块,充当amqplib的包装。 动机是,我们有一个现有的和明确的交换/队列/绑定设置,我只是想公开的消费方法,以允许消费传入的数据。

为此,我的模块需要一个forms为callback(channel, msg)的callback参数。 在模块中,build立交换和队列和绑定之后,我有以下几点

 module.exports = function (options, callback) { /* connection, exchange and queue set up here */ // consume messages from primary queue ok = ok.then(function() { var q = opts.pq; console.log('Subscribing to', q); return ch.consume(q, function (message) { callback(ch, message); }); }); return ok; }; 

在callback中,我正在处理消息,并在成功时调用channel.ack(msg)

一切正常运行,因为没有错误,但RabbitMQpipe理控制台显示所有处理的消息为Unacked。 如果我杀了我的应用程序,那么Unacked消息就会回到队列中(谢天谢地)。

为什么我的信息不被查看? 我在做错了吗? 我应该在呼叫中提供allUpTo参数为真ack

Env细节

 node -v v0.8.26 npm ls ... amqplib@0.1.1 

为了完整,用我发现的问题来回答我自己的问题。 这实际上是由于node-amqp和amqp.node之间的默认设置差异,以及我的客户中的一个稍微有缺陷的实现。

订阅队列时,默认情况下node-amqp使用prefetch=1 ,这意味着任何时候只有一条消息正在传输。 另一个将不会被传递,直到前一个被承认。 然而,amqp.node默认为prefetch=0 ,这意味着所有的消息尽可能快地发送给消费者,并且当消费者完成每个消息时,他们可以在将来某个时刻单独进行确认。

这是我在pipe理控制台中看到的,这引起了警报并导致了这个问题。 这个讨论可以在这个github问题中看到。

ASIDE – 考虑到上面的信息,我让我的消费者运行,并让它在消息准备好的时候发出信息。 这在我的实施中抛出了一个单独的问题。 考虑到我们正在传递的消息(即,一次),消息处理程序正在轰炸数据库,请求新的连接。 结果,数据库服务器超载,最终消费者死亡。 为了解决这个问题,我只是像以前一样切换到prefetch=1 ,这意味着从上面的消耗诺言方法如下

 module.exports = function (options, callback) { /* connection, exchange and queue set up here */ // consume messages from primary queue ok = ok.then(function() { var q = opts.pq; console.log('Subscribing to', q); ch.prefetch(1); // <-- only get 1 message at a time return ch.consume(q, function (message) { callback(ch, message); }, { noAck: false }); }); return ok; }; 

值得注意的是,如果有人面临类似的情况。