Kafka Java Producer代码实例详解
根据业务需要可以使用Kafka提供的JavaProducerAPI进行产生数据,并将产生的数据发送到Kafka对应Topic的对应分区中,入口类为:Producer
Kafka的ProducerAPI主要提供下列三个方法:
- publicvoidsend(KeyedMessage
message)发送单条数据到Kafka集群 - publicvoidsend(List
>messages)发送多条数据(数据集)到Kafka集群 - publicvoidclose()关闭Kafka连接资源
一、JavaKafkaProducerPartitioner:自定义的数据分区器,功能是:决定输入的key/value键值对的message发送到Topic的那个分区中,返回分区id,范围:[0,分区数量);这里的实现比较简单,根据key中的数字决定分区的值。具体代码如下:
importkafka.producer.Partitioner;
importkafka.utils.VerifiableProperties;
/**
*Createdbygerryon12/21.
*/
publicclassJavaKafkaProducerPartitionerimplementsPartitioner{
/**
*无参构造函数
*/
publicJavaKafkaProducerPartitioner(){
this(newVerifiableProperties());
}
/**
*构造函数,必须给定
*
*@paramproperties上下文
*/
publicJavaKafkaProducerPartitioner(VerifiablePropertiesproperties){
//nothings
}
@Override
publicintpartition(Objectkey,intnumPartitions){
intnum=Integer.valueOf(((String)key).replaceAll("key_","").trim());
returnnum%numPartitions;
}
}
二、JavaKafkaProducer:通过Kafka提供的API进行数据产生操作的测试类;具体代码如下:
importkafka.javaapi.producer.Producer;
importkafka.producer.KeyedMessage;
importkafka.producer.ProducerConfig;
importorg.apache.log4j.Logger;
importjava.util.Properties;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;
importjava.util.concurrent.TimeUnit;
importjava.util.concurrent.atomic.AtomicBoolean;
importjava.util.concurrent.ThreadLocalRandom;
/**
*Createdbygerryon12/21.
*/
publicclassJavaKafkaProducer{
privateLoggerlogger=Logger.getLogger(JavaKafkaProducer.class);
publicstaticfinalStringTOPIC_NAME="test";
publicstaticfinalchar[]charts="qazwsxedcrfvtgbyhnujmikolp1234567890".toCharArray();
publicstaticfinalintchartsLength=charts.length;
publicstaticvoidmain(String[]args){
StringbrokerList="192.168.187.149:9092";
brokerList="192.168.187.149:9092,192.168.187.149:9093,192.168.187.149:9094,192.168.187.149:9095";
brokerList="192.168.187.146:9092";
Propertiesprops=newProperties();
props.put("metadata.broker.list",brokerList);
/**
*0表示不等待结果返回
*1表示等待至少有一个服务器返回数据接收标识
*-1表示必须接收到所有的服务器返回标识,及同步写入
**/
props.put("request.required.acks","0");
/**
*内部发送数据是异步还是同步
*sync:同步,默认
*async:异步
*/
props.put("producer.type","async");
/**
*设置序列化的类
*可选:kafka.serializer.StringEncoder
*默认:kafka.serializer.DefaultEncoder
*/
props.put("serializer.class","kafka.serializer.StringEncoder");
/**
*设置分区类
*根据key进行数据分区
*默认是:kafka.producer.DefaultPartitioner==>安装key的hash进行分区
*可选:kafka.serializer.ByteArrayPartitioner==>转换为字节数组后进行hash分区
*/
props.put("partitioner.class","JavaKafkaProducerPartitioner");
//重试次数
props.put("message.send.max.retries","3");
//异步提交的时候(async),并发提交的记录数
props.put("batch.num.messages","200");
//设置缓冲区大小,默认10KB
props.put("send.buffer.bytes","102400");
//2.构建KafkaProducerConfiguration上下文
ProducerConfigconfig=newProducerConfig(props);
//3.构建Producer对象
finalProducerproducer=newProducer(config);
//4.发送数据到服务器,并发线程发送
finalAtomicBooleanflag=newAtomicBoolean(true);
intnumThreads=50;
ExecutorServicepool=Executors.newFixedThreadPool(numThreads);
for(inti=0;i<5;i++){
pool.submit(newThread(newRunnable(){
@Override
publicvoidrun(){
while(flag.get()){
//发送数据
KeyedMessagemessage=generateKeyedMessage();
producer.send(message);
System.out.println("发送数据:"+message);
//休眠一下
try{
intleast=10;
intbound=100;
Thread.sleep(ThreadLocalRandom.current().nextInt(least,bound));
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+"shutdown....");
}
},"Thread-"+i));
}
//5.等待执行完成
longsleepMillis=600000;
try{
Thread.sleep(sleepMillis);
}catch(InterruptedExceptione){
e.printStackTrace();
}
flag.set(false);
//6.关闭资源
pool.shutdown();
try{
pool.awaitTermination(6,TimeUnit.SECONDS);
}catch(InterruptedExceptione){
}finally{
producer.close();//最后之后调用
}
}
/**
*产生一个消息
*
*@return
*/
privatestaticKeyedMessagegenerateKeyedMessage(){
Stringkey="key_"+ThreadLocalRandom.current().nextInt(10,99);
StringBuildersb=newStringBuilder();
intnum=ThreadLocalRandom.current().nextInt(1,5);
for(inti=0;i
三、Pom.xml依赖配置如下
0.8.2.1
org.apache.kafka
kafka_2.10
${kafka.version}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。