Node.js TCP服务器传入缓冲区

我有两个节点进程相互讲话。 我将把它们称为[节点服务器][节点发送者][节点发送者]不断处理信息并通过TCP连接将消息写入[节点服务器] 。 然后[节点服务器]回写一个状态消息。

[节点发件人]的示例:

var message = "Test Message"; [Node Sender].connection.write(message); 

[节点服务器]的示例:

 [Node Server].socket.on("data", function(p_data) { this.write("OK"); // Do some work with p_data } 

这工作没有问题, p_data总是包含“testing消息”,当发送超过5毫秒的任何东西。 但是,如果我加速[节点发送者]写入每毫秒,则p_data偶尔会以“Test MessageTest MessageTes”之类的方式结束

我明白[Node Sender]中的缓冲区可能比写入命令发送的速度快。 有没有办法强制发送消息一对一的比例,而仍然保持asynchronous?

我当然可以在我的消息中添加一个终止符,并在[Node Server]中填充一个缓冲区,但是我想确保没有明显的缺失。

不,你不会错过任何东西,是的,你需要添加某种forms的终止消息。

这里有两个基本问题:

  1. TCP协议是面向stream的,而不是面向消息的; 它对于什么可能构成“信息”没有内在的知识。

  2. 由node.jsnetworking库触发的数据事件表明有一些数据已经到达,但不知道消息可能包含什么,也不能表明它已经收到任何特定的数据。

因此,通过发送消息的速度比Node能够处理更快,套接字recv缓冲区将填充多个“消息”。

这个问题的典型解决scheme是添加行终止,可以在https://github.com/baudehlo/Haraka/blob/master/connection.js的32-34行find:

 self.client.on('data', function (data) { self.process_data(data); }); 

和行110-134:

 Connection.prototype.process_data = function (data) { if (this.disconnected) { logger.logwarn("data after disconnect from " + this.remote_ip); return; } this.current_data += data; this._process_data(); }; Connection.prototype._process_data = function() { var results; while (results = line_regexp.exec(this.current_data)) { var this_line = results[1]; if (this.state === 'pause') { this.early_talker = 1; var self = this; // If you talk early, we're going to give you a delay setTimeout(function() { self._process_data() }, this.early_talker_delay); break; } this.current_data = this.current_data.slice(this_line.length); this.process_line(this_line); } }; 

您需要累积传入的缓冲区数据以获取完整的消息。 请参考下面的例子。 这个服务器需要4字节的头和消息体的数据。 头是无符号整数,这意味着身体的总长度和正文是带有分隔符“|”的string数据。 请注意,这个“标题和消息”可能一次没有被收到。 所以我们需要累积传入的数据,直到获得完整的数据。 还有可能一次收到多个“标题和消息”。 重点是我们需要数据积累。

 var SERVER_PORT = 8124; var TCP_DELIMITER = '|'; var packetHeaderLen = 4; // 32 bit integer --> 4 var server = net.createServer( function(c) { var accumulatingBuffer = new Buffer(0); var totalPacketLen = -1; var accumulatingLen = 0; var recvedThisTimeLen= 0; var remoteAddress = c.remoteAddress; var address= c.address(); var remotePort= c.remotePort; var remoteIpPort = remoteAddress +":"+ remotePort; console.log('-------------------------------'+remoteAddress); console.log('remoteIpPort='+ remoteIpPort); c.on('data', function(data) { console.log('received data length :' + data.length ); console.log('data='+ data); recvedThisTimeLen = data.length; console.log('recvedThisTimeLen='+ recvedThisTimeLen); //accumulate incoming data var tmpBuffer = new Buffer( accumulatingLen + recvedThisTimeLen ); accumulatingBuffer.copy(tmpBuffer); data.copy ( tmpBuffer, accumulatingLen ); // offset for accumulating accumulatingBuffer = tmpBuffer; tmpBuffer = null; accumulatingLen += recvedThisTimeLen ; console.log('accumulatingBuffer = ' + accumulatingBuffer ); console.log('accumulatingLen =' + accumulatingLen ); if( recvedThisTimeLen < packetHeaderLen ) { console.log('need to get more data(less than header-length received) -> wait..'); return; } else if( recvedThisTimeLen == packetHeaderLen ) { console.log('need to get more data(only header-info is available) -> wait..'); return; } else { console.log('before-totalPacketLen=' + totalPacketLen ); //a packet info is available.. if( totalPacketLen < 0 ) { totalPacketLen = accumulatingBuffer.readUInt32BE(0) ; console.log('totalPacketLen=' + totalPacketLen ); } } //while=> //in case of the accumulatingBuffer has multiple 'header and message'. while( accumulatingLen >= totalPacketLen + packetHeaderLen ) { console.log( 'accumulatingBuffer= ' + accumulatingBuffer ); var aPacketBufExceptHeader = new Buffer( totalPacketLen ); // a whole packet is available... console.log( 'aPacketBufExceptHeader len= ' + aPacketBufExceptHeader.length ); accumulatingBuffer.copy( aPacketBufExceptHeader, 0, packetHeaderLen, accumulatingBuffer.length); // //////////////////////////////////////////////////////////////////// //process one packet data var stringData = aPacketBufExceptHeader.toString(); var usage = stringData.substring(0,stringData.indexOf(TCP_DELIMITER)); console.log("usage: " + usage); //call handler (serverFunctions [usage])(c, remoteIpPort, stringData.substring(1+stringData.indexOf(TCP_DELIMITER))); //////////////////////////////////////////////////////////////////// //rebuild buffer var newBufRebuild = new Buffer( accumulatingBuffer.length ); newBufRebuild.fill(); accumulatingBuffer.copy( newBufRebuild, 0, totalPacketLen + packetHeaderLen, accumulatingBuffer.length ); //init accumulatingLen -= (totalPacketLen +4) ; accumulatingBuffer = newBufRebuild; newBufRebuild = null; totalPacketLen = -1; console.log( 'Init: accumulatingBuffer= ' + accumulatingBuffer ); console.log( ' accumulatingLen = ' + accumulatingLen ); if( accumulatingLen <= packetHeaderLen ) { return; } else { totalPacketLen = accumulatingBuffer.readUInt32BE(0) ; console.log('totalPacketLen=' + totalPacketLen ); } } }); ... }); 

整个例子请参考下面。

https://github.com/jeremyko/nodeChatServer

希望这个帮助。