pandas apply多线程实现代码
一、多线程化选择
并行化一个代码有两大选择:multithread和multiprocess。
Multithread,多线程,同一个进程(process)可以开启多个线程执行计算。每个线程代表了一个CPU核心,这么多线程可以访问同样的内存地址(所谓共享内存),实现了线程之间的通讯,算是最简单的并行模型。
Multiprocess,多进程,则相当于同时开启多个Python解释器,每个解释器有自己独有的数据,自然不会有数据冲突。
二、并行化思想
并行化的基本思路是把dataframe用np.array_split方法切割成多个子dataframe。再调用Pool.map函数并行地执行。注意到顺序执行的pandas.DataFrame.apply是如何转化成Pool.map然后并行执行的。
Pool对象是一组并行的进程,开源Pool类
开源Pool类定义
defPool(self,processes=None,initializer=None,initargs=(), maxtasksperchild=None): '''Returnsaprocesspoolobject''' from.poolimportPool returnPool(processes,initializer,initargs,maxtasksperchild, context=self.get_context())
设置进程初始化函数
definit_process(global_vars): globala a=global_vars
设置进程初始化函数
Pool(processes=8,initializer=init_process,initargs=(a,))
其中,指定产生8个进程,每个进程的初始化需运行init_process函数,其参数为一个singletontuplea.利用init_process和initargs,我们可以方便的设定需要在进程间共享的全局变量(这里是a)。
with关键词是contextmanager,避免写很繁琐的处理开关进程的逻辑。
withPool(processes=8,initializer=init_process,initargs=(a,))aspool: result_parts=pool.map(apply_f,df_parts)
三、多线程化应用
多线程时间比较和多线程的几种apply应用
importnumpyasnp importpandasaspd importtime frommultiprocessingimportPool deff(row): #直接对某列进行操作 returnsum(row)+a deff1_1(row): #对某一列进行操作,我这里的columns=range(0,2),此处是对第0列进行操作 returnrow[0]**2 deff1_2(row1): #对某一列进行操作,我这里的columns=range(0,2),此处是对第0列进行操作 returnrow1**2 deff2_1(row): #对某两列进行操作,我这里的columns=range(0,2),此处是对第0,2列进行操作 returnpd.Series([row[0]**2,row[1]**2],index=['1_1','1_2']) deff2_2(row1,row2): #对某两列进行操作,我这里的columns=range(0,2),此处是对第0,2列进行操作 returnpd.Series([row1**2,row2**2],index=['2_1','2_2']) defapply_f(df): returndf.apply(f,axis=1) defapply_f1_1(df): returndf.apply(f1_1,axis=1) defapply_f1_2(df): returndf[0].apply(f1_2) defapply_f2_1(df): returndf.apply(f2_1,axis=1) defapply_f2_2(df): returndf.apply(lambdarow:f2_2(row[0],row[1]),axis=1) definit_process(global_vars): globala a=global_vars deftime_compare(): '''直接调用和多线程调用时间对比''' a=2 np.random.seed(0) df=pd.DataFrame(np.random.rand(10**5,2),columns=range(0,2)) print(df.columns) t1=time.time() result_serial=df.apply(f,axis=1) t2=time.time() print("Serialtime=",t2-t1) print(result_serial.head()) df_parts=np.array_split(df,20) print(len(df_parts),type(df_parts[0])) withPool(processes=8,initializer=init_process,initargs=(a,))aspool: #withPool(processes=8)aspool: result_parts=pool.map(apply_f,df_parts) result_parallel=pd.concat(result_parts) t3=time.time() print("Paralleltime=",t3-t2) print(result_parallel.head()) defapply_fun(): '''多种apply函数的调用''' a=2 np.random.seed(0) df=pd.DataFrame(np.random.rand(10**5,2),columns=range(0,2)) print(df.columns) df_parts=np.array_split(df,20) print(len(df_parts),type(df_parts[0])) withPool(processes=8,initializer=init_process,initargs=(a,))aspool: #withPool(processes=8)aspool: res_part0=pool.map(apply_f,df_parts) res_part1=pool.map(apply_f1_1,df_parts) res_part2=pool.map(apply_f1_2,df_parts) res_part3=pool.map(apply_f2_1,df_parts) res_part4=pool.map(apply_f2_2,df_parts) res_parallel0=pd.concat(res_part0) res_parallel1=pd.concat(res_part1) res_parallel2=pd.concat(res_part2) res_parallel3=pd.concat(res_part3) res_parallel4=pd.concat(res_part4) print("f:\n",res_parallel0.head()) print("f1:\n",res_parallel1.head()) print("f2:\n",res_parallel2.head()) print("f3:\n",res_parallel3.head()) print("f4:\n",res_parallel4.head()) df=pd.concat([df,res_parallel0],axis=1) df=pd.concat([df,res_parallel1],axis=1) df=pd.concat([df,res_parallel2],axis=1) df=pd.concat([df,res_parallel3],axis=1) df=pd.concat([df,res_parallel4],axis=1) print(df.head()) if__name__=='__main__': time_compare() apply_fun()
参考网址
https://blog.fangzhou.me/posts/20170702-python-parallelism/
https://docs.python.org/3.7/library/multiprocessing.html
到此这篇关于pandasapply多线程实现代码的文章就介绍到这了,更多相关pandasapply多线程内容请搜索毛票票以前的文章或继续浏览下面的相关文章希望大家以后多多支持毛票票!
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。