Socket.io 1.0.6,nodejs CPU超过100%

我正在创build一个Web服务器来pipe理对活动数据的订阅。 这些数据通过TCPstream式传输到nodejs,然后stream式传输到使用Socket.io通过websockets订阅特定主题的客户端。

这是工作stream程。

客户端发送特定主题的订阅请求。 Nodejs应用程序将代理这个订阅到Java应用程序,然后开始通过TCP将数据推送到nodejs。 Java应用程序为每个可以侦听的新闻types启动9个不同的套接字连接,每个套接字一个类别

值得一提的是,这个数据可能是非常巨大的,因为当客户端订阅他们第一次获得历史数据(高达45MB),并且每个后续的推送包含一个小的增量。

我所遇到的问题是在初始推送nodejs应用程序中使用了超过100%的CPU。 起初,我认为这个问题是parsing正在发送给我的数据。 经过几天的debugging,我发现当注释掉socket.emit(data.topic, data) CPU会下降到1%-3%。 这使我相信,我遇到的问题是与socket.io。

我在这里误用socket.io吗? 有什么替代scheme可以推动不同的主题/渠道? 我的代码有什么问题吗?

感谢您的时间!

 //needed for REST subscribe services var app = require('express')() //needed for HTTP server , server = require('http').createServer(app) //WebSocket library (TODO see how to implement this) //, sockjs = require('sockjs') //backend subscriptions , subUtil = require('./subscriptions/subscriptionManager') , bodyParser = require('body-parser') //Using Socket.io for now , io = require('socket.io').listen(server) , config = require('./settings/config') , tcpServer = require('./servers/tcpServer') //logger , logger = require('./logging/logger'); //Parser used for parsing JSON out of HTTP body app.use(bodyParser.json()); // error handlers app.use(function(err, req, res, next){ logger.log.error(err.stack); res.send(500, 'Something broke!'); }); app.post('/subscribe/worldnews', subUtil.subWorldNews); app.post('/subscribe/sportnews', subUtil.subSoirtNews); app.post('/subscribe/sciencenews', subUtil.subScienceNews); app.post('/subscribe/townnews', subUtil.subTownNews); app.post('/subscribe/countrynews', subUtil.subCountryNews); app.post('/subscribe/regionnews', subUtil.subRegionNews); app.post('/subscribe/weathernews', subUtil.subWeatherNews); app.post('/subscribe/hotnews', subUtil.subHotNews); app.post('/subscribe/relatednews', subUtil.subRelatedNews); app.post('/unsubscribe', subUtil.unsubscribe); //Start tcp server tcpServer.startServer(); //Set max number of listeners, 100 users can connect tcpServer.setMaxListeners(100); //Create Socket.IO connection io.sockets.on('connection', function (socket) { //handling incoming backend data var incomingHandler = function(data) { //send data to the user socket.emit(data.topic, data); }; //Data is comming from backend tcpServer.on('incoming', incomingHandler); //CLient has disconnected socket.on('disconnect', function (discData) { logger.log.info('User has disconnected'); //remove listener tcpServer.removeListener('incoming', incomingHandler); //unsubscribe all topics with this token subUtil.unsubscribeAll(token); }); }); //HTTP server listening server.listen(config.httpPort); logger.log.info('TCP connection on port ' + config.tcpPort + '; HTTP connection on the port ' + config.httpPort); 

/ ************* TCP服务器模块是下面************************* /

 //import utils because it is easier to do inheritance var util = require('util') //import core events , EventEmitter = require('events').EventEmitter , config = require('../settings/config') //needed for TCP connection , net = require('net') , JsonSocket = require('json-socket') , logger = require('../logging/logger'); var TcpServer = function() { EventEmitter.call(this); } //inherit from events util.inherits(TcpServer, EventEmitter); TcpServer.prototype.startServer = function(incomingData) { //make a reference to postRequest function(important) var self = this; var tcpServer = net.createServer(function(sock) { logger.log.info('backend connected'); //Handle socket error sock.on('error', function(err) { logger.log.error('There was a socket error :( \r\n' + err.stack); }); sock.on('data', function(data) { logger.log.info('Incoming data'); _onData(data, self) }) var _contentLength = null; var _buffer = ''; var _onData = function(data, ref) { data = data.toString(); try { _handleData(data, ref); } catch (e) { logger.log.error(e); } } var _handleData = function(data, ref) { _buffer += data; if (_contentLength == null) { var i = _buffer.indexOf('#'); //Check if the buffer has a #, if not, the end of the buffer string might be in the middle of a content length string if (i !== -1) { var rawContentLength = _buffer.substring(0, i); _contentLength = parseInt(rawContentLength); if (isNaN(_contentLength)) { _contentLength = null; _buffer = ''; var err = new Error('Invalid content length supplied ('+rawContentLength+') in: '+ _buffer); err.code = 'E_INVALID_CONTENT_LENGTH'; throw err; } _buffer = _buffer.substring(i+1); } } if (_contentLength != null) { if (_buffer.length == _contentLength) { _handleMessage(_buffer, ref); } else if (_buffer.length > _contentLength) { var message = _buffer.substring(0, _contentLength); var rest = _buffer.substring(_contentLength); _handleMessage(message, ref); _onData(rest, ref); } } } var _handleMessage = function(data, ref) { _contentLength = null; _buffer = ''; var message; try { message = JSON.parse(data); } catch (e) { var err = new Error('Could not parse JSON: '+e.message+'\nRequest data: '+data); err.code = 'E_INVALID_JSON'; throw err; } message = message || {}; ref.emit('incoming', message); } }); //Handle server error tcpServer.on('error', function(err) { logger.log.error('There was a connection err \r\n' + err.stack); }); //listen fot backend data on port tcpServer.listen(config.tcpPort); } module.exports = new TcpServer();