kafka监控获取指定topic的消息总量示例
我就废话不多说了,直接上代码吧!
importkafka.api.PartitionOffsetRequestInfo; importkafka.common.TopicAndPartition; importkafka.javaapi.OffsetResponse; importkafka.javaapi.PartitionMetadata; importkafka.javaapi.TopicMetadata; importkafka.javaapi.TopicMetadataRequest; importkafka.javaapi.consumer.SimpleConsumer; importjava.util.*; importjava.util.Map.Entry; publicclassKafkaOffsetTools{ publicfinalstaticStringKAFKA_TOPIC_NAME_ADAPTER="sample"; publicfinalstaticStringKAFKA_TOPIC_NAME_EXCEPTION="exception"; publicfinalstaticStringKAFKA_TOPIC_NAME_AUDIT="audit"; privatestaticfinalStringrawTopicTotal="rawTopicTotalRecordCounter"; privatestaticfinalStringavroTopicTotal="avroTopicTotalRecordCounter"; privatestaticfinalStringexceptionTopicTotal="exceptionTopicTotalRecordCounter"; publicKafkaOffsetTools(){ } publicstaticlonggetLastOffset(SimpleConsumerconsumer,Stringtopic, intpartition,longwhichTime,StringclientName){ TopicAndPartitiontopicAndPartition=newTopicAndPartition(topic, partition); Map,PartitionOffsetRequestInfo>requestInfo=newHashMap,PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition,newPartitionOffsetRequestInfo( whichTime,1)); kafka.javaapi.OffsetRequestrequest=newkafka.javaapi.OffsetRequest( requestInfo,kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponseresponse=consumer.getOffsetsBefore(request); if(response.hasError()){ System.err.println("ErrorfetchingdataOffsetDatatheBroker.Reason:"+response.errorCode(topic,partition)); return0; } long[]offsets=response.offsets(topic,partition); returnoffsets[0]; } privateTreeMap,PartitionMetadata>findLeader(Lista_seedBrokers,Stringa_topic){ TreeMap,PartitionMetadata>map=newTreeMap,PartitionMetadata>(); loop: for(Stringseed:a_seedBrokers){ SimpleConsumerconsumer=null; try{ String[]hostAndPort; hostAndPort=seed.split(":"); consumer=newSimpleConsumer(hostAndPort[0],Integer.valueOf(hostAndPort[1]),100000,64*1024, "leaderLookup"+newDate().getTime()); Listtopics=Collections.singletonList(a_topic); TopicMetadataRequestreq=newTopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponseresp=consumer.send(req); ListmetaData=resp.topicsMetadata(); for(TopicMetadataitem:metaData){ for(PartitionMetadatapart:item.partitionsMetadata()){ map.put(part.partitionId(),part); } } }catch(Exceptione){ System.out.println("ErrorcommunicatingwithBroker["+seed +"]tofindLeaderfor["+a_topic+",]Reason:"+e); }finally{ if(consumer!=null) consumer.close(); } } returnmap; } publicstaticvoidmain(String[]args){ StringkafkaBrokerList=System.getenv("metadata.broker.list"); if(kafkaBrokerList==null||kafkaBrokerList.length()==0){ System.err.println("Noconfigkafkametadata.broker.list,itisnull."); //fortest kafkaBrokerList="localhost:9092,localhost:9093"; System.err.println("Usethisbrokerlistfortest,metadata.broker.list="+kafkaBrokerList); } //inittopic,logSize=0 Map,Integer>topics=newHashMap,Integer>(); topics.put(KAFKA_TOPIC_NAME_ADAPTER,0); topics.put(KAFKA_TOPIC_NAME_EXCEPTION,0); topics.put(KAFKA_TOPIC_NAME_AUDIT,0); //initkafkabrokerlist String[]kafkaHosts; kafkaHosts=kafkaBrokerList.split(","); if(kafkaHosts==null||kafkaHosts.length==0){ System.err.println("Noconfigkafkametadata.broker.list,itisnull."); System.exit(1); } Listseeds=newArrayList(); for(inti=0;imetadatas=kot.findLeader(seeds,topicName); intlogSize=0; for(Entry,PartitionMetadata>entry:metadatas.entrySet()){ intpartition=entry.getKey(); StringleadBroker=entry.getValue().leader().host(); StringclientName="Client_"+topicName+"_"+partition; SimpleConsumerconsumer=newSimpleConsumer(leadBroker,entry.getValue().leader().port(),100000, 64*1024,clientName); longreadOffset=getLastOffset(consumer,topicName,partition, kafka.api.OffsetRequest.LatestTime(),clientName); logSize+=readOffset; if(consumer!=null)consumer.close(); } topics.put(topicName,logSize); } System.out.println(topics.toString()); System.out.println(rawTopicTotal+"="+topics.get(KAFKA_TOPIC_NAME_ADAPTER)+""+System.currentTimeMillis()); System.out.println(avroTopicTotal+"="+topics.get(KAFKA_TOPIC_NAME_AUDIT)+""+System.currentTimeMillis()); System.out.println(exceptionTopicTotal+"="+topics.get(KAFKA_TOPIC_NAME_EXCEPTION)+""+System.currentTimeMillis()); } }
以上这篇kafka监控获取指定topic的消息总量示例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。