node.js套接字exception读取ETIMEDOUT – 如何正确捕获它? 怎么写超时?

我有一个代理服务器,pipe理一堆客户端,并与另一个http服务器通话。 消息被来回发送并导向给客户。 客户端可以做超时,服务器有一个心跳function(每n秒重复一次),发送心跳到clientId映射到套接字连接的所有客户端。

当心跳尝试与不再连接的客户端进行通信,但其套接字仍处于活动状态时,会收到“读取ETIMEDOUT”exception。 我试着暂时将套接字连接的超时设置为2000ms,理论上我的套接字事件处理程序会超时(事件处理程序在tcp服务器部分),但是这并没有发生。 它需要几次心跳死亡。

部分问题肯定是我对如何构造node.js代码缺乏了解,所以如果您有任何build议,我会非常感激他们。

另一个问题是,是否可以单独处理读写超时,或者至less将其分开。 我真正想做的是让我的心跳function成为tcp服务器的一部分,只发送一个心跳(如果在n秒内没有听到客户端的声音),只发送一次心跳。 如果我们超时,那么我们杀死socket,否则我们再等一等。

谢谢!

>>$ node --harmony-weakmaps server.js Heartbeat: Sat Feb 18 2012 08:34:40 GMT+0000 (UTC) { sending keep_alive to id:00:00:00:00:00:10 socket:[object Object] } socket:received data: {"id":"00:00:00:00:00:10","m":"keep_alive","success":"true"} Heartbeat: Sat Feb 18 2012 08:35:40 GMT+0000 (UTC) { sending keep_alive to id:00:00:00:00:00:10 socket:[object Object] } socket:received data: {"id":"00:00:00:00:00:10","m":"keep_alive","success":"true"} node.js:201 throw e; // process.nextTick error, or 'error' event on first tick ^ Error: read ETIMEDOUT at errnoException (net.js:642:11) at TCP.onread (net.js:375:20) 

