node.js只强制一个线程执行代码

当我启动我的应用程序node app.js ,运行的进程只有1个线程。 不过运行的时间越长,进程的线程就越多。 问题是,当我想执行这样的特定types的代码:

 var io = require('socket.io')(process.env.PORT); 

它失败了,因为信号是从多个线程发送的,因此代码没有成功执行。

简单的testing,如果这样做:

 var io = require('socket.io')(9001); var io = require('socket.io')(9002); var io = require('socket.io')(9003); var io = require('socket.io')(9004); 

它工作正常,但是这个代码:

 var cPort = 9001; setInterval(function() { var io = require('socket.io')(cPort); cPort++; }, 1000 * 60 * 2); // 1 sec * 60 seconds * 2 = 2 minutes interval 

不会被执行,因为2分钟后节点将有许multithreading,他们都试图执行代码 – 因此,你会看到error: address in use

所以,尽pipe运行同一个文件的multithreading进程,我如何强制节点执行此代码只有一次?

06.11.2017编辑—-

澄清问题:

我的意思是在这个问题上,我没有资源的问题,如果我一次启动所有的服务器(例如40个服务器),他们都成功启动,并无限期地工作。 如果我只启动一个服务器,然后运行需要时自动启动的代码,则会发生问题。 那时我总是看到address in use错误,显然地址在代码执行的时候没有使用。 目前我必须在周末手工启动更多的服务器,当有更多的人在一周中的其他几天使用服务和更less的服务器,我想创build一个自动化的系统,启动和closures服务器的人口。

这是服务器的代码开始:

 var cp = require('child_process'), servers = [], per_server = config.per_server, check_servers = function(callback) { for(var i = 0; i < servers.length; i++) { callback(i, servers[i]); } }; this.add_server = function(port) { var server = { port: port, load: 0, process: cp.fork(__dirname + '/../server_instance.js', [], { env: { port: port } }) }; server.process.on('message', function(message) { server.load = message.load; }); servers.push(server); }; this.find_server = function() { var min = Infinity, port = false; check_servers(function(index, details) { if(details.load < min) { min = details.load; port = details.port; } }); return port; }; 

现在,如果我执行controller.add_server() 40行,它将正确启动40个服务器,但如果我这样做:

 var start_port = 3185; setInterval(function() { var min = Infinity; check_servers(function(index, details) { if(details.load < min) { min = details.load; } }); if(min > config.per_server) { controller.add_server(start_port); start_port++; } }, 5000); 

我在第二,第三或第四个服务器创build地址已被使用,随机出现错误。

07.11.2017编辑—-

正如build议我尝试以下库进行端口扫描/发现者:

  • portfinder
  • 端口扫描工具
  • 扫描端口

只有使用第一个我能够启动至less2个服务器,这是我使用的代码:

 setInterval(function() { var min = Infinity; check_servers(function(index, details) { if(details.load < min) { min = details.load; } }); if(min > per_server) { _self.add_server(); } }, 5000); var portfinder = require('portfinder'); portfinder.basePort = 3185; this.add_server = function() { portfinder.getPortPromise() .then((port) => { console.log('port found', port); var server = { port: port, load: 0, process: cp.fork(__dirname + '/../server_instance.js', [], { env: { port: port } }) }; server.process.on('message', function(message) { server.load = message.load; }); servers.push(server); }) .catch((err) => { console.log('error happened'); }); }; 

经过多次testing,看起来像我可以启动2台服务器,然后随机,第三次或第四次尝试崩溃。 其清楚的问题是,随着端口的查找,这个库只告诉我,我知道什么端口打开,然后我再仔细检查,脚本将尝试启动服务器手动netstat -anp | grep PORT netstat -anp | grep PORT命令。

所以清楚的是,问题并不在于find打开的端口,从结果来看,它看起来像节点试图从单个命令多次启动服务器。

跟进编辑—-

