Node.js EventEmitter事件不共享事件循环

也许潜在的问题是,我使用的节点卡夫卡模块如何实现的东西,但也许不是,所以我们走了…

使用节点kafa库,我遇到了订阅consumer.on('message')事件的问题。 该库正在使用标准events模块,所以我认为这个问题可能是足够通用的。

我的实际代码结构庞大而复杂,所以这里是一个基本布局的伪示例来突出我的问题。 (注:这个代码片段没有经过testing,所以我可能在这里有错误,但是这里的语法没有问题)

 var messageCount = 0; var queryCount = 0; // Getting messages via some event Emitter consumer.on('message', function(message) { message++; console.log('Message #' + message); // Making a database call for each message mysql.query('SELECT "test" AS testQuery', function(err, rows, fields) { queryCount++; console.log('Query #' + queryCount); }); }) 

我在这里看到的是,当我启动我的服务器时,有十万左右积压的消息,卡夫卡会想要给我,它通过事件发射器。 所以我开始收到消息。 获取并logging所有消息大约需要15秒。

这是我期望看到的输出假设mysql查询是相当快的:

 Message #1 Message #2 Message #3 ... Message #500 Query #1 Message #501 Message #502 Query #2 ... and so on in some intermingled fashion 

我期望这是因为我的第一个mysql结果应该很快就绪,我期望结果(s)轮到事件循环处理响应。 我实际得到的是:

 Message #1 Message #2 ... Message #100000 Query #1 Query #2 ... Query #100000 

在mysql响应能够被处理之前,我正在收到每一条消息。 所以我的问题是,为什么? 为什么在所有消息事件完成之前我无法获得单个数据库结果?

另一个注意事项:我在节点kafka和mysql.query()在我的代码中的.emit('message')设置了一个中断点,我正在打开它们。 因此,在进入我的活动用户之前,似乎所有的10万个发射器都没有叠加起来。 所以我就这个问题进行了第一个假设。

想法和知识将不胜感激:)

node-kafka驱动程序使用相当宽松的缓冲区大小(1M),这意味着它将从卡夫卡获得将放入缓冲区的尽可能多的消息。 如果服务器积压,并根据消息大小,这可能意味着(数十)数千个消息进入一个请求。

由于EventEmitter是同步的(它不使用Node事件循环),这意味着驱动程序将向其侦听器发出数十万个事件,并且由于它是同步的,所以它不会屈服于Node事件循环,直到所有的消息已经交付。

我不认为你可以解决事件交付的洪水,但我不认为具体事件交付是有问题的。 更可能的问题是为每个事件启动一个asynchronous操作(在这种情况下是一个MySQL查询),这可能会用查询来泛滥数据库。

可能的解决方法是使用队列,而不是直接从事件处理程序执行查询。 例如,通过使用async.queue ,可以限制并发(asynchronous)任务的数量。 队列的“worker”部分将执行MySQL查询,在事件处理程序中,您只需将消息推送到队列中。