节点js TCP服务器,socket.on('data') – 数据缓冲区包含高负载的垃圾数据

我使用节点js的networking服务器并使用socket.on('data')函数接收数据。 parsingTCP消息我使用parsing缓冲区方法。 这将使用前4个字节作为TCP消息的长度,以便我可以从TCPstream中读取并形成单独的命令。总结高负载情况下,会有一些垃圾数据作为TCPstream的一部分返回,问题。

function onConnect(client) { var accumulatingBuffer = new Buffer(0); var totalPacketLen = -1; var accumulatingLen = 0; var recvedThisTimeLen = 0; client.on('data', function (data) { parseBuffer(client, data, accumulatingBuffer, totalPacketLen, accumulatingLen, recvedThisTimeLen); }); } 

这里是parsebuffer方法。

 function parseBuffer(client, data, accumulatingBuffer, totalPacketLen, accumulatingLen, recvedThisTimeLen) { recvedThisTimeLen = Buffer.byteLength(data); var tmpBuffer = new Buffer(accumulatingLen + recvedThisTimeLen); accumulatingBuffer.copy(tmpBuffer); data.copy(tmpBuffer, accumulatingLen); // offset for accumulating accumulatingBuffer = tmpBuffer; tmpBuffer = null; accumulatingLen = accumulatingLen + recvedThisTimeLen; if (accumulatingLen < PACKETHEADERLEN) { return; } else if (accumulatingLen === PACKETHEADERLEN) { packetHeaderLen return; } else { //a packet info is available.. if (totalPacketLen < 0) { totalPacketLen = accumulatingBuffer.readUInt32BE(0); } } while (accumulatingLen >= totalPacketLen + PACKETHEADERLEN) { var aPacketBufExceptHeader = new Buffer(totalPacketLen); // a whole packet is available... accumulatingBuffer.copy(aPacketBufExceptHeader, 0, PACKETHEADERLEN, PACKETHEADERLEN + totalPacketLen); //////////////////////////////////////////////////////////////////// //process packet data var stringData = aPacketBufExceptHeader.toString(); try { var JSONObject = JSON.parse(stringData); handler(client, JSONObject); var newBufRebuild = new Buffer(accumulatingBuffer.length - (totalPacketLen + PACKETHEADERLEN)); // we can reduce size of allocatin accumulatingBuffer.copy(newBufRebuild, 0, totalPacketLen + PACKETHEADERLEN, accumulatingBuffer.length); //init accumulatingLen = accumulatingLen - (totalPacketLen + PACKETHEADERLEN); //totalPacketLen+4 accumulatingBuffer = newBufRebuild; newBufRebuild = null; totalPacketLen = -1; //For a case in which multiple packets are transmitted at once. if (accumulatingLen <= PACKETHEADERLEN) { //need to get more data -> wait.. return; } else { totalPacketLen = accumulatingBuffer.readUInt32BE(0); } } catch (ex) { console.log(ex + ' unable to process data'); return; } } } 

一切都很好,直到有很高的模拟负载使用一堆客户端快速发送消息。 在ParseBuffer方法中,第一行“data.length”返回的时间比TCP数据的长度长。 这会导致读取垃圾的代码为UInt32BE,这会导致总包长度(它告诉下一个包长度)非常高的值。 这导致丢失的消息。 我错过了什么 请帮忙。

当你在你的parseBuffer()函数中这样做时:

 accumulatingBuffer = tmpBuffer; 

这只是将tmpBuffer分配给名为accumulatingBuffer的函数参数。 它不会改变onConnect()方法中的accumulatingBuffervariables。 因此,当你得到一个局部缓冲区时,你将失去累积的部分。 传递给parseBuffer()的其他参数也是同样的问题。 在parseBuffer()分配给它们不会改变onConnect()中的同名variables。

有可能更简单的方法来写这个,但保持相同的结构最简单的方法是不通过单个variables,而是传递一个单一的对象,具有这些variables作为对象的属性。 然后,当您分配给属性时,您可以从onConnect()获取这些新值。

总体结构如下所示:

 function onConnect(client) { var args = {}; args.accumulatingBuffer = new Buffer(0); args.totalPacketLen = -1; args.accumulatingLen = 0; args.recvedThisTimeLen = 0; client.on('data', function (data) { parseBuffer(client, data, args); }); } 

然后,在parseBuffer()进行相应的更改,以作为args对象的属性来访问参数。 由于对象是通过指针传递的,所以当你从parseBuffer中分配给args对象的属性时,这些对象将在onConnect方法的args对象中可见。


仅供参考,我没有在函数的其他地方跟踪整个逻辑,所以也可能有其他错误。 这段代码看起来相当复杂,有很多的缓冲区拷贝,它正在尝试执行相当常见的任务。 这也是以前可能写过很多次的代码,甚至可能存在于一些预build的库中。