Python concurrent.futures模块使用实例
这篇文章主要介绍了Pythonconcurrent.futures模块使用实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
concurrent.futures的作用:
管理并发任务池。concurrent.futures模块提供了使用工作线程或进程池运行任务的接口。线程和进程池API都是一样,所以应用只做最小的修改就可以在线程和进程之间地切换
1、基于线程池使用map()
futures_thread_pool_map.py
#!/usr/bin/envpython
#-*-coding:utf-8-*-
fromconcurrentimportfutures
importthreading
importtime
deftask(n):
print('{}:睡眠{}'.format(threading.current_thread().name,n))
time.sleep(n/10)
print('{}:执行完成{}'.format(threading.current_thread().name,n))
returnn/10
ex=futures.ThreadPoolExecutor(max_workers=2)
print('main:开始运行')
results=ex.map(task,range(5,0,-1))#返回值是generator生成器
print('main:未处理的结果{}'.format(results))
print('main:等待真实结果')
real_results=list(results)
print('main:最终结果:{}'.format(real_results))
运行效果
[root@mnt]#python3futures_thread_pool_map.py main:开始运行 ThreadPoolExecutor-0_0:睡眠5 ThreadPoolExecutor-0_1:睡眠4 main:未处理的结果.result_iteratorat0x7f1c97484678> main:等待真实结果 ThreadPoolExecutor-0_1:执行完成4 ThreadPoolExecutor-0_1:睡眠3 ThreadPoolExecutor-0_0:执行完成5 ThreadPoolExecutor-0_0:睡眠2 ThreadPoolExecutor-0_0:执行完成2 ThreadPoolExecutor-0_0:睡眠1 ThreadPoolExecutor-0_1:执行完成3 ThreadPoolExecutor-0_0:执行完成1 main:最终结果:[0.5,0.4,0.3,0.2,0.1]
2、futures执行单个任务
futures_thread_pool_submit.py
#!/usr/bin/envpython
#-*-coding:utf-8-*-
fromconcurrentimportfutures
importthreading
importtime
deftask(n):
print('{}:睡眠{}'.format(threading.current_thread().name,n))
time.sleep(n/10)
print('{}:执行完成{}'.format(threading.current_thread().name,n))
returnn/10
ex=futures.ThreadPoolExecutor(max_workers=2)
print('main:开始')
f=ex.submit(task,5)
print('main:future:{}'.format(f))
print('等待运行结果')
results=f.result()
print('main:result:{}'.format(results))
print('main:future之后的结果:{}'.format(f))
运行效果
[root@mnt]#python3futures_thread_pool_submit.py main:开始 ThreadPoolExecutor-0_0:睡眠5 main:future:等待运行结果 ThreadPoolExecutor-0_0:执行完成5 main:result:0.5 main:future之后的结果:
3、futures.as_completed()按任意顺序运行结果
futures_as_completed.py
#!/usr/bin/envpython
#-*-coding:utf-8-*-
importrandom
importtime
fromconcurrentimportfutures
deftask(n):
time.sleep(random.random())
return(n,n/10)
ex=futures.ThreadPoolExecutor(max_workers=2)
print('main:开始')
wait_for=[
ex.submit(task,i)foriinrange(5,0,-1)
]
forfinfutures.as_completed(wait_for):
print('main:result:{}'.format(f.result()))
运行效果
[root@mnt]#python3futures_as_completed.py main:开始 main:result:(5,0.5) main:result:(4,0.4) main:result:(3,0.3) main:result:(1,0.1) main:result:(2,0.2)
4、Future回调之futures.add_done_callback()
futures_future_callback.py
#!/usr/bin/envpython
#-*-coding:utf-8-*-
fromconcurrentimportfutures
importtime
deftask(n):
print('task{}:睡眠'.format(n))
time.sleep(0.5)
print('task{}:完成'.format(n))
returnn/10
defdone(fn):
iffn.cancelled():
print('done{}:取消'.format(fn.arg))
eliffn.done():
error=fn.exception()
iferror:
print('done{}:错误返回:{}'.format(fn.arg,error))
else:
result=fn.result()
print('done{}:正常返回:{}'.format(fn.arg,result))
if__name__=='__main__':
ex=futures.ThreadPoolExecutor(max_workers=2)
print('main:开始')
f=ex.submit(task,5)
f.arg=5
f.add_done_callback(done)
result=f.result()
运行效果
[root@mnt]#python3futures_future_callback.py main:开始 task5:睡眠 task5:完成 done5:正常返回:0.5
5、Future任务取消之futures.cancel()
futures_future_callback_cancel.py
#!/usr/bin/envpython
#-*-coding:utf-8-*-
fromconcurrentimportfutures
importtime
deftask(n):
print('task{}:睡眠'.format(n))
time.sleep(0.5)
print('task{}:完成'.format(n))
returnn/10
defdone(fn):
iffn.cancelled():
print('done{}:取消'.format(fn.arg))
eliffn.done():
error=fn.exception()
iferror:
print('done{}:错误返回:{}'.format(fn.arg,error))
else:
result=fn.result()
print('done{}:正常返回:{}'.format(fn.arg,result))
if__name__=='__main__':
ex=futures.ThreadPoolExecutor(max_workers=2)
print('main:开始')
tasks=[]
foriinrange(10,0,-1):
print('main:submitting{}'.format(i))
f=ex.submit(task,i)
f.arg=i
f.add_done_callback(done)
tasks.append((i,f))
fori,task_objinreversed(tasks):
ifnottask_obj.cancel():
print('main:不能取消{}'.format(i))
ex.shutdown()
运行效果
[root@mnt]#python3futures_future_callback_cancel.py main:开始 main:submitting10 task10:睡眠 main:submitting9 task9:睡眠 main:submitting8 main:submitting7 main:submitting6 main:submitting5 main:submitting4 main:submitting3 main:submitting2 main:submitting1 done1:取消 done2:取消 done3:取消 done4:取消 done5:取消 done6:取消 done7:取消 done8:取消 main:不能取消9 main:不能取消10 task10:完成 done10:正常返回:1.0 task9:完成 done9:正常返回:0.9
6、Future异常的处理
futures_future_exception
#!/usr/bin/envpython
#-*-coding:utf-8-*-
fromconcurrentimportfutures
deftask(n):
print('{}:开始'.format(n))
raiseValueError('这个值不太好{}'.format(n))
ex=futures.ThreadPoolExecutor(max_workers=2)
print('main:开始...')
f=ex.submit(task,5)
error=f.exception()
print('main:error:{}'.format(error))
try:
result=f.result()
exceptValueErrorase:
print('访问结果值的异常{}'.format(e))
运行效果
[root@mnt]#python3futures_future_exception.py main:开始... 5:开始 main:error:这个值不太好5 访问结果值的异常这个值不太好5
7、Future上下文管理即利用with打开futures.ThreadPoolExecutor()
futures_context_manager.py
#!/usr/bin/envpython
#-*-coding:utf-8-*-
fromconcurrentimportfutures
deftask(n):
print(n)
withfutures.ThreadPoolExecutor(max_workers=2)asex:
print('main:开始')
ex.submit(task,1)
ex.submit(task,2)
ex.submit(task,3)
ex.submit(task,4)
print('main:结束')
运行效果
[root@mnt]#python3futures_context_manager.py main:开始 2 4 main:结束
8、基于进程池使用map()
futures_process_pool_map.py
#!/usr/bin/envpython
#-*-coding:utf-8-*-
fromconcurrentimportfutures
importos
deftask(n):
return(n,os.getpid())
if__name__=='__main__':
ex=futures.ProcessPoolExecutor(max_workers=2)
results=ex.map(task,range(50,0,-1))
forn,pidinresults:
print('task{}in进程id{}'.format(n,pid))
运行效果
[root@mnt]#python3futures_process_pool_map.py task5in进程id9192 task4in进程id8668 task3in进程id9192 task2in进程id8668 task1in进程id9192
9、基于进程池异常处理
futures_process_pool_broken.py
#!/usr/bin/envpython
#-*-coding:utf-8-*-
fromconcurrentimportfutures
importos
importsignal
deftask(n):
return(n,os.getpid())
if__name__=='__main__':
withfutures.ProcessPoolExecutor(max_workers=2)asex:
print('获取工作进程的id')
f1=ex.submit(os.getpid)
pid1=f1.result()
print('结束进程{}'.format(pid1))
os.kill(pid1,signal.SIGHUP)
print('提交其它进程')
f2=ex.submit(os.getpid)
try:
pid2=f2.result()
exceptfutures.process.BrokenProcessPoolase:
print('不能开始新的任务:{}'.format(e))
运行效果
[root@mnt]#python3futures_process_pool_broken.py 获取工作进程的id 结束进程104623 提交其它进程 不能开始新的任务:Aprocessintheprocesspoolwasterminatedabruptlywhilethefuturewasrunningorpending.
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。