Python通过zookeeper实现分布式服务代码解析
借助zookeeper可以实现服务器的注册与发现,有需求的时候调用zookeeper来发现可用的服务器,将任务均匀分配到各个服务器上去.
这样可以方便的随任务的繁重程度对服务器进行弹性扩容,客户端和服务端是非耦合的,也可以随时增加客户端.
zk_server.py
importthreading importjson importsocket importsys fromkazoo.clientimportKazooClient #TCP服务端绑定端口开启监听,同时将自己注册到zk classZKServer(object): def__init__(self,host,port): self.sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM) self.sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) self.host=host self.port=port self.sock.bind((host,port)) self.zk=None defserve(self): """ 开始服务,每次获取得到一个信息,都新建一个线程处理 """ self.sock.listen(128) self.register_zk() print("开始监听") whileTrue: conn,addr=self.sock.accept() print("建立链接%s"%str(addr)) t=threading.Thread(target=self.handle,args=(conn,addr)) t.start() #具体的处理逻辑,只要接收到数据就立即投入工作,下次没有数据本次链接结束 defhandle(self,conn,addr): whileTrue: data=conn.recv(1024) ifnotdataordata.decode('utf-8')=='exit': break print(data.decode('utf-8')) conn.close() print('Myworkisdone!!!') #将自己注册到zk,临时节点,所以连接不能中断 defregister_zk(self): """ 注册到zookeeper """ self.zk=KazooClient(hosts='127.0.0.1:2181') self.zk.start() self.zk.ensure_path('/rpc')#创建根节点 value=json.dumps({'host':self.host,'port':self.port}) #创建服务子节点 self.zk.create('/rpc/server',value.encode(),ephemeral=True,sequence=True) if__name__=='__main__': iflen(sys.argv)<3: print("usage:pythonserver.py[host][port]") exit(1) host=sys.argv[1] port=sys.argv[2] server=ZKServer(host,int(port)) server.serve()
zk_client.py
importrandom importsys importtime importjson importsocket fromkazoo.clientimportKazooClient #客户端连接zk,并从zk获取可用的服务器列表 classZKClient(object): def__init__(self): self._zk=KazooClient(hosts='127.0.0.1:2181') self._zk.start() self._get_servers() def_get_servers(self,event=None): """ 从zookeeper获取服务器地址信息列表 """ servers=self._zk.get_children('/rpc',watch=self._get_servers) #print(servers) self._servers=[] forserverinservers: data=self._zk.get('/rpc/'+server)[0] ifdata: addr=json.loads(data.decode()) self._servers.append(addr) def_get_server(self): """ 随机选出一个可用的服务器 """ returnrandom.choice(self._servers) defget_connection(self): """ 提供一个可用的tcp连接 """ sock=None whileTrue: server=self._get_server() print('server:%s'%server) try: sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM) sock.connect((server['host'],server['port'])) exceptConnectionRefusedError: time.sleep(1) continue else: break returnsock if__name__=='__main__': #模拟多个客户端批量生成任务,推送给服务器执行 client=ZKClient() foriinrange(40): sock=client.get_connection() sock.send(bytes(str(i),encoding='utf8')) sock.close() time.sleep(1)
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。