python并发和异步编程实例
关于并发、并行、同步阻塞、异步非阻塞、线程、进程、协程等这些概念,单纯通过文字恐怕很难有比较深刻的理解,本文就通过代码一步步实现这些并发和异步编程,并进行比较。解释器方面本文选择python3,毕竟python3才是python的未来,并且python3用原生的库实现协程已经非常方便了。
1、准备阶段
下面为所有测试代码所需要的包
#!python3 #coding:utf-8 importsocket fromconcurrentimportfutures fromselectorsimportDefaultSelector,EVENT_WRITE,EVENT_READ importasyncio importaiohttp importtime fromtimeimportctime
在进行不同实现方式的比较时,实现场景就是在进行爬虫开发的时候通过向对方网站发起一系列的http请求访问,统计耗时来判断实现方式的优劣,具体地,通过建立通信套接字,访问新浪主页,返回源码,作为一次请求。先实现一个装饰器用来统计函数的执行时间:
deftsfunc(func):
defwrappedFunc(*args,**kargs):
start=time.clock()
action=func(*args,**kargs)
time_delta=time.clock()-start
print('[{0}]{1}()called,timedelta:{2}'.format(ctime(),func.__name__,time_delta))
returnaction
returnwrappedFunc
输出的格式为:当前时间,调用的函数,函数的执行时间。
2、阻塞/非阻塞和同步/异步
这两对概念不是很好区分,从定义上理解:
阻塞:在进行socket通信过程中,一个线程发起请求,如果当前请求没有返回结果,则进入sleep状态,期间线程挂起不能做其他操作,直到有返回结果,或者超时(如果设置超时的话)。
非阻塞:与阻塞相似,只不过在等待请求结果时,线程并不挂起而是进行其他操作,即在不能立刻得到结果之前,该函数不会阻挂起当前线程,而会立刻返回。
同步:同步和阻塞比较相似,但是二者并不是同一个概念,同步是指完成事件的逻辑,是指一件事完成之后,再完成第二件事,以此类推…
异步:异步和非阻塞比较类似,异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者,实现异步的方式通俗讲就是“等会再告诉你”。
1)阻塞方式
回到代码上,首先实现阻塞方式的请求函数:
defblocking_way():
sock=socket.socket()
sock.connect(('www.sina.com',80))
request='GET/HTTP/1.0\r\nHOST:www.sina.com\r\n\r\n'
sock.send(request.encode('ascii'))
response=b''
chunk=sock.recv(4096)
whilechunk:
response+=chunk
chunk=sock.recv(4096)
returnresponse
测试线程、多进程和多线程
#阻塞无并发
@tsfunc
defsync_way():
res=[]
foriinrange(10):
res.append(blocking_way())
returnlen(res)
@tsfunc
#阻塞、多进程
defprocess_way():
worker=10
withfutures.ProcessPoolExecutor(worker)asexecutor:
futs={executor.submit(blocking_way)foriinrange(10)}
returnlen([fut.result()forfutinfuts])
#阻塞、多线程
@tsfunc
defthread_way():
worker=10
withfutures.ThreadPoolExecutor(worker)asexecutor:
futs={executor.submit(blocking_way)foriinrange(10)}
returnlen([fut.result()forfutinfuts])
运行结果:
[WedDec1316:52:252017]sync_way()called,timedelta:0.06371647809425328 [WedDec1316:52:282017]process_way()called,timedelta:2.31437644946734 [WedDec1316:52:282017]thread_way()called,timedelta:0.010172946070299727
可见与非并发的方式相比,启动10个进程完成10次请求访问耗费的时间最长,进程确实需要很大的系统开销,相比多线程则效果好得多,启动10个线程并发请求,比顺序请求速度快了6倍左右。
2)非阻塞方式
实现非阻塞的请求代码,与阻塞方式的区别在于等待请求时并不挂起而是直接返回,为了确保能正确读取消息,最原始的方式就是循环读取,知道读取完成为跳出循环,代码如下:
defnonblocking_way():
sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.setblocking(False)
try:
sock.connect(('www.sina.com',80))
exceptBlockingIOError:
pass
request='GET/HTTP/1.0\r\nHost:www.sina.com\r\n\r\n'
data=request.encode('ascii')
whileTrue:
try:
sock.send(data)
break
exceptOSError:
pass
response=b''
whileTrue:
try:
chunk=sock.recv(4096)
whilechunk:
response+=chunk
chunk=sock.recv(4096)
break
exceptOSError:
pass
returnresponse
测试单线程异步非阻塞方式:
@tsfunc defasync_way(): res=[] foriinrange(10): res.append(nonblocking_way()) returnlen(res)
测试结果与单线程同步阻塞方式相比:
[WedDec1317:18:302017]sync_way()called,timedelta:0.07342884475822574 [WedDec1317:18:302017]async_way()called,timedelta:0.06509009095694886
非阻塞方式起到了一定的效果,但是并不明显,原因肯定是读取消息的时候虽然不是在线程挂起的时候而是在循环读取消息的时候浪费了时间,如果大部分时间读浪费了并没有发挥异步编程的威力,解决的办法就是后面要说的【事件驱动】
3、回调、生成器和协程
a、回调
classCrawler():
def__init__(self,url):
self.url=url
self.sock=None
self.response=b''
deffetch(self):
self.sock=socket.socket()
self.sock.setblocking(False)
try:
self.sock.connect(('www.sina.com',80))
exceptBlockingIOError:
pass
selector.register(self.sock.fileno(),EVENT_WRITE,self.connected)
defconnected(self,key,mask):
selector.unregister(key.fd)
get='GET{0}HTTP/1.0\r\nHost:www.sina.com\r\n\r\n'.format(self.url)
self.sock.send(get.encode('ascii'))
selector.register(key.fd,EVENT_READ,self.read_response)
defread_response(self,key,mask):
globalstopped
whileTrue:
try:
chunk=self.sock.recv(4096)
ifchunk:
self.response+=chunk
chunk=self.sock.recv(4096)
else:
selector.unregister(key.fd)
urls_todo.remove(self.url)
ifnoturls_todo:
stopped=True
break
except:
pass
defloop():
whilenotstopped:
events=selector.select()
forevent_key,event_maskinevents:
callback=event_key.data
callback(event_key,event_mask)
@tsfunc
defcallback_way():
forurlinurls_todo:
crawler=Crawler(url)
crawler.fetch()
loop1()
这是通过传统回调方式实现的异步编程,结果如下:
[TueMar2717:52:492018]callback_way()called,timedelta:0.054735804048789374
b、生成器
classCrawler2:
def__init__(self,url):
self.url=url
self.response=b''
deffetch(self):
globalstopped
sock=socket.socket()
yieldfromconnect(sock,('www.sina.com',80))
get='GET{0}HTTP/1.0\r\nHost:www.sina.com\r\n\r\n'.format(self.url)
sock.send(get.encode('ascii'))
self.response=yieldfromread_all(sock)
urls_todo.remove(self.url)
ifnoturls_todo:
stopped=True
classTask:
def__init__(self,coro):
self.coro=coro
f=Future1()
f.set_result(None)
self.step(f)
defstep(self,future):
try:
#send会进入到coro执行,即fetch,直到下次yield
#next_future为yield返回的对象
next_future=self.coro.send(future.result)
exceptStopIteration:
return
next_future.add_done_callback(self.step)
defloop1():
whilenotstopped:
events=selector.select()
forevent_key,event_maskinevents:
callback=event_key.data
callback()
运行结果如下:
[TueMar2717:54:272018]generate_way()called,timedelta:0.2914336347673473
c、协程
defnonblocking_way():
sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.setblocking(False)
try:
sock.connect(('www.sina.com',80))
exceptBlockingIOError:
pass
request='GET/HTTP/1.0\r\nHost:www.sina.com\r\n\r\n'
data=request.encode('ascii')
whileTrue:
try:
sock.send(data)
break
exceptOSError:
pass
response=b''
whileTrue:
try:
chunk=sock.recv(4096)
whilechunk:
response+=chunk
chunk=sock.recv(4096)
break
exceptOSError:
pass
returnresponse
@tsfunc
defasyncio_way():
tasks=[fetch(host+url)forurlinurls_todo]
loop.run_until_complete(asyncio.gather(*tasks))
return(len(tasks))
运行结果:
[TueMar2717:56:172018]asyncio_way()called,timedelta:0.43688060698484166
到此终于把并发和异步编程实例代码测试完,下边贴出全部代码,共读者自行测试,在任务量加大时,相信结果会大不一样。
#!python3
#coding:utf-8
importsocket
fromconcurrentimportfutures
fromselectorsimportDefaultSelector,EVENT_WRITE,EVENT_READ
importasyncio
importaiohttp
importtime
fromtimeimportctime
deftsfunc(func):
defwrappedFunc(*args,**kargs):
start=time.clock()
action=func(*args,**kargs)
time_delta=time.clock()-start
print('[{0}]{1}()called,timedelta:{2}'.format(ctime(),func.__name__,time_delta))
returnaction
returnwrappedFunc
defblocking_way():
sock=socket.socket()
sock.connect(('www.sina.com',80))
request='GET/HTTP/1.0\r\nHOST:www.sina.com\r\n\r\n'
sock.send(request.encode('ascii'))
response=b''
chunk=sock.recv(4096)
whilechunk:
response+=chunk
chunk=sock.recv(4096)
returnresponse
defnonblocking_way():
sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.setblocking(False)
try:
sock.connect(('www.sina.com',80))
exceptBlockingIOError:
pass
request='GET/HTTP/1.0\r\nHost:www.sina.com\r\n\r\n'
data=request.encode('ascii')
whileTrue:
try:
sock.send(data)
break
exceptOSError:
pass
response=b''
whileTrue:
try:
chunk=sock.recv(4096)
whilechunk:
response+=chunk
chunk=sock.recv(4096)
break
exceptOSError:
pass
returnresponse
selector=DefaultSelector()
stopped=False
urls_todo=['/','/1','/2','/3','/4','/5','/6','/7','/8','/9']
classCrawler():
def__init__(self,url):
self.url=url
self.sock=None
self.response=b''
deffetch(self):
self.sock=socket.socket()
self.sock.setblocking(False)
try:
self.sock.connect(('www.sina.com',80))
exceptBlockingIOError:
pass
selector.register(self.sock.fileno(),EVENT_WRITE,self.connected)
defconnected(self,key,mask):
selector.unregister(key.fd)
get='GET{0}HTTP/1.0\r\nHost:www.sina.com\r\n\r\n'.format(self.url)
self.sock.send(get.encode('ascii'))
selector.register(key.fd,EVENT_READ,self.read_response)
defread_response(self,key,mask):
globalstopped
whileTrue:
try:
chunk=self.sock.recv(4096)
ifchunk:
self.response+=chunk
chunk=self.sock.recv(4096)
else:
selector.unregister(key.fd)
urls_todo.remove(self.url)
ifnoturls_todo:
stopped=True
break
except:
pass
defloop():
whilenotstopped:
events=selector.select()
forevent_key,event_maskinevents:
callback=event_key.data
callback(event_key,event_mask)
#基于生成器的协程
classFuture:
def__init__(self):
self.result=None
self._callbacks=[]
defadd_done_callback(self,fn):
self._callbacks.append(fn)
defset_result(self,result):
self.result=result
forfninself._callbacks:
fn(self)
classCrawler1():
def__init__(self,url):
self.url=url
self.response=b''
deffetch(self):
sock=socket.socket()
sock.setblocking(False)
try:
sock.connect(('www.sina.com',80))
exceptBlockingIOError:
pass
f=Future()
defon_connected():
f.set_result(None)
selector.register(sock.fileno(),EVENT_WRITE,on_connected)
yieldf
selector.unregister(sock.fileno())
get='GET{0}HTTP/1.0\r\nHost:www.sina.com\r\n\r\n'.format(self.url)
sock.send(get.encode('ascii'))
globalstopped
whileTrue:
f=Future()
defon_readable():
f.set_result(sock.recv(4096))
selector.register(sock.fileno(),EVENT_READ,on_readable)
chunk=yieldf
selector.unregister(sock.fileno())
ifchunk:
self.response+=chunk
else:
urls_todo.remove(self.url)
ifnoturls_todo:
stopped=True
break
#yieldfrom改进的生成器协程
classFuture1:
def__init__(self):
self.result=None
self._callbacks=[]
defadd_done_callback(self,fn):
self._callbacks.append(fn)
defset_result(self,result):
self.result=result
forfninself._callbacks:
fn(self)
def__iter__(self):
yieldself
returnself.result
defconnect(sock,address):
f=Future1()
sock.setblocking(False)
try:
sock.connect(address)
exceptBlockingIOError:
pass
defon_connected():
f.set_result(None)
selector.register(sock.fileno(),EVENT_WRITE,on_connected)
yieldfromf
selector.unregister(sock.fileno())
defread(sock):
f=Future1()
defon_readable():
f.set_result(sock.recv(4096))
selector.register(sock.fileno(),EVENT_READ,on_readable)
chunk=yieldfromf
selector.unregister(sock.fileno())
returnchunk
defread_all(sock):
response=[]
chunk=yieldfromread(sock)
whilechunk:
response.append(chunk)
chunk=yieldfromread(sock)
returnb''.join(response)
classCrawler2:
def__init__(self,url):
self.url=url
self.response=b''
deffetch(self):
globalstopped
sock=socket.socket()
yieldfromconnect(sock,('www.sina.com',80))
get='GET{0}HTTP/1.0\r\nHost:www.sina.com\r\n\r\n'.format(self.url)
sock.send(get.encode('ascii'))
self.response=yieldfromread_all(sock)
urls_todo.remove(self.url)
ifnoturls_todo:
stopped=True
classTask:
def__init__(self,coro):
self.coro=coro
f=Future1()
f.set_result(None)
self.step(f)
defstep(self,future):
try:
#send会进入到coro执行,即fetch,直到下次yield
#next_future为yield返回的对象
next_future=self.coro.send(future.result)
exceptStopIteration:
return
next_future.add_done_callback(self.step)
defloop1():
whilenotstopped:
events=selector.select()
forevent_key,event_maskinevents:
callback=event_key.data
callback()
#asyncio协程
host='http://www.sina.com'
loop=asyncio.get_event_loop()
asyncdeffetch(url):
asyncwithaiohttp.ClientSession(loop=loop)assession:
asyncwithsession.get(url)asresponse:
response=awaitresponse.read()
returnresponse
@tsfunc
defasyncio_way():
tasks=[fetch(host+url)forurlinurls_todo]
loop.run_until_complete(asyncio.gather(*tasks))
return(len(tasks))
@tsfunc
defsync_way():
res=[]
foriinrange(10):
res.append(blocking_way())
returnlen(res)
@tsfunc
defprocess_way():
worker=10
withfutures.ProcessPoolExecutor(worker)asexecutor:
futs={executor.submit(blocking_way)foriinrange(10)}
returnlen([fut.result()forfutinfuts])
@tsfunc
defthread_way():
worker=10
withfutures.ThreadPoolExecutor(worker)asexecutor:
futs={executor.submit(blocking_way)foriinrange(10)}
returnlen([fut.result()forfutinfuts])
@tsfunc
defasync_way():
res=[]
foriinrange(10):
res.append(nonblocking_way())
returnlen(res)
@tsfunc
defcallback_way():
forurlinurls_todo:
crawler=Crawler(url)
crawler.fetch()
loop1()
@tsfunc
defgenerate_way():
forurlinurls_todo:
crawler=Crawler2(url)
Task(crawler.fetch())
loop1()
if__name__=='__main__':
#sync_way()
#process_way()
#thread_way()
#async_way()
#callback_way()
#generate_way()
asyncio_way()
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。