详解pandas apply 并行处理的几种方法
1.pandarallel(pipinstall)
对于一个带有PandasDataFramedf的简单用例和一个应用func的函数,只需用parallel_apply替换经典的apply。
frompandarallelimportpandarallel #Initialization pandarallel.initialize() #Standardpandasapply df.apply(func) #Parallelapply df.parallel_apply(func)
注意,如果不想并行化计算,仍然可以使用经典的apply方法。
另外可以通过在initialize函数中传递progress_bar=True来显示每个工作CPU的一个进度条。
2.joblib(pipinstall)
https://pypi.python.org/pypi/joblib
#Embarrassinglyparallelhelper:tomakeiteasytowritereadableparallelcodeanddebugitquickly frommathimportsqrt fromjoblibimportParallel,delayed deftest(): start=time.time() result1=Parallel(n_jobs=1)(delayed(sqrt)(i**2)foriinrange(10000)) end=time.time() print(end-start) result2=Parallel(n_jobs=8)(delayed(sqrt)(i**2)foriinrange(10000)) end2=time.time() print(end2-end)
-------输出结果----------
0.4434356689453125
0.6346755027770996
3.multiprocessing
importmultiprocessingasmp withmp.Pool(mp.cpu_count())aspool: df['newcol']=pool.map(f,df['col']) multiprocessing.cpu_count()
返回系统的CPU数量。
该数量不同于当前进程可以使用的CPU数量。可用的CPU数量可以由len(os.sched_getaffinity(0))方法获得。
可能引发NotImplementedError。
参见os.cpu_count()
4.几种方法性能比较
(1)代码
importsys importtime importpandasaspd importmultiprocessingasmp fromjoblibimportParallel,delayed frompandarallelimportpandarallel fromtqdmimporttqdm,tqdm_notebook defget_url_len(url): url_list=url.split(".") time.sleep(0.01)#休眠0.01秒 returnlen(url_list) deftest1(data): """ 不进行任何优化 """ start=time.time() data['len']=data['url'].apply(get_url_len) end=time.time() cost_time=end-start res=sum(data['len']) print("res:{},costtime:{}".format(res,cost_time)) deftest_mp(data): """ 采用mp优化 """ start=time.time() withmp.Pool(mp.cpu_count())aspool: data['len']=pool.map(get_url_len,data['url']) end=time.time() cost_time=end-start res=sum(data['len']) print("test_mp\tres:{},costtime:{}".format(res,cost_time)) deftest_pandarallel(data): """ 采用pandarallel优化 """ start=time.time() pandarallel.initialize() data['len']=data['url'].parallel_apply(get_url_len) end=time.time() cost_time=end-start res=sum(data['len']) print("test_pandarallel\tres:{},costtime:{}".format(res,cost_time)) deftest_delayed(data): """ 采用delayed优化 """ defkey_func(subset): subset["len"]=subset["url"].apply(get_url_len) returnsubset start=time.time() data_grouped=data.groupby(data.index) #data_grouped是一个可迭代的对象,那么就可以使用tqdm来可视化进度条 results=Parallel(n_jobs=8)(delayed(key_func)(group)forname,groupintqdm(data_grouped)) data=pd.concat(results) end=time.time() cost_time=end-start res=sum(data['len']) print("test_delayed\tres:{},costtime:{}".format(res,cost_time)) if__name__=='__main__': columns=['title','url','pub_old','pub_new'] temp=pd.read_csv("./input.csv",names=columns,nrows=10000) data=temp """ foriinrange(99): data=data.append(temp) """ print(len(data)) """ test1(data) test_mp(data) test_pandarallel(data) """ test_delayed(data)
(2)结果输出
1k
res:4338,costtime:0.0018074512481689453
test_mp res:4338,costtime:0.2626469135284424
test_pandarallel res:4338,costtime:0.3467681407928467
1w
res:42936,costtime:0.008773326873779297
test_mp res:42936,costtime:0.26111721992492676
test_pandarallel res:42936,costtime:0.33237743377685547
10w
res:426742,costtime:0.07944369316101074
test_mp res:426742,costtime:0.294996976852417
test_pandarallel res:426742,costtime:0.39208269119262695
100w
res:4267420,costtime:0.8074917793273926
test_mp res:4267420,costtime:0.9741342067718506
test_pandarallel res:4267420,costtime:0.6779992580413818
1000w
res:42674200,costtime:8.027287006378174
test_mp res:42674200,costtime:7.751036882400513
test_pandarallel res:42674200,costtime:4.404983282089233
在get_url_len函数里加个sleep语句(模拟复杂逻辑),数据量为1k,运行结果如下:
1k
res:4338,costtime:10.054503679275513
test_mp res:4338,costtime:0.35697126388549805
test_pandarallel res:4338,costtime:0.43415403366088867
test_delayed res:4338,costtime:2.294757843017578
5.小结
(1)如果数据量比较少,并行处理比单次执行效率更慢;
(2)如果apply的函数逻辑简单,并行处理比单次执行效率更慢。
6.问题及解决方法
(1)ImportError:Thisplatformlacksafunctioningsem_openimplementation,therefore,therequiredsynchronizationprimitivesneededwillnotfunction,seeissue3770.
https://www.jianshu.com/p/0be1b4b27bde
(2)Linux查看物理CPU个数、核数、逻辑CPU个数
https://lover.blog.csdn.net/article/details/113951192
(3)进度条的使用
https://www.nhooo.com/article/206219.htm
到此这篇关于详解pandasapply并行处理的几种方法的文章就介绍到这了,更多相关pandasapply并行处理内容请搜索毛票票以前的文章或继续浏览下面的相关文章希望大家以后多多支持毛票票!
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。