头文件在Node.js中使用RabbitMQ交换示例

我一直在寻找在Node.js中使用RabbitMQheaders exchange示例。 如果有人能把我指向正确的方向,那就太好了。 以下是我到目前为止:

发布者方法 (创build发布者)

 RabbitMQ.prototype.publisher = function(exchange, type) { console.log('New publisher, exchange: '+exchange+', type: '+type); amqp.then(function(conn) { conn.createConfirmChannel().then(function(ch) { publishers[exchange] = {}; publishers[exchange].assert = ch.assertExchange(exchange, type, {durable: true}); publishers[exchange].ch = ch; }); },function(err){ console.error("[AMQP]", err.message); return setTimeout(function(){ self.connect(URI); }, 1000); }).then(null, console.log); }; 

发布方法

 RabbitMQ.prototype.publish = function(exchange, routingKey, content, headers) { try { publishers[exchange].assert.then(function(){ publishers[exchange].ch.publish(exchange, routingKey, new Buffer(content), { persistent: true, headers: headers }, function(err, ok) { if (err) { console.error("[AMQP] publish", err); offlinePubQueue.push([exchange, routingKey, content]); publishers[exchange].ch.connection.close(); } }); }); } catch (e) { console.error("[AMQP] publish", e.message); offlinePubQueue.push([exchange, routingKey, content]); } }; 

消费方法 (创build消费者)

 RabbitMQ.prototype.consumer = function(exchange, type, routingKey, cb) { amqp.then(function(conn) { conn.createChannel().then(function(ch) { var ok = ch.assertExchange(exchange, type, {durable: true}); ok.then(function() { ch.assertQueue('', {exclusive: true}); }); ok = ok.then(function(qok) { var queue = qok.queue; ch.bindQueue(queue,exchange,routingKey) }); ok = ok.then(function(queue) { ch.consume(queue, function(msg){ cb(msg,ch); }, {noAck: false}); }); ok.then(function() { console.log(' [*] Waiting for logs. To exit press CTRL+C.'); }); }); }).then(null, console.warn); }; 

上面的例子正常工作,但我不知道如何过渡到headers 。 我很确定我需要改变我的绑定方法,但一直没能find任何实例来完成这个任务。

任何帮助将不胜感激!

我偶然发现这个问题寻找amqplib相同的答案。 不幸的是,像你我发现所有可用的文件 缺乏 。 仔细查看源代码,稍微阅读一下协议,然后尝试几个组合,终于为我做了。

 ... let opts = { headers: { 'asd': 'request', 'efg': 'test' }}; chan.publish(XCHANGE, '', Buffer.from(output), opts); ... 

 ... let opts = { 'asd': 'request', 'efg': 'test', 'x-match': 'all' }; chan.bindQueue(q.queue, XCHANGE, '', opts); ... 

完整的工作代码如下。 下面的authentication信息是假的,所以你必须使用自己的。 我也使用ES6,nodejs版本6.5和amqplib。 可能有一个问题,给你的标题x-前缀和/或使用保留字作为标题名称,但我不太确定(我必须看到RabbitMQ源)。

emit.js:

 #!/usr/bin/env node const XCHANGE = 'headers-exchange'; const Q = require('q'); const Broker = require('amqplib'); let scope = 'anonymous'; process.on('uncaughtException', (exception) => { console.error(`"::ERROR:: Uncaught exception ${exception}`); }); process.argv.slice(2).forEach((arg) => { scope = arg; console.info('[*] Scope now set to ' + scope); }); Q.spawn(function*() { let conn = yield Broker.connect('amqp://root:root@localhost'); let chan = yield conn.createChannel(); chan.assertExchange(XCHANGE, 'headers', { durable: false }); for(let count=0;; count=++count%3) { let output = (new Date()).toString(); let opts = { headers: { 'asd': 'request', 'efg': 'test' }}; chan.publish(XCHANGE, '', Buffer.from(output), opts); console.log(`[x] Published item "${output}" to <${XCHANGE} : ${JSON.stringify(opts)}>`); yield Q.delay(500); } }); 

receive.js:

 #!/usr/bin/env node const Q = require('q'); const Broker = require('amqplib'); const uuid = require('node-uuid'); const Rx = require('rx'); Rx.Node = require('rx-node'); const XCHANGE = 'headers-exchange'; const WORKER_ID = uuid.v4(); const WORKER_SHORT_ID = WORKER_ID.substr(0, 4); Q.spawn(function*() { let conn = yield Broker.connect('amqp://root:root@localhost'); let chan = yield conn.createChannel(); chan.assertExchange(XCHANGE, 'headers', { durable: false }); let q = yield chan.assertQueue('', { exclusive: true }); let opts = { 'asd': 'request', 'efg': 'test', 'x-match': 'all' }; chan.bindQueue(q.queue, XCHANGE, '', opts); console.info('[*] Binding with ' + JSON.stringify(opts)); console.log(`[*] Subscriber ${WORKER_ID} (${WORKER_SHORT_ID}) is online!`); chan.consume(q.queue, (msg) => { console.info(`[x](${WORKER_SHORT_ID}) Received pub "${msg.content.toString()}"`); chan.ack(msg); }); });