springboot 1.5.2 集成kafka的简单例子
本文介绍了springboot1.5.2集成kafka的简单例子,分享给大家,具体如下:
随着springboot1.5版本的发布,在spring项目中与kafka集成更为简便。
添加依赖
compile("org.springframework.kafka:spring-kafka:1.1.2.RELEASE")
添加application.properties
#kafka #指定kafka代理地址,可以多个 spring.kafka.bootstrap-servers=192.168.59.130:9092,192.168.59.131:9092,192.168.59.132:9092 #指定默认消费者groupid spring.kafka.consumer.group-id=myGroup #指定默认topicid spring.kafka.template.default-topic=my-replicated-topic #指定listener容器中的线程数,用于提高并发量 spring.kafka.listener.concurrency=3 #每次批量发送消息的数量 spring.kafka.producer.batch-size=1000
configuration启用kafka
packagecn.xiaojf.today.data.kafka.configuration; importorg.springframework.context.annotation.Configuration; importorg.springframework.kafka.annotation.EnableKafka; /** *kafka配置 *@authorxiaojf2017/3/2414:09 */ @Configuration @EnableKafka publicclassKafkaConfiguration{ }
消息生产者
packagecn.xiaojf.today.data.kafka.producer; importorg.apache.kafka.clients.producer.Producer; importorg.apache.kafka.clients.producer.RecordMetadata; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.kafka.core.KafkaOperations; importorg.springframework.kafka.core.KafkaTemplate; importorg.springframework.kafka.support.ProducerListener; importorg.springframework.stereotype.Component; /** *消息生产者 *@authorxiaojf2017/3/2414:36 */ @Component publicclassMsgProducer{ @Autowired privateKafkaTemplatekafkaTemplate; publicvoidsend(){ kafkaTemplate.send("my-replicated-topic","xiaojf"); kafkaTemplate.send("my-replicated-topic","xiaojf"); kafkaTemplate.metrics(); kafkaTemplate.execute(newKafkaOperations.ProducerCallback (){ @Override publicObjectdoInKafka(Producer producer){ //这里可以编写kafka原生的api操作 returnnull; } }); //消息发送的监听器,用于回调返回信息 kafkaTemplate.setProducerListener(newProducerListener (){ @Override publicvoidonSuccess(Stringtopic,Integerpartition,Stringkey,Stringvalue,RecordMetadatarecordMetadata){ } @Override publicvoidonError(Stringtopic,Integerpartition,Stringkey,Stringvalue,Exceptionexception){ } @Override publicbooleanisInterestedInSuccess(){ returnfalse; } }); } }
消息消费者
packagecn.xiaojf.today.data.kafka.consumer; importorg.springframework.kafka.annotation.KafkaListener; importorg.springframework.stereotype.Component; /** *消息消费者 *@authorxiaojf2017/3/2414:36 */ @Component publicclassMsgConsumer{ @KafkaListener(topics={"my-replicated-topic","my-replicated-topic2"}) publicvoidprocessMessage(Stringcontent){ System.out.println(content); } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。