AMQP使用Node.js,我如何发布/订阅?

我正在做一个类来操作这里提供的Node-AMQP模块: https : //github.com/postwait/node-amqp

但是我不能使用这个发布/订阅:

var Queue = require('./Queue.js'); var queue = new Queue(); queue.addTaskToQueue('salut', 5); queue.subscribeTaskQueue('salut'); 

这里是我正在使用的类(我在CoffeeScript中给出代码,在Node.js中给那些不知道CoffeeScript的人):

谢谢你的帮助。

在CoffeeScript中:

 amqp = require('amqp') class Queue constructor: (ip = 'localhost') -> @ip = ip @receivedObject @connection = amqp.createConnection({ host: @ip }) @queue subscribeTaskQueue: (queueToSubscribe) -> self = @ self.connection.on('ready', -> q = self.connection.queue(queueToSubscribe) q.bind('#') q.subscribe({ ack: true }, (message) -> self.receivedObject = message console.log(self.receivedObject) ) ) addTaskToQueue: (queue, objectToSend) -> @queue = @connection.queue("salut", { durable: true }) @connection.publish(queue, objectToSend) module.exports = Queue 

在Node.js中

 (function() { var Queue, amqp; amqp = require('amqp'); Queue = (function() { function Queue(ip) { if (ip == null) { ip = 'localhost'; } this.ip = ip; this.receivedObject; this.connection = amqp.createConnection({ host: this.ip }); this.queue; } Queue.prototype.subscribeTaskQueue = function(queueToSubscribe) { var self; self = this; return self.connection.on('ready', function() { var q; q = self.connection.queue(queueToSubscribe); q.bind('#'); return q.subscribe({ ack: true }, function(message) { self.receivedObject = message; return console.log(self.receivedObject); }); }); }; Queue.prototype.addTaskToQueue = function(queue, objectToSend) { this.queue = this.connection.queue("salut", { durable: true }); return this.connection.publish(queue, objectToSend); }; return Queue; })(); module.exports = Queue; }).call(this); 

我做了你所说的话,我做了一些修改。

这是我的课Queue.coffee:

 amqp = require('amqp') class Queue constructor: (ip = "localhost", queueName = "salut") -> @ip = ip @receivedObject = "test" @connection = amqp.createConnection({ host: 'localhost' }) @queueName = queueName subscribeTaskQueue: () -> @connection.on('ready', -> q = @connection.queue(@queueName) q.bind('#') q.subscribe({ ack: true }, (message) -> @receivedObject = message console.log(@receivedObject) ) ) addTaskToQueue: (objectToSend = "hello") -> @connection.publish(@queueName, objectToSend) queue = new Queue("localhost", "salut") queue.connection.on 'ready', -> queue.addTaskToQueue 'salut', 5 queue.subscribeTaskQueue 'salut' 

这里是rabbituser.coffee:

 Queue = require('./Queue.js') queue = new Queue("localhost", "salut") queue.addTaskToQueue("hello") queue.subscribeTaskQueue() 

当我做的命令:节点rabbituser.js

我得到:

 node.js:134 throw e; // process.nextTick error, or 'error' event on first tick ^ TypeError: Cannot read property '' of undefined at Connection.exchange (/home/armand/node_modules/amqp/amqp.js:1242:21) at Connection.publish (/home/armand/node_modules/amqp/amqp.js:1258:60) at Queue.addTaskToQueue (/home/armand/Desktop/RockSolidProject/coucheAMQP/Queue.js:36:30) at Object.<anonymous> (/home/armand/Desktop/RockSolidProject/coucheAMQP/rabbituser.js:5:9) at Object.<anonymous> (/home/armand/Desktop/RockSolidProject/coucheAMQP/rabbituser.js:7:4) at Module._compile (module.js:402:26) at Object..js (module.js:408:10) at Module.load (module.js:334:31) at Function._load (module.js:293:12) at Array.<anonymous> (module.js:421:10) 

我不确定你遇到的错误是什么,但我会指出一些事情:

  1. 在你的构造函数中,当你写@queue@queue它什么也不做。 在JavaScript中,每个对象都是一个散列,并且可以随时附加属性; 如果@queue@queue最初在Queue实例中未定义,那么您不必在构造函数中定义它们。

  2. 是否有可能是在连接存在之前(即在self.connection.on 'ready'callback之前)在addTaskToQueue调用@connection.queue

也许如果你改变你的代码

 queue = new Queue() queue.connection.on 'ready', -> queue.addTaskToQueue 'salut', 5 queue.subscribeTaskQueue 'salut' 

如果这不能解决你的问题,你能否描述一下你遇到的错误以及你遇到的错误?

为了使用amqp作为发布/订阅者,现在可以使用rabbitmq-nodejs-client