每个消息types具有单个消费者的分布式pub / sub

我不知道如果最好在这里或者在Programmers.SE上面询问,那么如果我有这个错误,请迁移。

首先,关于我想要实现的一点。 我有一个node.js应用程序,它从一个来源(一个socket.io客户端)接收消息,然后对消息进行处理,这可能会导致零个或多个消息返回给发件人或其他客户端组。

对于处理,我想本质上只是推送消息到一个队列,然后通过各种消息处理器的方式,它可能会启动自己的项目,最终,运行socket.io的位被通知“嘿,发送这个消息回“

作为一个具体的例子,假设一个用户login到服务中,那么login信息被放置在授权处理器获取它的队列中,然后把消息放回到队列中,说明客户端已被授权。 这可以追溯到连接到客户端的socket.io套接字以及其他可能感兴趣的客户端。 它也可以去其他子系统,可能要做更多的授权处理(查找用户信息,根据他们的数据发送更多的信息给客户端等)。

如果我想要强大的耦合,这将是容易的,但我之前尝试过,它只是一堆意大利面条代码,非常脆弱,我想避免这种情况。 设置中的另一个问题是集群能力,这是真正的问题出现的地方。可能有不止一个授权处理器在运行。 但授权信息只能处理一次。

所以,简而言之,我正在寻找一种模式/技术,让我从本质上来说有一个消息的多个用户组,消息每组只能处理一次。

我想也许有一个处理器的每个实例生成一个独特的名字,将被用作列表中的Reids。 然后这个名字将被注册到某种调度处理程序中,并被放置到该组用户的集合中。 然后,当一条消息到达时,调度会从该集合中抽取一个随机成员,并将其放入该列表中。 虽然看起来似乎有效,但似乎有点过于复杂和脆弱。

核心问题是我从来没有devise过这样的系统,所以我甚至不确定使用或查找的适当条款。 如果任何人都能指出我的正确方向,我会非常感激。

我认为你的描述与https://www.getbridge.com/服务类似&#x3002; 我但它最终根据zeromq写我自己的,它允许您注册服务,req – > < – rec和渠道是pub / sub工作人员。

至于devise方面,我使用了一个客户端 – >代理 – >服务和渠道,这些服务和渠道都是使用自动发现的即插即用方式,您可以将服务注册到打开tcp连接的代理上,以便其他服务器上的代理可以进行通信与那个经纪人团体服务。 然后,内部服务和客户端通过unix套接字或ipc频道连接,这是首选。

我最终围绕redis发布/订阅function做了一些工作。 每种types的消息处理器都会得到一个“组名”,并且该组中可能有多个处理器实例(因此可以运行多个程序实例进行集群)。

发布消息时,我生成一个增量ID,然后将消息存储在具有该ID的string键中,然后发布消息ID。

在接收端,订阅者所做的第一件事就是试图将刚从发布者那里得到的消息ID添加到一组接收到的与sadd消息中。 如果sadd返回0 ,则该消息已被另一个实例sadd ,并且它只是返回。 如果返回1 ,则将完整的消息从string键中拉出并发送给侦听器。

当然,这依赖于redis单线程,我想这将继续如此。

你可能要找的是一个AMQP协议实现,你可以让队列获得自定义交换,并实现一个pub-sub模型。

RabbitMQ – 一个stream行的amqp协议实现,拥有大量的库

它也有node.js库