python multiprocessing 多进程并行计算的操作
python的multiprocessing包是标准库提供的多进程并行计算包,提供了和threading(多线程)相似的API函数,但是相比于threading,将任务分配到不同的CPU,避免了GIL(GlobalInterpreterLock)的限制。
下面我们对multiprocessing中的Pool和Process类做介绍。
Pool
采用Pool进程池对任务并行处理更加方便,我们可以指定并行的CPU个数,然后Pool会自动把任务放到进程池中运行。Pool包含了多个并行函数。
applyapply_async
apply要逐个执行任务,在python3中已经被弃用,而apply_async是apply的异步执行版本。并行计算一定要采用apply_async函数。
importmultiprocessing importtime fromrandomimportrandint,seed deff(num): seed() rand_num=randint(0,10)#每次都随机生成一个停顿时间 time.sleep(rand_num) return(num,rand_num) start_time=time.time() cores=multiprocessing.cpu_count() pool=multiprocessing.Pool(processes=cores) pool_list=[] result_list=[] start_time=time.time() forxxinxrange(10): pool_list.append(pool.apply_async(f,(xx,)))#这里不能get,会阻塞进程 result_list=[xx.get()forxxinpool_list] #在这里不免有人要疑问,为什么不直接在for循环中直接result.get()呢?这是因为pool.apply_async之后的语句都是阻塞执行的,调用result.get()会等待上一个任务执行完之后才会分配下一个任务。事实上,获取返回值的过程最好放在进程池回收之后进行,避免阻塞后面的语句。 #最后我们使用一下语句回收进程池: pool.close() pool.join() printresult_list print'并行花费时间%.2f'%(time.time()-start_time) print'串行花费时间%.2f'%(sum([xx[1]forxxinresult_list])) #[(0,8),(1,2),(2,4),(3,9),(4,0),(5,1),(6,8),(7,3),(8,4),(9,6)] #并行花费时间14.11 #串行花费时间45.00
mapmap_async
map_async是map的异步执行函数。
相比于apply_async,map_async只能接受一个参数。
importtime frommultiprocessingimportPool defrun(fn): #fn:函数参数是数据列表的一个元素 time.sleep(1) returnfn*fn if__name__=="__main__": testFL=[1,2,3,4,5,6] print'串行:'#顺序执行(也就是串行执行,单进程) s=time.time() forfnintestFL: run(fn) e1=time.time() print"顺序执行时间:",int(e1-s) print'并行:'#创建多个进程,并行执行 pool=Pool(4)#创建拥有5个进程数量的进程池 #testFL:要处理的数据列表,run:处理testFL列表中数据的函数 rl=pool.map(run,testFL) pool.close()#关闭进程池,不再接受新的进程 pool.join()#主进程阻塞等待子进程的退出 e2=time.time() print"并行执行时间:",int(e2-e1) printrl #串行: #顺序执行时间:6 #并行: #并行执行时间:2 #[1,4,9,16,25,36]
Process
采用Process必须注意的是,Process对象来创建进程,每一个进程占据一个CPU,所以要建立的进程必须小于等于CPU的个数。
如果启动进程数过多,特别是当遇到CPU密集型任务,会降低并行的效率。
#16.6.1.1.TheProcessclass
frommultiprocessingimportProcess,cpu_count
importos
importtime
start_time=time.time()
definfo(title):
#print(title)
ifhasattr(os,'getppid'):#onlyavailableonUnix
print'parentprocess:',os.getppid()
print'processid:',os.getpid()
time.sleep(3)
deff(name):
info('functionf')
print'hello',name
if__name__=='__main__':
#info('mainline')
p_list=[]#保存Process新建的进程
cpu_num=cpu_count()
forxxinxrange(cpu_num):
p_list.append(Process(target=f,args=('xx_%s'%xx,)))
forxxinp_list:
xx.start()
forxxinp_list:
xx.join()
print('spendtime:%.2f'%(time.time()-start_time))
parentprocess:11741
#parentprocess:11741
#parentprocess:11741
#processid:12249
#processid:12250
#parentprocess:11741
#processid:12251
#processid:12252
#helloxx_1
#helloxx_0
#helloxx_2
#helloxx_3
#spendtime:3.04
进程间通信
Process和Pool均支持Queues和Pipes两种类型的通信。
Queue队列
队列遵循先进先出的原则,可以在各个进程间使用。
#16.6.1.2.Exchangingobjectsbetweenprocesses #Queues frommultiprocessingimportProcess,Queue deff(q): q.put([42,None,'hello']) if__name__=='__main__': q=Queue() p=Process(target=f,args=(q,)) p.start() printq.get()#prints"[42,None,'hello']" p.join()
pipe
frommultiprocessingimportProcess,Pipe deff(conn): conn.send([42,None,'hello']) conn.close() if__name__=='__main__': parent_conn,child_conn=Pipe() p=Process(target=f,args=(child_conn,)) p.start() printparent_conn.recv()#prints"[42,None,'hello']" p.join()
queue与pipe比较
Pipe()canonlyhavetwoendpoints.
Queue()canhavemultipleproducersandconsumers.
Whentousethem
Ifyouneedmorethantwopointstocommunicate,useaQueue().
Ifyouneedabsoluteperformance,aPipe()ismuchfasterbecauseQueue()isbuiltontopofPipe().
参考:
https://stackoverflow.com/questions/8463008/python-multiprocessing-pipe-vs-queue
共享资源
多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。
在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。
此时我们可以通过共享内存和Manager的方法来共享资源。
但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。
共享内存
共享内存仅适用于Process类,不能用于进程池Pool
#16.6.1.4.Sharingstatebetweenprocesses
#Sharedmemory
frommultiprocessingimportProcess,Value,Array
deff(n,a):
n.value=3.1415927
foriinrange(len(a)):
a[i]=-a[i]
if__name__=='__main__':
num=Value('d',0.0)
arr=Array('i',range(10))
p=Process(target=f,args=(num,arr))
p.start()
p.join()
printnum.value
printarr[:]
#3.1415927
#[0,-1,-2,-3,-4,-5,-6,-7,-8,-9]
ManagerClass
ManagerClass既可以用于Process也可以用于进程池Pool。
frommultiprocessingimportManager,Process
deff(d,l,ii):
d[ii]=ii
l.append(ii)
if__name__=='__main__':
manager=Manager()
d=manager.dict()
l=manager.list(range(10))
p_list=[]
forxxinrange(4):
p_list.append(Process(target=f,args=(d,l,xx)))
forxxinp_list:
xx.start()
forxxinp_list:
xx.join()
printd
printl
#{0:0,1:1,2:2,3:3}
#[0,1,2,3,4,5,6,7,8,9,0,1,2,3]
补充:python程序多进程运行时间计算/多进程写数据/多进程读数据
importtime
time_start=time.time()
time_end=time.time()
print('timecost',time_end-time_start,'s')
单位为秒,也可以换算成其他单位输出
注意写测试的时候,函数名要以test开头,否则运行不了。
多线程中的问题:
1)多线程存数据:
deftest_save_features_to_db(self):
df1=pd.read_csv('/home/sc/PycharmProjects/risk-model/xg_test/statis_data/shixin_company.csv')
com_list=df1['company_name'].values.tolist()
#com_list=com_list[400015:400019]
#print'test_save_features_to_db'
#print(com_list)
p_list=[]#进程列表
i=1
p_size=len(com_list)
forcompany_nameincom_list:
#创建进程
p=Process(target=self.__save_data_iter_method,args=[company_name])
#p.daemon=True
p_list.append(p)
#间歇执行进程
ifi%20==0ori==p_size:#20页处理一次,最后一页处理剩余
forpinp_list:
p.start()
forpinp_list:
p.join()#等待进程结束
p_list=[]#清空进程列表
i+=1
总结:多进程写入的时候,不需要lock,也不需要返回值。
核心p=Process(target=self.__save_data_iter_method,args=[company_name]),其中target指向多进程的一次完整的迭代,arg则是该迭代的输入。
注意写法args=[company_name]才对,原来写成:args=company_name,args=(company_name)会报如下错:只需要1个参数,而给出了34个参数。
多进程外层循环则是由输入决定的,有多少个输入就为多少次循环,理解p.start和p.join;
def__save_data_iter_method(self,com):
#time_start=time.time()
#print(com)
f_d_t=ShiXinFeaturesDealSvc()
res=f_d_t.get_time_features(company_name=com)
#是否失信
shixin_label=res.shixin_label
key1=res.shixin_time
ifkey1:
public_at=res.shixin_time
company_name=res.time_map_features[key1].company_name
#print(company_name)
established_years=res.time_map_features[key1].established_years
industry_dx_rate=res.time_map_features[key1].industry_dx_rate
regcap_change_cnt=res.time_map_features[key1].regcap_change_cnt
share_change_cnt=res.time_map_features[key1].share_change_cnt
industry_dx_cnt=res.time_map_features[key1].industry_dx_cnt
address_change_cnt=res.time_map_features[key1].address_change_cnt
fr_change_cnt=res.time_map_features[key1].fr_change_cnt
judgedoc_cnt=res.time_map_features[key1].judgedoc_cnt
bidding_cnt=res.time_map_features[key1].bidding_cnt
trade_mark_cnt=res.time_map_features[key1].trade_mark_cnt
network_share_cancel_cnt=res.time_map_features[key1].network_share_cancel_cnt
cancel_cnt=res.time_map_features[key1].cancel_cnt
industry_all_cnt=res.time_map_features[key1].industry_all_cnt
network_share_zhixing_cnt=res.time_map_features[key1].network_share_zhixing_cnt
network_share_judge_doc_cnt=res.time_map_features[key1].network_share_judge_doc_cnt
net_judgedoc_defendant_cnt=res.time_map_features[key1].net_judgedoc_defendant_cnt
judge_doc_cnt=res.time_map_features[key1].judge_doc_cnt
f_d_do=ShixinFeaturesDto(company_name=company_name,established_years=established_years,
industry_dx_rate=industry_dx_rate,regcap_change_cnt=regcap_change_cnt,
share_change_cnt=share_change_cnt,industry_all_cnt=industry_all_cnt,
industry_dx_cnt=industry_dx_cnt,address_change_cnt=address_change_cnt,
fr_change_cnt=fr_change_cnt,judgedoc_cnt=judgedoc_cnt,
bidding_cnt=bidding_cnt,trade_mark_cnt=trade_mark_cnt,
network_share_cancel_cnt=network_share_cancel_cnt,cancel_cnt=cancel_cnt,
network_share_zhixing_cnt=network_share_zhixing_cnt,
network_share_judge_doc_cnt=network_share_judge_doc_cnt,
net_judgedoc_defendant_cnt=net_judgedoc_defendant_cnt,
judge_doc_cnt=judge_doc_cnt,public_at=public_at,shixin_label=shixin_label)
#time_end=time.time()
#print('totallycost',time_end-time_start)
self.cfdbsvc.save_or_update_features(f_d_do)
defsave_or_update_features(self,shixin_features_dto):
"""
添加或更新:
插入一行数据,如果不存在则插入,存在则更新
"""
self._pg_util=PgUtil()
p_id=None
ifisinstance(shixin_features_dto,ShixinFeaturesDto):
p_id=str(uuid.uuid1())
self._pg_util.execute_sql(
self.s_b.insert_or_update_row(
self.model.COMPANY_NAME,
{
self.model.ID:p_id,
#公司名
self.model.COMPANY_NAME:shixin_features_dto.company_name,
#失信时间
self.model.PUBLIC_AT:shixin_features_dto.public_at,
self.model.SHIXIN_LABEL:shixin_features_dto.shixin_label,
self.model.ESTABLISHED_YEARS:shixin_features_dto.established_years,
self.model.INDUSTRY_DX_RATE:shixin_features_dto.industry_dx_rate,
self.model.REGCAP_CHANGE_CNT:shixin_features_dto.regcap_change_cnt,
self.model.SHARE_CHANGE_CNT:shixin_features_dto.share_change_cnt,
self.model.INDUSTRY_ALL_CNT:shixin_features_dto.industry_all_cnt,
self.model.INDUSTRY_DX_CNT:shixin_features_dto.industry_dx_cnt,
self.model.ADDRESS_CHANGE_CNT:shixin_features_dto.address_change_cnt,
self.model.NETWORK_SHARE_CANCEL_CNT:shixin_features_dto.network_share_cancel_cnt,
self.model.CANCEL_CNT:shixin_features_dto.cancel_cnt,
self.model.NETWORK_SHARE_ZHIXING_CNT:shixin_features_dto.network_share_zhixing_cnt,
self.model.FR_CHANGE_CNT:shixin_features_dto.fr_change_cnt,
self.model.JUDGEDOC_CNT:shixin_features_dto.judgedoc_cnt,
self.model.NETWORK_SHARE_JUDGE_DOC_CNT:shixin_features_dto.network_share_judge_doc_cnt,
self.model.BIDDING_CNT:shixin_features_dto.bidding_cnt,
self.model.TRADE_MARK_CNT:shixin_features_dto.trade_mark_cnt,
self.model.JUDGE_DOC_CNT:shixin_features_dto.judge_doc_cnt
},
[self.model.ADDRESS_CHANGE_CNT,self.model.BIDDING_CNT,self.model.CANCEL_CNT,
self.model.ESTABLISHED_YEARS,self.model.FR_CHANGE_CNT,self.model.INDUSTRY_ALL_CNT,
self.model.INDUSTRY_DX_RATE,self.model.INDUSTRY_DX_CNT,self.model.JUDGE_DOC_CNT,
self.model.JUDGEDOC_CNT,self.model.NETWORK_SHARE_CANCEL_CNT,self.model.NETWORK_SHARE_JUDGE_DOC_CNT,
self.model.NETWORK_SHARE_ZHIXING_CNT,self.model.REGCAP_CHANGE_CNT,self.model.TRADE_MARK_CNT,
self.model.SHARE_CHANGE_CNT,self.model.SHIXIN_LABEL,self.model.PUBLIC_AT]
)
)
returnp_id
函数中重新初始化了self._pg_util=PgUtil(),否则会报sslerror和ssldecryption的错误,背后原因有待研究!
**2)多进程取数据——(思考取数据为何要多进程)**
defflush_process(self,lock):#需要传入lock;
"""
运行待处理的方法队列
:typelockLock
:return返回一个dict
"""
#process_pool=Pool(processes=20)
#data_list=process_pool.map(one_process,self.__process_data_list)
#
#for(key,value)indata_list:
#
#覆盖上期变量
self.__dct_share=self.__manager.Value('tmp',{})#进程共享变量
p_list=[]#进程列表
i=1
p_size=len(self.__process_data_list)
forprocess_datainself.__process_data_list:**#循环遍历需要同时查找的公司列表!!!self.__process_data_list包含多个process_data,每个process_data包含三种属性?类对象也可以循环????**
#创建进程
p=Process(target=self.__one_process,args=(process_data,lock))#参数需要lock
#p.daemon=True
p_list.append(p)
#间歇执行进程
ifi%20==0ori==p_size:#20页处理一次,最后一页处理剩余
forpinp_list:
p.start()
forpinp_list:
p.join()#等待进程结束
p_list=[]#清空进程列表
i+=1
#endfor
self.__process_data_list=[]#清空订阅
returnself.__dct_share.value
def__one_process(self,process_data,lock):#迭代函数
"""
处理进程
:paramprocess_data:方法和参数集等
:paramlock:保护锁
"""
fcn=process_data.fcn
params=process_data.params
data_key=process_data.data_key
ifisinstance(params,tuple):
data=fcn(*params)#**注意:*params与params区别**
else:
data=fcn(params)
withlock:
temp_dct=dict(self.__dct_share.value)
ifdata_keynotintemp_dct:
temp_dct[data_key]=[]
temp_dct[data_key].append(data)
self.__dct_share.value=temp_dct
主程序调用:
defexe_process(self,company_name,open_from,time_nodes): """ 多进程执行pre订阅的数据 :paramcompany_name:公司名 :return: """ mul_process_helper=MulProcessHelper() lock=Lock() self.__get_time_bidding_statistic(company_name,mul_process_helper) data=mul_process_helper.flush_process(lock) returndata def__get_time_bidding_statistic(self,company_name,mul_process_helper): #招投标信息 process_data=ProcessData(f_e_t_svc.get_bidding_statistic_time_node_api,company_name, self.__BIDDING_STATISTIC_TIME)**#此处怎么理解?ProcessData是一个类!!!** mul_process_helper.add_process_data_list(process_data)#同时调用多个api???将api方法当做迭代????用于同时查找多个公司???? defadd_process_data_list(self,process_data): """ 添加用于进程处理的方法队列 :typeprocess_dataProcessData :paramprocess_data: :return: """ self.__process_data_list.append(process_data) classProcessData(object): """ 用于进程处理的的数据 """ def__init__(self,fcn,params,data_key): self.fcn=fcn#方法 self.params=params#参数 self.data_key=data_key#存储到进程共享变量中的名字
以上为个人经验,希望能给大家一个参考,也希望大家多多支持毛票票。如有错误或未考虑完全的地方,望不吝赐教。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。