SpringCloud Finchley Gateway 缓存请求Body和Form表单的实现
在接入Spring-Cloud-Gateway时,可能有需求进行缓存Json-Body数据或者Form-Urlencoded数据的情况。
由于Spring-Cloud-Gateway是以WebFlux为基础的响应式架构设计,所以在原有Zuul基础上迁移过来的过程中,传统的编程思路,并不适合于ReactorStream的开发。
网络上有许多缓存案例,但是在测试过程中出现各种Bug问题,在缓存Body时,需要考虑整体的响应式操作,才能更合理的缓存数据
下面提供缓存Json-Body数据或者Form-Urlencoded数据的具体实现方案,该方案经测试,满足各方面需求,以及避免了网络上其他缓存方案所出现的问题
定义一个GatewayContext类,用于存储请求中缓存的数据
importlombok.Getter; importlombok.Setter; importlombok.ToString; importorg.springframework.util.LinkedMultiValueMap; importorg.springframework.util.MultiValueMap; @Getter @Setter @ToString publicclassGatewayContext{ publicstaticfinalStringCACHE_GATEWAY_CONTEXT="cacheGatewayContext"; /** *cachejsonbody */ privateStringcacheBody; /** *cacheformdata */ privateMultiValueMapformData; /** *cachereqeustpath */ privateStringpath; }
实现GlobalFilter和Ordered接口用于缓存请求数据
1.该示例只支持缓存下面3种MediaType
- APPLICATION_JSON--Json数据
- APPLICATION_JSON_UTF8--Json数据
- APPLICATION_FORM_URLENCODED--FormData表单数据
2.经验总结:
- 在缓存Body时,不能够在Filter内部直接进行缓存,需要按照响应式的处理方式,在异步操作路途上进行缓存Body,由于Body只能读取一次,所以要读取完成后要重新封装新的request和exchange才能保证请求正常传递到下游
- 在缓存FormData时,FormData也只能读取一次,所以在读取完毕后,需要重新封装request和exchange,这里要注意,如果对FormData内容进行了修改,则必须重新定义Header中的content-length已保证传输数据的大小一致
importcom.choice.cloud.architect.usergate.option.FilterOrderEnum; importcom.choice.cloud.architect.usergate.support.GatewayContext; importio.netty.buffer.ByteBufAllocator; importlombok.extern.slf4j.Slf4j; importorg.springframework.cloud.gateway.filter.GatewayFilterChain; importorg.springframework.cloud.gateway.filter.GlobalFilter; importorg.springframework.core.Ordered; importorg.springframework.core.io.ByteArrayResource; importorg.springframework.core.io.buffer.DataBuffer; importorg.springframework.core.io.buffer.DataBufferUtils; importorg.springframework.core.io.buffer.NettyDataBufferFactory; importorg.springframework.http.HttpHeaders; importorg.springframework.http.MediaType; importorg.springframework.http.codec.HttpMessageReader; importorg.springframework.http.server.reactive.ServerHttpRequest; importorg.springframework.http.server.reactive.ServerHttpRequestDecorator; importorg.springframework.util.MultiValueMap; importorg.springframework.web.reactive.function.server.HandlerStrategies; importorg.springframework.web.reactive.function.server.ServerRequest; importorg.springframework.web.server.ServerWebExchange; importreactor.core.publisher.Flux; importreactor.core.publisher.Mono; importjava.io.UnsupportedEncodingException; importjava.net.URLEncoder; importjava.nio.charset.Charset; importjava.nio.charset.StandardCharsets; importjava.util.List; importjava.util.Map; @Slf4j publicclassGatewayContextFilterimplementsGlobalFilter,Ordered{ /** *defaultHttpMessageReader */ privatestaticfinalList>messageReaders=HandlerStrategies.withDefaults().messageReaders(); @Override publicMono filter(ServerWebExchangeexchange,GatewayFilterChainchain){ /** *saverequestpathandserviceIdintogatewaycontext */ ServerHttpRequestrequest=exchange.getRequest(); Stringpath=request.getPath().pathWithinApplication().value(); GatewayContextgatewayContext=newGatewayContext(); gatewayContext.getAllRequestData().addAll(request.getQueryParams()); gatewayContext.setPath(path); /** *savegatewaycontextintoexchange */ exchange.getAttributes().put(GatewayContext.CACHE_GATEWAY_CONTEXT,gatewayContext); HttpHeadersheaders=request.getHeaders(); MediaTypecontentType=headers.getContentType(); longcontentLength=headers.getContentLength(); if(contentLength>0){ if(MediaType.APPLICATION_JSON.equals(contentType)||MediaType.APPLICATION_JSON_UTF8.equals(contentType)){ returnreadBody(exchange,chain,gatewayContext); } if(MediaType.APPLICATION_FORM_URLENCODED.equals(contentType)){ returnreadFormData(exchange,chain,gatewayContext); } } log.debug("[GatewayContext]ContentType:{},Gatewaycontextissetwith{}",contentType,gatewayContext); returnchain.filter(exchange); } @Override publicintgetOrder(){ returnInteger.MIN_VALUE; } /** *ReadFormData *@paramexchange *@paramchain *@return */ privateMono readFormData(ServerWebExchangeexchange,GatewayFilterChainchain,GatewayContextgatewayContext){ HttpHeadersheaders=exchange.getRequest().getHeaders(); returnexchange.getFormData() .doOnNext(multiValueMap->{ gatewayContext.setFormData(multiValueMap); log.debug("[GatewayContext]ReadFormData:{}",multiValueMap); }) .then(Mono.defer(()->{ Charsetcharset=headers.getContentType().getCharset(); charset=charset==null?StandardCharsets.UTF_8:charset; StringcharsetName=charset.name(); MultiValueMap formData=gatewayContext.getFormData(); /** *formDataisemptyjustreturn */ if(null==formData||formData.isEmpty()){ returnchain.filter(exchange); } StringBuilderformDataBodyBuilder=newStringBuilder(); StringentryKey; List entryValue; try{ /** *removesystemparam,repackageformdata */ for(Map.Entry >entry:formData.entrySet()){ entryKey=entry.getKey(); entryValue=entry.getValue(); if(entryValue.size()>1){ for(Stringvalue:entryValue){ formDataBodyBuilder.append(entryKey).append("=").append(URLEncoder.encode(value,charsetName)).append("&"); } }else{ formDataBodyBuilder.append(entryKey).append("=").append(URLEncoder.encode(entryValue.get(0),charsetName)).append("&"); } } }catch(UnsupportedEncodingExceptione){ //ignoreURLEncodeException } /** *substringwiththelastchar'&' */ StringformDataBodyString=""; if(formDataBodyBuilder.length()>0){ formDataBodyString=formDataBodyBuilder.substring(0,formDataBodyBuilder.length()-1); } /** *getdatabytes */ byte[]bodyBytes=formDataBodyString.getBytes(charset); intcontentLength=bodyBytes.length; ServerHttpRequestDecoratordecorator=newServerHttpRequestDecorator( exchange.getRequest()){ /** *changecontent-length *@return */ @Override publicHttpHeadersgetHeaders(){ HttpHeadershttpHeaders=newHttpHeaders(); httpHeaders.putAll(super.getHeaders()); if(contentLength>0){ httpHeaders.setContentLength(contentLength); }else{ httpHeaders.set(HttpHeaders.TRANSFER_ENCODING,"chunked"); } returnhttpHeaders; } /** *readbytestoFlux *@return */ @Override publicFlux getBody(){ returnDataBufferUtils.read(newByteArrayResource(bodyBytes),newNettyDataBufferFactory(ByteBufAllocator.DEFAULT),contentLength); } }; ServerWebExchangemutateExchange=exchange.mutate().request(decorator).build(); log.debug("[GatewayContext]RewriteFormData:{}",formDataBodyString); returnchain.filter(mutateExchange); })); } /** *ReadJsonBody *@paramexchange *@paramchain *@return */ privateMono readBody(ServerWebExchangeexchange,GatewayFilterChainchain,GatewayContextgatewayContext){ /** *jointhebody */ returnDataBufferUtils.join(exchange.getRequest().getBody()) .flatMap(dataBuffer->{ /** *readthebodyFlux */ DataBufferUtils.retain(dataBuffer); Flux cachedFlux=Flux.defer(()->Flux.just(dataBuffer.slice(0,dataBuffer.readableByteCount()))); /** *repackageServerHttpRequest */ ServerHttpRequestmutatedRequest=newServerHttpRequestDecorator(exchange.getRequest()){ @Override publicFlux getBody(){ returncachedFlux; } }; /** *mutateexchagewithnewServerHttpRequest */ ServerWebExchangemutatedExchange=exchange.mutate().request(mutatedRequest).build(); /** *readbodystringwithdefaultmessageReaders */ returnServerRequest.create(mutatedExchange,messageReaders) .bodyToMono(String.class) .doOnNext(objectValue->{ gatewayContext.setCacheBody(objectValue); log.debug("[GatewayContext]ReadJsonBody:{}",objectValue); }).then(chain.filter(mutatedExchange)); }); } }
在后续Filter中,可以直接从ServerExchange中获取GatewayContext,就可以获取到缓存的数据,如果需要缓存其他数据,则可以根据自己的需求,添加到GatewayContext中即可
GatewayContextgatewayContext=exchange.getAttribute(GatewayContext.CACHE_GATEWAY_CONTEXT);
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。