工作完成后的Kuecallback

我的主节点实例派生一个工作进程,它通过IPC接受消息(使用内置的Node process.send()process.on('message'... ),这些对象包含有关要添加到Kue中的新作业的信息然后处理这些工作。

我的主要Node实例调用如下所示:

 worker.send({jobType:'filesystem', operation: 'delete', path: fileDir}); 

和工人实例做这样的事情:

 jobs.create(message.jobType, message).save(); jobs.process('filesystem', function(job, done) { fs.delete(job.data.path, function(err) { done(err); }); }); 

并且工作成功完成。

在作业完成后,如何在主节点实例中获得类似callback的function? 我怎样才能从工人实例返回一些结果到主节点实例?

我相信我已经解决了这个问题,但是如果有人能改进我的解决scheme或提供更好的解决scheme,我就不会解决这个问题。

当您使用Kue在单独的进程中处理作业时,不能简单地在作业完成时执行callback。 这是两个过程之间沟通的一个例子。 我希望能够使用Kue自动提供每个作业的ID(我相信它是在Redis中收到的id),但是app.js在发送给工作人员之前需要知道作业的ID,以便它可以在收到消息时匹配id。

app.js

 var child = require('child_process'); var async = require('async'); var worker = child.fork("./worker.js"); //When a message is received, search activeJobs for it, call finished callback, and delete the job worker.on('message', function(m) { for(var i = 0; i < activeJobs.length; i++) { if(m.jobId == activeJobs[i].jobId) { activeJobs[i].finished(m.err, m.results); activeJobs.splice(i,1); break; } } }); // local job system var newJobId = 0; var activeJobs = []; function Job(input, callback) { this.jobId = newJobId; input.jobId = newJobId; newJobId++; activeJobs.push(this); worker.send(input); this.finished = function(err, results) { callback(err, results); } } var deleteIt = function(req, res) { async.series([ function(callback) { // An *EXAMPLE* asynchronous task that is passed off to the worker to be processed // and requires a callback (because of async.series) new Job({ jobType:'filesystem', title:'delete project directory', operation: 'delete', path: '/deleteMe' }, function(err) { callback(err); }); }, //Delete it from the database function(callback) { someObject.remove(function(err) { callback(err); }); }, ], function(err) { if(err) console.log(err); }); }; 

worker.js

 var kue = require('kue'); var fs = require('fs-extra'); var jobs = kue.createQueue(); //Jobs that are sent arrive here process.on('message', function(message) { if(message.jobType) { var job = jobs.create(message.jobType, message).save(); } else { console.error("Worker:".cyan + " [ERROR] No jobType specified, message ignored".red); } }); jobs.process('filesystem', function(job, done) { if(job.data.operation == 'delete') { fs.delete(job.data.path, function(err) { notifyFinished(job.data.jobId, err); done(err); }); } }); function notifyFinished(id, error, results) { process.send({jobId: id, status: 'finished', error: error, results: results}); } 

https://gist.github.com/winduptoy/4991718