zookeeper python接口实例详解
本文主要讲python支持zookeeper的接口库安装和使用。zk的python接口库有zkpython,还有kazoo,下面是zkpython,是基于zk的C库的python接口。
zkpython安装
前提是zookeeper安装包已经在/usr/local/zookeeper下
cd/usr/local/zookeeper/src/c ./configure make makeinstall wget--no-check-certificatehttp://pypi.python.org/packages/source/z/zkpython/zkpython-0.4.tar.gz tar-zxvfzkpython-0.4.tar.gz cdzkpython-0.4 sudopythonsetup.pyinstall
zkpython应用
下面是网上一个zkpython的类,用的时候只要import进去就行
vimzkclient.py
#!/usr/bin/envpython2.7 #-*-coding:UTF-8-*- importzookeeper,time,threading fromcollectionsimportnamedtuple DEFAULT_TIMEOUT=30000 VERBOSE=True ZOO_OPEN_ACL_UNSAFE={"perms":0x1f,"scheme":"world","id":"anyone"} #Mappingofconnectionstatevaluestohumanstrings. STATE_NAME_MAPPING={ zookeeper.ASSOCIATING_STATE:"associating", zookeeper.AUTH_FAILED_STATE:"auth-failed", zookeeper.CONNECTED_STATE:"connected", zookeeper.CONNECTING_STATE:"connecting", zookeeper.EXPIRED_SESSION_STATE:"expired", } #Mappingofeventtypetohumanstring. TYPE_NAME_MAPPING={ zookeeper.NOTWATCHING_EVENT:"not-watching", zookeeper.SESSION_EVENT:"session", zookeeper.CREATED_EVENT:"created", zookeeper.DELETED_EVENT:"deleted", zookeeper.CHANGED_EVENT:"changed", zookeeper.CHILD_EVENT:"child", } classZKClientError(Exception): def__init__(self,value): self.value=value def__str__(self): returnrepr(self.value) classClientEvent(namedtuple("ClientEvent",'type,connection_state,path')): """ Aclienteventisreturnedwhenawatchdeferredfires.Itdenotes someeventonthezookeeperclientthatthewatchwasrequestedon. """ @property deftype_name(self): returnTYPE_NAME_MAPPING[self.type] @property defstate_name(self): returnSTATE_NAME_MAPPING[self.connection_state] def__repr__(self): return""%( self.type_name,self.path,self.state_name) defwatchmethod(func): defdecorated(handle,atype,state,path): event=ClientEvent(atype,state,path) returnfunc(event) returndecorated classZKClient(object): def__init__(self,servers,timeout=DEFAULT_TIMEOUT): self.timeout=timeout self.connected=False self.conn_cv=threading.Condition() self.handle=-1 self.conn_cv.acquire() ifVERBOSE:print("Connectingto%s"%(servers)) start=time.time() self.handle=zookeeper.init(servers,self.connection_watcher,timeout) self.conn_cv.wait(timeout/1000) self.conn_cv.release() ifnotself.connected: raiseZKClientError("Unabletoconnectto%s"%(servers)) ifVERBOSE: print("Connectedin%dms,handleis%d" %(int((time.time()-start)*1000),self.handle)) defconnection_watcher(self,h,type,state,path): self.handle=h self.conn_cv.acquire() self.connected=True self.conn_cv.notifyAll() self.conn_cv.release() defclose(self): returnzookeeper.close(self.handle) defcreate(self,path,data="",flags=0,acl=[ZOO_OPEN_ACL_UNSAFE]): start=time.time() result=zookeeper.create(self.handle,path,data,acl,flags) ifVERBOSE: print("Node%screatedin%dms" %(path,int((time.time()-start)*1000))) returnresult defdelete(self,path,version=-1): start=time.time() result=zookeeper.delete(self.handle,path,version) ifVERBOSE: print("Node%sdeletedin%dms" %(path,int((time.time()-start)*1000))) returnresult defget(self,path,watcher=None): returnzookeeper.get(self.handle,path,watcher) defexists(self,path,watcher=None): returnzookeeper.exists(self.handle,path,watcher) defset(self,path,data="",version=-1): returnzookeeper.set(self.handle,path,data,version) defset2(self,path,data="",version=-1): returnzookeeper.set2(self.handle,path,data,version) defget_children(self,path,watcher=None): returnzookeeper.get_children(self.handle,path,watcher) defasync(self,path="/"): returnzookeeper.async(self.handle,path) defacreate(self,path,callback,data="",flags=0,acl=[ZOO_OPEN_ACL_UNSAFE]): result=zookeeper.acreate(self.handle,path,data,acl,flags,callback) returnresult defadelete(self,path,callback,version=-1): returnzookeeper.adelete(self.handle,path,version,callback) defaget(self,path,callback,watcher=None): returnzookeeper.aget(self.handle,path,watcher,callback) defaexists(self,path,callback,watcher=None): returnzookeeper.aexists(self.handle,path,watcher,callback) defaset(self,path,callback,data="",version=-1): returnzookeeper.aset(self.handle,path,data,version,callback) watch_count=0 """Callablewatcherthatcountsthenumberofnotifications""" classCountingWatcher(object): def__init__(self): self.count=0 globalwatch_count self.id=watch_count watch_count+=1 defwaitForExpected(self,count,maxwait): """Waituptomaxwaitforthespecifiedcount, returnthecountwhetherornotmaxwaitreached. Arguments: -`count`:expectedcount -`maxwait`:maxmillisecondstowait """ waited=0 while(waited =count: returnself.count time.sleep(1.0); waited+=1000 returnself.count def__call__(self,handle,typ,state,path): self.count+=1 ifVERBOSE: print("handle%dgotwatchfor%sinwatcher%d,count%d"% (handle,path,self.id,self.count)) """Callablewatcherthatcountsthenumberofnotifications andverifiesthatthepathsaresequential""" classSequentialCountingWatcher(CountingWatcher): def__init__(self,child_path): CountingWatcher.__init__(self) self.child_path=child_path def__call__(self,handle,typ,state,path): ifnotself.child_path(self.count)==path: raiseZKClientError("handle%dinvalidpathorder%s"%(handle,path)) CountingWatcher.__call__(self,handle,typ,state,path) classCallback(object): def__init__(self): self.cv=threading.Condition() self.callback_flag=False self.rc=-1 defcallback(self,handle,rc,handler): self.cv.acquire() self.callback_flag=True self.handle=handle self.rc=rc handler() self.cv.notify() self.cv.release() defwaitForSuccess(self): whilenotself.callback_flag: self.cv.wait() self.cv.release() ifnotself.callback_flag==True: raiseZKClientError("asynchronousoperationtimedoutonhandle%d"% (self.handle)) ifnotself.rc==zookeeper.OK: raiseZKClientError( "asynchronousoperationfailedonhandle%dwithrc%d"% (self.handle,self.rc)) classGetCallback(Callback): def__init__(self): Callback.__init__(self) def__call__(self,handle,rc,value,stat): defhandler(): self.value=value self.stat=stat self.callback(handle,rc,handler) classSetCallback(Callback): def__init__(self): Callback.__init__(self) def__call__(self,handle,rc,stat): defhandler(): self.stat=stat self.callback(handle,rc,handler) classExistsCallback(SetCallback): pass classCreateCallback(Callback): def__init__(self): Callback.__init__(self) def__call__(self,handle,rc,path): defhandler(): self.path=path self.callback(handle,rc,handler) classDeleteCallback(Callback): def__init__(self): Callback.__init__(self) def__call__(self,handle,rc): defhandler(): pass self.callback(handle,rc,handler)
总结
以上就是本文关于zookeeperpython接口实例详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站其他相关专题,如有不足之处,欢迎留言指出。感谢朋友们对本站的支持!