首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >django-dramatiq管道只运行第一阶段。

django-dramatiq管道只运行第一阶段。
EN

Stack Overflow用户
提问于 2022-08-24 18:07:26
回答 1查看 86关注 0票数 0

我试图使用django-dramatiq来运行一个由几个阶段组成的管道,每个阶段定义为dramatiq Actor,使用pipeline(<stages>).run()方法,但它只是运行第一个阶段/Actor,而不是尝试其他阶段。

我定义了一些假演员来说明这个问题:

代码语言:javascript
复制
import dramatiq

@dramatiq.actor
def fake_extract(process_pk, *args, **kwargs):
    print(f"fake_extract: Process PK= {process_pk} Running extract on {kwargs['fits_file']}")

@dramatiq.actor
def fake_astromfit(process_pk, *args, **kwargs):
    print(f"fake_astromfit: Process PK= {process_pk} Astrometric fit on {kwargs['ldac_catalog']}, updating {kwargs['fits_file']}")

@dramatiq.actor
def fake_zeropoint(process_pk, *args, **kwargs):
    print(f"fake_zeropoint: Process PK= {process_pk} ZP determination on {kwargs['ldac_catalog']} with {kwargs['desired_catalog']} ref catalog")

然后,我定义了各个阶段,并构建了一个管道:

代码语言:javascript
复制
import os
from dramatiq import pipeline
from test_dramatiq.dramatiq_tests import fake_extract, fake_astromfit, fake_zeropoint


fits_filepath = '/foo/bar.fits'
fits_file = os.path.basename(fits_filepath)

steps = [{
            'name'   : 'proc-extract',
            'runner' : fake_extract,
            'inputs' : {'fits_file':fits_filepath,
                       'datadir': os.path.join(dataroot, temp_dir)}
        },
        {
            'name'   : 'proc-astromfit',
            'runner' : fake_astromfit,
            'inputs' : {'fits_file' : fits_filepath,
                        'ldac_catalog' : os.path.join(dataroot, temp_dir, fits_file.replace('e91.fits', 'e91_ldac.fits')),
                        'datadir' : os.path.join(dataroot, temp_dir)
                        }
        },
        {
            'name'   : 'proc-zeropoint',
            'runner' : fake_zeropoint,
            'inputs' : {'ldac_catalog' : os.path.join(dataroot, temp_dir, fits_file.replace('e91.fits', 'e92_ldac.fits')),
                        'datadir' : os.path.join(dataroot, temp_dir),
                        'desired_catalog' : 'PS1'
                        }
        }]

pipes = []
for step_num, step in enumerate(steps):
    inputs = step['inputs']
    print(f"  Performing pipeline step {step['name']}")
    pk = 1234+step_num
    pipes.append(step['runner'].message_with_options(args=[pk,], kwargs=inputs, pipe_ignore=True))
pipeline(pipes).run()

ipython内部使用常规dramatiq运行,这些操作看起来很好,并且所有阶段都运行:

代码语言:javascript
复制
fake_extract: Process PK= 1234 Running extract on /foo/bar.fits
fake_astromfit: Process PK= 1235 Astrometric fit on /foo/Temp_cvc2/bar.fits, updating /foo/bar.fits
fake_zeropoint: Process PK= 1236 ZP determination on /foo/Temp_cvc2/bar.fits with PS1 ref catalog

但是,在django-dramatiq通过Django项目的settings.py文件导入的模块中定义它们,并像上面在python manage.py shell中和python manage.py rundramatiq运行程序中那样定义管道,只运行第一阶段/Actor

代码语言:javascript
复制
fake_extract: Process PK= 1234 Running extract on /foo/bar.fits

它从来不执行其他阶段..。对于这里发生了什么以及为什么多阶段管道在django-dramatiq下不能工作,有什么想法吗?

EN

回答 1

Stack Overflow用户

发布于 2022-08-24 23:37:47

因此,这个问题是由于Django项目的settings.py中缺少中间件而造成的,这导致了它的部分工作。我有:

代码语言:javascript
复制
DRAMATIQ_BROKER = {
    'BROKER': 'dramatiq.brokers.redis.RedisBroker',
    'OPTIONS': {
        'url': f'redis://{REDIS_HOSTNAME}:6379',
    },
    'MIDDLEWARE': [
        'dramatiq.middleware.AgeLimit',
        'dramatiq.middleware.TimeLimit',
        'dramatiq.middleware.Callbacks',
        'dramatiq.middleware.Retries',
        'django_dramatiq.middleware.DbConnectionsMiddleware',
    ]
}

但是MIDDLEWARE在那里也缺少了一个'dramatiq.middleware.Pipelines',。在并重新启动runserverrundramatiq管理命令后,上面的精简测试示例和最初的全功能版本都会开始工作。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73477796

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档