python多线程并发及测试框架案例
这篇文章主要介绍了python多线程并发及测试框架案例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
1、循环创建多个线程,并通过循环启动执行
importthreading fromdatetimeimport* fromtimeimportsleep #单线程执行 deftest(): print('helloworld') t=threading.Thread(target=test) t.start() #多线程执行 deftest_01(): sleep(1) x=0 whilex==0:#设置一个死循环 print(datetime.now())#获取当前系统时间 deflooptest(): ''' 循环20次执行test_o1()函数 :return: ''' foriinrange(20): test_01() defthd(): ''' 创建并执行多个线程 需求:并发执行50次test_o1()函数 说明:把50的并发拆成25个线程组,每个线程再循环20次执行test_o1()函数,这样在启动下一个线程的时候, 上一个线程已经在循环了,以此类推,当启动第25个线程的时候,可能已经执行了200次的test_o1()函数, 这样就可以大大减少并发的时间差异 :return: ''' Threads=[] foriinrange(25): th=threading.Thread(target=looptest) Threads.append(th) ''' 守护线程:主线程执行完毕之后,会等待子线程全部执行完毕,才会关闭结束程序 必须加在start()之前,默认为false ''' th.setDaemon(True) forthinThreads: th.start() forthinThreads: ''' 阻塞线程:等主线程执行完毕之后再关闭所有子线程 必须加在start()之后 可以通过join()的timeout参数来完美解决相互等待的问题,子线程告诉主线程让其等待0.04秒, 0.04秒之内子线程完成,主线程就继续往下执行,0.04秒之后如果子线程还未完成,主线程也会 继续往下执行,执行完成之后关闭子线程 ''' th.join(0.04) if__name__=="__main__": print('start') thd() print('end')
2、并发测试框架
#并发测试框架 THREAD_NUM=1 ONE_WORKER_NUM=1 deftest(): pass#测试代码 defworking(): globalONE_WORKER_NUM foriinrange(0,ONE_WORKER_NUM): test() deft(): globalTHREAD_NUM Threads=[] foriinrange(THREAD_NUM): t=threading.Thread(target=working,name='T'+str(i)) t.setDaemon(True) Threads.append(t) fortinThreads: t.start() fortinThreads: t.join() if__name__=="__main__": t()
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。