为什么descisionTask不从AWS-SWF服务(SWF)接收任何任务?

我正在使用Nodejs作为后端。 我试过这个 npm包来创build一个简单的工作stream程(AMAZON-SWF)。 该软件包有一个示例文件夹,其中包含我放在我的节点项目中的文件,以便我了解它是如何工作的。

问题是Decider没有收到来自SWF服务器的任何任务。 因为我的工作stream程从未运行。 是否有一些configuration问题。 请指出我做了什么错误。

以下是快速参考的代码。 代码唯一的变化是版本号的变化和在域名中的变化。 否则,它是代码相同的代码,你可以在这里find。

以下是决定代码。

var swf = require('./index'); var myDecider = new swf.Decider({ "domain": "test-domain", "taskList": {"name": "my-workflow-tasklist"}, "identity": "Decider-01", "maximumPageSize": 100, "reverseOrder": false // IMPORTANT: must replay events in the right order, ie. from the start }); myDecider.on('decisionTask', function (decisionTask) { console.log("Got a new decision task !"); if(!decisionTask.eventList.scheduled('step1')) { decisionTask.response.schedule({ name: 'step1', activity: 'simple-activity' }); } else { decisionTask.response.stop({ result: "some workflow output data" }); } decisionTask.response.respondCompleted(decisionTask.response.decisions, function(err, result) { if(err) { console.log(err); return; } console.log("responded with some data !"); }); }); myDecider.on('poll', function(d) { //console.log(_this.config.identity + ": polling for decision tasks..."); console.log("polling for tasks...", d); }); // Start polling myDecider.start(); /** * It is not recommanded to stop the poller in the middle of a long-polling request, * because SWF might schedule an DecisionTask to this poller anyway, which will obviously timeout. * * The .stop() method will wait for the end of the current polling request, * eventually wait for a last decision execution, then stop properly : */ process.on('SIGINT', function () { console.log('Got SIGINT ! Stopping decider poller after this request...please wait...'); myDecider.stop(); }); 

以下是活动代码:

 /** * This simple worker example will respond to any incoming task * on the 'my-workflow-tasklist, by setting the input parameters as the results of the task */ var swf = require('./index'); var activityPoller = new swf.ActivityPoller({ domain: 'test-domain-newspecies', taskList: { name: 'my-workflow-tasklist' }, identity: 'simple-activity' }); activityPoller.on('error',function() { console.log('error'); }); activityPoller.on('activityTask', function(task) { console.log("Received new activity task !"); var output = task.input; task.respondCompleted(output, function (err) { if(err) { console.log(err); return; } console.log("responded with some data !"); }); }); activityPoller.on('poll', function(d) { console.log("polling for activity tasks...", d); }); activityPoller.on('error', function(error) { console.log(error); }); // Start polling activityPoller.start(); /** * It is not recommanded to stop the poller in the middle of a long-polling request, * because SWF might schedule an ActivityTask to this poller anyway, which will obviously timeout. * * The .stop() method will wait for the end of the current polling request, * eventually wait for a last activity execution, then stop properly : */ process.on('SIGINT', function () { console.log('Got SIGINT ! Stopping activity poller after this request...please wait...'); activityPoller.stop(); }); 

以下是注册的代码:

 var awsswf = require('./index'); var swf = awsswf.createClient(); /** * Register the domain "test-domain" */ swf.registerDomain({ name: "test-domain-newspecies", description: "this is a just a test domain", workflowExecutionRetentionPeriodInDays: "3" }, function (err, results) { if (err && err.code != 'DomainAlreadyExistsFault') { console.log("Unable to register domain: ", err); return; } console.log("'test-domain-newspecies' registered !") /** * Register the WorkflowType "simple-workflow" */ swf.registerWorkflowType({ domain: "test-domain-newspecies", name: "simple-workflow", version: "2.0" }, function (err, results) { if (err && err.code != 'TypeAlreadyExistsFault') { console.log("Unable to register workflow: ", err); return; } console.log("'simple-workflow' registered !") /** * Register the ActivityType "simple-activity" */ swf.registerActivityType({ domain: "test-domain-newspecies", name: "simple-activity", version: "2.0" }, function (err, results) { if (err && err.code != 'TypeAlreadyExistsFault') { console.log("Unable to register activity type: ", err); return; } console.log("'simple-activity' registered !"); }); }); }); 

以下是启动工作stream执行的代码:

 var swf = require('./index'); var workflow = new swf.Workflow({ "domain": "test-domain-newspecies", "workflowType": { "name": "simple-workflow", "version": "2.0" }, "taskList": { "name": "my-workflow-tasklist" }, "executionStartToCloseTimeout": "1800", "taskStartToCloseTimeout": "1800", "tagList": ["example"], "childPolicy": "TERMINATE" }); var workflowExecution = workflow.start({ input: "any data ..."}, function (err, runId) { if (err) { console.log("Cannot start workflow : ", err); return; } console.log("Workflow started, runId: " +runId); }); 

以下是index.js文件

 var basePath = "../node_modules/aws-swf/lib/"; exports.AWS = require('aws-swf').AWS; exports.AWS.config.loadFromPath(__dirname + '/../config/awsConfig.json'); exports.createClient = require(basePath+"swf").createClient; exports.Workflow = require(basePath+"workflow").Workflow; exports.WorkflowExecution = require(basePath+"workflow-execution").WorkflowExecution; exports.ActivityPoller = require(basePath+"activity-poller").ActivityPoller; exports.ActivityTask = require(basePath+"activity-task").ActivityTask; exports.Decider = require(basePath+"decider").Decider; exports.DecisionTask = require(basePath+"decision-task").DecisionTask; exports.EventList = require(basePath+"event-list").EventList; exports.DecisionResponse = require(basePath+"decision-response").DecisionResponse; exports.Poller = require(basePath+"poller").Poller; 

运行此代码的方式是同时打开三个terminal。 然后我在各自的terminal执行下面的命令。

 activity node <activity-file-name> decider node <decider-file-name> start and register I run in the same terminal. node <register-file-name> node <start-file-name> 

在使用"test-domain"的决策者中,你会发现,在其他代码中,你正在使用"test-domain-newspecies"

如果域"test-domain"未注册,则在轮询决策任务时应得到UnknownResourceFault错误。

Interesting Posts