Java kafka如何实现自定义分区类和拦截器
生产者发送到对应的分区有以下几种方式:
(1)指定了patition,则直接使用;(可以查阅对应的javaapi,有多种参数)
(2)未指定patition但指定key,通过对key的value进行hash出一个patition;
(3)patition和key都未指定,使用轮询选出一个patition。
但是kafka提供了,自定义分区算法的功能,由业务手动实现分布:
1、实现一个自定义分区类,CustomPartitioner实现Partitioner
importorg.apache.kafka.clients.producer.Partitioner; importorg.apache.kafka.common.Cluster; importjava.util.Map; publicclassCustomPartitionerimplementsPartitioner{ /** * *@paramtopic当前的发送的topic *@paramkey当前的key值 *@paramkeyBytes当前的key的字节数组 *@paramvalue当前的value值 *@paramvalueBytes当前的value的字节数组 *@paramcluster *@return */ @Override publicintpartition(Stringtopic,Objectkey,byte[]keyBytes,Objectvalue,byte[]valueBytes,Clustercluster){ //这边根据返回值就是分区号,这边就是固定发送到三号分区 return3; } @Override publicvoidclose(){ } @Override publicvoidconfigure(Mapconfigs){ } }
2、producer配置文件指定,具体的分区类
//具体的分区类
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"kafka.CustomPartitioner");
技巧:可以使用ProducerConfig中提供的配置ProducerConfig
kafkaproducer拦截器
拦截器(interceptor)是在Kafka0.10版本被引入的。
interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。
许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptorchain)。
所使用的类为:
org.apache.kafka.clients.producer.ProducerInterceptor
我们可以编码测试下:
1、定义消息拦截器,实现消息处理(可以是加时间戳等等,unid等等。)
importorg.apache.kafka.clients.producer.ProducerInterceptor; importorg.apache.kafka.clients.producer.ProducerRecord; importorg.apache.kafka.clients.producer.RecordMetadata; importjava.util.Map; importjava.util.UUID; publicclassMessageInterceptorimplementsProducerInterceptor{ @Override publicvoidconfigure(Map configs){ System.out.println("这是MessageInterceptor的configure方法"); } /** *这个是消息发送之前进行处理 * *@paramrecord *@return */ @Override publicProducerRecord onSend(ProducerRecord record){ //创建一个新的record,把uuid入消息体的最前部 System.out.println("为消息添加uuid"); returnnewProducerRecord(record.topic(),record.partition(),record.timestamp(),record.key(), UUID.randomUUID().toString().replace("-","")+","+record.value()); } /** *这个是生产者回调函数调用之前处理 *@parammetadata *@paramexception */ @Override publicvoidonAcknowledgement(RecordMetadatametadata,Exceptionexception){ System.out.println("MessageInterceptor拦截器的onAcknowledgement方法"); } @Override publicvoidclose(){ System.out.println("MessageInterceptorclose方法"); } }
2、定义计数拦截器
importjava.util.Map; importorg.apache.kafka.clients.producer.ProducerInterceptor; importorg.apache.kafka.clients.producer.ProducerRecord; importorg.apache.kafka.clients.producer.RecordMetadata; publicclassCounterInterceptorimplementsProducerInterceptor{ privateinterrorCounter=0; privateintsuccessCounter=0; @Override publicvoidconfigure(Map configs){ System.out.println("这是CounterInterceptor的configure方法"); } @Override publicProducerRecord onSend(ProducerRecord record){ System.out.println("CounterInterceptor计数过滤器不对消息做任何操作"); returnrecord; } @Override publicvoidonAcknowledgement(RecordMetadatametadata,Exceptionexception){ //统计成功和失败的次数 System.out.println("CounterInterceptor过滤器执行统计失败和成功数量"); if(exception==null){ successCounter++; }else{ errorCounter++; } } @Override publicvoidclose(){ //保存结果 System.out.println("Successfulsent:"+successCounter); System.out.println("Failedsent:"+errorCounter); } }
3、producer客户端:
importorg.apache.kafka.clients.producer.*; importjava.util.ArrayList; importjava.util.List; importjava.util.Properties; publicclassProducer1{ publicstaticvoidmain(String[]args)throwsException{ Propertiesprops=newProperties(); //Kafka服务端的主机名和端口号 props.put("bootstrap.servers","localhost:9092"); //等待所有副本节点的应答 props.put("acks","all"); //消息发送最大尝试次数 props.put("retries",0); //一批消息处理大小 props.put("batch.size",16384); //请求延时,可能生产数据太快了 props.put("linger.ms",1); //发送缓存区内存大小,数据是先放到生产者的缓冲区 props.put("buffer.memory",33554432); //key序列化 props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); //value序列化 props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); //具体的分区类 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"kafka.CustomPartitioner"); //定义拦截器 Listinterceptors=newArrayList<>(); interceptors.add("kafka.MessageInterceptor"); interceptors.add("kafka.CounterInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors); Producer producer=newKafkaProducer<>(props); for(inti=0;i<1;i++){ producer.send(newProducerRecord ("test_0515",i+"","xxx-"+i),newCallback(){ publicvoidonCompletion(RecordMetadatarecordMetadata,Exceptione){ System.out.println("这是producer回调函数"); } }); } /*System.out.println("现在执行关闭producer"); producer.close();*/ producer.close(); } }
总结,我们可以知道拦截器链各个方法的执行顺序,假如有A、B拦截器,在一个拦截器链中:
(1)执行A的configure方法,执行B的configure方法
(2)执行A的onSend方法,B的onSend方法
(3)生产者发送完毕后,执行A的onAcknowledgement方法,B的onAcknowledgement方法。
(4)执行producer自身的callback回调函数。
(5)执行A的close方法,B的close方法。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。