深入理解Java定时调度(Timer)机制
简介
在实现定时调度功能的时候,我们往往会借助于第三方类库来完成,比如:quartz、SpringSchedule等等。JDK从1.3版本开始,就提供了基于Timer的定时调度功能。在Timer中,任务的执行是串行的。这种特性在保证了线程安全的情况下,往往带来了一些严重的副作用,比如任务间相互影响、任务执行效率低下等问题。为了解决Timer的这些问题,JDK从1.5版本开始,提供了基于ScheduledExecutorService的定时调度功能。
本节我们主要分析Timer的功能。对于ScheduledExecutorService的功能,我们将新开一篇文章来讲解。
如何使用
Timer需要和TimerTask配合使用,才能完成调度功能。Timer表示调度器,TimerTask表示调度器执行的任务。任务的调度分为两种:一次性调度和循环调度。下面,我们通过一些例子来了解他们是如何使用的。
1.一次性调度
publicstaticvoidmain(String[]args){
Timertimer=newTimer();
TimerTasktask=newTimerTask(){
@Overridepublicvoidrun(){
SimpleDateFormatformat=newSimpleDateFormat("HH:mm:ss");
System.out.println(format.format(scheduledExecutionTime())+",called");
}
};
//延迟一秒,打印一次
//打印结果如下:10:58:24,called
timer.schedule(task,1000);
}
2.循环调度-schedule()
publicstaticvoidmain(String[]args){
Timertimer=newTimer();
TimerTasktask=newTimerTask(){
@Overridepublicvoidrun(){
SimpleDateFormatformat=newSimpleDateFormat("HH:mm:ss");
System.out.println(format.format(scheduledExecutionTime())+",called");
}
};
//固定时间的调度方式,延迟一秒,之后每隔一秒打印一次
//打印结果如下:
//11:03:55,called
//11:03:56,called
//11:03:57,called
//11:03:58,called
//11:03:59,called
//...
timer.schedule(task,1000,1000);
}
3.循环调度-scheduleAtFixedRate()
publicstaticvoidmain(String[]args){
Timertimer=newTimer();
TimerTasktask=newTimerTask(){
@Overridepublicvoidrun(){
SimpleDateFormatformat=newSimpleDateFormat("HH:mm:ss");
System.out.println(format.format(scheduledExecutionTime())+",called");
}
};
//固定速率的调度方式,延迟一秒,之后每隔一秒打印一次
//打印结果如下:
//11:08:43,called
//11:08:44,called
//11:08:45,called
//11:08:46,called
//11:08:47,called
//...
timer.scheduleAtFixedRate(task,1000,1000);
}
4.schedule()和scheduleAtFixedRate()的区别
从2和3的结果来看,他们达到的效果似乎是一样的。既然效果一样,JDK为啥要实现为两个方法呢?他们应该有不一样的地方!
在正常的情况下,他们的效果是一模一样的。而在异常的情况下-任务执行的时间比间隔的时间更长,他们是效果是不一样的。
- schedule()方法,任务的下一次执行时间是相对于上一次实际执行完成的时间点,因此执行时间会不断延后
- scheduleAtFixedRate()方法,任务的下一次执行时间是相对于上一次开始执行的时间点,因此执行时间不会延后
- 由于Timer内部是通过单线程方式实现的,所以这两种方式都不存在线程安全的问题
我们先来看看schedule()的异常效果:
publicstaticvoidmain(String[]args){
Timertimer=newTimer();
TimerTasktask=newTimerTask(){
@Overridepublicvoidrun(){
SimpleDateFormatformat=newSimpleDateFormat("HH:mm:ss");
try{
Thread.sleep(3000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.out.println(format.format(scheduledExecutionTime())+",called");
}
};
timer.schedule(task,1000,2000);
//执行结果如下:
//11:18:56,called
//11:18:59,called
//11:19:02,called
//11:19:05,called
//11:19:08,called
//11:19:11,called
}
接下来我们看看scheduleAtFixedRate()的异常效果:
publicstaticvoidmain(String[]args){
Timertimer=newTimer();
TimerTasktask=newTimerTask(){
@Overridepublicvoidrun(){
SimpleDateFormatformat=newSimpleDateFormat("HH:mm:ss");
try{
Thread.sleep(3000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
System.out.println(format.format(scheduledExecutionTime())+",called");
}
};
timer.scheduleAtFixedRate(task,1000,2000);
//执行结果如下:
//11:20:45,called
//11:20:47,called
//11:20:49,called
//11:20:51,called
//11:20:53,called
//11:20:55,called
}
楼主一直相信,实践是检验真理比较好的方式,上面的例子从侧面验证了我们最初的猜想。
但是,这儿引出了另外一个问题。既然Timer内部是单线程实现的,在执行间隔为2秒、任务实际执行为3秒的情况下,scheduleAtFixedRate是如何做到2秒输出一次的呢?
【特别注意】
这儿其实是一个障眼法。需要重点关注的是,打印方法输出的值是通过调用scheduledExecutionTime()来生成的,而这个方法并不一定是任务真实执行的时间,而是当前任务应该执行的时间。
源码阅读
楼主对于知识的理解是,除了知其然,还需要知其所以然。而阅读源码是打开知其所以然大门的一把强有力的钥匙。在JDK中,Timer主要由TimerTask、TaskQueue和TimerThread组成。
1.TimerTask
TimerTask表示任务调度器执行的任务,继承自Runnable,其内部维护着任务的状态,一共有4种状态
- VIRGIN,英文名为处女,表示任务还未调度
- SCHEDULED,已经调度,但还未执行
- EXECUTED,对于执行一次的任务,表示已经执行;对于重复执行的任务,该状态无效
- CANCELLED,任务被取消
TimerTask还有下面的成员变量
- nextExecutionTime,下次执行的时间
- period,任务执行的时间间隔。正数表示固定速率;负数表示固定时延;0表示只执行一次
分析完大致的功能之后,我们来看看其代码。
/** *Thestateofthistask,chosenfromtheconstantsbelow. */ intstate=VIRGIN; /** *Thistaskhasnotyetbeenscheduled. */ staticfinalintVIRGIN=0; /** *Thistaskisscheduledforexecution.Ifitisanon-repeatingtask, *ithasnotyetbeenexecuted. */ staticfinalintSCHEDULED=1; /** *Thisnon-repeatingtaskhasalreadyexecuted(oriscurrently *executing)andhasnotbeencancelled. */ staticfinalintEXECUTED=2; /** *Thistaskhasbeencancelled(withacalltoTimerTask.cancel). */ staticfinalintCANCELLED=3;
TimerTask有两个操作方法
- cancel()//取消任务
- scheduledExecutionTime()//获取任务执行时间
cancel()比较简单,主要对当前任务加锁,然后变更状态为已取消。
publicbooleancancel(){
synchronized(lock){
booleanresult=(state==SCHEDULED);
state=CANCELLED;
returnresult;
}
}
而在scheduledExecutionTime()中,任务执行时间是通过下一次执行时间减去间隔时间的方式计算出来的。
publiclongscheduledExecutionTime(){
synchronized(lock){
return(period<0?nextExecutionTime+period
:nextExecutionTime-period);
}
}
2.TaskQueue
TaskQueue是一个队列,在Timer中用于存放任务。其内部是使用【最小堆算法】来实现的,堆顶的任务将最先被执行。由于使用了【最小堆】,TaskQueue判断执行时间是否已到的效率极高。我们来看看其内部是怎么实现的。
classTaskQueue{
/**
*Priorityqueuerepresentedasabalancedbinaryheap:thetwochildren
*ofqueue[n]arequeue[2*n]andqueue[2*n+1].Thepriorityqueueis
*orderedonthenextExecutionTimefield:TheTimerTaskwiththelowest
*nextExecutionTimeisinqueue[1](assumingthequeueisnonempty).For
*eachnodenintheheap,andeachdescendantofn,d,
*n.nextExecutionTime<=d.nextExecutionTime.
*
*使用数组来存放任务
*/
privateTimerTask[]queue=newTimerTask[128];
/**
*Thenumberoftasksinthepriorityqueue.(Thetasksarestoredin
*queue[1]uptoqueue[size]).
*
*用于表示队列中任务的个数,需要注意的是,任务数并不等于数组长度
*/
privateintsize=0;
/**
*Returnsthenumberoftaskscurrentlyonthequeue.
*/
intsize(){
returnsize;
}
/**
*Addsanewtasktothepriorityqueue.
*
*往队列添加一个任务
*/
voidadd(TimerTasktask){
//Growbackingstoreifnecessary
//在任务数超过数组长度,则通过数组拷贝的方式进行动态扩容
if(size+1==queue.length)
queue=Arrays.copyOf(queue,2*queue.length);
//将当前任务项放入队列
queue[++size]=task;
//向上调整,重新形成一个最小堆
fixUp(size);
}
/**
*Returnthe"headtask"ofthepriorityqueue.(Theheadtaskisan
*taskwiththelowestnextExecutionTime.)
*
*队列的第一个元素就是最先执行的任务
*/
TimerTaskgetMin(){
returnqueue[1];
}
/**
*Returntheithtaskinthepriorityqueue,whereirangesfrom1(the
*headtask,whichisreturnedbygetMin)tothenumberoftasksonthe
*queue,inclusive.
*
*获取队列指定下标的元素
*/
TimerTaskget(inti){
returnqueue[i];
}
/**
*Removetheheadtaskfromthepriorityqueue.
*
*移除堆顶元素,移除之后需要向下调整,使之重新形成最小堆
*/
voidremoveMin(){
queue[1]=queue[size];
queue[size--]=null;//Dropextrareferencetopreventmemoryleak
fixDown(1);
}
/**
*Removestheithelementfromqueuewithoutregardformaintaining
*theheapinvariant.Recallthatqueueisone-based,so
*1<=i<=size.
*
*快速移除指定位置元素,不会重新调整堆
*/
voidquickRemove(inti){
asserti<=size;
queue[i]=queue[size];
queue[size--]=null;//Dropextrareftopreventmemoryleak
}
/**
*SetsthenextExecutionTimeassociatedwiththeheadtasktothe
*specifiedvalue,andadjustspriorityqueueaccordingly.
*
*重新调度,向下调整使之重新形成最小堆
*/
voidrescheduleMin(longnewTime){
queue[1].nextExecutionTime=newTime;
fixDown(1);
}
/**
*Returnstrueifthepriorityqueuecontainsnoelements.
*
*队列是否为空
*/
booleanisEmpty(){
returnsize==0;
}
/**
*Removesallelementsfromthepriorityqueue.
*
*清除队列中的所有元素
*/
voidclear(){
//Nullouttaskreferencestopreventmemoryleak
for(inti=1;i<=size;i++)
queue[i]=null;
size=0;
}
/**
*Establishestheheapinvariant(describedabove)assumingtheheap
*satisfiestheinvariantexceptpossiblyfortheleaf-nodeindexedbyk
*(whichmayhaveanextExecutionTimelessthanitsparent's).
*
*Thismethodfunctionsby"promoting"queue[k]upthehierarchy
*(byswappingitwithitsparent)repeatedlyuntilqueue[k]'s
*nextExecutionTimeisgreaterthanorequaltothatofitsparent.
*
*向上调整,使之重新形成最小堆
*/
privatevoidfixUp(intk){
while(k>1){
intj=k>>1;
if(queue[j].nextExecutionTime<=queue[k].nextExecutionTime)
break;
TimerTasktmp=queue[j];queue[j]=queue[k];queue[k]=tmp;
k=j;
}
}
/**
*Establishestheheapinvariant(describedabove)inthesubtree
*rootedatk,whichisassumedtosatisfytheheapinvariantexcept
*possiblyfornodekitself(whichmayhaveanextExecutionTimegreater
*thanitschildren's).
*
*Thismethodfunctionsby"demoting"queue[k]downthehierarchy
*(byswappingitwithitssmallerchild)repeatedlyuntilqueue[k]'s
*nextExecutionTimeislessthanorequaltothoseofitschildren.
*
*向下调整,使之重新形成最小堆
*/
privatevoidfixDown(intk){
intj;
while((j=k<<1)<=size&&j>0){
if(jqueue[j+1].nextExecutionTime)
j++;//jindexessmallestkid
if(queue[k].nextExecutionTime<=queue[j].nextExecutionTime)
break;
TimerTasktmp=queue[j];queue[j]=queue[k];queue[k]=tmp;
k=j;
}
}
/**
*Establishestheheapinvariant(describedabove)intheentiretree,
*assumingnothingabouttheorderoftheelementspriortothecall.
*/
voidheapify(){
for(inti=size/2;i>=1;i--)
fixDown(i);
}
}
3.TimerThread
TimerThread作为Timer的成员变量,扮演着调度器的校色。我们先来看看它的构造方法,作用主要就是持有任务队列。
TimerThread(TaskQueuequeue){
this.queue=queue;
}
接下来看看run()方法,也就是线程执行的入口。
publicvoidrun(){
try{
mainLoop();
}finally{
//SomeonekilledthisThread,behaveasifTimercancelled
synchronized(queue){
newTasksMayBeScheduled=false;
queue.clear();//Eliminateobsoletereferences
}
}
}
主逻辑全在mainLoop()方法。在mainLoop方法执行完之后,会进行资源的清理操作。我们来看看mainLoop()方法。
privatevoidmainLoop(){
//while死循环
while(true){
try{
TimerTasktask;
booleantaskFired;
//对queue进行加锁,保证一个队列里所有的任务都是串行执行的
synchronized(queue){
//Waitforqueuetobecomenon-empty
//操作1,队列为空,需要等待新任务被调度,这时进行wait操作
while(queue.isEmpty()&&newTasksMayBeScheduled)
queue.wait();
//这儿再次判断队列是否为空,是因为【操作1】有任务进来了,同时任务又被取消了(进行了`cancel`操作),
//这时如果队列再次为空,那么需要退出线程,避免循环被卡死
if(queue.isEmpty())
break;//Queueisemptyandwillforeverremain;die
//Queuenonempty;lookatfirstevtanddotherightthing
longcurrentTime,executionTime;
//取出队列中的堆顶元素(下次执行时间最小的那个任务)
task=queue.getMin();
//这儿对堆元素进行加锁,是为了保证任务的可见性和原子性
synchronized(task.lock){
//取消的任务将不再被执行,需要从队列中移除
if(task.state==TimerTask.CANCELLED){
queue.removeMin();
continue;//Noactionrequired,pollqueueagain
}
//获取系统当前时间和任务下次执行的时间
currentTime=System.currentTimeMillis();
executionTime=task.nextExecutionTime;
//任务下次执行的时间<=系统当前时间,则执行此任务(设置状态标记`taskFired`为true)
if(taskFired=(executionTime<=currentTime)){
//`peroid`为0,表示此任务只需执行一次
if(task.period==0){//Non-repeating,remove
queue.removeMin();
task.state=TimerTask.EXECUTED;
}
//period不为0,表示此任务需要重复执行
//在这儿就体现出了`schedule()`方法和`scheduleAtFixedRate()`的区别
else{//Repeatingtask,reschedule
queue.rescheduleMin(
task.period<0?currentTime-task.period
:executionTime+task.period);
}
}
}
//任务没有被触发,队列挂起(带超时时间)
if(!taskFired)//Taskhasn'tyetfired;wait
queue.wait(executionTime-currentTime);
}
//任务被触发,执行任务。执行完后进入下一轮循环
if(taskFired)//Taskfired;runit,holdingnolocks
task.run();
}catch(InterruptedExceptione){
}
}
}
4.Timer
Timer通过构造方法做了下面的事情:
- 填充TimerThread对象的各项属性(比如线程名字、是否守护线程)
- 启动线程
/**
*Thetimerthread.
*/
privatefinalTimerThreadthread=newTimerThread(queue);
publicTimer(Stringname,booleanisDaemon){
thread.setName(name);
thread.setDaemon(isDaemon);
thread.start();
}
在Timer中,真正的暴露给用户使用的调度方法只有两个,schedule()和scheduleAtFixedRate(),我们来看看。
publicvoidschedule(TimerTasktask,longdelay){
if(delay<0)
thrownewIllegalArgumentException("Negativedelay.");
sched(task,System.currentTimeMillis()+delay,0);
}
publicvoidschedule(TimerTasktask,Datetime){
sched(task,time.getTime(),0);
}
publicvoidschedule(TimerTasktask,longdelay,longperiod){
if(delay<0)
thrownewIllegalArgumentException("Negativedelay.");
if(period<=0)
thrownewIllegalArgumentException("Non-positiveperiod.");
sched(task,System.currentTimeMillis()+delay,-period);
}
publicvoidschedule(TimerTasktask,DatefirstTime,longperiod){
if(period<=0)
thrownewIllegalArgumentException("Non-positiveperiod.");
sched(task,firstTime.getTime(),-period);
}
publicvoidscheduleAtFixedRate(TimerTasktask,longdelay,longperiod){
if(delay<0)
thrownewIllegalArgumentException("Negativedelay.");
if(period<=0)
thrownewIllegalArgumentException("Non-positiveperiod.");
sched(task,System.currentTimeMillis()+delay,period);
}
publicvoidscheduleAtFixedRate(TimerTasktask,DatefirstTime,
longperiod){
if(period<=0)
thrownewIllegalArgumentException("Non-positiveperiod.");
sched(task,firstTime.getTime(),period);
}
从上面的代码我们看出下面几点。
- 这两个方法最终都调用了sched()私有方法
- schedule()传入的period为负数,scheduleAtFixedRate()传入的period为正数
接下来我们看看sched()方法。
privatevoidsched(TimerTasktask,longtime,longperiod){
//1.`time`不能为负数的校验
if(time<0)
thrownewIllegalArgumentException("Illegalexecutiontime.");
//Constrainvalueofperiodsufficientlytopreventnumeric
//overflowwhilestillbeingeffectivelyinfinitelylarge.
//2.`period`不能超过`Long.MAX_VALUE>>1`
if(Math.abs(period)>(Long.MAX_VALUE>>1))
period>>=1;
synchronized(queue){
//3.Timer被取消时,不能被调度
if(!thread.newTasksMayBeScheduled)
thrownewIllegalStateException("Timeralreadycancelled.");
//4.对任务加锁,然后设置任务的下次执行时间、执行周期和任务状态,保证任务调度和任务取消是线程安全的
synchronized(task.lock){
if(task.state!=TimerTask.VIRGIN)
thrownewIllegalStateException(
"Taskalreadyscheduledorcancelled");
task.nextExecutionTime=time;
task.period=period;
task.state=TimerTask.SCHEDULED;
}
//5.将任务添加进队列
queue.add(task);
//6.队列中如果堆顶元素是当前任务,则唤醒队列,让`TimerThread`可以进行任务调度
if(queue.getMin()==task)
queue.notify();
}
}
sched()方法经过了下述步骤:
- time不能为负数的校验
- period不能超过Long.MAX_VALUE>>1
- Timer被取消时,不能被调度
- 对任务加锁,然后设置任务的下次执行时间、执行周期和任务状态,保证任务调度和任务取消是线程安全的
- 将任务添加进队列
- 队列中如果堆顶元素是当前任务,则唤醒队列,让TimerThread可以进行任务调度
【说明】:我们需要特别关注一下第6点。为什么堆顶元素必须是当前任务时才唤醒队列呢?原因在于堆顶元素所代表的意义,即:堆顶元素表示离当前时间最近的待执行任务!
【例子1】:假如当前时间为1秒,队列里有一个任务A需要在3秒执行,我们新加入的任务B需要在5秒执行。这时,因为TimerThread有wait(timeout)操作,时间到了会自己唤醒。所以为了性能考虑,不需要在sched()操作的时候进行唤醒。
【例子2】:假如当前时间为1秒,队列里有一个任务A需要在3秒执行,我们新加入的任务B需要在2秒执行。这时,如果不在sched()中进行唤醒操作,那么任务A将在3秒时执行。而任务B因为需要在2秒执行,已经过了它应该执行的时间,从而出现问题。
任务调度方法sched()分析完之后,我们继续分析其他方法。先来看一下cancel(),该方法用于取消Timer的执行。
publicvoidcancel(){
synchronized(queue){
thread.newTasksMayBeScheduled=false;
queue.clear();
queue.notify();//Incasequeuewasalreadyempty.
}
}
从上面源码分析来看,该方法做了下面几件事情:
- 设置TimerThread的newTasksMayBeScheduled标记为false
- 清空队列
- 唤醒队列
有的时候,在一个Timer中可能会存在多个TimerTask。如果我们只是取消其中几个TimerTask,而不是全部,除了对TimerTask执行cancel()方法调用,还需要对Timer进行清理操作。这儿的清理方法就是purge(),我们来看看其实现逻辑。
publicintpurge(){
intresult=0;
synchronized(queue){
//1.遍历所有任务,如果任务为取消状态,则将其从队列中移除,移除数做加一操作
for(inti=queue.size();i>0;i--){
if(queue.get(i).state==TimerTask.CANCELLED){
queue.quickRemove(i);
result++;
}
}
//2.将队列重新形成最小堆
if(result!=0)
queue.heapify();
}
returnresult;
}
5.唤醒队列的方法
通过前面源码的分析,我们看到队列的唤醒存在于下面几处:
- Timer.cancel()
- Timer.sched()
- Timer.threadReaper.finalize()
第一点和第二点其实已经分析过了,下面我们来看看第三点。
privatefinalObjectthreadReaper=newObject(){
protectedvoidfinalize()throwsThrowable{
synchronized(queue){
thread.newTasksMayBeScheduled=false;
queue.notify();//Incasequeueisempty.
}
}
};
该方法用于在GC阶段对任务队列进行唤醒,此处往往被读者所遗忘。
那么,我们回过头来想一下,为什么需要这段代码呢?
我们在分析TimerThread的时候看到:如果Timer创建之后,没有被调度的话,将一直wait,从而陷入假死状态。为了避免这种情况,并发大师DougLea机智地想到了在finalize()中设置状态标记newTasksMayBeScheduled,并对任务队列进行唤醒操作(queue.notify()),将TimerThread从死循环中解救出来。
总结
首先,本文演示了Timer是如何使用的,然后分析了调度方法schedule()和scheduleAtFixedRate()的区别和联系。
然后,为了加深我们对Timer的理解,我们通过阅读源码的方式进行了深入的分析。可以看得出,其内部实现得非常巧妙,考虑得也很完善。
但是因为Timer串行执行的特性,限制了其在高并发下的运用。后面我们将深入分析高并发、分布式环境下的任务调度是如何实现的,让我们拭目以待吧~
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。