我正在运行以下代码。我以为指纹在两次处理之间是随机的。然而,我看到了一个确定性的结果:在每次运行时,第一个进程首先完成它的循环,然后第二个进程才开始运行循环。我期待的是随机行为,这意味着两个进程之间的上下文切换。但我看到的是一个进程完成后,第二个进程开始,没有任何上下文切换。
谁能描述一下我错过了什么?
import multiprocessing
import time
import os
lock = multiprocessing.Lock()
def func(_lock):
for _ in range(0, 3):
with _lock:
print("sleeping in pid " + str(os.getpid()))
time.sleep(1)
print("finished sleeping in pid " + str(os.getpid()))
process1 = multiprocessing.Process(target=func, args=(lock,))
process2 = multiprocessing.Process(target=func, args=(lock,))
process1.start()
process2.start()===
输出为:
在pid 2322中睡眠
在pid 2322中完成睡眠
在pid 2322中睡眠
在pid 2322中完成睡眠
在pid 2322中睡眠
在pid 2322中完成睡眠
在pid 2323中睡眠
在pid 2323中完成睡眠
在pid 2323中睡眠
在pid 2323中完成睡眠
在pid 2323中睡眠
在pid 2323中完成睡眠
进程已完成,退出代码为0
发布于 2021-05-10 16:35:33
下面是一个使用ThreadPoolExecutor的示例。如果您需要进程,那么只需更改为ProcessPoolExecutor即可。要了解要使用什么(线程/进程),您需要了解CPU Bound and I/O Bound
首先,创建字典class_holder并保存YourClass对象。另外,将对象的名称放入队列queue.put(i)中。对于每个执行器,只要您获得队列queue.get()的名称,就可以使用随机秒数调用方法my_print来运行线程executor.submit(...)。
希望这种实现方式能对你有所帮助。对我来说,我找到了扩展项目的方法。
import os
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Queue
import random
class YourClass:
def __init__(self):
self.string = None
self.sleep_time = None
def my_print(self, string, sleep_t):
self.string = string
self.sleep_time = sleep_t
time.sleep(self.sleep_time)
print(self.string + str(threading.current_thread().ident) + " process id: " + str(os.getpid()))
lock = threading.Lock()
queue = Queue()
class_holder = dict()
for i in ['a', 'b', 'c', 'd', 'e', 'f']:
class_holder[i] = YourClass()
queue.put(i)
thread_limit = 3
with ThreadPoolExecutor(max_workers=thread_limit) as executor:
while True:
_i = queue.get()
if _i in class_holder:
executor.submit(class_holder[_i].my_print,
string=f"sleeping {_i} in thread id: ",
sleep_t=random.randint(1,4))如果您需要lock = threading.Lock(),那么您可以在YourClass方法中使用它来隔离诸如编辑文件之类的事情。
class YourClass:
...
def my_extra_method(self):
with lock:
os.system(fr"sed -i 's|ARG_IN_FIE|NEW_ARG|g' some_file")我建议您使用Queue,它有助于组织作业,并且已经有一个锁。查看以下队列示例:
class Queue(object):
def __init__(self, size=5):
self._size = size
self._queue = []
self._mutex = threading.RLock()
self._empty = threading.Condition(self._mutex)
self._full = threading.Condition(self._mutex)
def put(self, val):
with self._full:
while len(self._queue) >= self._size:
self._full.wait()
self._queue.append(val)
self._empty.notify()
def get(self):
with self._empty:
while len(self._queue) == 0:
self._empty.wait()
ret = self._queue.pop(0)
self._full.notify()
return retfrom queue import Queue
from threading import Thread
def worker(q, n):
while True:
item = q.get()
if item is None:
break
print("process data:", n, item)
q = Queue(5)
th1 = Thread(target=worker, args=(q, 1))
th2 = Thread(target=worker, args=(q, 2))
th1.start(); th2.start()
for i in range(50):
q.put(i)
q.put(None); q.put(None)
th1.join(); th2.join()发布于 2021-05-10 21:54:57
你的进程获得一个锁,在另一个进程被阻塞的时候“做它自己的事情”,然后释放锁,并立即循环回来,尝试重新获得它刚刚释放的同一个锁。由于该进程已经在运行并且仍然是可调度的,因此它是成功的,即,仅仅因为它释放了锁并不意味着它自动停止运行,因此它在竞争中击败了其他进程来获得锁。将代码更改为以下代码,您将获得我认为您期望看到的结果:
def func(_lock):
for _ in range(0, 3):
with _lock:
print("sleeping in pid " + str(os.getpid()))
time.sleep(1) # this gives the other process a chance to acquire the lock
with _lock:
print("finished sleeping in pid " + str(os.getpid()))锁应该只持有尽可能短的时间。试着想出允许这一点的逻辑。
https://stackoverflow.com/questions/67466192
复制相似问题