springboot执行延时任务之DelayQueue的使用详解
DelayQueue简介
DelayQueue是一个无界阻塞队列,只有在延迟期满时,才能从中提取元素。
队列的头部,是延迟期满后保存时间最长的delay元素。
在很多场景我们需要用到延时任务,比如给客户异步转账操作超时后发通知告知用户,还有客户下单后多长时间内没支付则取消订单等等,这些都可以使用延时任务来实现。
jdk中DelayQueue可以实现上述需求,顾名思义DelayQueue就是延时队列。
DelayQueue提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。
没有过期元素的话,使用poll()方法会返回null值,超时判定是通过getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于0来判断。
延时队列不能存放空元素。
一般使用take()方法阻塞等待,有过期元素时继续。
队列元素说明
DelayQueue
publicinterfaceDelayedextendsComparable{ /** *Returnstheremainingdelayassociatedwiththisobject,inthe *giventimeunit. * *@paramunitthetimeunit *@returntheremainingdelay;zeroornegativevaluesindicate *thatthedelayhasalreadyelapsed */ longgetDelay(TimeUnitunit); }
所以DelayQueue的元素需要实现getDelay方法和Comparable接口的compareTo方法,getDelay方法来判定元素是否过期,compareTo方法来确定先后顺序。
springboot中实例运用
DelayTask就是队列中的元素
importjava.util.Date; importjava.util.concurrent.Delayed; importjava.util.concurrent.TimeUnit; publicclassDelayTaskimplementsDelayed{ finalprivateTaskBasedata; finalprivatelongexpire; /** *构造延时任务 *@paramdata业务数据 *@paramexpire任务延时时间(ms) */ publicDelayTask(TaskBasedata,longexpire){ super(); this.data=data; this.expire=expire+System.currentTimeMillis(); } publicTaskBasegetData(){ returndata; } publiclonggetExpire(){ returnexpire; } @Override publicbooleanequals(Objectobj){ if(objinstanceofDelayTask){ returnthis.data.getIdentifier().equals(((DelayTask)obj).getData().getIdentifier()); } returnfalse; } @Override publicStringtoString(){ return"{"+"data:"+data.toString()+","+"expire:"+newDate(expire)+"}"; } @Override publiclonggetDelay(TimeUnitunit){ returnunit.convert(this.expire-System.currentTimeMillis(),unit); } @Override publicintcompareTo(Delayedo){ longdelta=getDelay(TimeUnit.NANOSECONDS)-o.getDelay(TimeUnit.NANOSECONDS); return(int)delta; } }
TaskBase类是用户自定义的业务数据基类,其中有一个identifier字段来标识任务的id,方便进行索引
importcom.alibaba.fastjson.JSON; publicclassTaskBase{ privateStringidentifier; publicTaskBase(Stringidentifier){ this.identifier=identifier; } publicStringgetIdentifier(){ returnidentifier; } publicvoidsetIdentifier(Stringidentifier){ this.identifier=identifier; } @Override publicStringtoString(){ returnJSON.toJSONString(this); } }
定义一个延时任务管理类DelayQueueManager,通过@Component注解加入到spring中管理,在需要使用的地方通过@Autowire注入
importcom.alibaba.fastjson.JSON; importorg.slf4j.Logger; importorg.slf4j.LoggerFactory; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.beans.factory.annotation.Value; importorg.springframework.boot.CommandLineRunner; importorg.springframework.stereotype.Component; importjava.text.SimpleDateFormat; importjava.util.Date; importjava.util.concurrent.DelayQueue; importjava.util.concurrent.Executors; @Component publicclassDelayQueueManagerimplementsCommandLineRunner{ privatefinalLoggerlogger=LoggerFactory.getLogger(DelayQueueManager.class); privateDelayQueuedelayQueue=newDelayQueue<>(); /** *加入到延时队列中 *@paramtask */ publicvoidput(DelayTasktask){ logger.info("加入延时任务:{}",task); delayQueue.put(task); } /** *取消延时任务 *@paramtask *@return */ publicbooleanremove(DelayTasktask){ logger.info("取消延时任务:{}",task); returndelayQueue.remove(task); } /** *取消延时任务 *@paramtaskid *@return */ publicbooleanremove(Stringtaskid){ returnremove(newDelayTask(newTaskBase(taskid),0)); } @Override publicvoidrun(String...args)throwsException{ logger.info("初始化延时队列"); Executors.newSingleThreadExecutor().execute(newThread(this::excuteThread)); } /** *延时任务执行线程 */ privatevoidexcuteThread(){ while(true){ try{ DelayTasktask=delayQueue.take(); processTask(task); }catch(InterruptedExceptione){ break; } } } /** *内部执行延时任务 *@paramtask */ privatevoidprocessTask(DelayTasktask){ logger.info("执行延时任务:{}",task); //根据task中的data自定义数据来处理相关逻辑,例if(task.getData()instanceofXXX){} } }
DelayQueueManager实现了CommandLineRunner接口,在springboot启动完成后就会自动调用run方法。
总结
以上所述是小编给大家介绍的springboot执行延时任务DelayQueue的使用详解,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对毛票票网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。