触发超时的心跳function:

 console.log("Starting heartbeat"); var beat_period = 60; setInterval(function() { if(Object.keys(id2socket).length != 0){ console.log("Heartbeat: " + new Date()); //for (var key in id2socket) { // console.log("\t"+key+"->"+id2socket[key]); //} console.log("{"); for(var id in id2socket) { var socket = id2socket[id]; // don't want sockets to time out socket.setTimeout(2000); // for heartbeat, set the timeout try { console.log("\tsending keep_alive to id:"+id+" socket:"+id2socket[id]); socket.write('{"m":"keep_alive"}\r\n'); } catch(Error) { console.log("Heartbeat:Cannot find id:"+id); removeSocketFromMap(id,socket); // TODO: send message to API } socket.setTimeout(0); // no timeout } console.log("}"); } }, beat_period * 1000); 

server.js:

 // Launch Instructions // node --harmony-weakmaps server.js var net = require('net'); // tcp-server var http = require("http"); // http-server var querystring = require('querystring'); // Map of sockets to clients var id2socket = new Object; var socket2id = new WeakMap; // allows us to use object as key to hash // Test for client: // {"id":"123","m":"add"} // establishes connection and puts client into id2socket map // {"id":"123","m":"test"} // sends a message through to API // HTTP:POST outbound function // http://www.theroamingcoder.com/node/111 function postOut(dataToPost){ try{ console.log("postOut msg:"+JSON.stringify(dataToPost)); } catch (Error) { console.log("postOut error:"+Error); } var post_domain = '127.0.0.1'; var post_port = 80; var post_path = '/cgi-bin/index3.py'; var post_data = querystring.stringify({ 'act' : 'testjson', 'json' : JSON.stringify(dataToPost) }); console.log("post_data:"+post_data); var post_options = { host: post_domain, port: post_port, path: post_path, method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded', 'Content-Length': post_data.length } }; var post_req = http.request(post_options, function(res) { res.setEncoding('utf8'); res.on('data', function (chunk) { console.log('Response:data: ' + chunk); }); }); // Handle various issues post_req.on('error', function(error) { console.log('ERROR' + error.message); // If you need to go on even if there is an error add below line //getSomething(i + 1); }); post_req.on("response", function (response) { console.log("Response:response:"+response); }); // write parameters to post body post_req.write(post_data); post_req.end(); } function removeSocketFromMap(id,socket){ console.log("removeSocketFromMap socket:"+socket+" id:"+id); delete id2socket[id]; socket2id.delete(socket); //TODO: print map??? console.log("socketmap {"); for (var key in id2socket) { console.log("\t"+key+"->"+id2socket[key]); } console.log("}"); } // Setup a tcp server var server_plug = net.createServer( function(socket) { // Event handlers socket.addListener("connect", function(conn) { console.log("socket:connection from: " + socket.remoteAddress + ":" + socket.remotePort + " id:"+socket.id ); }); socket.addListener("data", function(data) { console.log("socket:received data: " + data); var request = null; try { request = JSON.parse(data); } catch (SyntaxError) { console.log('Invalid JSON:' + data); socket.write('{"success":"false","response":"invalid JSON"}\r\n'); } if(request!=null){ response = request; // set up the response we send back to the client if(request.m=="keep_alive"){ // HACK for keep alive // Do nothing } else if(request.m !== undefined && request['id'] !== undefined){ // hack on 'id', id is js obj property if(request.m == 'connect_device' || request.m == 'add'){ console.log("associating uid " + request['id'] + " with socket " + socket); id2socket[request['id']] = socket; socket2id.set(socket, request['id']); } postOut(request); socket.write(JSON.stringify(response)+"\r\n"); } else if(request['id'] !== undefined){ postOut(request); socket.write(JSON.stringify(response)+"\r\n"); } else { response['content'] = "JSON doesn't contain m or id params"; socket.write(JSON.stringify(response)+"\r\n"); } } else { console.log("null request"); } }); socket.on('end', function() { id = socket2id.get(socket); console.log("socket:disconnect by id " + id); removeSocketFromMap(id,socket); socket.destroy(); }); socket.on('timeout', function() { id = socket2id.get(socket); console.log('socket:timeout by id ' + id); removeSocketFromMap(id,socket); socket.destroy(); }); // handle uncaught exceptions socket.on('uncaughtException', function(err) { id = socket2id.get(socket); console.log('socket:uncaughtException by id ' + id); removeSocketFromMap(id,socket); socket.destroy(); }); } ); server_plug.on('error', function (error) { console.log('server_plug:Error: ' + error); }); // Setup http server var server_http = http.createServer( // Function to handle http:post requests, need two parts to it // http://jnjnjn.com/113/node-js-for-noobs-grabbing-post-content/ function onRequest(request, response) { request.setEncoding("utf8"); request.content = ''; request.on('error', function(err){ console.log("server_http:error: "+err); }) request.addListener("data", function(chunk) { request.content += chunk; }); request.addListener("end", function() { console.log("server_http:request_received"); try { var json = querystring.parse(request.content); console.log("server_http:received_post {"); for(var foo in json){ console.log("\t"+foo+"->"+json[foo]); } console.log("}"); // Send json message content to socket if(json['json']!=null && json['id']!=null){ id = json['id']; try { var socket = id2socket[id]; socket.write(json['json']+"\r\n"); } catch (Error) { console.log("Cannot find socket with id "+id); } finally { // respond to the incoming http request response.end(); // TODO: This should really be in socket.read! } } } catch(Error) { console.log("JSON parse error: "+Error) } }); request.on('end', function () { console.log("http_request:end"); }); request.on('close', function () { console.log("http_request:close"); }); } ); server_http.on('error', function (error) { console.log('server_http:Error: ' + error); }); // Heartbeat function console.log("Starting heartbeat"); var beat_period = 60; setInterval(function() { if(Object.keys(id2socket).length != 0){ console.log("Heartbeat: " + new Date()); //for (var key in id2socket) { // console.log("\t"+key+"->"+id2socket[key]); //} console.log("{"); for(var id in id2socket) { var socket = id2socket[id]; // don't want sockets to time out socket.setTimeout(2000); // for heartbeat, set the timeout try { console.log("\tsending keep_alive to id:"+id+" socket:"+id2socket[id]); socket.write('{"m":"keep_alive"}\r\n'); } catch(Error) { console.log("Heartbeat:Cannot find id:"+id); removeSocketFromMap(id,socket); // TODO: send message to API } socket.setTimeout(0); // no timeout } console.log("}"); } }, beat_period * 1000); // Fire up the servers //var HOST = '127.0.0.1'; // just local incoming connections var HOST = '0.0.0.0'; // allows access to all external IPs var PORT = 5280; var PORT2 = 9001; // accept tcp-ip connections server_plug.listen(PORT, HOST); console.log("TCP server listening on "+HOST+":"+PORT); // accept posts server_http.listen(PORT2); console.log("HTTP server listening on "+HOST+":"+PORT2); 

编辑:

我应该使用.on(event,callback)vs .onlistener(event,callback)吗?

更新:

这不起作用,我把tcp_server中的东西改成了心跳中的所有add_listener。 仍然没有抓住错误,吹了,说我加了太多的听众。

首先,如果你的结构没有对代码的上下文有更多的理解,那么它有点难说。

尝试添加

 socket.on('error', function() { id = socket2id.get(socket); console.log('socket:timeout by id ' + id); removeSocketFromMap(id,socket); socket.destroy(); } 

到net.CreateServer中的匿名函数。 ETIMEDOUT是一个系统调用错误,而node.js只是报告它。 这可能不是由一个典型的“超时”造成的。 你说它是由Hearbeat写的引起的,但是看起来像TCP.read是起源。 它可能是一个半封闭的sockets。

对于ETIMEDOUTexception问题,您是否尝试在stream程本身上监听uncaughtException?

 process.on('uncaughtException', function (err) { console.log('Caught exception: ' + err); }); 

请参阅这里的文档: http : //nodejs.org/docs/latest/api/process.html#event_uncaughtException_