首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >池,队列,挂

池,队列,挂
EN

Stack Overflow用户
提问于 2017-07-25 19:39:34
回答 3查看 666关注 0票数 1

我希望使用队列来保存结果,因为我希望在工人生成结果时,由一个使用者(串行而不是并行)处理工人的结果。

现在,我想知道为什么会挂起下面的程序。

代码语言:javascript
复制
import multiprocessing as mp
import time
import numpy as np
def worker(arg):
    time.sleep(0.2)
    q, arr = arg 
    q.put(arr[0])

p = mp.Pool(4)
x = np.array([4,4])
q = mp.Queue()

for i in range(4):
    x[0] = i 
    #worker((q,x))
    p.apply_async(worker, args=((q, x),)) 

print("done_apply")
time.sleep(0.2)
for i in range(4):
    print(q.get())
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2017-07-25 19:56:09

不能共享Queue对象。通过找到这个answer,我得出了与OP相同的结论。

不幸的是,这段代码中还有其他问题(这并不能使它与链接的答案完全重复)

  • worker(arg)应该是args解压缩才能工作的worker(*arg)。如果没有这些,我的过程也会被锁上(我承认我不知道为什么。它应该抛出一个异常,但我想多处理和异常不能很好地一起工作)
  • 将相同的x传递给工作人员,结果得到相同的数字(对于apply,它可以工作,但不适用于apply_async )。

另一件事:为了代码是可移植的,请用if __name__ == "__main__":包装主代码,这是由于进程生成不同而需要的。

为我输出0,3,2,1的完全固定代码:

代码语言:javascript
复制
import multiprocessing as mp
import time
import numpy as np
def worker(*arg):  # there are 2 arguments to "worker"
#def worker(q, arr):  # is probably even better
    time.sleep(0.2)
    q, arr = arg
    q.put(arr[0])

if __name__ == "__main__":
    p = mp.Pool(4)

    m = mp.Manager()  # use a manager, Queue objects cannot be shared
    q = m.Queue()

    for i in range(4):
        x = np.array([4,4])  # create array each time (or make a copy)
        x[0] = i
        p.apply_async(worker, args=(q, x))

    print("done_apply")
    time.sleep(0.2)
    for i in range(4):
        print(q.get())
票数 1
EN

Stack Overflow用户

发布于 2017-07-25 19:46:52

更改apply_async以应用提供错误消息:

代码语言:javascript
复制
"Queue objects should only be shared between processes through inheritance"

解决办法:

代码语言:javascript
复制
import multiprocessing as mp
import time
import numpy as np
def worker(arg):
    time.sleep(0.2)
    q, arr = arg
    q.put(arr[0])

p = mp.Pool(4)
x = np.array([4,4])
m = mp.Manager()
q = m.Queue()

for i in range(4):
    x[0] = i
    #worker((q,x))
    p.apply_async(worker, args=((q, x),))

print("done_apply")
time.sleep(0.2)
for i in range(4):
    print(q.get())

结果:

代码语言:javascript
复制
done_apply
3
3
3
3

显然,我需要手动复制numpy数组,因为所需的结果应该是0、1、2、3,而不是3、3、3、3。

票数 1
EN

Stack Overflow用户

发布于 2017-07-25 20:52:32

我认为您选择在使用multiprocessing.Pool的同时使用您自己的queue是您遇到的主要问题的根源。使用池可以预先创建子进程,然后将作业分配给这些进程。但是,由于您不能(很容易)将一个queue传递给一个已经存在的进程,所以这并不能很好地匹配您的问题。

相反,您应该要么摆脱自己的队列,使用内置在池中的队列获得由worker编辑的值,要么彻底废弃池,并使用multiprocessing.Process为您必须完成的每个任务启动一个新的进程。

我还注意到,您的代码在主进程中有一个争用条件,这个主进程在修改x数组的主线程和在将旧值发送给工作进程之前序列化旧值的线程之间。很多时候,您可能最终会发送多个相同数组的副本(带有最终值),而不是您想要的几个不同的值。

下面是一个快速的、未经测试的版本,它删除了队列:

代码语言:javascript
复制
def worker(arr):
    time.sleep(0.2)
    return arr[0]

if __name__ == "__main__":
    p = mp.Pool(4)
    results = p.map(worker, [np.array([i, 4]) for i in range(4)])
    p.join()
    for result in results:
        print(result)

下面是一个删除Pool并保持队列的版本:

代码语言:javascript
复制
def worker(q, arr): 
    time.sleep(0.2)
    q.put(arr[0])

if __name__ == "__main__":
    q = m.Queue()
    processes = []

    for i in range(4):
        p = mp.Process(target=worker, args=(q, np.array([i, 4])))
        p.start()
        processes.append(p)

    for i in range(4):
        print(q.get())

    for p in processes:
        p.join()

请注意,在上一个版本中,在尝试处理进程之前,get来自队列的结果可能很重要(但如果我们只处理四个值,则可能不是这样)。如果要填充队列,则如果按另一个顺序执行,则可能会出现死锁。工作人员可能会被阻止,试图写入队列,而主进程被阻塞,等待工作进程退出。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45312161

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档