kue以“群集”模式多次处理预定作业

我已经设置了kue来运行Cluster模块,该模块为每个CPU核心提供了一个kue的subprocess。

当调度程序插入every键到redis时,一切似乎都是按顺序的 – every运行只有一个键集。

但是,当触发作业处理的时间到来时,所有subprocess(工作者)开始处理处理逻辑,这导致作业具有由单个“调度程序”条目触发的多个实例。

当以编程方式触发kue的新作业时,似乎不会发生此症状,使用kue API时也不会发生这种情况。

请指教。

主引导代码

 var cluster = require('cluster'); var numCPUs = require('os').cpus().length; if (cluster.isMaster) { require('./init.js'); for (var i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('online', function (worker) { console.log('Worker ' + worker.process.pid + ' is online'); }); cluster.on('exit', function (worker, code, signal) { console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal); console.log('Starting a new worker'); cluster.fork(); }); require('./scheduler.js'); } else { require("./job_types"); } 

init.js

 /// Module Dependency var cors = require('cors'); var kue = require('kue-scheduler'); var express = require('express'); var bodyParser = require('body-parser'); var config = require('./configs/config'); var redis = require("redis"); var client = redis.createClient(); var jobs = kue.createQueue(); require('./routes'); // Clearing redis for clean startup console.log('Clearing old Redis data...'); client.flushall(); /// Webserver var corsOptions = {origin: '*'}; var app = express(); app.use(cors(corsOptions)); app.options('*', cors(corsOptions)); app.use(bodyParser.json()); app.use(kue.app); app.listen(config.env.port, function () { var host = config.env.host; var port = config.env.port; console.log('[' + process.pid + '] Monitoring kue listening at http://%s:%s', host, port); }); // Handling safe shutdown process.once('SIGTERM', function (sig) { kue.shutdown(5000, function (err) { console.log('[' + process.pid + '] Kue shutdown: ', err || ''); process.exit(0); }); }); process.on('uncaughtException', function (err) { console.log('[' + process.pid + '] ' + err); console.log('[' + process.pid + '] ' + err.stack); }); 

scheduler.js

 var scheduler = require('kue-scheduler'); var q = scheduler.createQueue(); // Set specific job scheduling here q.every('1 minutes', q.createJob('getSocialEntities').attempts(3).priority('normal')); // General scheduler event handling // Uncomment for debug q.on('already scheduled', function (job) { console.log('[' + process.pid + '] job is already scheduled: ' + job.type + ' (' + job.id + ')'); }); q.on('schedule success', function (job) { console.log('[' + process.pid + '] job scheduled: ' + job.type + ' (' + job.id + ')'); }); q.on('schedule error', function (error) { console.error('[' + process.pid + '] failed scheduling job'); console.error(error); }); 
  • job_types – 包含处理所有工作逻辑本身的kue jobs.process('job type',...)方法。