在节点模块中使用群集

更新:即使这种情况是不现实的,根据意见,我仍然感兴趣的是如何可以编写一个模块,利用集群,而不是每次重新运行父进程。


我正在尝试编写一个名为mass-request的Node.js模块,通过将其分发到subprocess来加速大量的HTTP请求。

我的希望是,在外面,它是这样工作的。

 var mr = require("mass-request"), scraper = mr(); for (var i = 0; i < my_urls_to_visit.length; i += 1) { scraper.add(my_urls_to_visit[i], function(resp) { // do something with response } } 

为了开始,我为群发请求模块添加了一个框架。

 var cluster = require("cluster"), numCPUs = require("os").cpus().length; module.exports = function() { console.log("hello from mass-request!"); if (cluster.isMaster) { for (var i = 0; i < numCPUs; i += 1) { var worker = cluster.fork(); } return { add: function(url, cb) {} } } else { console.log("worker " + process.pid + " is born!"); } } 

然后我在testing脚本中testing它:

 var m = mr(); console.log("hello from test.js!", m); 

我希望看到“来自群众的请求! logging了四次(的确如此)。 令我惊奇的是,我还看到了四次来自test.js的“hello”。 显然我不明白cluster.fork()是如何工作的。 是否重新运行整个过程,而不仅仅是第一次调用它的function呢?

如果是这样的话,那么如何在一个模块中利用集群而不会使用那个混乱的多进程逻辑来使用那个模块呢?

我相信你要找的是setupMaster

从文档:

cluster.setupMaster([设置])

  • 设置对象
    • exec工作文件的string文件path。 (缺省值= process.argv [1])
    • args传递给worker的数组string参数。 (默认值= process.argv.slice(2))
    • silent布尔值是否将输出发送到父级的stdio。 (默认=假)

setupMaster用来改变默认的'fork'行为。 一旦被调用,设置将出现在cluster.settings中

通过使用exec属性,您可以让您的工作人员从不同的模块启动。

重要提示:正如文档所述,这只能被调用一次。 如果你依赖于你的模块的这种行为,那么调用者不能使用cluster或整个事情崩溃。

例如:

index.js

 var cluster = require("cluster"), path = require("path"), numCPUs = require("os").cpus().length; console.log("hello from mass-request!"); if (cluster.isMaster) { cluster.setupMaster({ exec: path.join(__dirname, 'worker.js') }); for (var i = 0; i < numCPUs; i += 1) { var worker = cluster.fork(); } return { add: function (url, cb) { } } } else { console.log("worker " + process.pid + " is born!"); } 

worker.js

 console.log("worker " + process.pid + " is born!"); 

产量

 node index.js hello from mass-request! worker 38821 is born! worker 38820 is born! worker 38822 is born! worker 38819 is born! 

虽然node.js的asynchronous特性确实令人敬畏,但它仍然在单个事件循环中的服务器上的单个线程中运行。 使用集群multithreading处理node.js应用程序,可以将应用程序的subprocess分离到各自的线程中,从而使您能够更好地使用多核服务器。 我已经build立了一个游戏服务器体系结构,它使用了cluster和zmq(ZeroMQ)来实现multithreading,并使这些进程能够轻松地通过各种渠道来回发送消息。 我已经将这个架构简化为下面的例子,希望能够帮助说明如何将multithreading的node.js放在一起。 我道歉,如果有点粗糙,那是几年前,当时我是相对较新的节点;)

理想情况下,你不想在一个脚本中嵌套主/子的所有东西,但我想这是让你复制/粘贴/运行的最简单的方法:)

就像你在你的评论中提到的那样,我给出了一个很好的集群例子,但是没有一个适合你的具体用例,就像派发所有东西一样。 我没有太多的时间,所以我调整了我的例子,以便能够很快地满足您的需求。 给这个镜头:

大众request.js

 var cluster = require('cluster'); var zmq = require('zmq'); module.exports = { _childId : null, _urls : [], _threadCount : 1, _readyThreads : 0, _callbacks : {}, zmqReceive : null, //the socket we receive on for this thread zmqMaster : null, //the socket to the master zmqChildren : {}, //an object storing the sockets for the children setThreads : function( threadCount ) { this._threadCount = threadCount; }, add : function( url , cb ) { this._urls.push( {url: url, cb : cb } ); }, run : function() { if( cluster.isMaster ) { this._masterThread(); } else { this._childThread(); } }, _masterThread : function() { console.log( 'Master Process Starting Up' ); this.zmqReceive = zmq.socket('pull').bindSync( 'ipc://master.ipc' ); //bind handler for messages coming into this process using closure to allow us to access the massrequest object inside the callback ( function( massRequest ) { this.zmqReceive.on( 'message' , function( msg ) { msg = JSON.parse(msg); //was this an online notification? if( msg && msg.status == 'Online' ) { massRequest._threadReady(); return; //we're done } if( msg && msg.html ) { //this was a response from a child, call the callback for it massRequest._callbacks[ msg.sender ].call( massRequest , msg.html ); //send the child another URL massRequest._sendUrlToChild( msg.sender ); } } ); }).call( this , this ); //fork 4 child processes and set up the sending sockets for them for( var i=0; i < this._threadCount; ++i ) { //set up the sending socket this.zmqChildren[i] = zmq.socket('push').connect( 'ipc://child_' + i + '.ipc' ); //fork the process and pass it an id cluster.fork( { _childId:i } ); } }, _sendUrlToChild : function( child ) { //if there's no urls left, return (this would also be a good place to send a message to the child to exit gracefully) if( !this._urls.length ) return; //grab a url to process var item = this._urls.pop(); //set the callback for the child this._callbacks[child] = item.cb; this.zmqChildren[child].send( JSON.stringify( { url:item.url } ) ); }, _processUrls : function() { for( var i=0; i < this._threadCount; ++i ) { this._sendUrlToChild( i ); } }, _threadReady : function() { if( ++this._readyThreads >= this._threadCount ) { //all threads are ready, send out urls to start the mayhem console.log( 'All threads online, starting URL processing' ); this._processUrls(); } }, _childProcessUrl : function( url ) { console.log( 'Child Process ' + this.childId + ' Handling URL: ' + url ); //do something here to scrape your content however you see fit var html = 'HTML'; this.zmqMaster.send( JSON.stringify( { sender:this.childId, html:html } ) ); }, _childThread : function() { //get the child id that was passed from cluster this.childId = process.env._childId; console.log( 'Child Process ' + this.childId + ' Starting Up' ); //bind the pull socket to receive messages to this process this.zmqReceive = zmq.socket('pull').bindSync( 'ipc://child_' + this.childId + '.ipc' ); //bind the push socket to send to the master this.zmqMaster = zmq.socket('push').connect('ipc://master.ipc'); //bind handler for messages coming into this process ( function( massRequest ) { this.zmqReceive.on( 'message' , function( msg ) { msg = JSON.parse(msg); console.log( 'Child ' + this.childId + ': ' + msg ); //handle the url if( msg && msg.url ) massRequest._childProcessUrl( msg.url ); } ); }).call( this , this ); //let the master know we're done setting up this.zmqMaster.send( JSON.stringify({sender:this.childId,status:'Online'}) ); }, } 

demo.js

 var mr = require( './mass-request.js' ); mr.setThreads( 4 ); mr.add( 'http://foo.com' , function( resp ) { console.log( 'http://foo.com is done' ); } ); mr.add( 'http://bar.com' , function( resp ) { console.log( 'http://bar.com is done' ); } ); mr.add( 'http://alpha.com' , function( resp ) { console.log( 'http://alpha.com is done' ); } ); mr.add( 'http://beta.com' , function( resp ) { console.log( 'http://beta.com is done' ); } ); mr.add( 'http://theta.com' , function( resp ) { console.log( 'http://theta.com is done' ); } ); mr.add( 'http://apples.com' , function( resp ) { console.log( 'http://apples.com is done' ); } ); mr.add( 'http://oranges.com' , function( resp ) { console.log( 'http://oranges.com is done' ); } ); mr.run(); 

把它们放在同一个文件夹中,运行node demo.js

我也应该指出,由于这个基础是从我使用[0MQ] [ http://zeromq.org/%5D的其他项目中抽取的,所以需要将其与%5Bnode.js模块一起安装%5D [ https://github.com/JustinTulloss/zeromq.node%5D npm install zmq ,显然是集群模块。 当然,您可以将ZMQ部件replace为任何其他您需要的进程间通信方法。 这恰好是我熟悉和使用的一个。

简要概述:主线程AKA调用run()方法的脚本将启动X个子项(可通过调用setThreads来设置)。 这些孩子在完成初始化时通过ZeroMQ套接字向主线程报告。 一旦所有的线程都准备好了,主脚本会把这些url分派给这些孩子,这样他们就可以运行并获取HTML。 他们将HTML返回给主机,并将其传递给该URL的相应​​callback函数,然后将另一个URL分发给子脚本。 虽然这不是一个完美的解决scheme,但callback函数仍然会在主(主)线程中遇到瓶颈,因为您不能轻易将它们移动到另一个线程。 这些callback可能包含闭包/variables/等,如果没有某种对象共享机制,可能无法在父线程之外正常工作。

任何人,如果你在这里启动我的小演示,你会看到4个线程“处理”url(为简单起见,它们实际上并没有加载url)。

希望有帮助;)