将大数组传递给节点subprocess

我想要在大型arrays上执行复杂的CPU密集型工作。 理想情况下,我想把这个传递给subprocess。

var spawn = require('child_process').spawn; // dataAsNumbers is a large 2D array var child = spawn(process.execPath, ['/child_process_scripts/getStatistics', dataAsNumbers]); child.stdout.on('data', function(data){ console.log('from child: ', data.toString()); }); 

但是当我这样做,节点给出的错误:

产卵E2BIG

我遇到了这篇文章

因此,将数据传输给subprocess似乎是要走的路。 我的代码现在是:

 var spawn = require('child_process').spawn; console.log('creating child........................'); var options = { stdio: [null, null, null, 'pipe'] }; var args = [ '/getStatistics' ]; var child = spawn(process.execPath, args, options); var pipe = child.stdio[3]; pipe.write(Buffer('awesome')); child.stdout.on('data', function(data){ console.log('from child: ', data.toString()); }); 

然后在getStatistics.js中:

 console.log('im inside child'); process.stdin.on('data', function(data) { console.log('data is ', data); process.exit(0); }); 

process.stdin.on的callback没有达到。 如何在我的子脚本中接收stream?

编辑

我不得不放弃缓冲方法。 现在我发送数组作为消息:

 var cp = require('child_process'); var child = cp.fork('/getStatistics.js'); child.send({ dataAsNumbers: dataAsNumbers }); 

但这只有在dataAsNumbers的长度低于20,000时才有效,否则超时。

有了这么大量的数据,我会考虑使用共享内存,而不是将数据复制到subprocess中(这是使用pipe道或传递消息时发生的情况)。 这样可以节省内存,缩短父进程的CPU时间,并且不太可能遇到一些限制。

shm-typed-array是一个非常简单的模块,看起来适合你的应用程序。 例:

parent.js

 "use strict"; const shm = require('shm-typed-array'); const fork = require('child_process').fork; // Create shared memory const SIZE = 20000000; const data = shm.create(SIZE, 'Float64Array'); // Fill with dummy data Array.prototype.fill.call(data, 1); // Spawn child, set up communication, and give shared memory const child = fork("child.js"); child.on('message', sum => { console.log(`Got answer: ${sum}`); // Demo only; ideally you'd re-use the same child child.kill(); }); child.send(data.key); 

