Node.js AMQP扇出
我正在尝试构build一个分布式系统,我正在使用postwait / node-amqp 。 作为新手,我发现文档是非常无益的。
现在我的计划图:
Client -> Gate -> Q -> EatProcessor -> WalkProcessor -> Logger
让我们说客户发送Start Eating
。 Gate
收到它,并将其添加到Q
正在eating
的消息只有EatProcessor
应该从Q
获得消息。
所以让我们说EatProcessor:
connection.queue('Q', {autoDelete: false}, function(queue){ queue.subscribe(function(msg){ // recv message }); });
那么我如何:
- 让食物处理器只从Q中得到食物
- 让步行处理器只从Q中获取走路的东西
- logging器将从Q中获取所有内容,并只logging日志。
我正在阅读,也许门口应该是一个扇子? 但是我没有在文档中看到如何做扇出。
如果一个粉丝是否意味着每个人都会得到这个信息? 让我们说我还没有写logging器,我会写在function,但是当我写它时,我想直接听Q
而不改变任何代码在Gate
。
好的,首先使用交stream。 这样你就可以同时发布到更多的队列。 我不知道用Node.js的RabbitMQ,但我认为这样的事情应该工作:
// define a queue for each type connection.queue('eat-q', {autoDelete: false}, function(eat_queue) { // ... connection.queue('walk-q', {autoDelete: false}, function(eat_queue) { // ... connection.queue('log-q', {autoDelete: false}, function(log_queue) { // ... }); }); });
然后定义交换并进行绑定:
connection.exchange('my-exchange', function(exchange) { eat_queue.bind('my-exchange', 'eat', function() { // ... }); walk_queue.bind('my-exchange', 'walk', function() { // ... }); log_queue.bind('my-exchange', 'walk', function() { // ... }); log_queue.bind('my-exchange', 'eat', function() { // ... }); });
最后你可以发布到交stream:
exchange.publish('eat', 'my message', {}, function() { // ... });
该消息应该以eat-q
和log-q
结尾。
请注意,您应该在这里使用某种types的stream程库,如caolan的async.js或kriskowal的q,因为最终会导致大量的callback。 我更喜欢第二个。