bramqp&node.js:取消使用者时的错误(basic.cancel)

首先,我是rabbitmq和bramqp的新手。 我知道这可能是一个愚蠢的问题,但我一直在拉我的头发,因为这个问题在消除队列中的消费者。 我search了整个互联网,发现没有bramqp如何做basic.cancel代码的例子。

这是我的代码:

var bramqp = require('bramqp'); var net = require('net'); var async = require('async'); var queueName = 'testQueue'; var consumerTag = 'testConsumer'; var exchangeName = 'testExchange'; var socket = net.connect({ port : 5672 }); bramqp.initialize(socket, 'rabbitmq/full/amqp0-9-1.stripped.extended', function(error, handle){ async.series([ function(seriesCallback) { handle.openAMQPCommunication('guest', 'guest', true, seriesCallback); }, function(seriesCallback) { handle.exchange.declare( 1 /*short reserved-1*/, exchangeName /*exchange-name exchange*/, 'fanout' /*shortstr type*/, false /*bit passive*/, true /*bit durable*/, false /*bit auto-delete*/, false /*bit internal*/, false /*no-wait no-wait*/, {} /*table arguments*/ ); handle.once('exchange.declare-ok', function(channel, method, data) { console.log('exchange declared'); seriesCallback(); }); }, function(seriesCallback) { handle.basic.qos( 1 /*long prefetch-size*/, 0 /*short prefetch-count*/, 1 /*...*/, false /*bit global*/ ); handle.once('basic.qos-ok', function(channel, method, data) { console.log('qos accepted'); seriesCallback(); }); }, function(seriesCallback) { handle.queue.declare( 1 /*short reserved-1*/, queueName /*queue-name queue*/, false /*bit passive*/, true /*bit durable*/, false /*bit exclusive*/, false /*bit auto-delete*/, false /*no-wait no-wait*/, {} /*table arguments*/ ); handle.once('queue.declare-ok', function(channel, method, data) { console.log('queue declared'); seriesCallback(); }); }, function(seriesCallback) { handle.queue.bind( 1 /*short reserved-1*/, queueName /*exchange-name destination*/, exchangeName /*exchange-name source*/, null /*shortstr routing-key*/, false /*no-wait no-wait*/, {} /*table arguments*/ ); handle.once('queue.bind-ok', function(channel, method, data) { console.log('queue bound sucessfully'); seriesCallback(); }); }, function(seriesCallback) { handle.basic.consume(1 /*short reserved-1*/, queueName /*queue-name queue*/, consumerTag, false /*no-local no-local*/, false /*no-ack no-ack*/, false /*bit exclusive*/, false /*no-wait no-wait*/, {} /*table arguments*/ ); handle.once('basic.consume-ok', function (channel, method, data) { console.log('consuming from queue'); console.log(data); handle.on('basic.deliver', function (channel, method, data) { console.log('incoming message'); console.log(data); handle.once('content', function (channel, className, properties, content) { console.log('got a message:'); console.log(content.toString()); if (content.toString().indexOf("END_MESSAGE") > -1){ handle.basic.cancel(consumerTag, false); handle.once('basic.cancel-ok', function(channel, method, data) { console.log("consumer cancelled successfully"); seriesCallback(); }); res.json("END_MESSAGE"); } else{ console.log('acking'); handle.basic.ack(1, data['delivery-tag']); } console.log('with properties:'); console.log(properties); seriesCallback(); }); }); }); } ], function() { console.log('all done'); }); }); 

所以,我试图做的只是简单地停止消费者,当它检测到消息'END_MESSAGE'types。 我从上面的代码得到的是以下错误:

  events.js:72 throw er; // Unhandled 'error' event ^ TypeError: value is out of bounds at TypeError (<anonymous>) at checkInt (buffer.js:705:11) at Buffer.writeUInt16BE (buffer.js:730:5) req.on('error', function(e){ console.log('problem with request: ' + e.message); 

任何意见和build议将不胜感激。 谢谢!

看起来basic.cancel()至less在今天看起来有些不同。 我在文档中发现这个方法实际上吃了一个callback,就像这样:

 handle.basic.cancel(consumerTag, function(err) { if (err) throw err; }); 

https://www.npmjs.com/package/bramqp-wrapper – searchbasic.cancel

重要! callback并不保证你不会从队列中得到更多的消息。 看来只是为了接受取消而不是被队列服务器消化。