Kafka源码系列教程之删除topic
前言
ApacheKafka发源于LinkedIn,于2011年成为Apache的孵化项目,随后于2012年成为Apache的主要项目之一。Kafka使用Scala和Java进行编写。ApacheKafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统。Kafka具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。
本文依然是以kafka0.8.2.2为例讲解
一,如何删除一个topic
删除一个topic有两个关键点:
1,配置删除参数
delete.topic.enable这个Broker参数配置为True。
2,执行
bin/kafka-topics.sh--zookeeperzk_host:port/chroot--delete--topicmy_topic_name
假如不配置删除参数为true的话,topic其实并没有被清除,只是被标记为删除。此时,估计一般人的做法是删除topic在Zookeeper的信息和日志,其实这个操作并不会清除kafkaBroker内存的topic数据。所以,此时最佳的策略是配置删除参数为true然后,重启kafka。
二,重要的类介绍
1,PartitionStateMachine
该类代表分区的状态机。决定者分区的当前状态,和状态转移。四种状态
- NonExistentPartition
- NewPartition
- OnlinePartition
- OfflinePartition
2,ReplicaManager
负责管理当前机器的所有副本,处理读写、删除等具体动作。
读写:写获取partition对象,再获取Replica对象,再获取Log对象,采用其管理的Segment对象将数据写入、读出。
3,ReplicaStateMachine
副本的状态机。决定者副本的当前状态和状态之间的转移。一个副本总共可以处于一下几种状态的一种
NewReplica:Crontroller在分区重分配的时候可以创建一个新的副本。只能接受变为follower的请求。前状态可以是NonExistentReplica
OnlineReplica:新启动的分区,能接受变为leader或者follower请求。前状态可以是NewReplica,OnlineReplicaorOfflineReplica
OfflineReplica:死亡的副本处于这种状态。前状态可以是NewReplica,OnlineReplica
ReplicaDeletionStarted:分本删除开始的时候处于这种状态,前状态是OfflineReplica
ReplicaDeletionSuccessful:副本删除成功。前状态是ReplicaDeletionStarted
ReplicaDeletionIneligible:删除失败的时候处于这种状态。前状态是ReplicaDeletionStarted
NonExistentReplica:副本成功删除之后处于这种状态,前状态是ReplicaDeletionSuccessful
4,TopicDeletionManager
该类管理着topic删除的状态机
1),TopicCommand通过创建/admin/delete_topics/
2),Controller监听/admin/delete_topic子节点变动,开始分别删除topic
3),Controller有个后台线程负责删除Topic
三,源码彻底解析topic的删除过程
此处会分四个部分:
A),客户端执行删除命令作用
B),不配置delete.topic.enable整个流水的源码
C),配置了delete.topic.enable整个流水的源码
D),手动删除zk上topic信息和磁盘数据
1,客户端执行删除命令
bin/kafka-topics.sh--zookeeperzk_host:port/chroot--delete--topicmy_topic_name
进入kafka-topics.sh我们会看到
exec$(dirname$0)/kafka-run-class.shkafka.admin.TopicCommand$@
进入TopicCommand里面,main方法里面
elseif(opts.options.has(opts.deleteOpt)) deleteTopic(zkClient,opts)
实际内容是
valtopics=getTopics(zkClient,opts) if(topics.length==0){ println("Topic%sdoesnotexist".format(opts.options.valueOf(opts.topicOpt))) } topics.foreach{topic=> try{ ZkUtils.createPersistentPath(zkClient,ZkUtils.getDeleteTopicPath(topic))
在"/admin/delete_topics"目录下创建了一个topicName的节点。
2,假如不配置delete.topic.enable整个流水是
总共有两处listener会响应:
A),TopicChangeListener
B),DeleteTopicsListener
使用topic的删除命令删除一个topic的话,指挥触发DeleteTopicListener。
vartopicsToBeDeleted={ importJavaConversions._ (children:Buffer[String]).toSet } valnonExistentTopics=topicsToBeDeleted.filter(t=>!controllerContext.allTopics.contains(t)) topicsToBeDeleted--=nonExistentTopics if(topicsToBeDeleted.size>0){ info("Startingtopicdeletionfortopics"+topicsToBeDeleted.mkString(",")) //marktopicineligiblefordeletionifotherstatechangesareinprogress topicsToBeDeleted.foreach{topic=> valpreferredReplicaElectionInProgress= controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic) valpartitionReassignmentInProgress= controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic) if(preferredReplicaElectionInProgress||partitionReassignmentInProgress) controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic)) } //addtopictodeletionlist controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted) }
由于都会判断delete.topic.enable是否为true,假如不为true就不会执行,为true就进入执行
controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic)) controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
3,delete.topic.enable配置为true
此处与步骤2的区别,就是那两个处理函数。
controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic)) controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
markTopicIneligibleForDeletion函数的处理为
if(isDeleteTopicEnabled){ valnewTopicsToHaltDeletion=topicsToBeDeleted&topics topicsIneligibleForDeletion++=newTopicsToHaltDeletion if(newTopicsToHaltDeletion.size>0) info("Halteddeletionoftopics%s".format(newTopicsToHaltDeletion.mkString(","))) }
主要是停止删除topic,假如存储以下三种情况
*Haltdeletetopicif-
*1.replicasbeingdown
*2.partitionreassignmentinprogressforsomepartitionsofthetopic
*3.preferredreplicaelectioninprogressforsomepartitionsofthetopic
enqueueTopicsForDeletion主要作用是更新删除topic的集合,并激活TopicDeleteThread
defenqueueTopicsForDeletion(topics:Set[String]){ if(isDeleteTopicEnabled){ topicsToBeDeleted++=topics partitionsToBeDeleted++=topics.flatMap(controllerContext.partitionsForTopic) resumeTopicDeletionThread() } }
在删除线程DeleteTopicsThread的doWork方法中
topicsQueuedForDeletion.foreach{topic=> //ifallreplicasaremarkedasdeletedsuccessfully,thentopicdeletionisdone if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)){ //clearupallstateforthistopicfromcontrollercacheandzookeeper completeDeleteTopic(topic) info("Deletionoftopic%ssuccessfullycompleted".format(topic)) }
进入completeDeleteTopic方法中
//deregisterpartitionchangelisteneronthedeletedtopic.Thisistopreventthepartitionchangelistener //firingbeforethenewtopiclistenerwhenadeletedtopicgetsautocreated partitionStateMachine.deregisterPartitionChangeListener(topic) valreplicasForDeletedTopic=controller.replicaStateMachine.replicasInState(topic,ReplicaDeletionSuccessful) //controllerwillremovethisreplicafromthestatemachineaswellasitspartitionassignmentcache replicaStateMachine.handleStateChanges(replicasForDeletedTopic,NonExistentReplica) valpartitionsForDeletedTopic=controllerContext.partitionsForTopic(topic) //moverespectivepartitiontoOfflinePartitionandNonExistentPartitionstate partitionStateMachine.handleStateChanges(partitionsForDeletedTopic,OfflinePartition) partitionStateMachine.handleStateChanges(partitionsForDeletedTopic,NonExistentPartition) topicsToBeDeleted-=topic partitionsToBeDeleted.retain(_.topic!=topic) controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic)) controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic)) controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic)) controllerContext.removeTopic(topic)
主要作用是解除掉监控分区变动的listener,删除Zookeeper具体节点信息,删除磁盘数据,更新内存数据结构,比如从副本状态机里面移除分区的具体信息。
其实,最终要的是我们的副本磁盘数据是如何删除的。我们重点介绍这个部分。
首次清除的话,在删除线程DeleteTopicsThread的doWork方法中
{ //ifyoucomehere,thennoreplicaisinTopicDeletionStartedandallreplicasarenotin //TopicDeletionSuccessful.Thatmeans,thateithergiventopichaven'tinitiateddeletion //orthereisatleastonefailedreplica(whichmeanstopicdeletionshouldberetried). if(controller.replicaStateMachine.isAnyReplicaInState(topic,ReplicaDeletionIneligible)){ //marktopicfordeletionretry markTopicForDeletionRetry(topic) }
进入markTopicForDeletionRetry
valfailedReplicas=controller.replicaStateMachine.replicasInState(topic,ReplicaDeletionIneligible) info("Retryingdeletetopicfortopic%ssincereplicas%swerenotsuccessfullydeleted" .format(topic,failedReplicas.mkString(","))) controller.replicaStateMachine.handleStateChanges(failedReplicas,OfflineReplica)
在ReplicaStateMachine的handleStateChanges方法中,调用了handleStateChange,处理OfflineReplica
//sendstopreplicacommandtothereplicasothatitstopsfetchingfromtheleader brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId),topic,partition,deletePartition=false)
接着在handleStateChanges中
brokerRequestBatch.sendRequestsToBrokers(controller.epoch,controllerContext.correlationId.getAndIncrement)
给副本数据存储节点发送StopReplicaKey副本指令,并开始删除数据
stopReplicaRequestMapforeach{case(broker,replicaInfoList)=> valstopReplicaWithDelete=replicaInfoList.filter(p=>p.deletePartition==true).map(i=>i.replica).toSet valstopReplicaWithoutDelete=replicaInfoList.filter(p=>p.deletePartition==false).map(i=>i.replica).toSet debug("Thestopreplicarequest(delete=true)senttobroker%dis%s" .format(broker,stopReplicaWithDelete.mkString(","))) debug("Thestopreplicarequest(delete=false)senttobroker%dis%s" .format(broker,stopReplicaWithoutDelete.mkString(","))) replicaInfoList.foreach{r=> valstopReplicaRequest=newStopReplicaRequest(r.deletePartition, Set(TopicAndPartition(r.replica.topic,r.replica.partition)),controllerId,controllerEpoch,correlationId) controller.sendRequest(broker,stopReplicaRequest,r.callback) } } stopReplicaRequestMap.clear()
Broker的KafkaApis的Handle方法在接受到指令后
caseRequestKeys.StopReplicaKey=>handleStopReplicaRequest(request)
val(response,error)=replicaManager.stopReplicas(stopReplicaRequest)
接着是在stopReplicas方法中
{ controllerEpoch=stopReplicaRequest.controllerEpoch //Firststopfetchersforallpartitions,thenstopthecorrespondingreplicas replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map(r=>TopicAndPartition(r.topic,r.partition))) for(topicAndPartition<-stopReplicaRequest.partitions){ valerrorCode=stopReplica(topicAndPartition.topic,topicAndPartition.partition,stopReplicaRequest.deletePartitions) responseMap.put(topicAndPartition,errorCode) } (responseMap,ErrorMapping.NoError) }
进一步进入stopReplica方法,正式进入日志删除
getPartition(topic,partitionId)match{ caseSome(partition)=> if(deletePartition){ valremovedPartition=allPartitions.remove((topic,partitionId)) if(removedPartition!=null) removedPartition.delete()//thiswilldeletethelocallog }
以上就是kafka的整个日志删除流水。
4,手动删除zk上topic信息和磁盘数据
TopicChangeListener会监听处理,但是处理很简单,只是更新了
valdeletedTopics=controllerContext.allTopics--currentChildren controllerContext.allTopics=currentChildren valaddedPartitionReplicaAssignment=ZkUtils.getReplicaAssignmentForTopics(zkClient,newTopics.toSeq) controllerContext.partitionReplicaAssignment=controllerContext.partitionReplicaAssignment.filter(p=>
四,总结
Kafka的topic的删除过程,实际上就是基于Zookeeper做了一个订阅发布系统。Zookeeper的客户端创建一个节点/admin/delete_topics/
delete.topic.enable配置该参数为false的情况下执行了topic的删除命令,实际上未做任何动作。我们此时要彻底删除topic建议修改该参数为true,重启kafka,这样topic信息会被彻底删除,已经测试。
一般流行的做法是手动删除Zookeeper的topic相关信息及磁盘数据但是这样的话会造成部分内存数据未清除。至于是否会有隐患,未测试。
好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对毛票票的支持。