基于springboot 长轮询的实现操作
springboot长轮询实现
基于@EnableAsync,@Sync
@SpringBootApplication
@EnableAsync
publicclassDemoApplication{
publicstaticvoidmain(String[]args){
SpringApplication.run(DemoApplication.class,args);
}
}
@RequestMapping("/async")
@RestController
publicclassAsyncRequestDemo{
@Autowired
privateAsyncRequestServiceasyncRequestService;
@GetMapping("/value")
publicStringgetValue(){
Stringmsg=null;
Futureresult=null;
try{
result=asyncRequestService.getValue();
msg=result.get(10,TimeUnit.SECONDS);
}catch(Exceptione){
e.printStackTrace();
}finally{
if(result!=null){
result.cancel(true);
}
}
returnmsg;
}
@PostMapping("/value")
publicvoidpostValue(Stringmsg){
asyncRequestService.postValue(msg);
}
}
@Service
publicclassAsyncRequestService{
privateStringmsg=null;
@Async
publicFuturegetValue()throwsInterruptedException{
while(true){
synchronized(this){
if(msg!=null){
StringresultMsg=msg;
msg=null;
returnnewAsyncResult(resultMsg);
}
}
Thread.sleep(100);
}
}
publicsynchronizedvoidpostValue(Stringmsg){
this.msg=msg;
}
}
备注
@EnableAsync开启异步
@Sync标记异步方法
Future用于接收异步返回值
result.get(10,TimeUnit.SECONDS);阻塞,超时获取结果
Future.cancel()中断线程
补充:通过spring提供的DeferredResult实现长轮询服务端推送消息
DeferredResult字面意思就是推迟结果,是在servlet3.0以后引入了异步请求之后,spring封装了一下提供了相应的支持,也是一个很老的特性了。DeferredResult可以允许容器线程快速释放以便可以接受更多的请求提升吞吐量,让真正的业务逻辑在其他的工作线程中去完成。
最近再看apollo配置中心的实现原理,apollo的发布配置推送变更消息就是用DeferredResult实现的,apollo客户端会像服务端发送长轮训http请求,超时时间60秒,当超时后返回客户端一个304httpstatus,表明配置没有变更,客户端继续这个步骤重复发起请求,当有发布配置的时候,服务端会调用DeferredResult.setResult返回200状态码,然后轮训请求会立即返回(不会超时),客户端收到响应结果后,会发起请求获取变更后的配置信息。
下面我们自己写一个简单的demo来演示这个过程
springboot启动类:
@SpringBootApplication
publicclassDemoApplicationimplementsWebMvcConfigurer{
publicstaticvoidmain(String[]args){
SpringApplication.run(DemoApplication.class,args);
}
@Bean
publicThreadPoolTaskExecutormvcTaskExecutor(){
ThreadPoolTaskExecutorexecutor=newThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setQueueCapacity(100);
executor.setMaxPoolSize(25);
returnexecutor;
}
//配置异步支持,设置了一个用来异步执行业务逻辑的工作线程池,设置了默认的超时时间是60秒
@Override
publicvoidconfigureAsyncSupport(AsyncSupportConfigurerconfigurer){
configurer.setTaskExecutor(mvcTaskExecutor());
configurer.setDefaultTimeout(60000L);
}
}
importcom.google.common.collect.HashMultimap;
importcom.google.common.collect.Multimap;
importcom.google.common.collect.Multimaps;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importorg.springframework.web.bind.annotation.PathVariable;
importorg.springframework.web.bind.annotation.RequestMapping;
importorg.springframework.web.bind.annotation.RequestMethod;
importorg.springframework.web.bind.annotation.RestController;
importorg.springframework.web.context.request.async.DeferredResult;
importjava.util.Collection;
@RestController
publicclassApolloController{
privatefinalLoggerlogger=LoggerFactory.getLogger(this.getClass());
//guava中的Multimap,多值map,对map的增强,一个key可以保持多个value
privateMultimap>watchRequests=Multimaps.synchronizedSetMultimap(HashMultimap.create());
//模拟长轮询
@RequestMapping(value="/watch/{namespace}",method=RequestMethod.GET,produces="text/html")
publicDeferredResultwatch(@PathVariable("namespace")Stringnamespace){
logger.info("Requestreceived");
DeferredResultdeferredResult=newDeferredResult<>();
//当deferredResult完成时(不论是超时还是异常还是正常完成),移除watchRequests中相应的watchkey
deferredResult.onCompletion(newRunnable(){
@Override
publicvoidrun(){
System.out.println("removekey:"+namespace);
watchRequests.remove(namespace,deferredResult);
}
});
watchRequests.put(namespace,deferredResult);
logger.info("Servletthreadreleased");
returndeferredResult;
}
//模拟发布namespace配置
@RequestMapping(value="/publish/{namespace}",method=RequestMethod.GET,produces="text/html")
publicObjectpublishConfig(@PathVariable("namespace")Stringnamespace){
if(watchRequests.containsKey(namespace)){
Collection>deferredResults=watchRequests.get(namespace);
Longtime=System.currentTimeMillis();
//通知所有watch这个namespace变更的长轮训配置变更结果
for(DeferredResultdeferredResult:deferredResults){
deferredResult.setResult(namespace+"changed:"+time);
}
}
return"success";
}
}
当请求超时的时候会产生AsyncRequestTimeoutException,我们定义一个全局异常捕获类:
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importorg.springframework.http.HttpStatus;
importorg.springframework.web.bind.annotation.ControllerAdvice;
importorg.springframework.web.bind.annotation.ExceptionHandler;
importorg.springframework.web.bind.annotation.ResponseBody;
importorg.springframework.web.bind.annotation.ResponseStatus;
importorg.springframework.web.context.request.async.AsyncRequestTimeoutException;
importjavax.servlet.http.HttpServletRequest;
importjavax.servlet.http.HttpServletResponse;
@ControllerAdvice
classGlobalControllerExceptionHandler{
protectedstaticfinalLoggerlogger=LoggerFactory.getLogger(GlobalControllerExceptionHandler.class);
@ResponseStatus(HttpStatus.NOT_MODIFIED)//返回304状态码
@ResponseBody
@ExceptionHandler(AsyncRequestTimeoutException.class)//捕获特定异常
publicvoidhandleAsyncRequestTimeoutException(AsyncRequestTimeoutExceptione,HttpServletRequestrequest){
System.out.println("handleAsyncRequestTimeoutException");
}
}
然后我们通过postman工具发送请求http://localhost:8080/watch/mynamespace,请求会挂起,60秒后,DeferredResult超时,客户端正常收到了304状态码,表明在这个期间配置没有变更过。
然后我们在模拟配置变更的情况,再次发起请求http://localhost:8080/watch/mynamespace,等待个10秒钟(不要超过60秒),然后调用http://localhost:8080/publish/mynamespace,发布配置变更。这时postman会立刻收到response响应结果:
mynamespacechanged:1538880050147
表明在轮训期间有配置变更过。
这里我们用了一个MultiMap来存放所有轮训的请求,Key对应的是namespace,value对应的是所有watch这个namespace变更的异步请求DeferredResult,需要注意的是:在DeferredResult完成的时候记得移除MultiMap中相应的key,避免内存溢出请求。
采用这种长轮询的好处是,相比一直循环请求服务器,实例一多的话会对服务器产生很大的压力,http长轮询的方式会在服务器变更的时候主动推送给客户端,其他时间客户端是挂起请求的,这样同时满足了性能和实时性。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持毛票票。如有错误或未考虑完全的地方,望不吝赐教。