Python数据并行的6种方法(进程池+进度条) 2022-06-25 笔记,技巧,实验 暂无评论 1567 次阅读 [TOC] ## 什么是数据并行 **数据并行**(data-parallelism)和**任务并行**(task-parallelism)是实现并行计算的两种方式,可以简单理解为多人分工的两种方式。 - 例如:某场考试有200张试卷,试卷上有4种题型,需要4个人改卷。 - 数据并行:对“试卷数量”分割,每人改50张试卷的4种题型。(每人的任务都一样) - 任务并行:对“试卷内容”分割,每人改200张试卷的1种题型。(每人的任务不一样) 日常写代码,如果需要用一个函数对一组数据依次处理,觉得执行得慢,就可以用数据并行。本文最后有写好的轮子,可以直接放项目里用。 ## 输入输出示例 **输入:** - 函数 **f** ```python def Pow(a,n): # 例如:计算a的n次方 return a**n ``` - 数据列表 **args_mat** ```python args_mat=[ # 例如:从2的0次方到2的100000次方都需要计算 [2,0], [2,1], [2,2], ... [2,100000] ] ``` - 必须是二维列表,一行代表一组参数。 - 一维列表arr通过`arr=[[x] for x in arr]`升成二维 - 进程池大小 **pool_size** (默认为5) - 要小于CPU核数(`os.cpu_count()`可以返回CPU核数) - 进度条文字 **desc** (默认为空) **输出:** - 结果列表 ```python [1,2,4,8,16,32,64,...] # 结果列表要和数据列表一一对应得上 ``` ## 方法1:用Python自带的并行任务接口concurrent.futures ```python import concurrent.futures from tqdm import tqdm def multi_process_exec_v0(f,args_mat,pool_size=5,desc=None): if len(args_mat)==0:return [] results=[None for _ in range(len(args_mat))] with tqdm(total=len(args_mat), desc=desc) as pbar: with concurrent.futures.ProcessPoolExecutor(max_workers=pool_size) as executor: futures = {executor.submit(f,*args): i for i,args in enumerate(args_mat)} for future in concurrent.futures.as_completed(futures): i=futures[future] ret = future.result() results[i]=ret pbar.update(1) return results ``` 这个并行任务接口是给每一条输入数据各开一个进程来执行。 创建/销毁进程的开销很大,时间上比串行执行还慢。 这个进程池不适合用来做“数据并行”,而是适合做“任务并行”。 ## 方法2:用Python自带的多进程接口multiprocessing ```python from multiprocessing import Pool, Pipe from multiprocessing.connection import wait from tqdm import tqdm ToBatch = lambda arr,size:[arr[i*size:(i+1)*size] for i in range((size-1+len(arr))//size)] def batch_exec_v1(f,args_batch,w,offset=0): for i,args in enumerate(args_batch): ans = f(*args) w.send((i+offset,ans)) w.send('exit') w.close() def multi_process_exec_v1(f,args_mat,pool_size=5,desc=None): if len(args_mat)==0:return [] if type(args_mat[0]) not in [list,tuple]: args_mat=[[a]for a in args_mat] batch_size=max(1,int(len(args_mat)/4/pool_size)) results=[None for _ in range(len(args_mat))] args_batches = ToBatch(args_mat,batch_size) readers=[] with tqdm(total=len(args_mat), desc=desc) as pbar: with Pool(processes=pool_size) as pool: for i,args_batch in enumerate(args_batches): r,w=Pipe(duplex=False) readers.append(r) pool.apply_async(batch_exec_v1,(f,args_batch,w,i*batch_size)) while readers: for r in wait(readers): try: msg=r.recv() if msg=='exit': readers.remove(r) continue results[msg[0]]=msg[1] pbar.update(1) except EOFError: readers.remove(r) return results ``` 这段代码是把输入数据分批,每个进程处理一批。 数据分批的批数=进程池大小×4。批数设得越多,越倾向于提高并行计算资源的利用率,但同时,创建/销毁进程的开销也会越多。 由于一个进程是处理一批数据而不是一条数据,那么进度条就是一批一批地更新。要想一条一条地更新,就需要进程间通信。这里是每处理完一条数据,就立即把这一条的结果传给主进程;主进程收集结果,更新进度条。 ## 方法3:在方法2的基础上,共用同一个通信管道 ```python from multiprocessing import Pool, Pipe from tqdm import tqdm ToBatch = lambda arr,size:[arr[i*size:(i+1)*size] for i in range((size-1+len(arr))//size)] def batch_exec_v2(f,args_batch,w=None,offset=0): for i,args in enumerate(args_batch): ans = f(*args) if w:w.send((i+offset,ans)) def multi_process_exec_v2(f,args_mat,pool_size=5,desc=None): if len(args_mat)==0:return [] batch_size=max(1,int(len(args_mat)/4/pool_size)) results=[None for _ in range(len(args_mat))] args_batches = ToBatch(args_mat,batch_size) with tqdm(total=len(args_mat), desc=desc) as pbar: with Pool(processes=pool_size) as pool: r,w=Pipe(duplex=False) for i,args_batch in enumerate(args_batches): pool.apply_async(batch_exec_v2,(f,args_batch,w,i*batch_size)) cnt=0 while cnt``` File "/usr/lib/python3.8/multiprocessing/connection.py", line 251, in recv return _ForkingPickler.loads(buf.getbuffer()) _pickle.UnpicklingError: invalid load key, '\x00'. ``` 原因是两个进程同时往一个管道里传数据,数据混一起了,导致解析不出来。 ## 方法4:在方法3的基础上,不通过管道传结果 ```python from multiprocessing import Pool, Pipe from tqdm import tqdm ToBatch = lambda arr,size:[arr[i*size:(i+1)*size] for i in range((size-1+len(arr))//size)] def batch_exec_v3(f,args_batch,w=None): results=[] for i,args in enumerate(args_batch): ans = f(*args) results.append(ans) if w:w.send(1) return results def multi_process_exec_v3(f,args_mat,pool_size=5,desc=None): if len(args_mat)==0:return [] batch_size=max(1,int(len(args_mat)/4/pool_size)) results=[] args_batches = ToBatch(args_mat,batch_size) with tqdm(total=len(args_mat), desc=desc) as pbar: with Pool(processes=pool_size) as pool: r,w=Pipe(duplex=False) pool_rets=[] for i,args_batch in enumerate(args_batches): pool_rets.append(pool.apply_async(batch_exec_v3,(f,args_batch,w))) cnt=0 while cnt>> def Pow(a,n): ← 定义一个函数(可以有多个参数) # # ... return a**n # # >>> # # >>> args_mat=[[2,1], ← 批量计算 Pow(2,1) # # ... [2,2], Pow(2,2) # # ... [2,3], Pow(2,3) # # ... [2,4], Pow(2,4) # # ... [2,5], Pow(2,5) # # ... [2,6]] Pow(2,6) # # >>> # # >>> results=multi_thread_exec(Pow,args_mat,desc='计算中') # # 计算中: 100%|█████████████| 6/6 [00:00<00:00, 20610.83it/s] # # >>> # # >>> print(results) # # [2, 4, 8, 16, 32, 64] # #-------------------------------------------------------------# ToBatch = lambda arr,size:[arr[i*size:(i+1)*size] for i in range((size-1+len(arr))//size)] def batch_exec(f,args_batch,w): results=[] for i,args in enumerate(args_batch): try: ans = f(*args) results.append(ans) except Exception: results.append(None) w.send(1) return results def multi_process_exec(f,args_mat,pool_size=5,desc=None): if len(args_mat)==0:return [] batch_size=max(1,int(len(args_mat)/4/pool_size)) results=[] args_batches = ToBatch(args_mat,batch_size) with tqdm(total=len(args_mat), desc=desc) as pbar: with Pool(processes=pool_size) as pool: r,w=Pipe(duplex=False) pool_rets=[] for i,args_batch in enumerate(args_batches): pool_rets.append(pool.apply_async(batch_exec,(f,args_batch,w))) cnt=0 while cnt 标签: none 本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。
评论已关闭