Kafka使用Java客户端进行访问的示例代码
本文环境如下:
操作系统:CentOS632位
JDK版本:1.8.0_7732位
Kafka版本:0.9.0.1(Scala2.11)
1.maven依赖包
org.apache.kafka kafka-clients 0.9.0.1
2.生产者代码
packagecom.lnho.example.kafka; importorg.apache.kafka.clients.producer.KafkaProducer; importorg.apache.kafka.clients.producer.Producer; importorg.apache.kafka.clients.producer.ProducerRecord; importjava.util.Properties; publicclassKafkaProducerExample{ publicstaticvoidmain(String[]args){ Propertiesprops=newProperties(); props.put("bootstrap.servers","master:9092"); props.put("acks","all"); props.put("retries",0); props.put("batch.size",16384); props.put("linger.ms",1); props.put("buffer.memory",33554432); props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); Producerproducer=newKafkaProducer<>(props); for(inti=0;i<100;i++) producer.send(newProducerRecord<>("topic1",Integer.toString(i),Integer.toString(i))); producer.close(); } }
3.消费者代码
packagecom.lnho.example.kafka; importorg.apache.kafka.clients.consumer.ConsumerRecord; importorg.apache.kafka.clients.consumer.ConsumerRecords; importorg.apache.kafka.clients.consumer.KafkaConsumer; importjava.util.Arrays; importjava.util.Properties; publicclassKafkaConsumerExample{ publicstaticvoidmain(String[]args){ Propertiesprops=newProperties(); props.put("bootstrap.servers","master:9092"); props.put("group.id","test"); props.put("enable.auto.commit","true"); props.put("auto.commit.interval.ms","1000"); props.put("session.timeout.ms","30000"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerconsumer=newKafkaConsumer<>(props); consumer.subscribe(Arrays.asList("topic1")); while(true){ ConsumerRecords records=consumer.poll(100); for(ConsumerRecord record:records) System.out.printf("offset=%d,key=%s,value=%s\n",record.offset(),record.key(),record.value()); } } }
4.执行程序
lib底下需要有:kafka-clients-0.9.0.1.jarlog4j-1.2.17.jarslf4j-api-1.7.6.jarslf4j-log4j12-1.7.6.jar
生产者:
java-classpathkafka-example-1.0-SNAPSHOT.jar:lib/*com.lnho.example.kafka.KafkaProducerExample
消费者:
java-classpathkafka-example-1.0-SNAPSHOT.jar:lib/*com.lnho.example.kafka.KafkaConsumerExample
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。