Node.js:一次接收太多的UDP消息,丢失它们

我的节点服务器在一秒钟内收到大约400个UDP消息 ,这一切都能正常工作 ,而且我能够处理所有400个消息。

但是,当我开始在一秒钟内收到大约700个UDP消息时, 我失去了2-20个消息,并且它们永远不会被parsing:(

我在这里想过一些select

  1. 创build一个所有套接字消息的队列 ,然后逐个消费,尽pipe我不知道如何实现这个
    • 无法弄清楚如何实施
  2. Node / Express / dgram套接字find一个设置 ,我可以增加内存大小/缓冲区大小,类似的东西
    • 我找不到像这样的设置,虽然:(
  3. 使用不同的UDP接收器 ,停止使用节点的套接字UDP接收器
    • 没有find其他接收器

这是我的UDP发件人的样子:

var dgram = require("dgram"); var udpserver = dgram.createSocket("udp4"); var seatStateStore = require("./SeatStateStore"); udpserver.on("message", function (msg, rinfo) { seatStateStore.parseMessage(msg.toString()); }); 

有人有主意吗? 我找不出3个选项中的任何一个:/有人可以帮我吗?

节点v0.10.29

Express v3.14.0

===============================

更新/解决scheme

这里是我最终使用的代码(略微修改@RoyHB的解决scheme):

 var dgram = require("dgram"); var udpserver = dgram.createSocket("udp4"); var seatStateStore = require("./SeatStateStore"); var Dequeue = require('dequeue'); var FIFO = new Dequeue(); fetcher(); udpserver.on("message", function (msg, rinfo) { FIFO.push(msg.toString()); }); udpserver.bind(43278); function fetcher () { while (FIFO.length > 0) { var msg = FIFO.shift(); seatStateStore.parseMessage(msg); } setImmediate(fetcher); //make this function continuously run } 

有一个名为node-dequeue的NPM模块。 我用了很多类似的情况给你。

基本上,

  1. 您的程序将收到的消息推送到队列的末尾。
  2. 一个间隔计时器会周期性地激活另一个方法或函数(一个队列提取器),它检查队列中是否有消息,如果是,则提取一个或多个消息并对其进行处理。
  3. 另外(也许更好)没有计时器用于计划队列提取。 而是使用节点process.nextTick方法。

或者,也许最好是,你可以使用节点process.nextTick连续检查队列的消息。

理想情况下,seatStateStore.parseMessage将创build一个新对象来asynchronous处理一条消息,以便parseMessage在没有延迟的情况下返回,而实际的消息处理在后台继续。 (请参阅示例代码的底部)

我没有testing下面的代码,这是为了说明,而不是运行

 var FIFO = require ('dequeue'); var seatStateStore = require("./SeatStateStore"); var dgram = require("dgram"); setInterval(fetcher, 1); var udpserver = dgram.createSocket("udp4"); udpserver.on("message", function (msg, rinfo) { FIFO.push(msg); } ); function fetcher () { while (FIFO.length > 0) { var msg = FIFO.shift(); seatStateStore.parseMessage(msg); } } 

**或(可能更好)**

 var FIFO = require ('dequeue'); var seatStateStore = require("./SeatStateStore"); var dgram = require("dgram"); fetcher(); var udpserver = dgram.createSocket("udp4"); udpserver.on("message", function (msg, rinfo) { FIFO.push(msg); } ); function fetcher () { while (FIFO.length > 0) { var msg = FIFO.shift(); seatStateStore.parseMessage(msg); process.nextTick(fetcher); } } 

seatStateProcessor.parseMessage的大纲:

 seatStateProcessor.parseMessage = function (msg) { proc = new asyncProcHandler(msg, function (err) { if (err) { //handle the error } }); }