Spring Cloud Feign组件实例解析
这篇文章主要介绍了SpringCloudFeign组件实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
采用SpringCloud微服务框架后,经常会涉及到服务间调用,服务间调用采用了Feign组件。
由于之前有使用dubbo经验。dubbo的负载均衡策略(轮训、最小连接数、随机轮训、加权轮训),dubbo失败策略(快速失败、失败重试等等),
所以Feign负载均衡策略的是什么?失败后是否会重试,重试策略又是什么?带这个疑问,查了一些资料,最后还是看了下代码。毕竟代码就是一切
Springboot集成Feign的大概流程:
1、利用FeignAutoConfiguration自动配置。并根据EnableFeignClients自动注册产生Feign的代理类。
2、注册方式利用FeignClientFactoryBean,熟悉Spring知道FactoryBean产生bean的工厂,有个重要方法getObject产生FeignClient容器bean
3、同时代理类中使用hystrix做资源隔离,Feign代理类中构造RequestTemplate,RequestTemlate要做的向负载均衡选中的server发送http请求,并进行编码和解码一系列操作。
下面只是粗略的看了下整体流程,先有整体再有细节吧,下面利用IDEA看下细节:
一、Feign失败重试
SynchronousMethodHandler的方法中的处理逻辑:
@Override publicObjectinvoke(Object[]argv)throwsThrowable{ RequestTemplatetemplate=buildTemplateFromArgs.create(argv); Retryerretryer=this.retryer.clone(); while(true){ try{ returnexecuteAndDecode(template); }catch(RetryableExceptione){ retryer.continueOrPropagate(e); if(logLevel!=Logger.Level.NONE){ logger.logRetry(metadata.configKey(),logLevel); } continue; } } }
- 上面的逻辑很简单。构造template并去进行服务间的http调用,然后对返回结果进行解码
- 当抛出RetryableException后,异常逻辑是否重试?重试多少次?带这个问题,看了retryer.continueOrPropagate(e);
具体逻辑如下:
publicvoidcontinueOrPropagate(RetryableExceptione){ if(attempt++>=maxAttempts){ throwe; } longinterval; if(e.retryAfter()!=null){ interval=e.retryAfter().getTime()-currentTimeMillis(); if(interval>maxPeriod){ interval=maxPeriod; } if(interval<0){ return; } }else{ interval=nextMaxInterval(); } try{ Thread.sleep(interval); }catch(InterruptedExceptionignored){ Thread.currentThread().interrupt(); } sleptForMillis+=interval; }
- 当重试次数大于默认次数5时候,直接抛出异常,不在重试
- 否则每隔一段时间默认值最大1ms后重试一次。
这就Feign这块的重试这块的粗略逻辑,由于之前工作中一直使用dubbo。同样是否需要将生产环境中重试操作关闭?
思考:之前dubbo生产环境的重试操作都会关闭。原因有几个:
- 一般第一次失败,重试也会失败,极端情况下不断的重试,会占用大量dubbo连接池,造成连接池被打满,影响核心功能
- 也是比较重要的一点原因,重试带来的业务逻辑的影响,即如果接口不是幂等的,重试会带来业务逻辑的错误,引发问题
二、Feign负载均衡策略
那么负载均衡的策略又是什么呢?由上图中可知executeAndDecode(template)
ObjectexecuteAndDecode(RequestTemplatetemplate)throwsThrowable{ Requestrequest=targetRequest(template); if(logLevel!=Logger.Level.NONE){ logger.logRequest(metadata.configKey(),logLevel,request); } Responseresponse; longstart=System.nanoTime(); try{ response=client.execute(request,options); //ensuretherequestisset.TODO:removeinFeign10 response.toBuilder().request(request).build(); }catch(IOExceptione){ if(logLevel!=Logger.Level.NONE){ logger.logIOException(metadata.configKey(),logLevel,e,elapsedTime(start)); } throwerrorExecuting(request,e); } longelapsedTime=TimeUnit.NANOSECONDS.toMillis(System.nanoTime()-start); booleanshouldClose=true; try{ if(logLevel!=Logger.Level.NONE){ response= logger.logAndRebufferResponse(metadata.configKey(),logLevel,response,elapsedTime); //ensuretherequestisset.TODO:removeinFeign10 response.toBuilder().request(request).build(); } if(Response.class==metadata.returnType()){ if(response.body()==null){ returnresponse; } if(response.body().length()==null|| response.body().length()>MAX_RESPONSE_BUFFER_SIZE){ shouldClose=false; returnresponse; } //Ensuretheresponsebodyisdisconnected byte[]bodyData=Util.toByteArray(response.body().asInputStream()); returnresponse.toBuilder().body(bodyData).build(); } if(response.status()>=200&&response.status()<300){ if(void.class==metadata.returnType()){ returnnull; }else{ returndecode(response); } }elseif(decode404&&response.status()==404&&void.class!=metadata.returnType()){ returndecode(response); }else{ throwerrorDecoder.decode(metadata.configKey(),response); } }catch(IOExceptione){ if(logLevel!=Logger.Level.NONE){ logger.logIOException(metadata.configKey(),logLevel,e,elapsedTime); } throwerrorReading(request,response,e); }finally{ if(shouldClose){ ensureClosed(response.body()); } } }
概括的说主要做了两件事:发送HTTP请求,解码响应数据
想看的负载均衡应该在11行response=client.execute(request,options);而client的实现方式有两种Default、LoadBalancerFeignClient
猜的话应该是LoadBalancerFeignClient,带这个问题去看源码(其实个人更喜欢带着问题看源码,没有目的一是看很难将复杂的源码关联起来,二是很容易迷失其中)
果然通过一番查找发现Client实例就是LoadBalancerFeignClient,而设置这个Client就是通过上面说的FeignClientFactoryBean的getObject方法中设置的,具体不说了
下面重点看LoadBalancerFeignClientexecute(request,options)
@Override publicResponseexecute(Requestrequest,Request.Optionsoptions)throwsIOException{ try{ URIasUri=URI.create(request.url()); StringclientName=asUri.getHost(); URIuriWithoutHost=cleanUrl(request.url(),clientName); FeignLoadBalancer.RibbonRequestribbonRequest=newFeignLoadBalancer.RibbonRequest( this.delegate,request,uriWithoutHost); IClientConfigrequestConfig=getClientConfig(options,clientName); returnlbClient(clientName).executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse(); } catch(ClientExceptione){ IOExceptionio=findIOException(e); if(io!=null){ throwio; } thrownewRuntimeException(e); } }
通过几行代码比较重要的点RibbonRequest,原来Feign负载均衡还是通过Ribbon实现的,那么Ribbo又是如何实现负载均衡的呢?
publicObservablesubmit(finalServerOperation operation){ finalExecutionInfoContextcontext=newExecutionInfoContext(); if(listenerInvoker!=null){ try{ listenerInvoker.onExecutionStart(); }catch(AbortExecutionExceptione){ returnObservable.error(e); } } finalintmaxRetrysSame=retryHandler.getMaxRetriesOnSameServer(); finalintmaxRetrysNext=retryHandler.getMaxRetriesOnNextServer(); //Usetheloadbalancer Observable o= (server==null?selectServer():Observable.just(server)) .concatMap(newFunc1 >(){ @Override //Calledforeachserverbeingselected publicObservable call(Serverserver){ context.setServer(server); finalServerStatsstats=loadBalancerContext.getServerStats(server); //Calledforeachattemptandretry Observable o=Observable .just(server) .concatMap(newFunc1 >(){ @Override publicObservable call(finalServerserver){ context.incAttemptCount(); loadBalancerContext.noteOpenConnection(stats); if(listenerInvoker!=null){ try{ listenerInvoker.onStartWithServer(context.toExecutionInfo()); }catch(AbortExecutionExceptione){ returnObservable.error(e); } } finalStopwatchtracer=loadBalancerContext.getExecuteTracer().start(); returnoperation.call(server).doOnEach(newObserver (){ privateTentity; @Override publicvoidonCompleted(){ recordStats(tracer,stats,entity,null); //TODO:WhattodoifonNextoronErrorarenevercalled? } @Override publicvoidonError(Throwablee){ recordStats(tracer,stats,null,e); logger.debug("Goterror{}whenexecutedonserver{}",e,server); if(listenerInvoker!=null){ listenerInvoker.onExceptionWithServer(e,context.toExecutionInfo()); } } @Override publicvoidonNext(Tentity){ this.entity=entity; if(listenerInvoker!=null){ listenerInvoker.onExecutionSuccess(entity,context.toExecutionInfo()); } } privatevoidrecordStats(Stopwatchtracer,ServerStatsstats,Objectentity,Throwableexception){ tracer.stop(); loadBalancerContext.noteRequestCompletion(stats,entity,exception,tracer.getDuration(TimeUnit.MILLISECONDS),retryHandler); } }); } }); if(maxRetrysSame>0) o=o.retry(retryPolicy(maxRetrysSame,true)); returno; } }); if(maxRetrysNext>0&&server==null) o=o.retry(retryPolicy(maxRetrysNext,false)); returno.onErrorResumeNext(newFunc1 >(){ @Override publicObservable call(Throwablee){ if(context.getAttemptCount()>0){ if(maxRetrysNext>0&&context.getServerAttemptCount()==(maxRetrysNext+1)){ e=newClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED, "Numberofretriesonnextserverexceededmax"+maxRetrysNext +"retries,whilemakingacallfor:"+context.getServer(),e); } elseif(maxRetrysSame>0&&context.getAttemptCount()==(maxRetrysSame+1)){ e=newClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED, "Numberofretriesexceededmax"+maxRetrysSame +"retries,whilemakingacallfor:"+context.getServer(),e); } } if(listenerInvoker!=null){ listenerInvoker.onExecutionFailed(e,context.toFinalExecutionInfo()); } returnObservable.error(e); } }); }
通过上面代码分析,发现Ribbon和Hystrix一样都是利用了rxjava看来有必要掌握下rxjava了又。这里面比较重要的就是17行,
selectServer()方法选择指定的Server,负载均衡的策略主要是有ILoadBalancer接口不同实现方式:
- BaseLoadBalancer采用的规则为RoundRobinRule轮训规则
- DynamicServerListLoadBalancer继承了BaseLoadBalancer,主要运行时改变Server列表
- NoOpLoadBalancer什么操作都不做
- ZoneAwareLoadBalancer功能主要是根据区域Zone分组的实例列表
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。