Node.js:一次接收太多的UDP消息,丢失它们
我的节点服务器在一秒钟内收到大约400个UDP消息 ,这一切都能正常工作 ,而且我能够处理所有400个消息。
但是,当我开始在一秒钟内收到大约700个UDP消息时, 我失去了2-20个消息,并且它们永远不会被parsing:(
我在这里想过一些select :
- 创build一个所有套接字消息的队列 ,然后逐个消费,尽pipe我不知道如何实现这个
- 无法弄清楚如何实施
- 在Node / Express / dgram套接字find一个设置 ,我可以增加内存大小/缓冲区大小,类似的东西
- 我找不到像这样的设置,虽然:(
- 使用不同的UDP接收器 ,停止使用节点的套接字UDP接收器
- 没有find其他接收器
这是我的UDP发件人的样子:
var dgram = require("dgram"); var udpserver = dgram.createSocket("udp4"); var seatStateStore = require("./SeatStateStore"); udpserver.on("message", function (msg, rinfo) { seatStateStore.parseMessage(msg.toString()); });
有人有主意吗? 我找不出3个选项中的任何一个:/有人可以帮我吗?
节点v0.10.29
Express v3.14.0
===============================
更新/解决scheme
这里是我最终使用的代码(略微修改@RoyHB的解决scheme):
var dgram = require("dgram"); var udpserver = dgram.createSocket("udp4"); var seatStateStore = require("./SeatStateStore"); var Dequeue = require('dequeue'); var FIFO = new Dequeue(); fetcher(); udpserver.on("message", function (msg, rinfo) { FIFO.push(msg.toString()); }); udpserver.bind(43278); function fetcher () { while (FIFO.length > 0) { var msg = FIFO.shift(); seatStateStore.parseMessage(msg); } setImmediate(fetcher); //make this function continuously run }
有一个名为node-dequeue的NPM模块。 我用了很多类似的情况给你。
基本上,
- 您的程序将收到的消息推送到队列的末尾。
- 一个间隔计时器会周期性地激活另一个方法或函数(一个队列提取器),它检查队列中是否有消息,如果是,则提取一个或多个消息并对其进行处理。
- 另外(也许更好)没有计时器用于计划队列提取。 而是使用节点process.nextTick方法。
或者,也许最好是,你可以使用节点process.nextTick连续检查队列的消息。
理想情况下,seatStateStore.parseMessage将创build一个新对象来asynchronous处理一条消息,以便parseMessage在没有延迟的情况下返回,而实际的消息处理在后台继续。 (请参阅示例代码的底部)
我没有testing下面的代码,这是为了说明,而不是运行
var FIFO = require ('dequeue'); var seatStateStore = require("./SeatStateStore"); var dgram = require("dgram"); setInterval(fetcher, 1); var udpserver = dgram.createSocket("udp4"); udpserver.on("message", function (msg, rinfo) { FIFO.push(msg); } ); function fetcher () { while (FIFO.length > 0) { var msg = FIFO.shift(); seatStateStore.parseMessage(msg); } }
**或(可能更好)**
var FIFO = require ('dequeue'); var seatStateStore = require("./SeatStateStore"); var dgram = require("dgram"); fetcher(); var udpserver = dgram.createSocket("udp4"); udpserver.on("message", function (msg, rinfo) { FIFO.push(msg); } ); function fetcher () { while (FIFO.length > 0) { var msg = FIFO.shift(); seatStateStore.parseMessage(msg); process.nextTick(fetcher); } }
seatStateProcessor.parseMessage的大纲:
seatStateProcessor.parseMessage = function (msg) { proc = new asyncProcHandler(msg, function (err) { if (err) { //handle the error } }); }