Spring Cloud Feign组件实例解析
这篇文章主要介绍了SpringCloudFeign组件实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
采用SpringCloud微服务框架后,经常会涉及到服务间调用,服务间调用采用了Feign组件。
由于之前有使用dubbo经验。dubbo的负载均衡策略(轮训、最小连接数、随机轮训、加权轮训),dubbo失败策略(快速失败、失败重试等等),
所以Feign负载均衡策略的是什么?失败后是否会重试,重试策略又是什么?带这个疑问,查了一些资料,最后还是看了下代码。毕竟代码就是一切
Springboot集成Feign的大概流程:
1、利用FeignAutoConfiguration自动配置。并根据EnableFeignClients自动注册产生Feign的代理类。
2、注册方式利用FeignClientFactoryBean,熟悉Spring知道FactoryBean产生bean的工厂,有个重要方法getObject产生FeignClient容器bean
3、同时代理类中使用hystrix做资源隔离,Feign代理类中构造RequestTemplate,RequestTemlate要做的向负载均衡选中的server发送http请求,并进行编码和解码一系列操作。
下面只是粗略的看了下整体流程,先有整体再有细节吧,下面利用IDEA看下细节:
一、Feign失败重试
SynchronousMethodHandler的方法中的处理逻辑:
@Override
publicObjectinvoke(Object[]argv)throwsThrowable{
RequestTemplatetemplate=buildTemplateFromArgs.create(argv);
Retryerretryer=this.retryer.clone();
while(true){
try{
returnexecuteAndDecode(template);
}catch(RetryableExceptione){
retryer.continueOrPropagate(e);
if(logLevel!=Logger.Level.NONE){
logger.logRetry(metadata.configKey(),logLevel);
}
continue;
}
}
}
- 上面的逻辑很简单。构造template并去进行服务间的http调用,然后对返回结果进行解码
- 当抛出RetryableException后,异常逻辑是否重试?重试多少次?带这个问题,看了retryer.continueOrPropagate(e);
具体逻辑如下:
publicvoidcontinueOrPropagate(RetryableExceptione){
if(attempt++>=maxAttempts){
throwe;
}
longinterval;
if(e.retryAfter()!=null){
interval=e.retryAfter().getTime()-currentTimeMillis();
if(interval>maxPeriod){
interval=maxPeriod;
}
if(interval<0){
return;
}
}else{
interval=nextMaxInterval();
}
try{
Thread.sleep(interval);
}catch(InterruptedExceptionignored){
Thread.currentThread().interrupt();
}
sleptForMillis+=interval;
}
- 当重试次数大于默认次数5时候,直接抛出异常,不在重试
- 否则每隔一段时间默认值最大1ms后重试一次。
这就Feign这块的重试这块的粗略逻辑,由于之前工作中一直使用dubbo。同样是否需要将生产环境中重试操作关闭?
思考:之前dubbo生产环境的重试操作都会关闭。原因有几个:
- 一般第一次失败,重试也会失败,极端情况下不断的重试,会占用大量dubbo连接池,造成连接池被打满,影响核心功能
- 也是比较重要的一点原因,重试带来的业务逻辑的影响,即如果接口不是幂等的,重试会带来业务逻辑的错误,引发问题
二、Feign负载均衡策略
那么负载均衡的策略又是什么呢?由上图中可知executeAndDecode(template)
ObjectexecuteAndDecode(RequestTemplatetemplate)throwsThrowable{
Requestrequest=targetRequest(template);
if(logLevel!=Logger.Level.NONE){
logger.logRequest(metadata.configKey(),logLevel,request);
}
Responseresponse;
longstart=System.nanoTime();
try{
response=client.execute(request,options);
//ensuretherequestisset.TODO:removeinFeign10
response.toBuilder().request(request).build();
}catch(IOExceptione){
if(logLevel!=Logger.Level.NONE){
logger.logIOException(metadata.configKey(),logLevel,e,elapsedTime(start));
}
throwerrorExecuting(request,e);
}
longelapsedTime=TimeUnit.NANOSECONDS.toMillis(System.nanoTime()-start);
booleanshouldClose=true;
try{
if(logLevel!=Logger.Level.NONE){
response=
logger.logAndRebufferResponse(metadata.configKey(),logLevel,response,elapsedTime);
//ensuretherequestisset.TODO:removeinFeign10
response.toBuilder().request(request).build();
}
if(Response.class==metadata.returnType()){
if(response.body()==null){
returnresponse;
}
if(response.body().length()==null||
response.body().length()>MAX_RESPONSE_BUFFER_SIZE){
shouldClose=false;
returnresponse;
}
//Ensuretheresponsebodyisdisconnected
byte[]bodyData=Util.toByteArray(response.body().asInputStream());
returnresponse.toBuilder().body(bodyData).build();
}
if(response.status()>=200&&response.status()<300){
if(void.class==metadata.returnType()){
returnnull;
}else{
returndecode(response);
}
}elseif(decode404&&response.status()==404&&void.class!=metadata.returnType()){
returndecode(response);
}else{
throwerrorDecoder.decode(metadata.configKey(),response);
}
}catch(IOExceptione){
if(logLevel!=Logger.Level.NONE){
logger.logIOException(metadata.configKey(),logLevel,e,elapsedTime);
}
throwerrorReading(request,response,e);
}finally{
if(shouldClose){
ensureClosed(response.body());
}
}
}
概括的说主要做了两件事:发送HTTP请求,解码响应数据
想看的负载均衡应该在11行response=client.execute(request,options);而client的实现方式有两种Default、LoadBalancerFeignClient
猜的话应该是LoadBalancerFeignClient,带这个问题去看源码(其实个人更喜欢带着问题看源码,没有目的一是看很难将复杂的源码关联起来,二是很容易迷失其中)
果然通过一番查找发现Client实例就是LoadBalancerFeignClient,而设置这个Client就是通过上面说的FeignClientFactoryBean的getObject方法中设置的,具体不说了
下面重点看LoadBalancerFeignClientexecute(request,options)
@Override
publicResponseexecute(Requestrequest,Request.Optionsoptions)throwsIOException{
try{
URIasUri=URI.create(request.url());
StringclientName=asUri.getHost();
URIuriWithoutHost=cleanUrl(request.url(),clientName);
FeignLoadBalancer.RibbonRequestribbonRequest=newFeignLoadBalancer.RibbonRequest(
this.delegate,request,uriWithoutHost);
IClientConfigrequestConfig=getClientConfig(options,clientName);
returnlbClient(clientName).executeWithLoadBalancer(ribbonRequest,
requestConfig).toResponse();
}
catch(ClientExceptione){
IOExceptionio=findIOException(e);
if(io!=null){
throwio;
}
thrownewRuntimeException(e);
}
}
通过几行代码比较重要的点RibbonRequest,原来Feign负载均衡还是通过Ribbon实现的,那么Ribbo又是如何实现负载均衡的呢?
publicObservablesubmit(finalServerOperation operation){ finalExecutionInfoContextcontext=newExecutionInfoContext(); if(listenerInvoker!=null){ try{ listenerInvoker.onExecutionStart(); }catch(AbortExecutionExceptione){ returnObservable.error(e); } } finalintmaxRetrysSame=retryHandler.getMaxRetriesOnSameServer(); finalintmaxRetrysNext=retryHandler.getMaxRetriesOnNextServer(); //Usetheloadbalancer Observable o= (server==null?selectServer():Observable.just(server)) .concatMap(newFunc1 >(){ @Override //Calledforeachserverbeingselected publicObservable call(Serverserver){ context.setServer(server); finalServerStatsstats=loadBalancerContext.getServerStats(server); //Calledforeachattemptandretry Observable o=Observable .just(server) .concatMap(newFunc1 >(){ @Override publicObservable call(finalServerserver){ context.incAttemptCount(); loadBalancerContext.noteOpenConnection(stats); if(listenerInvoker!=null){ try{ listenerInvoker.onStartWithServer(context.toExecutionInfo()); }catch(AbortExecutionExceptione){ returnObservable.error(e); } } finalStopwatchtracer=loadBalancerContext.getExecuteTracer().start(); returnoperation.call(server).doOnEach(newObserver (){ privateTentity; @Override publicvoidonCompleted(){ recordStats(tracer,stats,entity,null); //TODO:WhattodoifonNextoronErrorarenevercalled? } @Override publicvoidonError(Throwablee){ recordStats(tracer,stats,null,e); logger.debug("Goterror{}whenexecutedonserver{}",e,server); if(listenerInvoker!=null){ listenerInvoker.onExceptionWithServer(e,context.toExecutionInfo()); } } @Override publicvoidonNext(Tentity){ this.entity=entity; if(listenerInvoker!=null){ listenerInvoker.onExecutionSuccess(entity,context.toExecutionInfo()); } } privatevoidrecordStats(Stopwatchtracer,ServerStatsstats,Objectentity,Throwableexception){ tracer.stop(); loadBalancerContext.noteRequestCompletion(stats,entity,exception,tracer.getDuration(TimeUnit.MILLISECONDS),retryHandler); } }); } }); if(maxRetrysSame>0) o=o.retry(retryPolicy(maxRetrysSame,true)); returno; } }); if(maxRetrysNext>0&&server==null) o=o.retry(retryPolicy(maxRetrysNext,false)); returno.onErrorResumeNext(newFunc1 >(){ @Override publicObservable call(Throwablee){ if(context.getAttemptCount()>0){ if(maxRetrysNext>0&&context.getServerAttemptCount()==(maxRetrysNext+1)){ e=newClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED, "Numberofretriesonnextserverexceededmax"+maxRetrysNext +"retries,whilemakingacallfor:"+context.getServer(),e); } elseif(maxRetrysSame>0&&context.getAttemptCount()==(maxRetrysSame+1)){ e=newClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED, "Numberofretriesexceededmax"+maxRetrysSame +"retries,whilemakingacallfor:"+context.getServer(),e); } } if(listenerInvoker!=null){ listenerInvoker.onExecutionFailed(e,context.toFinalExecutionInfo()); } returnObservable.error(e); } }); }
通过上面代码分析,发现Ribbon和Hystrix一样都是利用了rxjava看来有必要掌握下rxjava了又。这里面比较重要的就是17行,
selectServer()方法选择指定的Server,负载均衡的策略主要是有ILoadBalancer接口不同实现方式:
- BaseLoadBalancer采用的规则为RoundRobinRule轮训规则
- DynamicServerListLoadBalancer继承了BaseLoadBalancer,主要运行时改变Server列表
- NoOpLoadBalancer什么操作都不做
- ZoneAwareLoadBalancer功能主要是根据区域Zone分组的实例列表
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。