我将如何阻止一个rabbitmq工作者的中间脚本?
如何构build我的应用程序node.js服务器将数据发送给一个python worker,然后启动一个不间断的脚本。 稍后使node.js服务器能够停止脚本并使工作者可以再次从队列中接收的方法是什么?
制止消费者的秘密通常是有毒的信息。
有些是通过多个队列来完成的,有些通过一个专门的消息被下游理解为停止消费(后者只适用于单个工人)。
基本上,当你的python脚本看到毒讯息,它立即closures。 您可能希望在单独的队列中使用此function的原因是,如果您有多个工作人员,则可能需要广播该消息,因此每个工作人员需要一个专用队列。
至于启动脚本,最好的办法就是让工作人员继续运行,让他们等待重新启动信息。
否则,你可以让一个观察者工作人员或控制器来处理消息和控制进程,比如重启python脚本,但是这很复杂。
我相信,如果你正在寻找一些预感的东西,芹菜会做很多。
编辑 – 中毒消息可能已经意味着AMQP中的其他东西。 我的意思是你自己的毒药信息。
既然你想在运行的时候杀掉一些东西,你就必须以广播的方式去做。
对于每个消费者,你将实际上每个Python脚本有两个消费者。
一个接受实际数据,另一个接收命令来启动和停止数据消费者。
如果你想开始你可以有一个广播消息(JSON,但你可以使用任何你喜欢的格式,或者你也可以使用AMQP标题属性)):
{ "command" : "start" }
当你想停止你的制作人可能会发送一条消息,如:
{ "command": "stop" }
你的消费者callback将看到message.command == 'stop'
。
一旦你看到这个消息,你可以调用实际上正在接收数据的消费者的 channel.basicCancel(consumerTag)
,并且实际上正在执行任何正在运行的消息。 目前正在运行的任何杀戮程序都高度依赖于你的设置,所以我不能真正进入,因为它可能是一个单独的进程,或者如果使用某个事件引擎,它可能是一个单独的线程或事件。 杀死长时间运行的进程本质上是非常棘手的。
如果使用话题交换,每个python脚本都应该有一个命令使用者的专用队列,或者你可以使用扇出(aka broadcast)交换。
数据使用者将与所有其他脚本共享一个队列。
您可以考虑使用像芹菜这样的软件包来处理RabbitMQ作为您的经纪商。 Celery有一个可以用来终止任务的revoke()
函数。
>> from celery.task.control import revoke >> revoke(task_id, terminate=True)
由于听起来您想要停止已经处理的任务,因此将terminate选项设置为True的撤销请求将会终止正在执行的任务。 这在默认情况下是错误的。
供参考: http : //docs.celeryproject.org/en/latest/userguide/workers.html#commands