Tag: node amqp

为什么node-amqp出现问题时会“closures”事件?

(我正在使用node-amqp和rabbitmq服务器。) 我想猜测为什么我有一个closures事件,如果出现错误。 例如,如果我尝试打开连接到队列(具有错误的参数),我收到一个错误事件。 这是完美的。 但是,在发生任何错误之后,我将收到一个closures的连接(在这种情况下,可能是因为closures了失败的套接字到队列)。 之后,自动重新连接,我收到(初始)准备好的事件。 问题: connection.on('ready', function() { do_a_lot_of_things }).on(error, function(error){ solve_the_problem }); 如果出现错误,我收到错误,但是“准备就绪”事件,它将重新do_a_lot_of_things。 我的方法错了吗? 最好的祝福

使用amqplib控制Node.JS,控制RabbitMQ使用者的使用率

我的应用程序使用RabbitMQ队列来存储消息,然后我有一个工作人员消费这些消息,并将其插入到数据库中。 目的不是强调工作负载高峰期的数据库。 我遇到的问题是,在那些高峰期,队列的发布率真的很高,工作人员每秒钟开始接收的消息比它能处理的时间多,直到它崩溃。 有什么办法来控制消费率,所以我可以确保工人收不到消息比消耗更快? 信息并不重要,所以我不介意他们有多less时间入队,直到工作人员能够处理。 我使用的Node.JS amqplib,这是我使用的工人代码: open.then(function(conn) { var ok = conn.createChannel(); ok = ok.then(function(ch) { ch.assertQueue(q); ch.consume(q, function(msg) { if (msg !== null) { message = JSON.parse(msg.content.toString()); processMessage(message); } }, {noAck: true}); }); return ok; }).then(null, console.warn);

如何发送一个消息给所有的订阅者,除了发布者也是同一个rabbitMQ队列上的一个侦听器

我有一个由nodeJS服务器安装的rabbitMQ。 我使用rabbit.js库与兔子进行交互,到目前为止,我对此感到满意。 我在扇出模式的同一个队列中有多个用户,每个节点都是一个订阅者,也是一个发布者,这对我很有好处,并且工作正常,因为在很多情况下,我想通知所有服务器一些发生的更新在这些节点之一(这也是出版商…) 我偶然发现一个情况,我需要发送队列中的所有侦听器的消息,除了发送它的人(这也是同一队列中的侦听器)。 我不知道谁在监听(可能有一个,可能有几百万),所以我不能通过一些白名单的路由规则将它路由到一些特定的节点。 它必须是某种排除通配符路由规则(一些黑名单),例如,发送这个消息给每个人听谁不对应我自己的唯一ID … 可以使用rabbit.js来完成吗? 它可以甚至在rabbitmq以某种方式完成? 我不太了解那只兔子,所以要对我温柔:) 顺便说一句,如果你知道如何使用rabbit.js,甚至更好… 编辑:: 根据Derick Bailey的要求,这是我需要这个的原因 我有一个系统,其中有许多负载平衡的nodeJS服务器作为web服务运行。 他们是完全透明的。 他们都不知道哪些其他节点存在。 我想保持这种方式,因为这种分离使我更容易通过添加和删除其他“并行”节点来更好地扩展。 这些节点中的每一个都有自己的内存本地caching服务。 我偶然发现了一个单个节点更新某个实体的情况。 现在我需要使这个节点可以通知所有其他并行节点(可能在caching中具有相同的实体)使其无效。 问题在于发送消息的节点(更新节点)也会收到消息,因为他也是一个监听者。 所以我希望他以某种方式排除自己的特定消息的接收者列表…因此需要一些路由黑名单。 (他知道自己,所以除了自己的身份证之外,我可以让他通往所有人……但即使有人确实在听另一端的话,他也不知道……所以它不能成为白名单) 希望我的需要现在更清楚。 我已经想到了解决我的问题,但它需要额外的发展在我身边,我想通过使用兔子的当前能力(如果可能的话),以避免它我可以只添加一个唯一的ID的内容的消息。 那么发送节点可以认识到这个消息来自他并忽略这个消息。 但正如你可以明白,这可能会变得棘手,因为我需要考虑更多的陷阱和其他边缘情况下可能会失败… 如果有人能告诉我如何使用一些兔子现有的configuration,我会很高兴听到如何:)

