spring-cloud-stream结合kafka使用详解
1.pom文件导入依赖
org.springframework.cloud spring-cloud-stream-binder-kafka
2.application.yml文件配置
spring: cloud: stream: kafka: binder: brokers:xxx.xxx.xxx.xx:xxxx//Kafka的消息中间件服务器地址 bindings: xxx_output://通道名称 destination:xxx//消息发往的目的地,对应topic在发送消息的配置里面,group是不用配置的 //如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json) xxx_input: destination:xxx//消息发往的目的地,对应topic group:xxx//对应kafka的group
3.创建消息发送者
@EnableBinding(Source.class)//@EnableBinding是绑定通道的,Soure.class是spring提供的,表示这是一个可绑定的发布通道 @Service publicclassMqService{ @Resource(name=KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT) privateMessageChanneloesWorkbenchChannel; /** *发送一条kafka消息 */ publicbooleansendLifeData(Objectobject){ returnMqUtils.send(oesWorkbenchChannel,object,KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT); } } //发布通道 publicinterfaceSource{ @Output(KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT) MessageChanneloesWorkbenchLifeDataOutput();//发布通道用MessageChannel }
4.创建消息监听者
@Slf4j @EnableBinding(Sink.class) publicclassWorkbenchStreamListener{ @Resource privateFileServicefileService; @StreamListener(KafkaConstants.xxx_input)//监听接受通道 publicvoidreceiveData(MoveMessagemoveMessage){ } } //接受通道 publicinterfaceSink{ @Input(KafkaConstants.OES_WORKBENCH_MOVE_INPUT) SubscribableChanneloesWorkbenchMoveInput();//接受通道用SubscribableChannel }
接下来就可以愉快的发送监听消息了
到此这篇关于spring-cloud-stream结合kafka使用详解的文章就介绍到这了,更多相关spring-cloud-stream整合kafka内容请搜索毛票票以前的文章或继续浏览下面的相关文章希望大家以后多多支持毛票票!
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。