Kafka使用Java客户端进行访问的示例代码
本文环境如下:
操作系统:CentOS632位
JDK版本:1.8.0_7732位
Kafka版本:0.9.0.1(Scala2.11)
1.maven依赖包
org.apache.kafka kafka-clients 0.9.0.1
2.生产者代码
packagecom.lnho.example.kafka;
importorg.apache.kafka.clients.producer.KafkaProducer;
importorg.apache.kafka.clients.producer.Producer;
importorg.apache.kafka.clients.producer.ProducerRecord;
importjava.util.Properties;
publicclassKafkaProducerExample{
publicstaticvoidmain(String[]args){
Propertiesprops=newProperties();
props.put("bootstrap.servers","master:9092");
props.put("acks","all");
props.put("retries",0);
props.put("batch.size",16384);
props.put("linger.ms",1);
props.put("buffer.memory",33554432);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
Producerproducer=newKafkaProducer<>(props);
for(inti=0;i<100;i++)
producer.send(newProducerRecord<>("topic1",Integer.toString(i),Integer.toString(i)));
producer.close();
}
}
3.消费者代码
packagecom.lnho.example.kafka;
importorg.apache.kafka.clients.consumer.ConsumerRecord;
importorg.apache.kafka.clients.consumer.ConsumerRecords;
importorg.apache.kafka.clients.consumer.KafkaConsumer;
importjava.util.Arrays;
importjava.util.Properties;
publicclassKafkaConsumerExample{
publicstaticvoidmain(String[]args){
Propertiesprops=newProperties();
props.put("bootstrap.servers","master:9092");
props.put("group.id","test");
props.put("enable.auto.commit","true");
props.put("auto.commit.interval.ms","1000");
props.put("session.timeout.ms","30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumerconsumer=newKafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1"));
while(true){
ConsumerRecordsrecords=consumer.poll(100);
for(ConsumerRecordrecord:records)
System.out.printf("offset=%d,key=%s,value=%s\n",record.offset(),record.key(),record.value());
}
}
}
4.执行程序
lib底下需要有:kafka-clients-0.9.0.1.jarlog4j-1.2.17.jarslf4j-api-1.7.6.jarslf4j-log4j12-1.7.6.jar
生产者:
java-classpathkafka-example-1.0-SNAPSHOT.jar:lib/*com.lnho.example.kafka.KafkaProducerExample
消费者:
java-classpathkafka-example-1.0-SNAPSHOT.jar:lib/*com.lnho.example.kafka.KafkaConsumerExample
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。