futuretask源码分析(推荐)
FutureTask只实现RunnableFuture接口:
该接口继承了java.lang.Runnable和Future接口,也就是继承了这两个接口的特性。
1.可以不必直接继承Thread来生成子类,只要实现run方法,且把实例传入到Thread构造函数,Thread就可以执行该实例的run方法了(Thread(Runnable))。
2.可以让任务独立执行,get获取任务执行结果时,可以阻塞直至执行结果完成。也可以中断执行,判断执行状态等。
FutureTask是一个支持取消行为的异步任务执行器。该类实现了Future接口的方法。
如:1.取消任务执行
2.查询任务是否执行完成
3.获取任务执行结果(”get“任务必须得执行完成才能获取结果,否则会阻塞直至任务完成)。
注意:一旦任务执行完成,则不能执行取消任务或者重新启动任务。(除非一开始就使用runAndReset模式运行任务)
FutureTask支持执行两种任务,Callable或者Runnable的实现类。且可把FutureTask实例交由Executor执行。
源码部分(很简单):
publicclassFutureTaskimplementsRunnableFuture { /* *Revisionnotes:Thisdiffersfrompreviousversionsofthis *classthatreliedonAbstractQueuedSynchronizer,mainlyto *avoidsurprisingusersaboutretaininginterruptstatusduring *cancellationraces.Synccontrolinthecurrentdesignrelies *ona"state"fieldupdatedviaCAStotrackcompletion,along *withasimpleTreiberstacktoholdwaitingthreads. * *Stylenote:Asusual,webypassoverheadofusing *AtomicXFieldUpdatersandinsteaddirectlyuseUnsafeintrinsics. */ /** *Therunstateofthistask,initiallyNEW.Therunstate *transitionstoaterminalstateonlyinmethodsset, *setException,andcancel.Duringcompletion,statemaytakeon *transientvaluesofCOMPLETING(whileoutcomeisbeingset)or *INTERRUPTING(onlywhileinterruptingtherunnertosatisfya *cancel(true)).Transitionsfromtheseintermediatetofinal *statesusecheaperordered/lazywritesbecausevaluesareunique *andcannotbefurthermodified. * *Possiblestatetransitions: *NEW->COMPLETING->NORMAL *NEW->COMPLETING->EXCEPTIONAL *NEW->CANCELLED *NEW->INTERRUPTING->INTERRUPTED */ privatevolatileintstate; privatestaticfinalintNEW=0; privatestaticfinalintCOMPLETING=1; privatestaticfinalintNORMAL=2; privatestaticfinalintEXCEPTIONAL=3; privatestaticfinalintCANCELLED=4; privatestaticfinalintINTERRUPTING=5; privatestaticfinalintINTERRUPTED=6; /**Theunderlyingcallable;nulledoutafterrunning*/ privateCallable callable; /**用来存储任务执行结果或者异常对象,根据任务state在get时候选择返回执行结果还是抛出异常*/ privateObjectoutcome;//non-volatile,protectedbystatereads/writes /**当前运行Run方法的线程*/ privatevolatileThreadrunner; /**Treiberstackofwaitingthreads*/ privatevolatileWaitNodewaiters; /** *Returnsresultorthrowsexceptionforcompletedtask. * *@paramscompletedstatevalue */ @SuppressWarnings("unchecked") privateVreport(ints)throwsExecutionException{ Objectx=outcome; if(s==NORMAL) return(V)x; if(s>=CANCELLED) thrownewCancellationException(); thrownewExecutionException((Throwable)x); } /** *Createsa{@codeFutureTask}thatwill,uponrunning,executethe *given{@codeCallable}. * *@paramcallablethecallabletask *@throwsNullPointerExceptionifthecallableisnull */ publicFutureTask(Callable callable){ if(callable==null) thrownewNullPointerException(); this.callable=callable; this.state=NEW;//ensurevisibilityofcallable } /** *Createsa{@codeFutureTask}thatwill,uponrunning,executethe *given{@codeRunnable},andarrangethat{@codeget}willreturnthe *givenresultonsuccessfulcompletion. * *@paramrunnabletherunnabletask *@paramresulttheresulttoreturnonsuccessfulcompletion.If *youdon'tneedaparticularresult,considerusing *constructionsoftheform: *{@codeFuture>f=newFutureTask (runnable,null)} *@throwsNullPointerExceptioniftherunnableisnull */ publicFutureTask(Runnablerunnable,Vresult){ this.callable=Executors.callable(runnable,result); this.state=NEW;//ensurevisibilityofcallable } //判断任务是否已取消(异常中断、取消等) publicbooleanisCancelled(){ returnstate>=CANCELLED; } /** 判断任务是否已结束(取消、异常、完成、NORMAL都等于结束) ** publicbooleanisDone(){ returnstate!=NEW; } /** mayInterruptIfRunning用来决定任务的状态。 true:任务状态=INTERRUPTING=5。如果任务已经运行,则强行中断。如果任务未运行,那么则不会再运行 false:CANCELLED=4。如果任务已经运行,则允许运行完成(但不能通过get获取结果)。如果任务未运行,那么则不会再运行 **/ publicbooleancancel(booleanmayInterruptIfRunning){ if(state!=NEW) returnfalse; if(mayInterruptIfRunning){ if(!UNSAFE.compareAndSwapInt(this,stateOffset,NEW,INTERRUPTING)) returnfalse; Threadt=runner; if(t!=null) t.interrupt(); UNSAFE.putOrderedInt(this,stateOffset,INTERRUPTED);//finalstate } elseif(!UNSAFE.compareAndSwapInt(this,stateOffset,NEW,CANCELLED)) returnfalse; finishCompletion(); returntrue; } /** *@throwsCancellationException{@inheritDoc} */ publicVget()throwsInterruptedException,ExecutionException{ ints=state; //如果任务未彻底完成,那么则阻塞直至任务完成后唤醒该线程 if(s<=COMPLETING) s=awaitDone(false,0L); returnreport(s); } /** *@throwsCancellationException{@inheritDoc} */ publicVget(longtimeout,TimeUnitunit) throwsInterruptedException,ExecutionException,TimeoutException{ if(unit==null) thrownewNullPointerException(); ints=state; if(s<=COMPLETING&& (s=awaitDone(true,unit.toNanos(timeout)))<=COMPLETING) thrownewTimeoutException(); returnreport(s); } /** *Protectedmethodinvokedwhenthistasktransitionstostate *{@codeisDone}(whethernormallyorviacancellation).The *defaultimplementationdoesnothing.Subclassesmayoverride *thismethodtoinvokecompletioncallbacksorperform *bookkeeping.Notethatyoucanquerystatusinsidethe *implementationofthismethodtodeterminewhetherthistask *hasbeencancelled. */ protectedvoiddone(){} /** 该方法在FutureTask里只有run方法在任务完成后调用。 主要保存任务执行结果到成员变量outcome中,和切换任务执行状态。 由该方法可以得知: COMPLETING:任务已执行完成(也可能是异常完成),但还未设置结果到成员变量outcome中,也意味着还不能get NORMAL:任务彻底执行完成 **/ protectedvoidset(Vv){ if(UNSAFE.compareAndSwapInt(this,stateOffset,NEW,COMPLETING)){ outcome=v; UNSAFE.putOrderedInt(this,stateOffset,NORMAL);//finalstate finishCompletion(); } } /** *Causesthisfuturetoreportan{@linkExecutionException} *withthegiventhrowableasitscause,unlessthisfuturehas *alreadybeensetorhasbeencancelled. * * Thismethodisinvokedinternallybythe{@link#run}method *uponfailureofthecomputation. * *@paramtthecauseoffailure */ protectedvoidsetException(Throwablet){ if(UNSAFE.compareAndSwapInt(this,stateOffset,NEW,COMPLETING)){ outcome=t; UNSAFE.putOrderedInt(this,stateOffset,EXCEPTIONAL);//finalstate finishCompletion(); } } /** 由于实现了Runnable接口的缘故,该方法可由执行线程所调用。 **/ publicvoidrun(){ //只有当任务状态=new时才被运行继续执行 if(state!=NEW|| !UNSAFE.compareAndSwapObject(this,runnerOffset, null,Thread.currentThread())) return; try{ Callable
c=callable; if(c!=null&&state==NEW){ Vresult; booleanran; try{ //调用Callable的Call方法 result=c.call(); ran=true; }catch(Throwableex){ result=null; ran=false; setException(ex); } if(ran) set(result); } }finally{ //runnermustbenon-nulluntilstateissettledto //preventconcurrentcallstorun() runner=null; //statemustbere-readafternullingrunnertoprevent //leakedinterrupts ints=state; if(s>=INTERRUPTING) handlePossibleCancellationInterrupt(s); } } /** 如果该任务在执行过程中不被取消或者异常结束,那么该方法不记录任务的执行结果,且不修改任务执行状态。 所以该方法可以重复执行N次。不过不能直接调用,因为是protected权限。 **/ protectedbooleanrunAndReset(){ if(state!=NEW|| !UNSAFE.compareAndSwapObject(this,runnerOffset, null,Thread.currentThread())) returnfalse; booleanran=false; ints=state; try{ Callable c=callable; if(c!=null&&s==NEW){ try{ c.call();//don'tsetresult ran=true; }catch(Throwableex){ setException(ex); } } }finally{ //runnermustbenon-nulluntilstateissettledto //preventconcurrentcallstorun() runner=null; //statemustbere-readafternullingrunnertoprevent //leakedinterrupts s=state; if(s>=INTERRUPTING) handlePossibleCancellationInterrupt(s); } returnran&&s==NEW; } /** *Ensuresthatanyinterruptfromapossiblecancel(true)isonly *deliveredtoataskwhileinrunorrunAndReset. */ privatevoidhandlePossibleCancellationInterrupt(ints){ //Itispossibleforourinterruptertostallbeforegettinga //chancetointerruptus.Let'sspin-waitpatiently. if(s==INTERRUPTING) while(state==INTERRUPTING) Thread.yield();//waitoutpendinginterrupt //assertstate==INTERRUPTED; //Wewanttoclearanyinterruptwemayhavereceivedfrom //cancel(true).However,itispermissibletouseinterrupts //asanindependentmechanismforatasktocommunicatewith //itscaller,andthereisnowaytoclearonlythe //cancellationinterrupt. // //Thread.interrupted(); } /** *SimplelinkedlistnodestorecordwaitingthreadsinaTreiber *stack.SeeotherclassessuchasPhaserandSynchronousQueue *formoredetailedexplanation. */ staticfinalclassWaitNode{ volatileThreadthread; volatileWaitNodenext; WaitNode(){thread=Thread.currentThread();} } /** 该方法在任务完成(包括异常完成、取消)后调用。删除所有正在get获取等待的节点且唤醒节点的线程。和调用done方法和置空callable. **/ privatevoidfinishCompletion(){ //assertstate>COMPLETING; for(WaitNodeq;(q=waiters)!=null;){ if(UNSAFE.compareAndSwapObject(this,waitersOffset,q,null)){ for(;;){ Threadt=q.thread; if(t!=null){ q.thread=null; LockSupport.unpark(t); } WaitNodenext=q.next; if(next==null) break; q.next=null;//unlinktohelpgc q=next; } break; } } done(); callable=null;//toreducefootprint } /** 阻塞等待任务执行完成(中断、正常完成、超时) **/ privateintawaitDone(booleantimed,longnanos) throwsInterruptedException{ finallongdeadline=timed?System.nanoTime()+nanos:0L; WaitNodeq=null; booleanqueued=false; for(;;){ /** 这里的ifelse的顺序也是有讲究的。 1.先判断线程是否中断,中断则从队列中移除(也可能该线程不存在于队列中) 2.判断当前任务是否执行完成,执行完成则不再阻塞,直接返回。 3.如果任务状态=COMPLETING,证明该任务处于已执行完成,正在切换任务执行状态,CPU让出片刻即可 4.q==null,则证明还未创建节点,则创建节点 5.q节点入队 6和7.阻塞 **/ if(Thread.interrupted()){ removeWaiter(q); thrownewInterruptedException(); } ints=state; if(s>COMPLETING){ if(q!=null) q.thread=null; returns; } elseif(s==COMPLETING)//cannottimeoutyet Thread.yield(); elseif(q==null) q=newWaitNode(); elseif(!queued) queued=UNSAFE.compareAndSwapObject(this,waitersOffset, q.next=waiters,q); elseif(timed){ nanos=deadline-System.nanoTime(); if(nanos<=0L){ removeWaiter(q); returnstate; } LockSupport.parkNanos(this,nanos); } else LockSupport.park(this); } } /** *Triestounlinkatimed-outorinterruptedwaitnodetoavoid *accumulatinggarbage.Internalnodesaresimplyunspliced *withoutCASsinceitisharmlessiftheyaretraversedanyway *byreleasers.Toavoideffectsofunsplicingfromalready *removednodes,thelistisretraversedincaseofanapparent *race.Thisisslowwhentherearealotofnodes,butwedon't *expectliststobelongenoughtooutweighhigher-overhead *schemes. */ privatevoidremoveWaiter(WaitNodenode){ if(node!=null){ node.thread=null; retry: for(;;){//restartonremoveWaiterrace for(WaitNodepred=null,q=waiters,s;q!=null;q=s){ s=q.next; if(q.thread!=null) pred=q; elseif(pred!=null){ pred.next=s; if(pred.thread==null)//checkforrace continueretry; } elseif(!UNSAFE.compareAndSwapObject(this,waitersOffset, q,s)) continueretry; } break; } } } //Unsafemechanics privatestaticfinalsun.misc.UnsafeUNSAFE; privatestaticfinallongstateOffset; privatestaticfinallongrunnerOffset; privatestaticfinallongwaitersOffset; static{ try{ UNSAFE=sun.misc.Unsafe.getUnsafe(); Class>k=FutureTask.class; stateOffset=UNSAFE.objectFieldOffset (k.getDeclaredField("state")); runnerOffset=UNSAFE.objectFieldOffset (k.getDeclaredField("runner")); waitersOffset=UNSAFE.objectFieldOffset (k.getDeclaredField("waiters")); }catch(Exceptione){ thrownewError(e); } } }
总结
以上就是本文关于futuretask源码分析(推荐)的全部内容,希望对大家有所帮助。感兴趣的朋友可以参阅:Java利用future及时获取多线程运行结果、浅谈Java多线程处理中Future的妙用(附源码)、futuretask用法及使用场景介绍等,有什么问题可以随时留言,欢迎大家一起交流讨论。