带有node.jspipe道接收器的ZeroMQ会在一段时间后停止接收消息

我一直在试图build立一个呼吸机/工作者/汇的模式,以抓取页面,但我从来没有超过testing阶段。 我的设置的一个特殊性在于,水槽与呼吸机的生命过程相同。 所有节点使用ipc:// transport。 目前只有testing消息被交换。 呼吸机发送任务,工作人员收到它们,然后等待发送确认信息。

症状 :一段时间后(通常less于5分钟),即使呼吸机继续发送任务,工作人员仍然接收并发送确认信息,接收机将停止接收确认信息。

我知道发送的确认信息,因为如果我重新启动我的接收器,启动时会得到所有丢失的消息。

我认为ZeroMQ处理自动重新连接。

通风机/宿

var push = zmq.socket('push'); var sink = zmq.socket('pull'); var pi = 0; setInterval(function() { push.send(['ping', pi++], zmq.ZMQ_SNDMORE); push.send('end'); }, 2000); push.bind('ipc://crawl.ipc'); sink.bind('ipc://crawl-sink.ipc'); sink.on('message', function() { var args = [].slice.apply(arguments).map(function(e) {return e.toString()}); console.log('got message', args.join(' ')); }); 

worker.js

 var pull = zmq.socket('pull'); var sink = zmq.socket('push'); sink.connect(opt.sink); pull.connect(opt.push); pull.on('message', function() { var args = [].slice.apply(arguments).map(function(e) {return e.toString()}); console.log('got job ', args.join(' ')); setTimeout(function() { console.log('job done ', args.join(' ')); sink.send(['job done', args.join(' ')]); }, Math.random() * 5 * 1000); }); 

编辑我试图将接收器移动到另一个进程,它似乎工作。 然而,我真的很喜欢它在同一个进程中生活,并且在处理每个进程的多个zmq套接字时,我观察到了类似的行为,无论使用哪种模式

编辑我使用这个模块https://github.com/JustinTulloss/zeromq.node

我不一定期望这个答案被接受,但我把它放在这里作为参考。 有一个非常忠实的节点模块叫做Axon ,它受ZeroMQ的启发。

  • Axon没有编译的依赖关系,并重新创build与ZeroMQ相同的套接字types。
  • Axon也build立在pub / sub sockettypes上来创build一个networking事件发射器。
  • 最后,ZMQ的req / rep套接字不能与Node.js一起工作,因为ZMQ期望回复同步发生。 作为本地Node,Axon库正确处理了req / rep模式。

注意: ZMQ和Axon不可互操作。

不知道你是否使用基本的AMQP客户端,或者使用它的包,我有类似的问题与RabbitMQ。 实际的过程仍在运行(setInterval工程)

我从主进程通过cluster.fork运行我的服务/工作人员…在主进程中有监听程序,重新启动退出时的工人/服务…在我的工人内部我有​​一个setInterval每隔X秒,如果在那段时间没有工作,我有我的工作者process.exit(主stream程侦听器将启动一个新的叉)。 这对我来说足够的韧性了。 通过让几个工作人员跑(听队列),工作仍然完成。

另一个build议,我一直在考虑切换到轴突,因为我所有的MQ接口正在通过节点。 我的其他系统通过NodeJS驱动的API服务进行连接。 对于这个问题,通过一个API服务来暴露你可能需要的东西可能不会太难。