python分布式计算dispy的使用详解
dispy,是用asyncoro实现的分布式并行计算框架。
框架也是非常精简,只有4个组件,在其源码文件夹下可以找到:
dispy.py(client)providestwowaysofcreating“clusters”:JobClusterwhenonlyoneinstanceofdispymayrunandSharedJobClusterwhenmultipleinstancesmayrun(inseparateprocesses).IfJobClusterisused,theschedulercontainedwithindispy.pywilldistributejobsontheservernodes;ifSharedJobClusterisused,aseparatescheduler(dispyscheduler)mustberunning.
dispynode.pyexecutesjobsonbehalfofdispy.dispynodemustberunningoneachofthe(server)nodesthatformthecluster.
dispyscheduler.pyisneededonlywhenSharedJobClusterisused;thisprovidesaschedulerthatcanbesharedbymultipledispyusers.
dispynetrelay.pyisneededwhennodesarelocatedacrossdifferentnetworks;thisrelaysinformationaboutnodesonanetworktothescheduler.Ifallthenodesareonsamenetwork,thereisnoneedfordispynetrelay-theschedulerandnodesautomaticallydiscovereachother.
一般情况下,使用dispy和dispynode就已经足够解决问题了。
简单使用:
服务器端:
在服务器端启动dispy,监听并接收所有发来的计算任务,完成计算后将结果返回给客户端。
打开python_home/Scripts文件夹,在安装dispy后会有上面说到的4个dispy组件,以py文件形式存在。当然你也可以在dispy的源码文件夹里面找到对于的dispynode.py文件,然后执行
pythondispynode.py-c2-i192.168.138.128-p51348-ssecret--clean
pythondispynode.py-c2-i192.168.8.143-p51348-ssecret--clean
这里192.168.138.128和192.168.8.143是执行计算节点的ip(对服务器来说相当于localhost),这里我启用了两个节点,每个节点使用2个cpu资源,其中有一个节点是在虚拟机,一个是本地机器。
-ssecret是通信密码,客户端和服务器连接需要密码,密码随意。
--clean表示每次启动服务都删除上次的启动信息,如果不删除,可能会出现pid占用的错误。
客户端:
在客户端需要注意的是,发送到计算节点函数所引用的模块,不能在py文件的顶层导入,而需要在函数内导入。
对于需要导入自定义模块,比较麻烦一点,需要先实例化函数,才能在计算节点的函数中使用。
#这些在顶层导入的模块只能是这个py文件用 importtime importsocket importnumpy importdatetime #这个是自定义函数,要在本模块中先实例化才能在计算节点函数中调用使用, #而本模块的其他地方可以直接调用使用 frommy_package.my_modelimportget_time #实例化自定义的函数,注意后面是没有括号的,否则就是直接调用得到返回值了 now=get_time.now #计算函数,dispy将这个函数和参数一并发送到服务器节点 #如果函数有多个参数,需要包装程tuple格式 defcompute(args): n,array=args#如果函数有多个参数,需要包装程tuple格式 #看到没,计算需要的模块是在函数内导入的 importtime,socket time.sleep(3) host=socket.gethostname() #这个py文件中自定义函数,可以直接引用 total=my_sum(array) #这个now是在其他模块中自定义的函数,需要在顶层先实例化才能引用 now_time=now() return(host,n,total,now_time) defsum(array): #自定义函数,需要的模块同样需要在函数内导入 importnumpyasnp returnnp.sum(array) defloadData(): #自定义函数,生成测试数据 importnumpyasnp data=np.random.rand(20,20) data=[lineforlineindata] returndata if__name__=='__main__': importdispy,random #定义两个计算节点 nodes=['192.168.8.143','192.168.138.128'] #启动计算集群,和服务器通信,通信密钥是'secret' #depends为依赖函数 cluster=dispy.JobCluster(compute,nodes=nodes, secret='secret',depends=[sum,now]) jobs=[] datas=loadData() forninrange(len(datas)): #提交任务 job=cluster.submit((n,datas[n])) job.id=n jobs.append(job) #print(datetime.datetime.now()) #cluster.wait()#等待所有任务完成后才接着往下执行 #print(datetime.datetime.now()) forjobinjobs: host,n,total,t=job() print('%sexecutedjob%sat%swith%stotal=%.2ft=%s' %(host,job.id,job.start_time,n,total,t)) #otherfieldsof'job'thatmaybeuseful: #printjob.stdout,job.stderr,job.exception, #job.ip_addr,job.start_time,job.end_time #显示集群计算状态 cluster.stats()
以上这篇python分布式计算dispy的使用详解就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。