添加server_instance.js代码:

 var io = require('socket.io')(process.env.port), connections_current = 0, connections_made = 0, connections_dropped = 0; io.on('connection', function(socket) { connections_current++; connections_made++; // ... service logic here, not relevant (like query db, send data to users etc) socket.on('disconnect', function() { connections_current--; connections_dropped++; }); }); setInterval(function() { process.send({ load: connections_current }); }, 5000); 

08.11.2017编辑—-

我正在testing许多解决scheme来解决这个问题,我观察到了这种情况:

  • 在mac osx上,我可以产生最多3000个连接到服务器的本地testing。 错误没有发生,节点有1 process6 threads的路由器文件。 有了3000个连接,我甚至可以生成200台服务器。

  • 在Linux上的服务器testingDebian,我生成2 mln连接到服务器。 错误总是发生在第3或第4个服务器实例上,当我连接所有的人员节点时,对于路由器文件的10 threads for every process都有6 processes10 threads for every process

这显然是问题的根源,我拥有的能力越多,节点产生的进程越多,尝试启动新服务器时越早会重叠。

最好的解决scheme是在主进程中生成端口号,然后将它们传递给工作进程,以便不相交。

你也可以检查端口是否在使用中,并使用npm模块(如test-port-provider)来获得空闲端口。

您可以使用portfinder软件包来发现系统中可用的networking端口(它开始从端口8000发现)。 用法很简单,如下所示:

 const http = require('http'); const portfinder = require('portfinder'); const pid = process.pid; portfinder.getPort((err, port) => { if (err) throw err; http.createServer((req, res) => { res.end(`Response from server ${pid}.\n`); }).listen(port, () => { console.log(`Server ${pid} running on port ${port}...`); }); }); 

** 编辑 **

似乎相同的端口是从portfinder返回多次,因此EADDRINUSE错误被抛出。 我的怀疑是,当portfinder试图find一个新的端口(因此返回相同的端口)时,端口还没有听,但这似乎与一个简单的for循环启动多个服务器似乎工作正常的事实相矛盾:

 for (let i = 0; i < max_number_of_servers; ++i) { this.add_server(); } 

对代码的简单修正可能是每次调用add_server时增加portfinder的基地址:

 portfinder.basePort = 8000; this.add_server = function() { portfinder.getPortPromise() .then((port) => { portfinder.basePort += 1; var server = { port: port, load: 0, process: cp.fork('server_instance.js', [], { env: { port: port } }) }; server.process.on('message', function(message) { server.load = message.load; console.log("message"); }); servers.push(server); }) .catch((err) => { console.log(err); }); }; 

这段代码似乎工作正常,至less在我的机器上。
无论如何,我build议你考虑一个不同的实现。 如果您发现在最高stream量情况下需要N台服务器正确处理所有请求,则不需要创build较less数量的服务器,然后根据当前stream量dynamic更改,原因如下:

  • 一个新的过程是一个昂贵的操作,可能需要一段时间才能启动和运行。
  • 在高stream量的情况下,您的所有服务器已经准备好,无需额外延迟地为请求提供服务
  • 在低/中等stream量的情况下,您的服务器将不会超负荷,但您在恢复能力和可用性方面会有所收益(如果服务器进程崩溃,无论出于何种原因,还有许多其他服务器可能会提供请求,新的服务器进程需要一些时间)。

您可以使用本地群集模块轻松构build具有自动负载平衡和容错function的stream程分布式服务器应用程序。 默认情况下,clusteer模块执行循环algorithm来在工作人员之间分配传入请求,从而免费获得负载平衡!
一个可能的简单实现(仅用于testing,我已经使用了不同的端口查找程序包 ):

 // main.js const cluster = require('cluster'); const getPort = require('get-port'); const max_servers = 40; // master process if (cluster.isMaster) { for (let i = 0; i < max_servers; ++i) { getPort().then(port => { cluster.fork({port: port}); }) } // detect exit event on workers cluster.on("exit", (worker, errCode) => { console.log(worker); // start new worker in case of crashes if (errCode != 0 && !worker.suicide) { console.log("Worker-server crashed. Starting new worker..."); getPort().then(port => { cluster.fork({port: port}); }) } }); } // worker process --> start server else { require('./server_instance.js'); // [2] } 
 // server_instance.js const http = require("http"); const pid = process.pid; let port = process.env.port; console.log(`Starting server on process ${pid} running on port ${port}...`); let io = require('socket.io')(process.env.port), connections_current = 0, connections_made = 0, connections_dropped = 0; io.on('connection', function(socket) { console.log(`Socket.io on process ${pid} running on port ${port}...`); connections_current++; connections_made++; // ... service logic here, not relevant (like query db, send data to users etc) socket.on('disconnect', function() { connections_current--; connections_dropped++; }); });