Zookeeper接口kazoo实例解析
本文主要研究的是Zookeeper接口kazoo的相关内容,具体介绍如下。
zookeeper的开发接口以前主要以java和c为主,随着python项目越来越多的使用zookeeper作为分布式集群实现,python的zookeeper接口也出现了很多,现在主流的纯python的zookeeper接口是kazoo。因此如何使用kazoo开发基于python的分布式程序是必须掌握的。
1.安装kazoo
yuminstallpython-pip pipinstallkazoo
安装过程中会出现一些python依赖包未安装的情况,安装即可。
2.运行kazoo基础例子kazoo_basic.py
importtime
fromkazoo.clientimportKazooClient
fromkazoo.clientimportKazooState
defmain():
zk=KazooClient(hosts='127.0.0.1:2182')
zk.start()
@zk.add_listener
defmy_listener(state):
ifstate==KazooState.LOST:
print("LOST")
elifstate==KazooState.SUSPENDED:
print("SUSPENDED")
else:
print("Connected")
#CreatingNodes
#Ensureapath,createifnecessary
zk.ensure_path("/my/favorite")
#Createanodewithdata
zk.create("/my/favorite/node",b"")
zk.create("/my/favorite/node/a",b"A")
#ReadingData
#Determineifanodeexists
ifzk.exists("/my/favorite"):
print("/my/favoriteisexisted")
@zk.ChildrenWatch("/my/favorite/node")
defwatch_children(children):
print("Childrenarenow:%s"%children)
#Abovefunctioncalledimmediately,andfromthenon
@zk.DataWatch("/my/favorite/node")
defwatch_node(data,stat):
print("Version:%s,data:%s"%(stat.version,data.decode("utf-8")))
#Printtheversionofanodeanditsdata
data,stat=zk.get("/my/favorite/node")
print("Version:%s,data:%s"%(stat.version,data.decode("utf-8")))
#Listthechildren
children=zk.get_children("/my/favorite/node")
print("Thereare%schildrenwithnames%s"%(len(children),children))
#UpdatingData
zk.set("/my/favorite",b"somedata")
#DeletingNodes
zk.delete("/my/favorite/node/a")
#Transactions
transaction=zk.transaction()
transaction.check('/my/favorite/node',version=-1)
transaction.create('/my/favorite/node/b',b"B")
results=transaction.commit()
print("Transactionresultsis%s"%results)
zk.delete("/my/favorite/node/b")
zk.delete("/my",recursive=True)
time.sleep(2)
zk.stop()
if__name__=="__main__":
try:
main()
exceptException,ex:
print"OcurredException:%s"%str(ex)
quit()
运行结果:
Childrenarenow:[u'a'] Version:0,data: Version:0,data: Thereare1childrenwithnames[u'a'] Childrenarenow:[] Transactionresultsis[True,u'/my/favorite/node/b'] Childrenarenow:[u'b'] Childrenarenow:[] Nohandlerscouldbefoundforlogger"kazoo.recipe.watchers" LOST
以上程序运行了基本kazoo接口命令,包括创建删除加watcher等操作,通过调试并对比zookeeper服务节点znode目录结构的变化,就可以理解具体的操作结果。
3.运行通过kazoo实现的分布式锁程序kazoo_lock.py
importlogging,os,time
fromkazoo.clientimportKazooClient
fromkazoo.clientimportKazooState
fromkazoo.recipe.lockimportLock
classZooKeeperLock():
def__init__(self,hosts,id_str,lock_name,logger=None,timeout=1):
self.hosts=hosts
self.id_str=id_str
self.zk_client=None
self.timeout=timeout
self.logger=logger
self.name=lock_name
self.lock_handle=None
self.create_lock()
defcreate_lock(self):
try:
self.zk_client=KazooClient(hosts=self.hosts,logger=self.logger,timeout=self.timeout)
self.zk_client.start(timeout=self.timeout)
exceptException,ex:
self.init_ret=False
self.err_str="CreateKazooClientfailed!Exception:%s"%str(ex)
logging.error(self.err_str)
return
try:
lock_path=os.path.join("/","locks",self.name)
self.lock_handle=Lock(self.zk_client,lock_path)
exceptException,ex:
self.init_ret=False
self.err_str="Createlockfailed!Exception:%s"%str(ex)
logging.error(self.err_str)
return
defdestroy_lock(self):
#self.release()
ifself.zk_client!=None:
self.zk_client.stop()
self.zk_client=None
defacquire(self,blocking=True,timeout=None):
ifself.lock_handle==None:
returnNone
try:
returnself.lock_handle.acquire(blocking=blocking,timeout=timeout)
exceptException,ex:
self.err_str="Acquirelockfailed!Exception:%s"%str(ex)
logging.error(self.err_str)
returnNone
defrelease(self):
ifself.lock_handle==None:
returnNone
returnself.lock_handle.release()
def__del__(self):
self.destroy_lock()
defmain():
logger=logging.getLogger()
logger.setLevel(logging.INFO)
sh=logging.StreamHandler()
formatter=logging.Formatter('%(asctime)s-%(module)s:%(filename)s-L%(lineno)d-%(levelname)s:%(message)s')
sh.setFormatter(formatter)
logger.addHandler(sh)
zookeeper_hosts="127.0.0.1:2182"
lock_name="test"
lock=ZooKeeperLock(zookeeper_hosts,"myidis1",lock_name,logger=logger)
ret=lock.acquire()
ifnotret:
logging.info("Can'tgetlock!Ret:%s",ret)
return
logging.info("Getlock!Dosomething!Sleep10secs!")
foriinrange(1,11):
time.sleep(1)
printstr(i)
lock.release()
if__name__=="__main__":
try:
main()
exceptException,ex:
print"OcurredException:%s"%str(ex)
quit()
将该测试文件copy到多个服务器,同时运行,就可以看到分布式锁的效果了。
总结
以上就是本文关于Zookeeper接口kazoo实例解析的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站其他相关专题,如有不足之处,欢迎留言指出。感谢朋友们对本站的支持!