Python通过zookeeper实现分布式服务代码解析
借助zookeeper可以实现服务器的注册与发现,有需求的时候调用zookeeper来发现可用的服务器,将任务均匀分配到各个服务器上去.
这样可以方便的随任务的繁重程度对服务器进行弹性扩容,客户端和服务端是非耦合的,也可以随时增加客户端.
zk_server.py
importthreading
importjson
importsocket
importsys
fromkazoo.clientimportKazooClient
#TCP服务端绑定端口开启监听,同时将自己注册到zk
classZKServer(object):
def__init__(self,host,port):
self.sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
self.host=host
self.port=port
self.sock.bind((host,port))
self.zk=None
defserve(self):
"""
开始服务,每次获取得到一个信息,都新建一个线程处理
"""
self.sock.listen(128)
self.register_zk()
print("开始监听")
whileTrue:
conn,addr=self.sock.accept()
print("建立链接%s"%str(addr))
t=threading.Thread(target=self.handle,args=(conn,addr))
t.start()
#具体的处理逻辑,只要接收到数据就立即投入工作,下次没有数据本次链接结束
defhandle(self,conn,addr):
whileTrue:
data=conn.recv(1024)
ifnotdataordata.decode('utf-8')=='exit':
break
print(data.decode('utf-8'))
conn.close()
print('Myworkisdone!!!')
#将自己注册到zk,临时节点,所以连接不能中断
defregister_zk(self):
"""
注册到zookeeper
"""
self.zk=KazooClient(hosts='127.0.0.1:2181')
self.zk.start()
self.zk.ensure_path('/rpc')#创建根节点
value=json.dumps({'host':self.host,'port':self.port})
#创建服务子节点
self.zk.create('/rpc/server',value.encode(),ephemeral=True,sequence=True)
if__name__=='__main__':
iflen(sys.argv)<3:
print("usage:pythonserver.py[host][port]")
exit(1)
host=sys.argv[1]
port=sys.argv[2]
server=ZKServer(host,int(port))
server.serve()
zk_client.py
importrandom
importsys
importtime
importjson
importsocket
fromkazoo.clientimportKazooClient
#客户端连接zk,并从zk获取可用的服务器列表
classZKClient(object):
def__init__(self):
self._zk=KazooClient(hosts='127.0.0.1:2181')
self._zk.start()
self._get_servers()
def_get_servers(self,event=None):
"""
从zookeeper获取服务器地址信息列表
"""
servers=self._zk.get_children('/rpc',watch=self._get_servers)
#print(servers)
self._servers=[]
forserverinservers:
data=self._zk.get('/rpc/'+server)[0]
ifdata:
addr=json.loads(data.decode())
self._servers.append(addr)
def_get_server(self):
"""
随机选出一个可用的服务器
"""
returnrandom.choice(self._servers)
defget_connection(self):
"""
提供一个可用的tcp连接
"""
sock=None
whileTrue:
server=self._get_server()
print('server:%s'%server)
try:
sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.connect((server['host'],server['port']))
exceptConnectionRefusedError:
time.sleep(1)
continue
else:
break
returnsock
if__name__=='__main__':
#模拟多个客户端批量生成任务,推送给服务器执行
client=ZKClient()
foriinrange(40):
sock=client.get_connection()
sock.send(bytes(str(i),encoding='utf8'))
sock.close()
time.sleep(1)
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。