我试图在更新完成后使用chord启动报告更新。
@shared_task(autoretry_for=(Exception,), retry_backoff=True, retry_kwargs {'max_retries': 5})
def upload(df: pd.DataFrame, **kwargs):
ed = EntityDataPoint(df, **kwargs)
uploadtasks, source, subtype = ed.upload_database()
chord(uploadtasks)(final_report.si(logger=ed.logger,
source=source,
subtype=subtype,
index=ed.index))上传任务是:
g = group(unwrap_upload_bulk.s(obj = self, data = self.data.iloc[i:i+chunk_size])
for i in range(0, len(self.data), chunk_size))当chord的头包含两个以上的元素时,前两个子任务就会成功,组中的其余任务和回调都不会启动,没有任何错误被发送到任何地方,芹菜工人日志中也没有任何信息。经过检查工人,芹菜检查活动,排定,似乎没有任何等待任务在队列中。
如果标题(组)有2个或更少的元素,就没有问题,组任务完成,回调就会被调用。
这似乎不取决于元素的大小(如果组中的每个子任务发送100行,则1000行仍有相同的行为)。
如果我只是在没有chord和回调的情况下启动组任务,任务就会在没有任何错误的情况下成功。
我试着用不同的语法来解释和弦,但似乎没有什么改变。
我试着使用group.link特性来查看会发生什么,而这个组在完成此操作时似乎已经完成了,但是在完成所有组任务之后,回调不会发生,因为它不是我从文档中了解到的,所以它并不完全是我想要的行为。
我使用的是芹菜5.2.3和Redis 7.0.0 broker,Django 3.2.13后端是psql,python 3.9。所有东西都在独立的码头集装箱上运行。
发布于 2022-06-21 14:38:48
似乎直接使用组作为chord的标题是造成问题的原因。它可能使用组中的第一个任务作为标题,使用第二个任务作为回调(尽管我不明白为什么这些任务的参数没有导致一些错误)。而不是返回:
group(unwrap_upload_bulk.s(obj = self, data = self.data.iloc[i:i+chunk_size])
for i in range(0, len(self.data), chunk_size))我现在回来:
[unwrap_upload_bulk.s(obj = self, data = self.data.iloc[i:i+chunk_size])
for i in range(0, len(self.data), chunk_size)]它的工作就像预期的那样。
https://stackoverflow.com/questions/72690160
复制相似问题