child.js

 "use strict"; const shm = require('shm-typed-array'); process.on('message', key => { // Get access to shared memory const data = shm.get(key, 'Float64Array'); // Perform processing const sum = Array.prototype.reduce.call(data, (a, b) => a + b, 0); // Return processed data process.send(sum); }); 

请注意,我们只是通过IPC向父进程发送一个小“钥匙”,而不是整个数据。 因此,我们节省了大量的记忆和时间。

当然,您可以将'Float64Array' (例如double )更改为应用程序需要的任何types的数组 。 请注意,这个库特别只处理一维types的数组; 但这只是一个小小的障碍。

我也能够重现你所经历的延迟,但也许并不像你那么糟糕。 我用了以下

 // main.js const fork = require('child_process').fork const child = fork('./getStats.js') const dataAsNumbers = Array(100000).fill(0).map(() => Array(100).fill(0).map(() => Math.round(Math.random() * 100))) child.send({ dataAsNumbers: dataAsNumbers, }) 

 // getStats.js process.on('message', function (data) { console.log('data is ', data) process.exit(0) }) 

节点main.js 2.72s用户0.45s系统103%cpu 3.045总数

我生成100个由100个数字组成的元素来模拟你的数据,确保你正在使用message事件process 。 但也许你的孩子更复杂,可能是失败的原因,也取决于你设置的查询超时。


如果你想得到更好的结果,你可以做的是将你的数据分块成多个部分,将被发送到subprocess,并重build形成初始数组。


也有一种可能性是使用第三方库或协议,即使这样做有点多。 你可以看一下messenger.js ,甚至像AMQP队列,这样可以让你在两个进程之间进行通信,并保证subprocess确认消息。 它有几个节点的实现,像amqp.node ,但它仍然需要一些设置和configuration工作。

使用内存caching(如https://github.com/ptarjan/node-cache) ,并让父进程使用某个键存储数组内容,subprocess将通过该键检索内容。

您可以考虑使用操作系统pipe道, 您将在这里find要点作为节点子应用程序的input。

我知道这不是你所要求的,但是你可以使用集群模块(包含在节点中)。 通过这种方式,您可以获得与机器核心一样多的实例,以加快处理速度。 此外,如果在开始处理之前不需要获取所有可用数据,请考虑使用stream。 如果要处理的数据太大,我会将其存储在一个文件中,以便在过程中发生错误时重新进行处理。 这是一个聚类的例子。

 var cluster = require('cluster'); var numCPUs = 4; if (cluster.isMaster) { for (var i = 0; i < numCPUs; i++) { var worker = cluster.fork(); console.log('id', worker.id) } } else { doSomeWork() } function doSomeWork(){ for (var i=1; i<10; i++){ console.log(i) } } 

更多信息在工人问题8534462发送消息。

你为什么要做一个子过程? 通过subprocess发送数据可能花费更多的CPU和实时时间,而不是在同一进程内进行处理。

相反,我build议对于超高效的编码,你可以考虑在工作线程中进行统计计算,这个工作线程和nodejs主进程在同一个内存中运行。

您可以使用NAN编写可以发布到工作线程的C ++代码,然后让该工作线程完成后将结果和事件发回到您的nodejs事件循环。

这样做的好处是,您不需要额外的时间将数据发送到不同的进程,但缺点是您将为线程操作编写一些C ++代码,但NAN扩展应该处理大多数对你来说是艰巨的任务。

对于长时间的工作任务,你可以使用像齿轮人员这样的工作,你可以在工作人员上做繁重的工作,这样就可以设置你需要的工作人员数量,例如我用这种方式进行文件处理,如果我需要规模,工人实例,我也有不同的工作人员为不同的任务,处理zip文件,生成缩略图等,这是好的工作人员可以写在任何语言node.js,Java,Python,并可以轻松地集成在您的项目

 // worker-unzip.js const debug = require('debug')('worker:unzip'); const {series, apply} = require('async'); const gearman = require('gearmanode'); const {mkdirpSync} = require('fs-extra'); const extract = require('extract-zip'); module.exports.unzip = unzip; module.exports.worker = worker; function unzip(inputPath, outputDirPath, done) { debug('unzipping', inputPath, 'to', outputDirPath); mkdirpSync(outputDirPath); extract(inputPath, {dir: outputDirPath}, done); } /** * * @param {Job} job */ function workerUnzip(job) { const {inputPath, outputDirPath} = JSON.parse(job.payload); series([ apply(unzip, inputPath, outputDirPath), (done) => job.workComplete(outputDirPath) ], (err) => { if (err) { console.error(err); job.reportError(); } }); } function worker(config) { const worker = gearman.worker(config); if (config.id) { worker.setWorkerId(config.id); } worker.addFunction('unzip', workerUnzip, {timeout: 10, toStringEncoding: 'ascii'}); worker.on('error', (err) => console.error(err)); return worker; } 

一个简单的index.js

 const unzip = require('./worker-unzip').worker; unzip(config); // pass host and port of the Gearman server 

我通常运行PM2的工人

与代码的集成非常简单。 就像是

 //initialize const gearman = require('gearmanode'); gearman.Client.logger.transports.console.level = 'error'; const client = gearman.client(configGearman); // same host and port 

只是将工作添加到传递函数名称的队列中

 const taskpayload = {inputPath: '/tmp/sample-file.zip', outputDirPath: '/tmp/unzip/sample-file/'} const job client.submitJob('unzip', JSON.stringify(taskpayload)); job.on('complete', jobCompleteCallback); job.on('error', jobErrorCallback);