springCloud学习5(Spring-Cloud-Stream事件驱动)
本文内容纲要:
-为什么使用消息传递
-同步请求-响应方式
-使用消息传递方式
-springcloud中使用消息传递
-springcloudstream架构
-实战
-建立redis服务
-建立kafka服务
-在组织服务中编写消息生产者
-在许可证服务中编写消息消费者
-自定义通道
-结束
springcloud总集:https://www.tapme.top/blog/detail/2019-02-28-11-33
代码见文章结尾
想想平常生活中做饭的场景,在用电饭锅做饭的同时,我们可以洗菜、切菜,等待电饭锅发出饭做好的提示我们回去拔下电饭锅电源(或者什么也不知让它处于保温状态),反正这个时候我们知道饭做好了,接下来可以炒菜了。从这里可以看出我们在日常生活中与世界的互动并不是同步的、线性的,不是简单的请求--响应模型。它是事件驱动的,我们不断的发送消息、接受消息、处理消息。
同样在软件世界中也不全是请求--响应模型,也会需要进行异步的消息通信。使用消息实现事件通信的概念被称为消息驱动架构(EventDrivenArchitecture,EDA),也被称为消息驱动架构(MessageDrivenArchitecture,MDA)。使用这类架构可以构建高度解耦的系统,该系统能够对变化做出响应,且不需要与特定的库或者服务紧密耦合。
在SpringCloud项目中可以使用SpirngCloudStream轻而易举的构建基于消息传递的解决方案。
为什么使用消息传递
要解答这个问题,让我们从一个例子开始,之前一直使用的两个服务:许可证服务和组织服务。每次对许可证服务进行请求,许可证服务都要通过http请求到组织服务上查询组织信息。显而易见这次额外的http请求会花费较长的时间。如果能够将缓存组织数据的读操作,将会大幅提高许可证服务的响应时间。但是缓存数据有如下2个要求:
- 缓存的数据需要在许可证服务的所有实例之间保存一致——这意味着不能将数据缓存到服务实例的内存中。
- 在更新或者删除一个组织数据时,许可证服务缓存的数据需要失效——避免读取到过期数据,需要尽早让过时数据失效并删除。
要实现上面的要求,现在有两种办法。
- 使用同步请求--响应模型来实现。组织服务在组织数据变化时调用许可证服务的接口通知组织服务已经变化,或者直接操作许可证服务的缓存。
- 使用事件驱动。组织服务发出一个异步消息。许可证服务收到该消息后清除对应的缓存。
同步请求-响应方式
许可证服务在redis中缓存从组织服务中查询到的服务信息,当组织数据更新时,组织服务同步http请求通知许可证服务数据过期。这种方式有以下几个问题:
- 组织服务和许可证服务紧密耦合
- 这种方式不够灵活,如果要为组织服务添加新的消费者,必须修改组织服务代码,以让其通知新的服务数据变动。
使用消息传递方式
同样的许可证服务在redis中缓存从组织服务中查询到的服务信息,当组织数据更新时,组织服务将更新信息写入到队列中。许可证服务监听消息队列。使用消息传递有一下4个好处:
- 松耦合性:将服务间的依赖,变成了服务对队列的依赖,依赖关系变弱了。
- 耐久性:即使服务消费者已经关闭了,也可以继续往里发送消息,等消费者开启后处理
- 可伸缩性:消息发送者不用等待消息消费者的响应,它们可以继续做各自的工作
- 灵活性:消息发送者不用知道谁会消费这个消息,因此在有新的消息消费者时无需修改消息发送代码
springcloud中使用消息传递
springcloud项目中可以通过springcloudstream框架来轻松集成消息传递。该框架最大的特点是抽象了消息传递平台的细节,因此可以在支持的消息队列中随意切换(包括ApacheKafka和RabbitMQ)。
springcloudstream架构
springcloudstream中有4个组件涉及到消息发布和消息消费,分别为:
-
发射器
当一个服务准备发送消息时,它将使用发射器发布消息。发射器是一个Spring注解接口,它接收一个普通Java对象,表示要发布的消息。发射器接收消息,然后序列化(默认序列化为JSON)后发布到通道中。
-
通道
通道是对队列的一个抽象。通道名称是与目标队列名称相关联的。但是队列名称并不会直接公开在代码中,代码永远只会使用通道名。
-
绑定器
绑定器是springcloudstream框架的一部分,它是与特定消息平台对话的Spring代码。通过绑定器,使得开发人员不必依赖于特定平台的库和API来发布和消费消息。
-
接收器
服务通过接收器来从队列中接收消息,并将消息反序列化。
处理逻辑如下:
实战
继续使用之前的项目,在许可证服务中缓存组织数据到redis中。
建立redis服务
为方便起见,使用docker创建redis,建立脚本如下:
dockerrun-itd--nameredis--nethostredis:
建立kafka服务
在组织服务中编写消息生产者
首先在organization服务中引入springcloudstream和kafka的依赖。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
然后在events类中编写SimpleSouce
类,用于组织数据修改,产生一条消息到队列中。代码如下:
@EnableBinding(Source.class)
publicclassSimpleSource{
privateLoggerlogger=LoggerFactory.getLogger(SimpleSource.class);
privateSourcesource;
@Autowired
publicSimpleSource(Sourcesource){
this.source=source;
}
publicvoidpublishOrChange(Stringaction,StringorgId){
logger.info("在请求:{}中,发送kafka消息:{}forOrganizationId:{}",UserContextHolder.getContext().id,action,orgId);
OrganizationChangechange=newOrganizationChange(action,orgId,UserContextHolder.getContext().id);
source.output().send(MessageBuilder.withPayload(change).build());
}
}
这里使用的是默认通道,Source类定义的output通道发消息。后面通过Sink定义的input通道收消息。
然后在OrganizationController
类中定义一个delete方法,并注入SimpleSouce类,代码如下:
@Autowired
privateSimpleSourcesimpleSource;
@DeleteMapping(value="/organization/{orgId}")
publicvoiddeleteOne(@PathVariable("orgId")Stringid){
logger.debug("删除了组织:{}",id);
simpleSource.publishOrChange("delete",id);
}
最后在配置文件中加入消息队列的配置:
#省略了其他配置
spring:
cloud:
stream:
bindings:
output:
destination:orgChangeTopic
content-type:application/json
kafka:
binder:
#替换为部署kafka的ip和端口
zk-nodes:192.168.226.5:2181
brokers:192.168.226.5:9092
现在我们可以测试下访问localhost:5555/apis/org/organization/12,可以看到控制台打印消息生成的日志。
在许可证服务中编写消息消费者
集成redis的方法,参看。这里不作说明。
首先引入依赖,依赖项同上面组织服务。
然后在event包下创建OrgChange
的类,代码如下:
@EnableBinding(Sink.class)//使用Sink接口中定义的通道来监听传入消息
publicclassOrgChange{
privateLoggerlogger=LoggerFactory.getLogger(OrgChange.class);
@StreamListener(Sink.INPUT)
publicvoidloggerSink(OrganizationChangechange){
logger.info("收到一个消息,组织id为:{},关联id为:{}",change.getOrgId(),change.getId());
//删除失效缓存
RedisUtils.del(RedisKeyUtils.getOrgCacheKey(change.getOrgId()));
}
}
//下面两个都在util包下
//RedisKeyUtils.java代码如下
publicclassRedisKeyUtils{
privatestaticfinalStringORG_CACHE_PREFIX="orgCache_";
publicstaticStringgetOrgCacheKey(StringorgId){
returnORG_CACHE_PREFIX+orgId;
}
}
//RedisUtils.java代码如下
@Component
@SuppressWarnings("all")
publicclassRedisUtils{
publicstaticRedisTemplateredisTemplate;
@Autowired
publicvoidsetRedisTemplate(RedisTemplateredisTemplate){
RedisUtils.redisTemplate=redisTemplate;
}
publicstaticbooleansetObj(Stringkey,Objectvalue){
returnsetObj(key,value,0);
}
/**
*Description:
*
*@authorfanxb
*@date2019/2/2115:21
*@paramkey键
*@paramvalue值
*@paramtime过期时间,单位ms
*@returnboolean是否成功
*/
publicstaticbooleansetObj(Stringkey,Objectvalue,longtime){
try{
if(time<=0){
redisTemplate.opsForValue().set(key,value);
}else{
redisTemplate.opsForValue().set(key,value,time,TimeUnit.MILLISECONDS);
}
returntrue;
}catch(Exceptione){
e.printStackTrace();
returnfalse;
}
}
publicstaticObjectget(Stringkey){
if(key==null){
returnnull;
}
try{
Objectobj=redisTemplate.opsForValue().get(key);
returnobj;
}catch(Exceptione){
e.printStackTrace();
returnnull;
}
}
publicstaticvoiddel(String...key){
if(key!=null&&key.length>0){
redisTemplate.delete(CollectionUtils.arrayToList(key));
}
}
}
上面用到的是Sink.INPUT通道,这个和之前的Source.OUTPUT通道刚好一队,一个负责收,一个负责发。
然后修改OrganizationByRibbonService.java
文件中的getOrganizationWithRibbon
方法:
publicOrganizationgetOrganizationWithRibbon(Stringid){
Stringkey=RedisKeyUtils.getOrgCacheKey(id);
//先从redis缓存取数据
Objectres=RedisUtils.get(key);
if(res==null){
logger.info("当前数据无缓存:{}",id);
try{
ResponseEntity<Organization>responseEntity=restTemplate.exchange("http://organizationservice/organization/{id}",
HttpMethod.GET,null,Organization.class,id);
res=responseEntity.getBody();
RedisUtils.setObj(key,res);
}catch(Exceptione){
e.printStackTrace();
}
}else{
logger.info("当前数据为缓存数据:{}",id);
}
return(Organization)res;
}
最后修改配置文件,为input通道指定topic,配置如下:
spring:
cloud:
stream:
bindings:
input:
destination:orgChangeTopic
content-type:application/json
#定义将要消费消息的消费者组的名称
#可能多个服务监听同一个消息队列。如果定义了消费者组,那么同组中只要有一个消费了消息,剩余的不会再次消费该消息,保证只有消息的
#一个副本会被该组的某个实例所消费
group:licensingGroup
kafka:
binder:
zk-nodes:192.168.226.5:2181
brokers:192.168.226.5:9092
基本和发送的配置相同,只是这里是为input
通道映射队列,然后还定义了一个组名,避免一个消息被重复消费。
现在来多次访问localhost:5555/apis/licensingservice/licensingByRibbon/12,可以看到licensingservice控制台打印数据从缓存中读取,如下所示:
然后再以delete访问localhost:5555/apis/org/organization/12清除缓存,再次访问licensingservice服务,结果如下:
自定义通道
上面用的是SpringCloudStream
自带的input/output通道,那么要如何自定义通道呢?下面以自定义customInput/customOutput
通道为例。
自定义发数据通道
publicinterfaceCustomOutput{
@Output("customOutput")
MessageChannelout();
}
对于每个自定义的发数据通道,需使用@OutPut注解标记的返回MessageChannel类的方法。
自定义收数据通道
publicinterfaceCustomInput{
@Input("customInput")
SubscribableChannelin();
}
同上,对应自定义的收数据通道,需要使用@Input注解标记的返回SubscribableChannel类的方法。
结束
看完本篇你应该已经能够在SpringCloud中集成SpringCloudStream消息队列了,貌似这个也能用到普通的springboot项目中,比直接集成mq更加的优雅。
2019,Fighting!
本篇原创发布于:FleyX的个人博客
本篇所用全部代码:FleyX的github
本文内容总结:为什么使用消息传递,同步请求-响应方式,使用消息传递方式,springcloud中使用消息传递,springcloudstream架构,实战,建立redis服务,建立kafka服务,在组织服务中编写消息生产者,在许可证服务中编写消息消费者,自定义通道,结束,
原文链接:https://www.cnblogs.com/wuyoucao/p/11056841.html