首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用current_app.send_task时,芹菜任务路由不起作用

使用current_app.send_task时,芹菜任务路由不起作用
EN

Stack Overflow用户
提问于 2015-01-15 22:29:56
回答 1查看 3.9K关注 0票数 1

我在使用current_app.send_task时遇到了芹菜队列路由问题

我有两个工作进程(每个队列一个)

代码语言:javascript
复制
python manage.py celery worker -E -Q priority --concurrency=8 --loglevel=DEBUG
python manage.py celery worker -Q low --concurrency=8 -E -B --loglevel=DEBUG

我在celeryconfig.py文件中定义了两个队列:

代码语言:javascript
复制
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

from django.core.exceptions import ImproperlyConfigured

from celery import Celery
from django.conf import settings

try:
    app = Celery('proj', broker=getattr(settings, 'BROKER_URL', 'redis://'))
except ImproperlyConfigured:
    app = Celery('proj', broker='redis://')

app.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],
    CELERY_RESULT_SERIALIZER='json',
    CELERY_RESULT_BACKEND='djcelery.backends.database:DatabaseBackend',
    CELERY_DEFAULT_EXCHANGE='tasks',
    CELERY_DEFAULT_EXCHANGE_TYPE='topic',
    CELERY_DEFAULT_ROUTING_KEY='task.priority',
    CELERY_QUEUES=(
        Queue('priority',routing_key='priority.#'),
        Queue('low', routing_key='low.#'),
    ),
    CELERY_DEFAULT_EXCHANGE='priority',
    CELERY_IMPORTS=('mymodule.tasks',)

CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = 'UTC'

if __name__ == '__main__':
    app.start()

在任务的定义中,我们使用装饰器来显式队列:

代码语言:javascript
复制
@task(name='mymodule.mytask', routing_key='low.mytask', queue='low')
def mytask():
    # does something
    pass

当使用以下命令运行此任务时,此任务确实在低队列中运行:

代码语言:javascript
复制
from mymodule.tasks import mytask
mytask.delay()

但当它使用以下命令运行时就不是这样了:(它在默认队列中运行:"priority")

代码语言:javascript
复制
from celery import current_app
current_app.send_task('mymodule.mytask')

我想知道为什么后面的方法不将任务路由到“低”队列!

附言:我使用redis。

EN

回答 1

Stack Overflow用户

发布于 2015-01-16 19:02:10

send_task是一种低级方法。它直接将任务签名发送给代理,而无需通过您的任务装饰器。使用这种方法,您甚至可以在不加载任务代码/模块的情况下发送任务。

为了解决您的问题,您可以直接从configuration中获取routing_key/queue:

代码语言:javascript
复制
  route = celery.amqp.routes[0].route_for_task("mymodule.mytask")
Out[10]: {'queue': 'low', 'routing_key': 'low.mytask'}
  celery.send_task("myodule.mytask", queue=route['queue'], routing_key=route['routing_key']`
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/27965672

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档