Python的socket模块源码中的一些实现要点分析
BaseServer和BaseRequestHandler
Python为网络编程提高了更高级的封装。SocketServer.py提供了不少网络服务的类。它们的设计很优雅。Python把网络服务抽象成两个主要的类,一个是Server类,用于处理连接相关的网络操作,另外一个则是RequestHandler类,用于处理数据相关的操作。并且提供两个MixIn类,用于扩展Server,实现多进程或多线程。在构建网络服务的时候,Server和RequestHandler并不是分开的,RequestHandler的实例对象在Server内配合Server工作。
改模块的主要几个Server关系如下:
+------------+ |BaseServer| +------------+ | v +-----------++------------------+ |TCPServer|------->|UnixStreamServer| +-----------++------------------+ | v +-----------++--------------------+ |UDPServer|------->|UnixDatagramServer| +-----------++--------------------+
BaseServer分析
BaseServer通过__init__初始化,对外提供serve_forever和handler_request方法。
init初始化:
def__init__(self,server_address,RequestHandlerClass): """Constructor.Maybeextended,donotoverride.""" self.server_address=server_address self.RequestHandlerClass=RequestHandlerClass self.__is_shut_down=threading.Event() self.__shutdown_request=False
__init__源码很简单。主要作用是创建server对象,并初始化server地址和处理请求的class。熟悉socket编程应该很清楚,server_address是一个包含主机和端口的元组。
serve_forever
创建了server对象之后,就需要使用server对象开启一个无限循环,下面来分析serve_forever的源码。
defserve_forever(self,poll_interval=0.5): self.__is_shut_down.clear() try: whilenotself.__shutdown_request: r,w,e=_eintr_retry(select.select,[self],[],[], poll_interval) ifselfinr: self._handle_request_noblock() finally: self.__shutdown_request=False self.__is_shut_down.set()
serve_forever接受一个参数poll_interval,用于表示select轮询的时间。然后进入一个无限循环,调用select方式进行网络IO的监听。
如果select函数返回,表示有IO连接或数据,那么将会调用_handle_request_noblock方法。
_handle_request_noblock def_handle_request_noblock(self): try: request,client_address=self.get_request() exceptsocket.error: return ifself.verify_request(request,client_address): try: self.process_request(request,client_address) except: self.handle_error(request,client_address) self.shutdown_request(request)
_handle_request_noblock方法即开始处理一个请求,并且是非阻塞。该方法通过get_request方法获取连接,具体的实现在其子类。一旦得到了连接,调用verify_request方法验证请求。验证通过,即调用process_request处理请求。如果中途出现错误,则调用handle_error处理错误,以及shutdown_request结束连接。
verify_request defverify_request(self,request,client_address): returnTrue
该方法对request进行验证,通常会被子类重写。简单的返回True即可,然后进入process_request方法处理请求。
process_request defprocess_request(self,request,client_address): self.finish_request(request,client_address) self.shutdown_request(request)
process_request方法是mixin的入口,MixIn子类通过重写该方法,进行多线程或多进程的配置。调用finish_request完成请求的处理,同时调用shutdown_request结束请求。
finish_request deffinish_request(self,request,client_address): self.RequestHandlerClass(request,client_address,self)
finish_request方法将会处理完毕请求。创建requestHandler对象,并通过requestHandler做具体的处理。
BaseRequestHandler分析
所有requestHandler都继承BaseRequestHandler基类。
def__init__(self,request,client_address,server): self.request=request self.client_address=client_address self.server=server self.setup() try: self.handle() finally: self.finish()
该类会处理每一个请求。初始化对象的时候,设置请求request对象。然后调用setup方法,子类会重写该方法,用于处理socket连接。接下来的将是handler和finish方法。所有对请求的处理,都可以重写handler方法。
至此,整个Python提供的Server方式即介绍完毕。总结一下,构建一个网络服务,需要一个BaseServer用于处理网络IO,同时在内部创建requestHandler对象,对所有具体的请求做处理。
BaseServer-BaseRequestHandler
__init__(server_address,RequestHandlerClass): BaseServer.server_address BaseServer.RequestHandlerClass serve_forever(): select() BaseServer._handle_request_noblock() BaseServer.get_request()->request,client_addres BaseServer.verify_request() BaseServer.process_request() BaseServer.process_request() BaseServer.finish_request() BaseServer.RequestHandlerClass() BaseRequestHandler.__init__(request) BaseRequestHandler.request BaseRequestHandler.client_address=client_address BaseRequestHandler.setup() BaseRequestHandler.handle() BaseServer.shutdown_request() BaseServer.close_request() BaseServer.shutdown_request() BaseServer.close_request()
BaseServer和BaseRequestHandler是网络处理的两个基类。实际应用中,网络操作更多是使用TCP或HTTP协议。SocketServer.py也提供了更高级的TCP、UDP封装。下面就来看下关于TCP方面的网络模块(UDP和TCP的在代码组织上差别不是特别大,暂且忽略)。
TCPServer
TCPServer继承了BaseServer,初始化的时候,进行了socket套接字的创建。
def__init__(self,server_address,RequestHandlerClass,bind_and_activate=True): BaseServer.__init__(self,server_address,RequestHandlerClass) self.socket=socket.socket(self.address_family, self.socket_type) ifbind_and_activate: self.server_bind() self.server_activate()
__init__方法通过socket模块创建了socket对象,然后进行调用server_bind和server_activate。
server_bind defserver_bind(self): ifself.allow_reuse_address: self.socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) self.socket.bind(self.server_address) self.server_address=self.socket.getsockname()
server_bind方法进行socket对象的bind操作,以及设置socket相关属性,如网络地址的复用。
server_activate defserver_activate(self): self.socket.listen(self.request_queue_size)
server_activate方法也比较简单,添加socket对象的listen。
get_request
该类最重要的方法就是get_request。该方法进行返回socket对象的请求连接。
defget_request(self): """Gettherequestandclientaddressfromthesocket. """ returnself.socket.accept()
get_request方法是在BaseServer基类中的_handle_request_noblock中调用,从那里里传入套接字对象获取的连接信息。如果是UDPServer,这里获取的就是UDP连接。
此外,TCPServer还提供了一个fileno方法,提供给基类的select调用返回文件描述符。
StreamRequestHandler
TCPServer实现了使用tcp套接字的网络服务,Handler方面则是对应的StreamRequestHandler。它继承了BaseRequestHandler。基类的setup方法和finish方法被它重写,用于通过连接实现缓存文件的读写操作。
setup方法:
defsetup(self): self.connection=self.request ifself.timeoutisnotNone: self.connection.settimeout(self.timeout) ifself.disable_nagle_algorithm: self.connection.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY,True) self.rfile=self.connection.makefile('rb',self.rbufsize) self.wfile=self.connection.makefile('wb',self.wbufsize)
setup判断了是否使用nagle算法。然后设置对应的连接属性。最重要的就是创建了一个可读(rfile)和一个可写(wfile)的“文件”对象,他们实际上并不是创建了文件,而是封装了读取数据和发送数据的操作,抽象成为对文件的操作。可以理解为self.rfile就是读取客户端数据的对象,它有一些方法可以读取数据。self.wfile则是用来发送数据给客户端的对象。后面的操作,客户端数据到来会被写入缓冲区可读,需要向客户端发送数据的时候,只需要向可写的文件中write数据即可。
实现TCP服务需要使用TCPServer和StreamRequestHandler共同协作。大致函数调用流程如下,函数调用用括号表示,赋值不带括号,没有类前缀的表示系统调用:
TCPServer-StreamRequestHandler
__init__(server_address,RequestHandlerClass): BaseServer.server_address BaseServer.RequestHandlerClass TCPServer.socket=socket.socket(self.address_family,self.socket_type) TCPServer.server_bind() TCPServer.server_activate() serve_forever(): select() BaseServer._handle_request_noblock() TCPServer.get_request()->request,client_addres socket.accept() BaseServer.verify_request() BaseServer.process_request() BaseServer.process_request() BaseServer.finish_request(request,client_address) BaseServer.RequestHandlerClass() BaseRequestHandler.__init__(request) BaseRequestHandler.request BaseRequestHandler.client_address=client_address StreamRequestHandler.setup() StreamRequestHandler.connection=StreamRequestHandler.request StreamRequestHandler.rfile StreamRequestHandler.wfile BaseRequestHandler.handle() StreamRequestHandler.finsih() StreamRequestHandler.wfile.close() StreamRequestHandler.rfile.close() BaseServer.shutdown_request(request) TCPServer.shutdown() request.shutdown() TCPServer.close_request(request) request.close() TCPServer.shutdown_request(request) TCPServer.shutdown(request) request.shutdown() TCPServer.close_request(request) request.close()
最早关于介绍BaseServer的时候,我们知道python对BaseServer设计的时候,预留了可用于Mixin扩展多线程或多进程的接口。mixin通过复写父类的parse_request方法实现。
ThreadingMixIn
ThreadingMixIn类实现了多线程的方式,它只有两个方法,分别是process_request和process_request_thread方法。多进程的方式是ForkingMixIn,暂且略过。
process_request defprocess_request(self,request,client_address): t=threading.Thread(target=self.process_request_thread, args=(request,client_address)) t.daemon=self.daemon_threads t.start()
process_request方法复写了父类的此方法。以此为接口入口,对每一个请求,调用Thread开启一个新的线程。每一个线程都绑定process_request_thread方法。
process_request_thread defprocess_request_thread(self,request,client_address): try: self.finish_request(request,client_address) self.shutdown_request(request) except: self.handle_error(request,client_address) self.shutdown_request(request)
process_request_thread方法和BaseServer里的parse_request几乎一样。只不过是多线程的方式调用。
使用的时候,通过多继承调用接口,例如:
classThreadingTCPServer(ThreadingMixIn,TCPServer): pass
具体的调用过程如下:
ThreadingMixIn--TCPServer-StreamRequestHandler __init__(server_address,RequestHandlerClass): BaseServer.server_address BaseServer.RequestHandlerClass TCPServer.socket=socket.socket(self.address_family,self.socket_type) TCPServer.server_bind() TCPServer.server_activate() serve_forever(): select() BaseServer._handle_request_noblock() TCPServer.get_request()->request,client_addres socket.accept() BaseServer.verify_request() BaseServer.process_request() ThreadingMixIn.process_request() t=threading.Thread(target=ThreadingMixIn.process_request_thread) ThreadingMixIn.process_request_thread BaseServer.finish_request(request,client_address) BaseServer.RequestHandlerClass() BaseRequestHandler.__init__(request) BaseRequestHandler.request BaseRequestHandler.client_address=client_address StreamRequestHandler.setup() StreamRequestHandler.connection=StreamRequestHandler.request StreamRequestHandler.rfile StreamRequestHandler.wfile BaseRequestHandler.handle() StreamRequestHandler.finsih() StreamRequestHandler.wfile.close() StreamRequestHandler.rfile.close() BaseServer.shutdown_request(request) TCPServer.shutdown() request.shutdown() TCPServer.close_request(request) request.close() TCPServer.shutdown_request(request) TCPServer.shutdown(request) request.shutdown() TCPServer.close_request(request) request.close()