源码阅读之storm操作zookeeper-cluster.clj
storm操作zookeeper的主要函数都定义在命名空间backtype.storm.cluster中(即cluster.clj文件中)。backtype.storm.cluster定义了两个重要protocol:ClusterState和StormClusterState。
clojure中的protocol可以看成java中的接口,封装了一组方法。ClusterState协议中封装了一组与zookeeper进行交互的基础函数,如获取子节点函数,获取子节点数据函数等,ClusterState协议定义如下:
ClusterState协议
(defprotocolClusterState (set-ephemeral-node[thispathdata]) (delete-node[thispath]) (create-sequential[thispathdata]) ;;ifnodedoesnotexist,createpersistentwiththisdata (set-data[thispathdata]) (get-data[thispathwatch?]) (get-version[thispathwatch?]) (get-data-with-version[thispathwatch?]) (get-children[thispathwatch?]) (mkdirs[thispath]) (close[this]) (register[thiscallback]) (unregister[thisid]))
StormClusterState协议封装了一组storm与zookeeper进行交互的函数,可以将StormClusterState协议中的函数看成ClusterState协议中函数的"组合"。StormClusterState协议定义如下:
StormClusterState协议
(defprotocolStormClusterState (assignments[thiscallback]) (assignment-info[thisstorm-idcallback]) (assignment-info-with-version[thisstorm-idcallback]) (assignment-version[thisstorm-idcallback]) (active-storms[this]) (storm-base[thisstorm-idcallback]) (get-worker-heartbeat[thisstorm-idnodeport]) (executor-beats[thisstorm-idexecutor->node+port]) (supervisors[thiscallback]) (supervisor-info[thissupervisor-id]);;returnsnilifdoesn'texist (setup-heartbeats![thisstorm-id]) (teardown-heartbeats![thisstorm-id]) (teardown-topology-errors![thisstorm-id]) (heartbeat-storms[this]) (error-topologies[this]) (worker-heartbeat![thisstorm-idnodeportinfo]) (remove-worker-heartbeat![thisstorm-idnodeport]) (supervisor-heartbeat![thissupervisor-idinfo]) (activate-storm![thisstorm-idstorm-base]) (update-storm![thisstorm-idnew-elems]) (remove-storm-base![thisstorm-id]) (set-assignment![thisstorm-idinfo]) (remove-storm![thisstorm-id]) (report-error[thisstorm-idtask-idnodeporterror]) (errors[thisstorm-idtask-id]) (disconnect[this]))
命名空间backtype.storm.cluster除了定义ClusterState和StormClusterState这两个重要协议外,还定义了两个重要函数:mk-distributed-cluster-state和mk-storm-cluster-state。
mk-distributed-cluster-state函数如下:
该函数返回一个实现了ClusterState协议的对象,通过这个对象就可以与zookeeper进行交互了。
mk-distributed-cluster-state函数
(defnmk-distributed-cluster-state
;;conf绑定了storm.yaml中的配置信息,是一个map对象
[conf]
;;zk绑定一个zkclient,Storm使用CuratorFramework与Zookeeper进行交互
(let[zk(zk/mk-clientconf(confSTORM-ZOOKEEPER-SERVERS)(confSTORM-ZOOKEEPER-PORT):auth-confconf)]
;;创建storm集群在zookeeper上的根目录,默认值为/storm
(zk/mkdirszk(confSTORM-ZOOKEEPER-ROOT))
(.closezk))
;;callbacks绑定回调函数集合,是一个map对象
(let[callbacks(atom{})
;;active标示zookeeper集群状态
active(atomtrue)
;;zk重新绑定新的zkclient,该zkclient设置了watcher,这样当zookeeper集群的状态发生变化时,zkserver会给zkclient发送相应的event,zkclient设置的watcher会调用callbacks中相应回调函数来处理event
;;启动nimbus时,callbacks是一个空集合,所以nimbus端收到event后不会调用任何回调函数;但是启动supervisor时,callbacks中注册了回调函数,所以当supervisor收到zkserver发送的event后,会调用相应的回调函数
;;mk-client函数定义在zookeeper.clj文件中,请参见其定义部分
zk(zk/mk-clientconf
(confSTORM-ZOOKEEPER-SERVERS)
(confSTORM-ZOOKEEPER-PORT)
:auth-confconf
:root(confSTORM-ZOOKEEPER-ROOT)
;;:watcher绑定一个函数,指定zkclient的默认watcher函数,state标示当前zkclient的状态;type标示事件类型;path标示zookeeper上产生该事件的znode
;;该watcher函数主要功能就是执行callbacks集合中的函数,callbacks集合中的函数是在mk-storm-cluster-state函数中通过调用ClusterState的register函数添加的
:watcher(fn[statetypepath]
(when@active
(when-not(=:connectedstate)
(log-warn"Receivedevent"state":"type":"path"withdisconnectedZookeeper."))
(when-not(=:nonetype)
(doseq[callback(vals@callbacks)]
(callbacktypepath))))))]
;;reify相当于java中的implements,这里表示实现一个协议
(reify
ClusterState
;;register函数用于将回调函数加入callbacks中,key是一个32位的标识
(register
[thiscallback]
(let[id(uuid)]
(swap!callbacksassocidcallback)
id))
;;unregister函数用于将指定key的回调函数从callbacks中删除
(unregister
[thisid]
(swap!callbacksdissocid))
;;在zookeeper上添加一个临时节点
(set-ephemeral-node
[thispathdata]
(zk/mkdirszk(parent-pathpath))
(if(zk/existszkpathfalse)
(try-cause
(zk/set-datazkpathdata);shouldverifythatit'sephemeral
(catchKeeperException$NoNodeExceptione
(log-warn-errore"Ephemeralnodedisappearedbetweencheckingforexistingandsettingdata")
(zk/create-nodezkpathdata:ephemeral)
))
(zk/create-nodezkpathdata:ephemeral)))
;;在zookeeper上添加一个顺序节点
(create-sequential
[thispathdata]
(zk/create-nodezkpathdata:sequential))
;;修改某个节点数据
(set-data
[thispathdata]
;;note:thisdoesnotturnoffanyexistingwatches
(if(zk/existszkpathfalse)
(zk/set-datazkpathdata)
(do
(zk/mkdirszk(parent-pathpath))
(zk/create-nodezkpathdata:persistent))))
;;删除指定节点
(delete-node
[thispath]
(zk/delete-recursivezkpath))
;;获取指定节点数据。path标示节点路径;watch?是一个布尔类型值,表示是否需要对该节点进行"观察",如果watch?=true,当调用set-data函数修改该节点数据后,
;;会给zkclient发送一个事件,zkclient接收事件后,会调用创建zkclient时指定的默认watcher函数(即:watcher绑定的函数)
(get-data
[thispathwatch?]
(zk/get-datazkpathwatch?))
;;与get-data函数的区别就是获取指定节点数据的同时,获取节点数据的version,version表示节点数据修改的次数
(get-data-with-version
[thispathwatch?]
(zk/get-data-with-versionzkpathwatch?))
;;获取指定节点的version,watch?的含义与get-data函数中的watch?相同
(get-version
[thispathwatch?]
(zk/get-versionzkpathwatch?))
;;获取指定节点的子节点列表,watch?的含义与get-data函数中的watch?相同
(get-children
[thispathwatch?]
(zk/get-childrenzkpathwatch?))
;;在zookeeper上创建一个节点
(mkdirs
[thispath]
(zk/mkdirszkpath))
;;关闭zkclient
(close
[this]
(reset!activefalse)
(.closezk)))))
mk-storm-cluster-state函数定义如下:
mk-storm-cluster-state函数非常重要,该函数返回一个实现了StormClusterState协议的实例,通过该实例storm就可以更加方便与zookeeper进行交互。
在启动nimbus和supervisor的函数中均调用了mk-storm-cluster-state函数。关于nimbus和supervisor的启动将在之后的文章中介绍。
mk-storm-cluster-state函数
(defnmk-storm-cluster-state
[cluster-state-spec]
;;satisfies?谓词相当于java中的instanceof,判断cluster-state-spec是不是ClusterState实例
(let[[solo?cluster-state](if(satisfies?ClusterStatecluster-state-spec)
[falsecluster-state-spec]
[true(mk-distributed-cluster-statecluster-state-spec)])
;;绑定topologyid->回调函数的map,当/assignments/{topologyid}数据发生变化时,zkclient执行assignment-info-callback中topologyid所对应的回调函数
assignment-info-callback(atom{})
;;assignment-info-with-version-callback与assignment-info-callback类似
assignment-info-with-version-callback(atom{})
;;assignment-version-callback与assignments-callback类似
assignment-version-callback(atom{})
;;当/supervisors标示的znode的子节点发生变化时,zkclient执行supervisors-callback指向的函数
supervisors-callback(atomnil)
;;当/assignments标示的znode的子节点发生变化时,zkclient执行assignments-callback指向的函数
assignments-callback(atomnil)
;;当/storms/{topologyid}标示的znode的数据发生变化时,zkclient执行storm-base-callback中topologyid所对应的回调函数
storm-base-callback(atom{})
;;register函数将"回调函数(fn...)"添加到cluster-state的callbacks集合中,并返回标示该回调函数的uuid
state-id(register
cluster-state
;;定义"回调函数",type标示事件类型,path标示znode
(fn[typepath]
;;subtree绑定路径前缀如"assignments"、"storms"、"supervisors"等,args存放topologyid
(let[[subtree&args](tokenize-pathpath)]
;;condp相当于java中的switch
(condp=subtree
;;当subtree="assignments"时,如果args为空,说明是/assignments的子节点发生变化,执行assignments-callback指向的回调函数,否则
;;说明/assignments/{topologyid}标示的节点数据发生变化,执行assignment-info-callback指向的回调函数
ASSIGNMENTS-ROOT(if(empty?args)
(issue-callback!assignments-callback)
(issue-map-callback!assignment-info-callback(firstargs)))
;;当subtree="supervisors"时,说明是/supervisors的子节点发生变化,执行supervisors-callback指向的回调函数
SUPERVISORS-ROOT(issue-callback!supervisors-callback)
;;当subtree="storms"时,说明是/storms/{topologyid}标示的节点数据发生变化,执行storm-base-callback指向的回调函数
STORMS-ROOT(issue-map-callback!storm-base-callback(firstargs))
;;thisshouldneverhappen
(exit-process!30"Unknowncallbackforsubtree"subtreeargs)))))]
;;在zookeeper上创建storm运行topology所必需的znode
(doseq[p[ASSIGNMENTS-SUBTREESTORMS-SUBTREESUPERVISORS-SUBTREEWORKERBEATS-SUBTREEERRORS-SUBTREE]]
(mkdirscluster-statep))
;;返回一个实现StormClusterState协议的实例
(reify
StormClusterState
;;获取/assignments的子节点列表,如果callback不为空,将其赋值给assignments-callback,并对/assignments添加"节点观察"
(assignments
[thiscallback]
(whencallback
(reset!assignments-callbackcallback))
(get-childrencluster-stateASSIGNMENTS-SUBTREE(not-nil?callback)))
;;获取/assignments/{storm-id}节点数据,即storm-id的分配信息,如果callback不为空,将其添加到assignment-info-callback中,并对/assignments/{storm-id}添加"数据观察"
(assignment-info
[thisstorm-idcallback]
(whencallback
(swap!assignment-info-callbackassocstorm-idcallback))
(maybe-deserialize(get-datacluster-state(assignment-pathstorm-id)(not-nil?callback))))
;;获取/assignments/{storm-id}节点数据包括version信息,如果callback不为空,将其添加到assignment-info-with-version-callback中,并对/assignments/{storm-id}添加"数据观察"
(assignment-info-with-version
[thisstorm-idcallback]
(whencallback
(swap!assignment-info-with-version-callbackassocstorm-idcallback))
(let[{data:dataversion:version}
(get-data-with-versioncluster-state(assignment-pathstorm-id)(not-nil?callback))]
{:data(maybe-deserializedata)
:versionversion}))
;;获取/assignments/{storm-id}节点数据的version信息,如果callback不为空,将其添加到assignment-version-callback中,并对/assignments/{storm-id}添加"数据观察"
(assignment-version
[thisstorm-idcallback]
(whencallback
(swap!assignment-version-callbackassocstorm-idcallback))
(get-versioncluster-state(assignment-pathstorm-id)(not-nil?callback)))
;;获取storm集群中正在运行的topologyid即/storms的子节点列表
(active-storms
[this]
(get-childrencluster-stateSTORMS-SUBTREEfalse))
;;获取storm集群中所有有心跳的topologyid即/workerbeats的子节点列表
(heartbeat-storms
[this]
(get-childrencluster-stateWORKERBEATS-SUBTREEfalse))
;;获取所有有错误的topologyid即/errors的子节点列表
(error-topologies
[this]
(get-childrencluster-stateERRORS-SUBTREEfalse))
;;获取指定storm-id进程的心跳信息,即/workerbeats/{storm-id}/{node-port}节点数据
(get-worker-heartbeat
[thisstorm-idnodeport]
(->cluster-state
(get-data(workerbeat-pathstorm-idnodeport)false)
maybe-deserialize))
;;获取指定进程中所有线程的心跳信息
(executor-beats
[thisstorm-idexecutor->node+port]
;;needtotakeexecutor->node+portinexplicitlysothatwedon'trunintoasituationwherea
;;longdeadworkerwithaskewedclockoverridesallthetimestamps.Byonlycheckingheartbeats
;;withanassignednode+port,andonlyreadingexecutorsfromthatheartbeatthatareactuallyassigned,
;;weavoidsituationslikethat
(let[node+port->executors(reverse-mapexecutor->node+port)
all-heartbeats(for[[[nodeport]executors]node+port->executors]
(->>(get-worker-heartbeatthisstorm-idnodeport)
(convert-executor-beatsexecutors)
))]
(applymergeall-heartbeats)))
;;获取/supervisors的子节点列表,如果callback不为空,将其赋值给supervisors-callback,并对/supervisors添加"节点观察"
(supervisors
[thiscallback]
(whencallback
(reset!supervisors-callbackcallback))
(get-childrencluster-stateSUPERVISORS-SUBTREE(not-nil?callback)))
;;获取/supervisors/{supervisor-id}节点数据,即supervisor的心跳信息
(supervisor-info
[thissupervisor-id]
(maybe-deserialize(get-datacluster-state(supervisor-pathsupervisor-id)false)))
;;设置进程心跳信息
(worker-heartbeat!
[thisstorm-idnodeportinfo]
(set-datacluster-state(workerbeat-pathstorm-idnodeport)(Utils/serializeinfo)))
;;删除进程心跳信息
(remove-worker-heartbeat!
[thisstorm-idnodeport]
(delete-nodecluster-state(workerbeat-pathstorm-idnodeport)))
;;创建指定storm-id的topology的用于存放心跳信息的节点
(setup-heartbeats!
[thisstorm-id]
(mkdirscluster-state(workerbeat-storm-rootstorm-id)))
;;删除指定storm-id的topology的心跳信息节点
(teardown-heartbeats!
[thisstorm-id]
(try-cause
(delete-nodecluster-state(workerbeat-storm-rootstorm-id))
(catchKeeperExceptione
(log-warn-errore"Couldnotteardownheartbeatsfor"storm-id))))
;;删除指定storm-id的topology的错误信息节点
(teardown-topology-errors!
[thisstorm-id]
(try-cause
(delete-nodecluster-state(error-storm-rootstorm-id))
(catchKeeperExceptione
(log-warn-errore"Couldnotteardownerrorsfor"storm-id))))
;;创建临时节点存放supervisor的心跳信息
(supervisor-heartbeat!
[thissupervisor-idinfo]
(set-ephemeral-nodecluster-state(supervisor-pathsupervisor-id)(Utils/serializeinfo)))
;;创建/storms/{storm-id}节点
(activate-storm!
[thisstorm-idstorm-base]
(set-datacluster-state(storm-pathstorm-id)(Utils/serializestorm-base)))
;;更新topology对应的StormBase对象,即更新/storm/{storm-id}节点
(update-storm!
[thisstorm-idnew-elems]
;;base绑定storm-id在zookeeper上的StormBase对象
(let[base(storm-basethisstorm-idnil)
;;executors绑定component名称->组件并行度的map
executors(:component->executorsbase)
;;new-elems绑定合并后的组件并行度map,update函数将组件新并行度map合并到旧map中
new-elems(updatenew-elems:component->executors(partialmergeexecutors))]
;;更新StormBase对象中的组件并行度map,并写入zookeeper的/storms/{storm-id}节点
(set-datacluster-state(storm-pathstorm-id)
(->base
(mergenew-elems)
Utils/serialize))))
;;获取storm-id的StormBase对象,即读取/storms/{storm-id}节点数据,如果callback不为空,将其赋值给storm-base-callback,并为/storms/{storm-id}节点添加"数据观察"
(storm-base
[thisstorm-idcallback]
(whencallback
(swap!storm-base-callbackassocstorm-idcallback))
(maybe-deserialize(get-datacluster-state(storm-pathstorm-id)(not-nil?callback))))
;;删除storm-id的StormBase对象,即删除/storms/{storm-id}节点
(remove-storm-base!
[thisstorm-id]
(delete-nodecluster-state(storm-pathstorm-id)))
;;更新storm-id的分配信息,即更新/assignments/{storm-id}节点数据
(set-assignment!
[thisstorm-idinfo]
(set-datacluster-state(assignment-pathstorm-id)(Utils/serializeinfo)))
;;删除storm-id的分配信息,同时删除其StormBase信息,即删除/assignments/{storm-id}节点和/storms/{storm-id}节点
(remove-storm!
[thisstorm-id]
(delete-nodecluster-state(assignment-pathstorm-id))
(remove-storm-base!thisstorm-id))
;;将组件异常信息写入zookeeper
(report-error
[thisstorm-idcomponent-idnodeporterror]
;;path绑定"/errors/{storm-id}/{component-id}"
(let[path(error-pathstorm-idcomponent-id)
;;data绑定异常信息,包括异常时间、异常堆栈信息、主机和端口
data{:time-secs(current-time-secs):error(stringify-errorerror):hostnode:portport}
;;创建/errors/{storm-id}/{component-id}节点
_(mkdirscluster-statepath)
;;创建/errors/{storm-id}/{component-id}的子顺序节点,并写入异常信息
_(create-sequentialcluster-state(strpath"/e")(Utils/serializedata))
;;to-kill绑定除去顺序节点编号最大的前10个节点的剩余节点的集合
to-kill(->>(get-childrencluster-statepathfalse)
(sort-byparse-error-path)
reverse
(drop10))]
;;删除to-kill中包含的节点
(doseq[kto-kill]
(delete-nodecluster-state(strpath"/"k)))))
;;得到给定的storm-idcomponent-id下的异常信息
(errors
[thisstorm-idcomponent-id]
(let[path(error-pathstorm-idcomponent-id)
_(mkdirscluster-statepath)
children(get-childrencluster-statepathfalse)
errors(dofor[cchildren]
(let[data(->(get-datacluster-state(strpath"/"c)false)
maybe-deserialize)]
(whendata
(structTaskError(:errordata)(:time-secsdata)(:hostdata)(:portdata))
)))
]
(->>(filternot-nil?errors)
(sort-by(comp-:time-secs)))))
;;关闭连接,在关闭连接前,将回调函数从cluster-state的callbacks中删除
(disconnect
[this]
(unregistercluster-statestate-id)
(whensolo?
(closecluster-state))))))
zookeeper.clj中mk-client函数
mk-client函数创建一个CuratorFramework实例,为该实例注册了CuratorListener,当一个后台操作完成或者指定的watch被触发时将会执行CuratorListener中的eventReceived()。eventReceived中调用的wacher函数就是mk-distributed-cluster-state中:watcher绑定的函数。
(defnkmk-client [confserversport :root"" :watcherdefault-watcher :auth-confnil] (let[fk(Utils/newCuratorconfserversportroot(whenauth-conf(ZookeeperAuthInfo.auth-conf)))] (..fk (getCuratorListenable) (addListener (reifyCuratorListener (^voideventReceived[this^CuratorFramework_fk^CuratorEvente] (when(=(.getTypee)CuratorEventType/WATCHED) (let[^WatchedEventevent(.getWatchedEvente)] (watcher(zk-keeper-states(.getStateevent)) (zk-event-types(.getTypeevent)) (.getPathevent)))))))) (.startfk) fk))
以上就是storm与zookeeper进行交互的源码分析,我觉得最重要的部分就是如何给zkclient添加"wacher",storm的很多功能都是通过zookeeper的wacher机制实现的,如"分配信息领取"。添加"wacher"大概分为以下几个步骤:
mk-distributed-cluster-state函数创建了一个zkclient,并通过:watcher给该zkclient指定了"wacher"函数,这个"wacher"函数只是简单调用ClusterState的callbacks集合中的函数,这样这个"wacher"函数执行哪些函数将由ClusterState实例决定
ClusterState实例提供register函数来更新callbacks集合,ClusterState实例被传递给了mk-storm-cluster-state函数,在mk-storm-cluster-state中调用register添加了一个函数(fn[typepath]...),这个函数实现了"watcher"函数的全部逻辑
mk-storm-cluster-state中注册的函数执行的具体内容由StormClusterState实例决定,对zookeeper节点添加"观察"也是通过StormClusterState实例实现的,这样我们就可以通过StormClusterState实例对我们感兴趣的节点添加"观察"和"回调函数",当节点或节点数据发生变化后,zkserver就会给zkclient发送"通知",zkclient中的"wather"函数将被调用,进而我们注册的"回到函数"将被执行。
总结
这部分源码与zookeeper联系十分紧密,涉及了很多zookeeper中的概念和特性,如"数据观察"和"节点观察"等,有关zookeeper的wacher机制请参考:https://www.nhooo.com/article/124295.htm,storm并没有直接使用zookeeper的api,而是使用Curator框架,Curator框架简化了访问zookeeper的操作。关于Curator框架请参考:https://www.nhooo.com/article/125785.htm。
以上就是本文关于源码阅读之storm操作zookeeper-cluster.clj的全部内容了,感兴趣的朋友可以参阅:zookeeperwatch机制的理解、apachezookeeper使用方法实例详解、为zookeeper配置相应的acl权限等,希望对大家有所帮助。感谢各位的阅读!