JVM优先级线程池做任务队列的实现方法
前言
我们都知道web服务的工作大多是接受http请求,并返回处理后的结果。服务器接受的每一个请求又可以看是一个任务。一般而言这些请求任务会根据请求的先后有序处理,如果请求任务的处理比较耗时,往往就需要排队了。而同时不同的任务直接可能会存在一些优先级的变化,这时候就需要引入任务队列并进行管理了。可以做任务队列的东西有很多,Java自带的线程池,以及其他的消息中间件都可以。
同步与异步
这个问题在之前已经提过很多次了,有些任务是需要请求后立即返回结果的,而有的则不需要。设想一下你下单购物的场景,付完钱后,系统只需要返回一个支付成功即可,后续的积分增加、优惠券发放、安排发货等等业务都不需要实时返回给用户的,这些就是异步的任务。大量的异步任务到达我们部署的服务上,由于处理效率的瓶颈,无法达到实时处理,因此与需要用队列将他们暂时保存起来,排队处理。
线程池
在Java中提到队列,我们除了想到基本的数据结构之外,应该还有线程池。线程池自带一套机制可以实现任务的排队和执行,可以满足单点环境下绝大多数异步化的场景。下面是典型的一个处理流程:
//注入合适类型的线程池 @Autowired privatefinalThreadPoolExecutorasyncPool; @RequestMapping(value="/async/someOperate",method=RequestMethod.POST) publicRestResultsomeOperate(HttpServletRequestrequest,Stringparams,StringcallbackUrl{ //接受请求后submit到线程池排队处理 asyncPool.submit(newTask(params,callbackUrl); returnnewRestResult(ResultCode.SUCCESS.getCode(),null){{ setMsg("successful!"+prop.getShowMsg()); }}; } //异步任务处理 @Slf4j publicclassTaskextendsCallable{ privateStringparams; privateStringcallbackUrl; privatefinalIAlgorithmServicealgorithmService=SpringUtil.getBean(IAlgorithmServiceImpl.class); privatefinalServiceUtilsserviceUtils=SpringUtil.getBean(ServiceUtils.class); publicImageTask(Stringparams,StringcallbackUrl){ this.params=params; this.callbackUrl=callbackUrl; } @Override publicRestResultcall(){ try{ //业务处理 CarDamageResultresult=algorithmService.someOperate(this.params); //回调 returnserviceUtils.callback(this.callbackUrl,this.caseNum,ResultCode.SUCCESS.getCode(),result,this.isAsync); }catch(ServiceExceptione){ returnserviceUtils.callback(this.callbackUrl,this.caseNum,e.getCode(),null,this.isAsync); } } }
对于线程池这里就不具体展开讲了,仅仅简单理了下具体的流程:
- 收到请求后,参数校验后传入线程池排队。
- 返回结果:“请求成功,正在处理”。
- 任务排到后由相应的线程处理,处理完后进行接口回调。
上面的例子描述了一个生产速度远远大于消费速度的模型,普通面向数据库开发的企业级应用,由于数据库的连接池开发的连接数较大,一般不需要这样通过线程池来处理,而一些GPU密集型的应用场景,由于显存的瓶颈导致消费速度慢时,就需要队列来作出调整了。
带优先级的线程池
更复杂的,例如考虑到任务的优先级,还需要对线程池进行重写,通过PriorityBlockingQueue来替换默认的阻塞队列。直接上代码。
importlombok.Data; importjava.util.concurrent.Callable; /** *@authorFururur *@create2020-01-14-10:37 */ @Data publicabstractclassPriorityCallableimplementsCallable { privateintpriority; }
importlombok.Getter; importlombok.Setter; importjava.util.concurrent.*; importjava.util.concurrent.atomic.AtomicLong; /** *优先级线程池的实现 * *@authorFururur *@create2019-07-23-10:19 */ publicclassPriorityThreadPoolExecutorextendsThreadPoolExecutor{ privateThreadLocallocal=ThreadLocal.withInitial(()->0); publicPriorityThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit){ super(corePoolSize,maximumPoolSize,keepAliveTime,unit,getWorkQueue()); } publicPriorityThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize, longkeepAliveTime,TimeUnitunit,ThreadFactorythreadFactory){ super(corePoolSize,maximumPoolSize,keepAliveTime,unit,getWorkQueue(),threadFactory); } publicPriorityThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize, longkeepAliveTime,TimeUnitunit,RejectedExecutionHandlerhandler){ super(corePoolSize,maximumPoolSize,keepAliveTime,unit,getWorkQueue(),handler); } publicPriorityThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize, longkeepAliveTime,TimeUnitunit,ThreadFactorythreadFactory, RejectedExecutionHandlerhandler){ super(corePoolSize,maximumPoolSize,keepAliveTime,unit,getWorkQueue(),threadFactory,handler); } privatestaticPriorityBlockingQueuegetWorkQueue(){ returnnewPriorityBlockingQueue(); } @Override publicvoidexecute(Runnablecommand){ intpriority=local.get(); try{ this.execute(command,priority); }finally{ local.set(0); } } publicvoidexecute(Runnablecommand,intpriority){ super.execute(newPriorityRunnable(command,priority)); } public Future submit(PriorityCallable task){ local.set(task.getPriority()); returnsuper.submit(task); } public Future submit(Runnabletask,Tresult,intpriority){ local.set(priority); returnsuper.submit(task,result); } publicFuture>submit(Runnabletask,intpriority){ local.set(priority); returnsuper.submit(task); } @Getter @Setter protectedstaticclassPriorityRunnableimplementsRunnable,Comparable { privatefinalstaticAtomicLongseq=newAtomicLong(); privatefinallongseqNum; privateRunnablerun; privateintpriority; PriorityRunnable(Runnablerun,intpriority){ seqNum=seq.getAndIncrement(); this.run=run; this.priority=priority; } @Override publicvoidrun(){ this.run.run(); } @Override publicintcompareTo(PriorityRunnableother){ intres=0; if(this.priority==other.priority){ if(other.run!=this.run){ //ASC res=(seqNum other.priority?-1:1; } returnres; } } }
要点如下:
- 替换线程池默认的阻塞队列为PriorityBlockingQueue,响应的传入的线程类需要实现Comparable
才能进行比较。 - PriorityBlockingQueue的数据结构决定了,优先级相同的任务无法保证FIFO,需要自己控制顺序。
- 需要重写线程池的execute()方法。看过线程池源码的会发现,执行submit(task)方法后,都会转化成RunnableFuture
再进一步执行,由于传入的task虽然实现了Comparable 到,但是内部转换成的RunnableFuture 并未实现,因此直接submit会抛出Causedby:java.lang.ClassCastException:java.util.concurrent.FutureTaskcannotbecasttojava.lang.Comparable这样一个异常,所以需要重写execute()方法,构造一个PriorityRunnable作为中转。
总结
JVM线程池是实现异步任务队列最简单最原生的一种方式,本文介绍了基本的使用流程和带有优先队列需求的用法。这种方法可有满足到一些简单的业务场景,但也存在一定的局限性:
- JVM线程池是单机的,横向扩展多个服务下做负载均衡时,就会存在多个线程池了他们是分开工作的,无法很好的统一和管理,不太适合分布式场景。
- JVM线程池是基于内存的,一旦服务挂了,会出现任务丢失的情况,可靠性低。
- 缺少作为任务队列的ack机制,一旦任务失败不会重新执行,且无法很好地对线程池队列进行监控。
显然简单的JVM线程池是无法handle到负载的业务场景的,这就需要引入其他中间件了,在接下来的文章中我们会继续探讨。
参考文献
ThreadPoolExecutor优先级的线程池
implementingPriorityQueueonThreadPoolExecutor
ThreadPoolExecutor的PriorityBlockingQueue类型转化问题
大搜车异步任务队列中间件的建设实践
到此这篇关于JVM优先级线程池做任务队列的实现方法的文章就介绍到这了,更多相关java线程池优先级内容请搜索毛票票以前的文章或继续浏览下面的相关文章希望大家以后多多支持毛票票!