所以我得到了一个这样的代码:
mgr = MP.Manager()
mp_dataset = mgr.dict(dataset)
mp_seen = mgr.dict({k: None for k in seen})
mp_barrier = MP.Barrier(WORKER_COUNT + 1) # +1 to include main process
# TileQueue is a global var
workers = [
MP.Process(target=process_item_worker, args=(TileQueue, mp_dataset, mp_seen, mp_barrier))
for _ in range(0, WORKER_COUNT)
]
[worker.start() for worker in workers]
print("Waiting for workers...")
mp_barrier.wait()
start_t = time.monotonic()
try:
asyncio.run(fetch_more_data())
elapsed_t = time.monotonic() - start_t
print(f"\nFetching finished in {elapsed_t:,.2f} seconds", flush=True)
except Exception as e:
print(f"\nAn Exception happened: {e}")
finally:
# Save the results first, convert from managed to normal dicts
dataset.update(mp_dataset)
progress["seen"] = dict(mp_seen)
with PROGRESS_FILE.open("wb") as fout:
pickle.dump(progress, fout)
# Then we tell workers to disband
[TileQueue.put(None) for _ in workers]
print("Waiting for workers...", flush=True)
for w in workers:
w.join()
TileQueue.close()
print("Start processing updated dataset")为什么是异步和多进程的结合?因为fetch_more_data逻辑是I/O绑定的,所以异步在那里工作得很好,而process_item在很大程度上是受CPU限制的,所以我想专门处理那些繁重的事情。
这一问题:
我总是在最后一个GetOverlappedResult got err 109行之前几次(总是等于WORKER_COUNT)得到消息print()。
不过,一切都如期而至。但这个信息让我很恼火。
有什么问题吗?
发布于 2022-10-31 05:22:09
好的,在做了大量的实验之后,我发现了(可能的)原因:
I还必须“结束”
Manager()实例
因此,我将finally块改为如下:
finally:
# Save the results first, convert from managed to normal dicts
dataset.update(mp_dataset)
progress["seen"] = dict(mp_seen)
with PROGRESS_FILE.open("wb") as fout:
pickle.dump(progress, fout)
mgr.shutdown()
mgr.join()
# Then we tell workers to disband
[TileQueue.put(None) for _ in workers]
time.sleep(1.0)
TileQueue.close()
print("Waiting for workers...", flush=True)
for w in workers:
w.join()现在我不再得到GetOverlappedResult got err 109了,我很高兴:-)
https://stackoverflow.com/questions/74258919
复制相似问题