我使用的是带有python 2.6、rabbitmq后端和django的celery 2.4.1。我希望我的任务能够正确清理,如果工人关闭。据我所知,您不能提供任务析构函数,所以我尝试连接到worker_shutdown信号。
注意:AbortableTask只适用于数据库后端,所以我不能使用它。
from celery.signals import worker_shutdown
@task
def mytask(*args)
obj = DoStuff()
def shutdown_hook(*args):
print "Worker shutting down"
# cleanup nicely
obj.stop()
worker_shutdown.connect(shutdown_hook)
# blocking call that monitors a network connection
obj.stuff()但是,关闭钩子永远不会被调用。Ctrl-C‘’ing worker并不会终止任务,我必须从shell中手动终止它。
因此,如果这不是正确的方法,我如何允许任务正常关闭?
发布于 2011-11-23 00:47:37
worker_shutdown仅由MainProcess发送,而不是子池工作进程。所有worker_*信号except for worker_process_init,请参阅MainProcess。
但是,关闭钩子永远不会被调用。Ctrl-C‘’ing worker并不会终止任务,我必须从shell中手动终止它。
在正常的(热的)关机状态下,工人永远不会终止任务。即使一项任务需要几天的时间才能完成,直到它完成,工人也不会完全关机。您可以将--soft-time-limit或--time-limit设置为,以告知实例何时可以终止任务。
因此,要添加任何类型的流程清理流程,您首先需要确保任务可以实际完成。因为在此之前不会调用清理。
要将清理步骤添加到池工作进程,您可以使用如下命令:
from celery import platforms
from celery.signals import worker_process_init
def cleanup_after_tasks(signum, frame):
# reentrant code here (see http://docs.python.org/library/signal.html)
def install_pool_process_sighandlers(**kwargs):
platforms.signals["TERM"] = cleanup_after_tasks
platforms.signals["INT"] = cleanup_after_tasks
worker_process_init.connect(install_pool_process_sighandlers)https://stackoverflow.com/questions/8138642
复制相似问题