本文共 4168 字,大约阅读时间需要 13 分钟。
编写完毕的代码,尚未运行时被称为程序。而正在运行的代码,则被称为进程。进程不仅包含代码,还需要运行所需的环境,因此与程序有所区别。
进程的创建主要有以下几种方式:
在Unix/Linux等操作系统中,fork()函数具有特殊性。每次调用fork(),操作系统会创建一个子进程(复制当前进程),然后分别在父进程和子进程中返回。子进程的PID为0,父进程则返回子进程的PID。子进程和父进程的执行顺序由操作系统调度算法决定。
multiprocessing模块提供了跨平台支持的Process类,用于创建子进程。创建过程简单,通过传入目标函数和参数即可实例化Process对象,并调用start()方法启动。Process类还提供了join()方法等用于管理子进程。
from multiprocessing import Processimport osdef run_proc(name): print(f'子进程运行中,name={name}, pid={os.getpid()}')if __name__ == '__main__': print(f'父进程 {os.getpid()}') p = Process(target=run_proc, args=('test',)) print('子进程将要执行') p.start() p.join() print('子进程已结束') from multiprocessing import Processimport timeimport osclass Process_Class(Process): def __init__(self, interval): super().__init__() self.interval = interval def run(self): print(f'子进程({self.name}) 开始执行,父进程为 {os.getppid()}') t_start = time.time() time.sleep(self.interval) t_stop = time.time() print(f'({self.name}) 执行结束,耗时 {t_stop - t_start:.2f} 秒')if __name__ == '__main__': t_start = time.time() print(f'当前程序进程 {os.getpid()}') p1 = Process_Class(2) p1.start() p1.join() t_stop = time.time() print(f'({p1.name}) 执行结束,耗时 {t_stop - t_start:.2f} 秒') 当需要同时执行大量子进程时,使用进程池(multiprocessing.Pool)更加高效。进程池通过指定最大进程数,自动管理子进程的创建和调度,适合处理大量任务。
apply_async(func, args, kwds):非阻塞调用函数,适用于并行执行。apply(func, args, kwds):阻塞调用函数,需等待上一个进程退出。close():关闭进程池,不再接收新任务。terminate():立即终止所有子进程。join():阻塞等待所有子进程完成。from multiprocessing import Poolimport osimport timeimport randomdef worker(msg): t_start = time.time() print(f'{msg}开始执行,进程号 {os.getpid()}') time.sleep(random.random() * 2) t_stop = time.time() print(f'{msg}执行完毕,耗时 {t_stop - t_start:.2f} 秒')if __name__ == '__main__': print('----start----') po = Pool(3) for i in range(10): po.apply_async(worker, (i,)) po.close() po.join() print('-----end-----') from multiprocessing import Poolimport osimport timeimport randomdef worker(msg): t_start = time.time() print(f'{msg}开始执行,进程号 {os.getpid()}') time.sleep(random.random() * 2) t_stop = time.time() print(f'{msg}执行完毕,耗时 {t_stop - t_start:.2f} 秒')if __name__ == '__main__': print('----start----') po = Pool(3) for i in range(6): po.apply(worker, (i,)) po.close() print('-----end-----') 进程间通信需要考虑数据共享问题,multiprocessing模块提供了Queue(消息队列)实现异步通信。Queue支持先进先出(FIFO)原则,适合多进程间的数据传递。
qsize():返回当前队列消息数量。empty():判断队列是否为空。full():判断队列是否已满。get([block, timeout]):获取队列消息,支持阻塞和超时。put(item, [block, timeout]):将消息写入队列,支持阻塞和超时。put_nowait():不阻塞地写入队列,若队列满则抛出异常。from multiprocessing import Queueq = Queue(3)q.put('A')q.put('B')print(q.full()) # 输出: Falseq.put('C')print(q.full()) # 输出: Truetry: q.put('D', timeout=2)except: print(f'消息队列已满,现有消息数量 {q.qsize()}')print(q.get()) # 返回: Aif not q.full(): q.put('D') print('还可以写入消息D')if not q.empty(): for i in range(q.qsize()): print(q.get()) from multiprocessing import Processimport osimport timedef worker_write(q): if not q.full(): for i in range(10): q.put(i) print(f'写入数据 {i}')def worker_read(q): if not q.empty(): for i in range(q.qsize()): print(f'读取数据 {q.get()}')if __name__ == '__main__': q = Queue() p1 = Process(target=worker_write, args=(q,)) p1.start() p1.join() p2 = Process(target=worker_read, args=(q,)) p2.start() p2.join() print('所有数据都已写入并读取完毕') from multiprocessing import Poolimport osimport timefrom multiprocessing import Managerdef worker_write(q): if not q.full(): for i in range(10): q.put(i) print(f'写入数据 {i}')def worker_read(q): if not q.empty(): for i in range(q.qsize()): print(f'读取数据 {q.get()}')if __name__ == '__main__': q = Manager().Queue() po = Pool() po.apply(worker_write, (q,)) po.apply(worker_read, (q,)) po.close() print('写入数据并读取数据完毕') 通过以上方法,开发者可以在多进程环境中实现数据传递和同步。
转载地址:http://niduz.baihongyu.com/