我试图从Falcon API方法中运行一些戏剧化演员,如下所示:
def on_post(self, req, resp):
begin_id = int(req.params["begin_id"])
count = int(req.params["count"])
for page_id in range(begin_id, begin_id + count):
process_vk_page.send(f"https://vk.com/id{page_id}")
resp.status = falcon.HTTP_200我的代码得到了“发送”方法,通过循环没有任何问题。但是队列中没有新的任务!未调用参与者本身,代理中的“默认”队列为空。如果我设置了自定义队列,它仍然是空的。我的演员长得像这样:
@dramatiq.actor(broker=broker)
def process_vk_page(link: str):
pass经纪人在哪里
broker = RabbitmqBroker(url="amqp://guest:guest@rabbitmq:5672")RabbitMQ日志显示它连接良好。
我在调试器中做了一些额外的研究。它可以很好地获取消息(这意味着发送给代理),而broker.enqueue在Actor.send_with_options()中不返回异常,尽管我不能真正得到它的内部逻辑。我真的不知道为什么会失败,但是引起问题的肯定是RabbitmqBroker.enqueue()。
Broker是Erlang22.2.1上的RabbitMQ 3.8.2,运行在带有默认设置的映像中的Docker中。Dramatiq版本为1.7.0。
在RabbitMQ日志中,只有在应用程序启动时连接到代理,当我关闭它时才断开连接,如下所示:
2020-01-05 08:25:35.622 [info] <0.594.0> accepting AMQP connection <0.594.0> (172.20.0.1:51242 -> 172.20.0.3:5672)
2020-01-05 08:25:35.627 [info] <0.594.0> connection <0.594.0> (172.20.0.1:51242 -> 172.20.0.3:5672): user 'guest' authenticated and granted access to vhost '/'
2020-01-05 08:28:35.625 [error] <0.597.0> closing AMQP connection <0.597.0> (172.20.0.1:51246 -> 172.20.0.3:5672):
missed heartbeats from client, timeout: 60s代理在主包的__init__.py中定义,并在子包中导入。我不确定在所有函数的装饰器中指定同一个代理实例是否正确,但是在禁止它的文档中没有任何内容。我想这并不重要,因为如果我为每个演员创建新的代理,它仍然不起作用。
我试着让Redis做经纪人,但我还是遇到了同样的问题。
这可能是什么原因?
发布于 2020-01-06 07:27:25
最有可能的问题是,您没有告诉员工使用哪个代理,因为您没有声明默认代理。
您没有提到您的文件是如何在应用程序中布局的,但是,假设您的代理在tasks.py中被定义为tasks.py,那么您必须让您的工作人员这样了解它:
dramatiq tasks:broker有关更多信息和模式,请参阅dramatiq --help末尾的示例。
https://stackoverflow.com/questions/59570155
复制相似问题