Node.js AMQP扇出

我正在尝试构build一个分布式系统,我正在使用postwait / node-amqp 。 作为新手,我发现文档是非常无益的。

现在我的计划图:

Client -> Gate -> Q -> EatProcessor -> WalkProcessor -> Logger 

让我们说客户发送Start EatingGate收到它,并将其添加到Q正在eating的消息只有EatProcessor应该从Q获得消息。

所以让我们说EatProcessor:

 connection.queue('Q', {autoDelete: false}, function(queue){ queue.subscribe(function(msg){ // recv message }); }); 

那么我如何:

  • 让食物处理器只从Q中得到食物
  • 让步行处理器只从Q中获取走路的东西
  • logging器将从Q中获取所有内容,并只logging日志。

我正在阅读,也许门口应该是一个扇子? 但是我没有在文档中看到如何做扇出。

如果一个粉丝是否意味着每个人都会得到这个信息? 让我们说我还没有写logging器,我会写在function,但是当我写它时,我想直接听Q而不改变任何代码在Gate

好的,首先使用交stream。 这样你就可以同时发布到更多的队列。 我不知道用Node.js的RabbitMQ,但我认为这样的事情应该工作:

 // define a queue for each type connection.queue('eat-q', {autoDelete: false}, function(eat_queue) { // ... connection.queue('walk-q', {autoDelete: false}, function(eat_queue) { // ... connection.queue('log-q', {autoDelete: false}, function(log_queue) { // ... }); }); }); 

然后定义交换并进行绑定:

 connection.exchange('my-exchange', function(exchange) { eat_queue.bind('my-exchange', 'eat', function() { // ... }); walk_queue.bind('my-exchange', 'walk', function() { // ... }); log_queue.bind('my-exchange', 'walk', function() { // ... }); log_queue.bind('my-exchange', 'eat', function() { // ... }); }); 

最后你可以发布到交stream:

 exchange.publish('eat', 'my message', {}, function() { // ... }); 

该消息应该以eat-qlog-q结尾。

请注意,您应该在这里使用某种types的stream程库,如caolan的async.js或kriskowal的q,因为最终会导致大量的callback。 我更喜欢第二个。