我希望使用队列来保存结果,因为我希望在工人生成结果时,由一个使用者(串行而不是并行)处理工人的结果。
现在,我想知道为什么会挂起下面的程序。
import multiprocessing as mp
import time
import numpy as np
def worker(arg):
time.sleep(0.2)
q, arr = arg
q.put(arr[0])
p = mp.Pool(4)
x = np.array([4,4])
q = mp.Queue()
for i in range(4):
x[0] = i
#worker((q,x))
p.apply_async(worker, args=((q, x),))
print("done_apply")
time.sleep(0.2)
for i in range(4):
print(q.get())发布于 2017-07-25 19:56:09
不能共享Queue对象。通过找到这个answer,我得出了与OP相同的结论。
不幸的是,这段代码中还有其他问题(这并不能使它与链接的答案完全重复)
worker(arg)应该是args解压缩才能工作的worker(*arg)。如果没有这些,我的过程也会被锁上(我承认我不知道为什么。它应该抛出一个异常,但我想多处理和异常不能很好地一起工作)x传递给工作人员,结果得到相同的数字(对于apply,它可以工作,但不适用于apply_async )。另一件事:为了代码是可移植的,请用if __name__ == "__main__":包装主代码,这是由于进程生成不同而需要的。
为我输出0,3,2,1的完全固定代码:
import multiprocessing as mp
import time
import numpy as np
def worker(*arg): # there are 2 arguments to "worker"
#def worker(q, arr): # is probably even better
time.sleep(0.2)
q, arr = arg
q.put(arr[0])
if __name__ == "__main__":
p = mp.Pool(4)
m = mp.Manager() # use a manager, Queue objects cannot be shared
q = m.Queue()
for i in range(4):
x = np.array([4,4]) # create array each time (or make a copy)
x[0] = i
p.apply_async(worker, args=(q, x))
print("done_apply")
time.sleep(0.2)
for i in range(4):
print(q.get())发布于 2017-07-25 19:46:52
更改apply_async以应用提供错误消息:
"Queue objects should only be shared between processes through inheritance"解决办法:
import multiprocessing as mp
import time
import numpy as np
def worker(arg):
time.sleep(0.2)
q, arr = arg
q.put(arr[0])
p = mp.Pool(4)
x = np.array([4,4])
m = mp.Manager()
q = m.Queue()
for i in range(4):
x[0] = i
#worker((q,x))
p.apply_async(worker, args=((q, x),))
print("done_apply")
time.sleep(0.2)
for i in range(4):
print(q.get())结果:
done_apply
3
3
3
3显然,我需要手动复制numpy数组,因为所需的结果应该是0、1、2、3,而不是3、3、3、3。
发布于 2017-07-25 20:52:32
我认为您选择在使用multiprocessing.Pool的同时使用您自己的queue是您遇到的主要问题的根源。使用池可以预先创建子进程,然后将作业分配给这些进程。但是,由于您不能(很容易)将一个queue传递给一个已经存在的进程,所以这并不能很好地匹配您的问题。
相反,您应该要么摆脱自己的队列,使用内置在池中的队列获得由worker编辑的值,要么彻底废弃池,并使用multiprocessing.Process为您必须完成的每个任务启动一个新的进程。
我还注意到,您的代码在主进程中有一个争用条件,这个主进程在修改x数组的主线程和在将旧值发送给工作进程之前序列化旧值的线程之间。很多时候,您可能最终会发送多个相同数组的副本(带有最终值),而不是您想要的几个不同的值。
下面是一个快速的、未经测试的版本,它删除了队列:
def worker(arr):
time.sleep(0.2)
return arr[0]
if __name__ == "__main__":
p = mp.Pool(4)
results = p.map(worker, [np.array([i, 4]) for i in range(4)])
p.join()
for result in results:
print(result)下面是一个删除Pool并保持队列的版本:
def worker(q, arr):
time.sleep(0.2)
q.put(arr[0])
if __name__ == "__main__":
q = m.Queue()
processes = []
for i in range(4):
p = mp.Process(target=worker, args=(q, np.array([i, 4])))
p.start()
processes.append(p)
for i in range(4):
print(q.get())
for p in processes:
p.join()请注意,在上一个版本中,在尝试处理进程之前,get来自队列的结果可能很重要(但如果我们只处理四个值,则可能不是这样)。如果要填充队列,则如果按另一个顺序执行,则可能会出现死锁。工作人员可能会被阻止,试图写入队列,而主进程被阻塞,等待工作进程退出。
https://stackoverflow.com/questions/45312161
复制相似问题