zookeeper python接口实例详解
本文主要讲python支持zookeeper的接口库安装和使用。zk的python接口库有zkpython,还有kazoo,下面是zkpython,是基于zk的C库的python接口。
zkpython安装
前提是zookeeper安装包已经在/usr/local/zookeeper下
cd/usr/local/zookeeper/src/c ./configure make makeinstall wget--no-check-certificatehttp://pypi.python.org/packages/source/z/zkpython/zkpython-0.4.tar.gz tar-zxvfzkpython-0.4.tar.gz cdzkpython-0.4 sudopythonsetup.pyinstall
zkpython应用
下面是网上一个zkpython的类,用的时候只要import进去就行
vimzkclient.py
#!/usr/bin/envpython2.7
#-*-coding:UTF-8-*-
importzookeeper,time,threading
fromcollectionsimportnamedtuple
DEFAULT_TIMEOUT=30000
VERBOSE=True
ZOO_OPEN_ACL_UNSAFE={"perms":0x1f,"scheme":"world","id":"anyone"}
#Mappingofconnectionstatevaluestohumanstrings.
STATE_NAME_MAPPING={
zookeeper.ASSOCIATING_STATE:"associating",
zookeeper.AUTH_FAILED_STATE:"auth-failed",
zookeeper.CONNECTED_STATE:"connected",
zookeeper.CONNECTING_STATE:"connecting",
zookeeper.EXPIRED_SESSION_STATE:"expired",
}
#Mappingofeventtypetohumanstring.
TYPE_NAME_MAPPING={
zookeeper.NOTWATCHING_EVENT:"not-watching",
zookeeper.SESSION_EVENT:"session",
zookeeper.CREATED_EVENT:"created",
zookeeper.DELETED_EVENT:"deleted",
zookeeper.CHANGED_EVENT:"changed",
zookeeper.CHILD_EVENT:"child",
}
classZKClientError(Exception):
def__init__(self,value):
self.value=value
def__str__(self):
returnrepr(self.value)
classClientEvent(namedtuple("ClientEvent",'type,connection_state,path')):
"""
Aclienteventisreturnedwhenawatchdeferredfires.Itdenotes
someeventonthezookeeperclientthatthewatchwasrequestedon.
"""
@property
deftype_name(self):
returnTYPE_NAME_MAPPING[self.type]
@property
defstate_name(self):
returnSTATE_NAME_MAPPING[self.connection_state]
def__repr__(self):
return""%(
self.type_name,self.path,self.state_name)
defwatchmethod(func):
defdecorated(handle,atype,state,path):
event=ClientEvent(atype,state,path)
returnfunc(event)
returndecorated
classZKClient(object):
def__init__(self,servers,timeout=DEFAULT_TIMEOUT):
self.timeout=timeout
self.connected=False
self.conn_cv=threading.Condition()
self.handle=-1
self.conn_cv.acquire()
ifVERBOSE:print("Connectingto%s"%(servers))
start=time.time()
self.handle=zookeeper.init(servers,self.connection_watcher,timeout)
self.conn_cv.wait(timeout/1000)
self.conn_cv.release()
ifnotself.connected:
raiseZKClientError("Unabletoconnectto%s"%(servers))
ifVERBOSE:
print("Connectedin%dms,handleis%d"
%(int((time.time()-start)*1000),self.handle))
defconnection_watcher(self,h,type,state,path):
self.handle=h
self.conn_cv.acquire()
self.connected=True
self.conn_cv.notifyAll()
self.conn_cv.release()
defclose(self):
returnzookeeper.close(self.handle)
defcreate(self,path,data="",flags=0,acl=[ZOO_OPEN_ACL_UNSAFE]):
start=time.time()
result=zookeeper.create(self.handle,path,data,acl,flags)
ifVERBOSE:
print("Node%screatedin%dms"
%(path,int((time.time()-start)*1000)))
returnresult
defdelete(self,path,version=-1):
start=time.time()
result=zookeeper.delete(self.handle,path,version)
ifVERBOSE:
print("Node%sdeletedin%dms"
%(path,int((time.time()-start)*1000)))
returnresult
defget(self,path,watcher=None):
returnzookeeper.get(self.handle,path,watcher)
defexists(self,path,watcher=None):
returnzookeeper.exists(self.handle,path,watcher)
defset(self,path,data="",version=-1):
returnzookeeper.set(self.handle,path,data,version)
defset2(self,path,data="",version=-1):
returnzookeeper.set2(self.handle,path,data,version)
defget_children(self,path,watcher=None):
returnzookeeper.get_children(self.handle,path,watcher)
defasync(self,path="/"):
returnzookeeper.async(self.handle,path)
defacreate(self,path,callback,data="",flags=0,acl=[ZOO_OPEN_ACL_UNSAFE]):
result=zookeeper.acreate(self.handle,path,data,acl,flags,callback)
returnresult
defadelete(self,path,callback,version=-1):
returnzookeeper.adelete(self.handle,path,version,callback)
defaget(self,path,callback,watcher=None):
returnzookeeper.aget(self.handle,path,watcher,callback)
defaexists(self,path,callback,watcher=None):
returnzookeeper.aexists(self.handle,path,watcher,callback)
defaset(self,path,callback,data="",version=-1):
returnzookeeper.aset(self.handle,path,data,version,callback)
watch_count=0
"""Callablewatcherthatcountsthenumberofnotifications"""
classCountingWatcher(object):
def__init__(self):
self.count=0
globalwatch_count
self.id=watch_count
watch_count+=1
defwaitForExpected(self,count,maxwait):
"""Waituptomaxwaitforthespecifiedcount,
returnthecountwhetherornotmaxwaitreached.
Arguments:
-`count`:expectedcount
-`maxwait`:maxmillisecondstowait
"""
waited=0
while(waited=count:
returnself.count
time.sleep(1.0);
waited+=1000
returnself.count
def__call__(self,handle,typ,state,path):
self.count+=1
ifVERBOSE:
print("handle%dgotwatchfor%sinwatcher%d,count%d"%
(handle,path,self.id,self.count))
"""Callablewatcherthatcountsthenumberofnotifications
andverifiesthatthepathsaresequential"""
classSequentialCountingWatcher(CountingWatcher):
def__init__(self,child_path):
CountingWatcher.__init__(self)
self.child_path=child_path
def__call__(self,handle,typ,state,path):
ifnotself.child_path(self.count)==path:
raiseZKClientError("handle%dinvalidpathorder%s"%(handle,path))
CountingWatcher.__call__(self,handle,typ,state,path)
classCallback(object):
def__init__(self):
self.cv=threading.Condition()
self.callback_flag=False
self.rc=-1
defcallback(self,handle,rc,handler):
self.cv.acquire()
self.callback_flag=True
self.handle=handle
self.rc=rc
handler()
self.cv.notify()
self.cv.release()
defwaitForSuccess(self):
whilenotself.callback_flag:
self.cv.wait()
self.cv.release()
ifnotself.callback_flag==True:
raiseZKClientError("asynchronousoperationtimedoutonhandle%d"%
(self.handle))
ifnotself.rc==zookeeper.OK:
raiseZKClientError(
"asynchronousoperationfailedonhandle%dwithrc%d"%
(self.handle,self.rc))
classGetCallback(Callback):
def__init__(self):
Callback.__init__(self)
def__call__(self,handle,rc,value,stat):
defhandler():
self.value=value
self.stat=stat
self.callback(handle,rc,handler)
classSetCallback(Callback):
def__init__(self):
Callback.__init__(self)
def__call__(self,handle,rc,stat):
defhandler():
self.stat=stat
self.callback(handle,rc,handler)
classExistsCallback(SetCallback):
pass
classCreateCallback(Callback):
def__init__(self):
Callback.__init__(self)
def__call__(self,handle,rc,path):
defhandler():
self.path=path
self.callback(handle,rc,handler)
classDeleteCallback(Callback):
def__init__(self):
Callback.__init__(self)
def__call__(self,handle,rc):
defhandler():
pass
self.callback(handle,rc,handler)
总结
以上就是本文关于zookeeperpython接口实例详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站其他相关专题,如有不足之处,欢迎留言指出。感谢朋友们对本站的支持!