Zookeeper接口kazoo实例解析
本文主要研究的是Zookeeper接口kazoo的相关内容,具体介绍如下。
zookeeper的开发接口以前主要以java和c为主,随着python项目越来越多的使用zookeeper作为分布式集群实现,python的zookeeper接口也出现了很多,现在主流的纯python的zookeeper接口是kazoo。因此如何使用kazoo开发基于python的分布式程序是必须掌握的。
1.安装kazoo
yuminstallpython-pip pipinstallkazoo
安装过程中会出现一些python依赖包未安装的情况,安装即可。
2.运行kazoo基础例子kazoo_basic.py
importtime fromkazoo.clientimportKazooClient fromkazoo.clientimportKazooState defmain(): zk=KazooClient(hosts='127.0.0.1:2182') zk.start() @zk.add_listener defmy_listener(state): ifstate==KazooState.LOST: print("LOST") elifstate==KazooState.SUSPENDED: print("SUSPENDED") else: print("Connected") #CreatingNodes #Ensureapath,createifnecessary zk.ensure_path("/my/favorite") #Createanodewithdata zk.create("/my/favorite/node",b"") zk.create("/my/favorite/node/a",b"A") #ReadingData #Determineifanodeexists ifzk.exists("/my/favorite"): print("/my/favoriteisexisted") @zk.ChildrenWatch("/my/favorite/node") defwatch_children(children): print("Childrenarenow:%s"%children) #Abovefunctioncalledimmediately,andfromthenon @zk.DataWatch("/my/favorite/node") defwatch_node(data,stat): print("Version:%s,data:%s"%(stat.version,data.decode("utf-8"))) #Printtheversionofanodeanditsdata data,stat=zk.get("/my/favorite/node") print("Version:%s,data:%s"%(stat.version,data.decode("utf-8"))) #Listthechildren children=zk.get_children("/my/favorite/node") print("Thereare%schildrenwithnames%s"%(len(children),children)) #UpdatingData zk.set("/my/favorite",b"somedata") #DeletingNodes zk.delete("/my/favorite/node/a") #Transactions transaction=zk.transaction() transaction.check('/my/favorite/node',version=-1) transaction.create('/my/favorite/node/b',b"B") results=transaction.commit() print("Transactionresultsis%s"%results) zk.delete("/my/favorite/node/b") zk.delete("/my",recursive=True) time.sleep(2) zk.stop() if__name__=="__main__": try: main() exceptException,ex: print"OcurredException:%s"%str(ex) quit()
运行结果:
Childrenarenow:[u'a'] Version:0,data: Version:0,data: Thereare1childrenwithnames[u'a'] Childrenarenow:[] Transactionresultsis[True,u'/my/favorite/node/b'] Childrenarenow:[u'b'] Childrenarenow:[] Nohandlerscouldbefoundforlogger"kazoo.recipe.watchers" LOST
以上程序运行了基本kazoo接口命令,包括创建删除加watcher等操作,通过调试并对比zookeeper服务节点znode目录结构的变化,就可以理解具体的操作结果。
3.运行通过kazoo实现的分布式锁程序kazoo_lock.py
importlogging,os,time fromkazoo.clientimportKazooClient fromkazoo.clientimportKazooState fromkazoo.recipe.lockimportLock classZooKeeperLock(): def__init__(self,hosts,id_str,lock_name,logger=None,timeout=1): self.hosts=hosts self.id_str=id_str self.zk_client=None self.timeout=timeout self.logger=logger self.name=lock_name self.lock_handle=None self.create_lock() defcreate_lock(self): try: self.zk_client=KazooClient(hosts=self.hosts,logger=self.logger,timeout=self.timeout) self.zk_client.start(timeout=self.timeout) exceptException,ex: self.init_ret=False self.err_str="CreateKazooClientfailed!Exception:%s"%str(ex) logging.error(self.err_str) return try: lock_path=os.path.join("/","locks",self.name) self.lock_handle=Lock(self.zk_client,lock_path) exceptException,ex: self.init_ret=False self.err_str="Createlockfailed!Exception:%s"%str(ex) logging.error(self.err_str) return defdestroy_lock(self): #self.release() ifself.zk_client!=None: self.zk_client.stop() self.zk_client=None defacquire(self,blocking=True,timeout=None): ifself.lock_handle==None: returnNone try: returnself.lock_handle.acquire(blocking=blocking,timeout=timeout) exceptException,ex: self.err_str="Acquirelockfailed!Exception:%s"%str(ex) logging.error(self.err_str) returnNone defrelease(self): ifself.lock_handle==None: returnNone returnself.lock_handle.release() def__del__(self): self.destroy_lock() defmain(): logger=logging.getLogger() logger.setLevel(logging.INFO) sh=logging.StreamHandler() formatter=logging.Formatter('%(asctime)s-%(module)s:%(filename)s-L%(lineno)d-%(levelname)s:%(message)s') sh.setFormatter(formatter) logger.addHandler(sh) zookeeper_hosts="127.0.0.1:2182" lock_name="test" lock=ZooKeeperLock(zookeeper_hosts,"myidis1",lock_name,logger=logger) ret=lock.acquire() ifnotret: logging.info("Can'tgetlock!Ret:%s",ret) return logging.info("Getlock!Dosomething!Sleep10secs!") foriinrange(1,11): time.sleep(1) printstr(i) lock.release() if__name__=="__main__": try: main() exceptException,ex: print"OcurredException:%s"%str(ex) quit()
将该测试文件copy到多个服务器,同时运行,就可以看到分布式锁的效果了。
总结
以上就是本文关于Zookeeper接口kazoo实例解析的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站其他相关专题,如有不足之处,欢迎留言指出。感谢朋友们对本站的支持!