浅谈Zookeeper开源客户端框架Curator
zookeepercurator
Curator是Netflix开源的一套ZooKeeper客户端框架.Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层,应用方在使用的时候需要自己处理很多事情,于是在它的基础上包装了一下,提供了一套更好用的客户端框架.Netflix在用ZooKeeper的过程中遇到的问题,我们也遇到了,所以开始研究一下,首先从他在github上的源码,wiki文档以及Netflix的技术blog入手.
看完官方的文档之后,发现Curator主要解决了三类问题:
2.提供了一套Fluent风格的操作API;
3.提供ZooKeeper各种应用场景(recipe,比如共享锁服务,集群领导选举机制)的抽象封装.
Curator列举的ZooKeeper使用过程中的几个问题
2.自动恢复(failover)的问题:当client与一台server的连接丢失,并试图去连接另外一台server时,client将回到初始连接模式
session过期的问题:在极端情况下,出现ZooKeepersession过期,客户端需要自己去监听该状态并重新创建ZooKeeper实例.
3.对可恢复异常的处理:当在server端创建一个有序ZNode,而在将节点名返回给客户端时崩溃,此时client端抛出可恢复的异常,用户需要自己捕获这些异常并进行重试
4.使用场景的问题:Zookeeper提供了一些标准的使用场景支持,但是ZooKeeper对这些功能的使用说明文档很少,而且很容易用错.在一些极端场景下如何处理,zk并没有给出详细的文档说明.比如共享锁服务,当服务器端创建临时顺序节点成功,但是在客户端接收到节点名之前挂掉了,如果不能很好的处理这种情况,将导致死锁.
Curator主要从以下几个方面降低了zk使用的复杂性:
2.连接状态监控:Curator初始化之后会一直的对zk连接进行监听,一旦发现连接状态发生变化,将作出相应的处理.
3.zk客户端实例管理:Curator对zk客户端到server集群连接进行管理.并在需要的情况,重建zk实例,保证与zk集群的可靠连接
各种使用场景支持:Curator实现zk支持的大部分使用场景支持(甚至包括zk自身不支持的场景),这些实现都遵循了zk的较佳实践,并考虑了各种极端情况.
Curator通过以上的处理,让用户专注于自身的业务本身,而无需花费更多的精力在zk本身.
Curator声称的一些亮点:
日志工具
采用驱动器(driver)机制,允许扩展和定制日志和跟踪处理
提供了一个TracerDriver接口,通过实现addTrace()和addCount()接口来集成用户自己的跟踪框架
和Curator相比,另一个ZooKeeper客户端——zkClient
文档几乎没有
异常处理弱爆了(简单的抛出RuntimeException)
重试处理太难用了
没有提供各种使用场景的实现
对ZooKeeper自带客户端(ZooKeeper类)的"抱怨":
只是一个底层实现
要用需要自己写大量的代码
很容易误用
需要自己处理连接丢失,重试等
Curator几个组成部分
2.Framework:用来简化ZooKeeper高级功能的使用,并增加了一些新的功能,比如管理到ZooKeeper集群的连接,重试处理
3.Recipes:实现了通用ZooKeeper的recipe,该组件建立在Framework的基础之上
4.Utilities:各种ZooKeeper的工具类
5.Errors:异常处理,连接,恢复等.
6.Extensions:recipe扩展
Client
这是一个底层的API,应用方基本对这个可以无视,较好直接从CuratorFramework入手,主要包括三部分:
连接重试处理
RetryLoop(循环重试)
一种典型的用法:
RetryLoopretryLoop=client.newRetryLoop(); while(retryLoop.shouldContinue()) { try { //performyourwork ... //it'simportanttore-gettheZKinstanceastheremayhavebeenanerrorandtheinstancewasre-created ZooKeeperzk=client.getZookeeper(); retryLoop.markComplete(); } catch(Exceptione) { retryLoop.takeException(e); } }
如果在操作过程中失败,且这种失败是可重试的,而且在允许的次数内,Curator将保证操作的最终完成.
另一种使用Callable接口的重试做法:
RetryLoop.callWithRetry(client,newCallable() { @Override publicVoidcall()throwsException { //doyourworkhere-itwillgetretriedifneeded returnnull; } });
重试策略
RetryPolicy接口只有一个方法(以前版本有两个方法):
publicbooleanallowRetry(intretryCount,longelapsedTimeMs);
在开始重试之前,allowRetry方法被调用,其参数将指定当前重试次数,和操作已消耗时间.如果允许,将继续重试,否则抛出异常.
Curator内部实现的几种重试策略:
2.RetryNTimes:指定较大重试次数的重试策略
3.RetryOneTime:仅重试一次
4.RetryUntilElapsed:一直重试直到达到规定的时间
Framework
是ZooKeeperClient更高的抽象API
自动连接管理:当ZooKeeper客户端内部出现异常,将自动进行重连或重试,该过程对外几乎完全透明
更清晰的API:简化了ZooKeeper原生的方法,事件等,提供流程的接口
CuratorFrameworkFactory类提供了两个方法,一个工厂方法newClient,一个构建方法build.使用工厂方法newClient可以创建一个默认的实例,而build构建方法可以对实例进行定制.当CuratorFramework实例构建完成,紧接着调用start()方法,在应用结束的时候,需要调用close()方法. CuratorFramework是线程安全的.在一个应用中可以共享同一个zk集群的CuratorFramework.
CuratorFrameworkAPI采用了连贯风格的接口(FluentInterface).所有的操作一律返回构建器,当所有元素加在一起之后,整个方法看起来就像一个完整的句子.比如下面的操作:
client.create().forPath("/head",newbyte[0]); client.delete().inBackground().forPath("/head"); client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child",newbyte[0]); client.getData().watched().inBackground().forPath("/test");
方法说明:
2.delete():发起一个删除操作.可以组合其他方法(version或background)最后以forPath()方法结尾
3.checkExists():发起一个检查ZNode是否存在的操作.可以组合其他方法(watch或background)最后以forPath()方法结尾
4.getData():发起一个获取ZNode数据的操作.可以组合其他方法(watch,background或getstat)最后以forPath()方法结尾
5.setData():发起一个设置ZNode数据的操作.可以组合其他方法(version或background)最后以forPath()方法结尾
6.getChildren():发起一个获取ZNode子节点的操作.可以组合其他方法(watch,background或getstat)最后以forPath()方法结尾
7.inTransaction():发起一个ZooKeeper事务.可以组合create,setData,check,和/或delete为一个操作,然后commit()提交
.
通知(Notification)
Curator的相关代码已经更新了,里面的接口已经由ClientListener改成CuratorListener了,而且接口中去掉了clientCloseDueToError方法.只有一个方法:
eventReceived() 当一个后台操作完成或者指定的watch被触发时该方法被调用
UnhandledErrorListener接口用来对异常进行处理.
CuratorEvent(在以前版本为ClientEvent)是对各种操作触发相关事件对象(POJO)的一个完整封装,而事件对象的内容跟事件类型相关,下面是对应关系:
名称空间(Namespace)
因为一个zk集群会被多个应用共享,为了避免各个应用的zkpatch冲突,CuratorFramework内部会给每一个CuratorFramework实例分配一个namespace(可选).这样你在createZNode的时候都会自动加上这个namespace作为这个nodepath的root.使用代码如下:
CuratorFrameworkclient=CuratorFrameworkFactory.builder().namespace("MyApp")...build(); … client.create().forPath("/test",data); //nodewasactuallywrittento:"/MyApp/test"
Recipe
Curator实现ZooKeeper的所有recipe(除了两段提交)
选举
集群领导选举(leaderelection)
锁服务
共享锁:全局同步分布式锁,同一时间两台机器只有一台能获得同一把锁.
共享读写锁:用于分布式的读写互斥处理,同时生成两个锁:一个读锁,一个写锁,读锁能被多个应用持有,而写锁只能一个独占,当写锁未被持有时,多个读锁持有者可以同时进行读操作
共享信号量:在分布式系统中的各个JVM使用同一个zklockpath,该path将跟一个给定数量的租约(lease)相关联,然后各个应用根据请求顺序获得对应的lease,相对来说,这是最公平的锁服务使用方式.
多共享锁:内部构件多个共享锁(会跟一个znodepath关联),在acquire()过程中,执行所有共享锁的acquire()方法,如果中间出现一个失败,则将释放所有已require的共享锁;执行release()方法时,则执行内部多个共享锁的release方法(如果出现失败将忽略)
队列(Queue)
分布式队列:采用持久顺序zknode来实现FIFO队列,如果有多个消费者,可以使用LeaderSelector来保证队列的消费者顺序
分布式优先队列:优先队列的分布式版本
BlockingQueueConsumer:JDK阻塞队列的分布式版本
关卡(Barrier)
分布式关卡:一堆客户端去处理一堆任务,只有所有的客户端都执行完,所有客户端才能继续往下处理
双分布式关卡:同时开始,同时结束
计数器(Counter)
共享计数器:所有客户端监听同一个znodepath,并共享一个的integer计数值
分布式AtomicLong(AtomicInteger):AtomicXxx的分布式版本,先采用乐观锁更新,若失败再采用互斥锁更新,可以配置重试策略来处理重试
工具类
PathCache
PathCache用于监听ZNode的子节点的变化,当add,update,remove子节点时将改变PathCachestate,同时返回所有子节点的data和state.
Curator中采用了PathChildrenCache类来处理PathCache,状态的变化则采用PathChildrenCacheListener来监听.
相关用法参见TestPathChildrenCache测试类
注意:当zkserver的数据发生变化,zkclient会出现不一致,这个需要通过版本号来识别这种状态的变化
TestServer
用来在测试中模拟一个本地进程内ZooKeeperServer.
TestCluster
用来在测试中模拟一个ZooKeeperServer集群
ZKPaths工具类
提供了和ZNode相关的path处理工具方法:
1.getNodeFromPath:根据给定path获取nodename.i.e."/one/two/three"->"three"
2.mkdirs:根据给定路径递归创建所有node
3.getSortedChildren:根据给定路径,返回一个按序列号排序的子节点列表
4.makePath:根据给定的path和子节点名,创建一个完整path
EnsurePath工具类
直接看例子,具体的说就是调用多次,只会执行一次创建节点操作.
EnsurePathensurePath=newEnsurePath(aFullPathToEnsure); ... StringnodePath=aFullPathToEnsure+"/foo"; ensurePath.ensure(zk);//firsttimesyncsandcreatesifneeded zk.create(nodePath,...); ... ensurePath.ensure(zk);//subsequenttimesareNOPs zk.create(nodePath,...);
Notification事件处理
Curator对ZooKeeper的事件Watcher进行了封装处理,然后实现了一套监听机制.提供了几个监听接口用来处理ZooKeeper连接状态的变化
当连接出现异常,将通过ConnectionStateListener接口进行监听,并进行相应的处理,这些状态变化包括:
2.重连(RECONNECTED):连接丢失,执行重连时,将触发该通知
3.丢失(LOST):连接超时时,将触发该通知
从com.netflix.curator.framework.imps.CuratorFrameworkImpl.validateConnection(CuratorEvent)方法中我们可以知道,Curator分别将ZooKeeper的Disconnected,Expired,SyncConnected三种状态转换成上面三种状态.
总结
以上就是本文关于浅谈Zookeeper开源客户端框架Curator的全部内容,感兴趣的朋友可以参阅:zookeeperwatch机制的理解、为zookeeper配置相应的acl权限、apachezookeeper使用方法实例详解等,希望对大家有所帮助。如有不足之处,欢迎留言指正。感谢朋友们对本站的支持!