amqp.node不会检测到连接丢失

我们有一个运行socket.io服务器的nod​​e.js脚本,它的客户端使用RabbitMQ队列中的消息。 我们最近已经迁移到Amazon AWS,RabbitMQ现在是两台机器(冗余实例)的集群。 AMQP连接会不时丢失(这是一个限制,从具有冗余VM的高可用性环境到达,我们必须应付它),如果尝试重新连接,DNS将select要连接的实例它是一个具有数据复制的集群,因此连接哪个实例并不重要)。 问题是重新连接的尝试从来没有做过; 一段时间后,当连接丢失时,amqp.node显然没有注意到连接已经丢失。 此外,消费者停止接收消息,并且socket.io服务器停止接受新的连接。 我们在RabbitMQ URL处设置了55秒的心跳超时(不要与socket.io心跳超时相混淆),并使用amqp.node的callbackAPI检查“错误”和“closures”事件,但显然从未发出。 队列期望消耗的消息被消除。 我们希望节点脚本检测丢失的连接并自行完成,因此环境将自动启动新的进程并重新build立连接。 这里是代码,也许我们正在做一些错误的amqp.nodecallbackAPI或其他东西。 var express = require('express'); app = express(); var http = require('http'); var serverio = http.createServer(app); var io = require('socket.io').listen(serverio, { log: false }); var socket; var allcli = []; var red, blue, green, magenta, reset; red = '\033[31m'; blue = '\033[34m'; green = […]

RabbitMQ和node-amqp:在确认模式下交换不确认 – 为什么?

我正在编写一个依赖于RabbitMQ的Node.js应用程序。 我使用node-amqp作为连接到RabbitMQ的select库。 一旦build立了与RabbitMQ的连接,我要做的第一件事是创build一个交换: var options = { autoDelete: false, confirm: true, durable: true, type: 'direct' }; connection.exchange('myExchange', options, function (myExchange) { // … }); 这完美的作品。 正如你所看到的,我使用confirm: true来创build交换,因此我期望交换在之后处于确认模式。 现在,一旦我尝试发布消息,就会出现问题: var options = {}; myExchange.publish('', { data: 'foobar' }, options, function () { // … }); 问题是publish函数的callback从来没有被调用 – 虽然这个消息已经成功发布了(正如我在RabbitMQ的webpipe理工具中看到的那样)。 我以错误的方式了解确认模式吗? 这是node-amqp的错误吗? 任何帮助,将不胜感激 :-)

node-amqp不能发送消息给RabbitMQ

我正在使用rabbitmq-tutorials ,ruby版本正常,但是node.js版本不能发送消息。 我不知道什么是错的。 var amqp = require('amqp'); var amqp_hacks = require('./amqp-hacks'); var connection = amqp.createConnection({host: 'localhost'}); connection.on('ready', function(){ connection.publish('hello_node', 'Hello World!'); console.log(" [x] Sent 'Hello World!'"); amqp_hacks.safeEndConnection(connection); }); 在运行node send.js ,运行进程node recv.js不能node recv.js任何东西。 而rabbitmqctl list_queues不显示hello_node队列。

Node-amqp – X尝试后拒绝消息

我如何实现几个可configuration的requeue尝试后拒绝消息的机制? 换句话说,如果我正在订阅一个队列,我希望保证相同的消息不会再多出现X次。 我的代码示例: q.subscribe({ack: true}, function(data,headers,deliveryInfo,message) { try{ doSomething(data); } catch(e) { message.reject(true); } }