序列化处理asynchronous处理的消息队列

如何处理通过函数传递的一系列消息,按照消息的处理顺序来处理消息的asynchronous操作?

一个例子:

var processMessage = function(msg) { switch (msg.action) { case "doFoo": this.processFoo(()=> { // `foo` is done processing }); break; case "doBar": this.processBar(()=> { // `bar` is done processing }); break; case "doBaz": this.processBaz(()=> { // `baz` is done processing }); break; } } 

笔记

  • 我当然可以推送一个数组中的项目,然后使用async eachSeries来处理消息数组
  • 然而,消息不断地来,因此将数组填充更多的项目进行处理,导致处理动摇

这种问题有没有事实上的标准解决scheme?

这是一个总体scheme:

  1. 当新消息到达时,检查一个标志,看看你是否已经在处理消息。 如果标志已设置,只需将该消息添加到队列中。
  2. 如果标志未设置,请检查队列,如果队列中有消息,则从队列中删除该消息。
  3. 当您开始处理该消息时,请设置一个标志,指示您现在正在处理消息。
  4. 启动处理消息的asynchronous操作
  5. 当发生callback信号表示完成asynchronous消息时,请清除该标志以指示您正在处理中,并recursion调用一个从第2步开始的函数。

您在两点触发处理新消息。 首先,当一个新的消息到达,而你还没有处理一个消息,其次是其他消息的处理完成时,你检查是否有任何东西在你正在处理时被添加到队列中。

您维护一个inProcessingtypes标志,以便在另一个消息已处于正在处理中(这会强制执行您请求的串行执行)时不会无意中开始处理传入消息。

你必须严格处理错误条件,所以标志永远不会卡住,所以你的队列处理不会停滞。

在伪代码中(假设这些是包含数组作为队列的队列对象上的方法):

 addQueue: function(msg) { if (!this.inProcess) { // not currently processing anything so just process the message this.processMessage(msg); } else { this.queue.push(msg); } }, processMessage: function(msg, completeFn) { var self = this; // must set this flag before going async self.inProcess = true; // asynchronously process this message someAsyncProcessing(msg, function(err) { self.inProcess = false; if (completeFn) { completeFn(err); } // see if anything else is in the queue to process if (self.queue.length) { // pull out oldest message and process it var msg = self.queue.shift(); self.processMessage(msg); } }); }