Tag: rabbitmq

RabbitMQ – 并行任务消耗

我使用RabbitMQ从symfony发送消息到nodeJs。我会说要求并行处理。 根据客户要处理的数据量,每个客户端的工作可能需要5分钟到70分钟才能完成。 客户可以随机发布工作要求。 假设5个客户完成各自的工作所需的时间如下所示 客户端1作业-65分钟客户端2作业 – 10分钟客户端3作业 – 5分钟客户端4作业 – 10分钟客户端5作业 – 20分钟正常队列有一个问题,因为客户端1的工作将花费65分钟完成,因此客户端2必须等待75分钟(65 + 10)完成一项需要10分钟的工作。 我知道,在RabbitMQ中,我可以创造许多工人来处理这项工作,但是根据工作所需的时间和工作的位置,客户可能需要很长时间才能完成工作。 所以我正在寻找创build每个客户端的dynamic消费者。 每个消费者都为每个客户单独完成这项工作。 Rabbitmq有可能吗? 如果是的话,怎样才能做到呢?

RabbitMQ多队列消费者(独立消费队列)

TL; DR 是否有任何(Node.JS)模式在一段时间(10秒)后停止使用队列,并closures连接,而不中断消息的处理还没有收到? 更长的版本 我们每天都会向客户发送数百万次推送通知。 生成消息后,我们将它们插入到RabbitMQ中,然后将它们发送给数百个消费者。 通常客户想要几乎同时发送大量的消息,并且如果这些消息进入同一队列,则一个客户的消息将具有另一个客户的消息的处理时间。 我们想要创build一个dynamic的多队列设置,其中一个广告系列的消息正在发送消息。 队列,客户dynamic订阅,处理上千条消息,然后切换到另一个队列。 我们的想法是,我们正在分享我们之间的资源,所以他们将在同一时间并行发送。 我们目前的解决scheme是生成的消息比我们发送出来的要慢,所以它们按照混合顺序插入到队列中。 这不是理想的解决scheme,因为很难find足够慢的速度,但速度并不慢。 我已经尝试创build一个简单的setTimeout,强制消费者在几秒钟后完成消费(Channel#cancel),然后订阅一个队列的消费者最less。 问题是这会中断一些消息的处理。 我已经尝试使用通道#来处理一堆消息(然后select一个不同的队列),但文档说,它是理想的只有几条消息,但不是数百万的消息,使用通道#消耗是好得多性能。 任何想法如何做得好?

打开握手期间,套接字突然closures:rabbitmq使用nodejs连接失败

Error: Socket closed abruptly during opening handshake at TLSSocket.endWhileOpening (/app/node_modules/amqplib/lib/connection.js:258:17) at emitNone (events.js:91:20) at TLSSocket.emit (events.js:185:7) at endReadableNT (_stream_readable.js:974:12) at _combinedTickCallback (internal/process/next_tick.js:80:11) at process._tickCallback (internal/process/next_tick.js:104:9)

Node.js amqplib何时closures连接

我正在使用amqplib在我的node.js服务器中传输消息。 我看到一个来自RabbitMQ官方网站的例子: var amqp = require('amqplib/callback_api'); amqp.connect('amqp://localhost', function(err, conn) { conn.createChannel(function(err, ch) { var q = 'hello'; var msg = 'Hello World!'; ch.assertQueue(q, {durable: false}); // Note: on Node 6 Buffer.from(msg) should be used ch.sendToQueue(q, new Buffer(msg)); console.log(" [x] Sent %s", msg); }); setTimeout(function() { conn.close(); process.exit(0) }, 500); }); 在这种情况下,连接在超时function中closures。 我不认为这是一个可持续的方式来做到这一点。 但是, ch.sendToQueue没有callback函数,允许我在发送消息后closures连接。 closures连接有什么好处?

如何连接节点的amqplib与SSL连接?

我试图连接我的RabbitMQ服务器,它被迫使用SSL,并用用户和密码保护。 以前,我成功地从C#,PHP和Python应用程序连接到此服务器。 这是我的node.js代码: const amqp = require('amqplib'); const fs = require('fs'); const config = {…} const opts = { ca: [fs.readFileSync(config.certificatePath)] }; const url = `amqps://${config.username}:${config.password}@${config.hostname}:${config.port}`; const open = amqp.connect(url, opts); open.then(function(conn) { console.log('connected.'); }).then(null, console.warn); 我得到的错误是:无法获得本地发行者证书 1)错误是什么意思? 2)我需要做什么才能连接? 注意:在图书馆的ssl指南中写着“(必须)提供一个选项对象,它将被传递给tls.connect() 。 我无法理解这个指令,也没有tls.connect()使用的例子。 我尝试了一些变化,但失败了,也许会给这里的某个人提供线索。

AMQP使用Node.js,我如何发布/订阅?

我正在做一个类来操作这里提供的Node-AMQP模块: https : //github.com/postwait/node-amqp 但是我不能使用这个发布/订阅: var Queue = require('./Queue.js'); var queue = new Queue(); queue.addTaskToQueue('salut', 5); queue.subscribeTaskQueue('salut'); 这里是我正在使用的类(我在CoffeeScript中给出代码,在Node.js中给那些不知道CoffeeScript的人): 谢谢你的帮助。 在CoffeeScript中: amqp = require('amqp') class Queue constructor: (ip = 'localhost') -> @ip = ip @receivedObject @connection = amqp.createConnection({ host: @ip }) @queue subscribeTaskQueue: (queueToSubscribe) -> self = @ self.connection.on('ready', -> q = self.connection.queue(queueToSubscribe) q.bind('#') q.subscribe({ ack: […]

RabbitMQ + Node.JS连接问题,帧大小太大?

我一直在玩apache + php的RabbitMQ,想看看它和Node.JS的性能比较。 使用最新的node.js(0.4.12)和amqp模块(npm install amqp),我得到一个未知的exception连接,看起来像是导致错误的帧大小(amqp.js中的第181行),将错误从代码只是抛出另一个相关的分配太大的数组大小。 RabbitMQ服务器只是默认configuration的“apt-get install rabbitmq-server”。 这在PHP中工作就好了。 Starting … AMQP URL: amqp://localhost events.js:47 throw new Error("Uncaught, unspecified 'error' event."); ^ Error: Uncaught, unspecified 'error' event. at Connection.emit (events.js:47:15) at AMQPParser.onError (/nodeJS_stuff/node_modules/amqp/amqp.js:839:12) at AMQPParser.throwError (/nodeJS_stuff/node_modules/amqp/amqp.js:145:25) at AMQPParser.execute (/nodeJS_stuff/node_modules/amqp/amqp.js:181:18) at Connection.<anonymous> (/nodeJS_stuff/node_modules/amqp/amqp.js:851:12) at Connection.emit (events.js:64:17) at Connection._onReadable (net.js:672:14) at IOWatcher.onReadable [as callback] (net.js:177:10) 有没有遇到过这个? […]

NodeJS AMQP客户端无法连接

我疯了最近两天,我不能连接在持久交换和持久队列的NodeJS客户端。 所以PHP代码创build并发送消息: <?php $connection = new AMQPConnection(array( 'host' => 'localhost', 'vhost' => 'bvh', 'port' => 5672, 'login' => 'bizneus', 'password' => 'lozinkus' )); //$connection = new AMQPConnection(); $connection->connect(); if (!$connection->isConnected()) { die('Not connected :(' . PHP_EOL); } // Open Channel $channel = new AMQPChannel($connection); // Declare exchange $exchange = new AMQPExchange($channel); $exchange->setName('biznea_e_1'); $exchange->setType('fanout'); $exchange->setFlags(AMQP_DURABLE); $exchange->declare(); […]

使用nodeJS移除RabbitMQ中的使用者

我试图让我的听众在我想要立即删除我的消费者之后只收听1条消息。 我怎么能做到这一点。 这里是代码。 queueListener:function(Queue,timeOut){ var deferred=sails.promise.defer(),timer,data; sails.amqp.connect('amqp://localhost', function(err, conn) { conn.createConfirmChannel(function(err, ch) { if(err){ conn.close(); deferred.reject(err); }else{ ch.assertQueue(Queue, {durable: true}); ch.prefetch(1); ch.consume(Queue,function(msg){ data=msg.content.toString(); clearTimeout(timer); ch.ack(msg); setTimeout(function(){ conn.close(); deferred.resolve(data); },0); },{noAck: false}); } }); timer=setTimeout(function(){ conn.close(); deferred.reject(new Error("Nothing in the Queue.")); },timeOut-5); }); return deferred.promise; } 在上面的Queue中是它将要监听的队列,timeOut代表了我的监听器将要监听的时间。 如果它侦听一条消息,我想停止listen.And进一步侦听,我将下一次调用函数queueListner 。 虽然我做了conn.close()但在用户界面,它仍然显示消费者。

延迟工作与RabokuMQ在Heroku上的优缺点是什么?

我想在Heroku上编写一个Node.js UDP服务器,并计划将数据排队到一个Rails实例(dyno)以供它处理? 使用Delayed Job vs RabbitMQ有什么优点和缺点? 谢谢,Chirag