Java多线程之异步Future机制的原理和实现
项目中经常有些任务需要异步(提交到线程池中)去执行,而主线程往往需要知道异步执行产生的结果,这时我们要怎么做呢?用runnable是无法实现的,我们需要用callable看下面的代码:
importjava.util.concurrent.Callable; importjava.util.concurrent.ExecutionException; importjava.util.concurrent.ExecutorService; importjava.util.concurrent.Executors; importjava.util.concurrent.Future; publicclassAddTaskimplementsCallable<Integer>{ privateinta,b; publicAddTask(inta,intb){ this.a=a; this.b=b; } @Override publicIntegercallthrowsException{ Integerresult=a+b; returnresult; } publicstaticvoidmain(String[]args)throwsInterruptedException,ExecutionException{ ExecutorServiceexecutor=Executors.newSingleThreadExecutor; //JDK目前为止返回的都是FutureTask的实例 Future<Integer>future=executor.submit(newAddTask(1,2)); Integerresult=future.get;//只有当future的状态是已完成时(future.isDone=true),get方法才会返回 } }
虽然可以实现获取异步执行结果的需求,但是我们发现这个Future其实很不好用,因为它没有提供通知的机制,也就是说我们不知道future什么时候完成(如果我们需要轮询isDone()来判断的话感觉就没有用这个的必要了)。看下java.util.concurrent.future.Future的接口方法:
publicinterfaceFuture<V>{ booleancancel(booleanmayInterruptIfRunning); booleanisCancelled; booleanisDone; VgetthrowsInterruptedException,ExecutionException; Vget(longtimeout,TimeUnitunit) throwsInterruptedException,ExecutionException,TimeoutException; }
由此可见JDK的Future机制其实并不好用,如果能给这个future加个监听器,让它在完成时通知监听器的话就比较好用了,就像下面这个IFuture:
packagefuture; importjava.util.concurrent.CancellationException; importjava.util.concurrent.Future; importjava.util.concurrent.TimeUnit; /** *Theresultofanasynchronousoperation. * *@authorlixiaohui *@param<V>执行结果的类型参数 */ publicinterfaceIFuture<V>extendsFuture<V>{ booleanisSuccess;//是否成功 VgetNow;//立即返回结果(不管Future是否处于完成状态) Throwablecause;//若执行失败时的原因 booleanisCancellable;//是否可以取消 IFuture<V>awaitthrowsInterruptedException;//等待future的完成 booleanawait(longtimeoutMillis)throwsInterruptedException;//超时等待future的完成 booleanawait(longtimeout,TimeUnittimeunit)throwsInterruptedException; IFuture<V>awaitUninterruptibly;//等待future的完成,不响应中断 booleanawaitUninterruptibly(longtimeoutMillis);//超时等待future的完成,不响应中断 booleanawaitUninterruptibly(longtimeout,TimeUnittimeunit); IFuture<V>addListener(IFutureListener<V>l);//当future完成时,会通知这些加进来的监听器 IFuture<V>removeListener(IFutureListener<V>l); }
接下来就一起来实现这个IFuture,在这之前要说明下Object.wait,Object.notifyAll方法,因为整个Future实现的原���的核心就是这两个方法.看看JDK里面的解释:
publicclassObject{ /** *Causesthecurrentthreadtowaituntilanotherthreadinvokesthe *{@linkjava.lang.Object#notify}methodorthe *{@linkjava.lang.Object#notifyAll}methodforthisobject. *Inotherwords,thismethodbehavesexactlyasifitsimply *performsthecall{@codewait(0)}. *调用该方法后,当前线程会释放对象监视器锁,并让出CPU使用权。直到别的线程调用notify/notifyAll */ publicfinalvoidwaitthrowsInterruptedException{ wait(0); } /** *Wakesupallthreadsthatarewaitingonthisobject'smonitor.A *threadwaitsonanobject'smonitorbycallingoneofthe *{@codewait}methods. *<p> *Theawakenedthreadswillnotbeabletoproceeduntilthecurrent *threadrelinquishesthelockonthisobject.Theawakenedthreads *willcompeteintheusualmannerwithanyotherthreadsthatmight *beactivelycompetingtosynchronizeonthisobject;forexample, *theawakenedthreadsenjoynoreliableprivilegeordisadvantagein *beingthenextthreadtolockthisobject. */ publicfinalnativevoidnotifyAll; }
知道这个后,我们要自己实现Future也就有了思路,当线程调用了IFuture.await等一系列的方法时,如果Future还未完成,那么就调用future.wait方法使线程进入WAITING状态。而当别的线程设置Future为完成状态(注意这里的完成状态包括正常结束和异常结束)时,就需要调用future.notifyAll方法来唤醒之前因为调用过wait方法而处于WAITING状态的那些线程。完整的实现如下(代码应该没有很难理解的地方,我是参考netty的Future机制的。有兴趣的可以去看看netty的源码):
packagefuture; importjava.util.Collection; importjava.util.concurrent.CancellationException; importjava.util.concurrent.CopyOnWriteArrayList; importjava.util.concurrent.ExecutionException; importjava.util.concurrent.TimeUnit; importjava.util.concurrent.TimeoutException; /** *<pre> *正常结束时,若执行的结果不为null,则result为执行结果;若执行结果为null,则result={@linkAbstractFuture#SUCCESS_SIGNAL} *异常结束时,result为{@linkCauseHolder}的实例;若是被取消而导致的异常结束,则result为{@linkCancellationException}的实例,否则为其它异常的实例 *以下情况会使异步操作由未完成状态转至已完成状态,也就是在以下情况发生时调用notifyAll方法: *<ul> *<li>异步操作被取消时(cancel方法)</li> *<li>异步操作正常结束时(setSuccess方法)</li> *<li>异步操作异常结束时(setFailure方法)</li> *</ul> *</pre> * *@authorlixiaohui * *@param<V> *异步执行结果的类型 */ publicclassAbstractFuture<V>implementsIFuture<V>{ protectedvolatileObjectresult;//需要保证其可见性 /** *监听器集 */ protectedCollection<IFutureListener<V>>listeners=newCopyOnWriteArrayList<IFutureListener<V>>; /** *当任务正常执行结果为null时,即客户端调用{@linkAbstractFuture#setSuccess(null)}时, *result引用该对象 */ privatestaticfinalSuccessSignalSUCCESS_SIGNAL=newSuccessSignal; @Override publicbooleancancel(booleanmayInterruptIfRunning){ if(isDone){//已完成了不能取消 returnfalse; } synchronized(this){ if(isDone){//doublecheck returnfalse; } result=newCauseHolder(newCancellationException); notifyAll;//isDone=true,通知等待在该对象的wait的线程 } notifyListeners;//通知监听器该异步操作已完成 returntrue; } @Override publicbooleanisCancellable{ returnresult==null; } @Override publicbooleanisCancelled{ returnresult!=null&&resultinstanceofCauseHolder&&((CauseHolder)result).causeinstanceofCancellationException; } @Override publicbooleanisDone{ returnresult!=null; } @Override publicVgetthrowsInterruptedException,ExecutionException{ await;//等待执行结果 Throwablecause=cause; if(cause==null){//没有发生异常,异步操作正常结束 returngetNow; } if(causeinstanceofCancellationException){//异步操作被取消了 throw(CancellationException)cause; } thrownewExecutionException(cause);//其他异常 } @Override publicVget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException{ if(await(timeout,unit)){//超时等待执行结果 Throwablecause=cause; if(cause==null){//没有发生异常,异步操作正常结束 returngetNow; } if(causeinstanceofCancellationException){//异步操作被取消了 throw(CancellationException)cause; } thrownewExecutionException(cause);//其他异常 } //时间到了异步操作还没有结束,抛出超时异常 thrownewTimeoutException; } @Override publicbooleanisSuccess{ returnresult==null?false:!(resultinstanceofCauseHolder); } @SuppressWarnings("unchecked") @Override publicVgetNow{ return(V)(result==SUCCESS_SIGNAL?null:result); } @Override publicThrowablecause{ if(result!=null&&resultinstanceofCauseHolder){ return((CauseHolder)result).cause; } returnnull; } @Override publicIFuture<V>addListener(IFutureListener<V>listener){ if(listener==null){ thrownewNullPointerException("listener"); } if(isDone){//若已完成直接通知该监听器 notifyListener(listener); returnthis; } synchronized(this){ if(!isDone){ listeners.add(listener); returnthis; } } notifyListener(listener); returnthis; } @Override publicIFuture<V>removeListener(IFutureListener<V>listener){ if(listener==null){ thrownewNullPointerException("listener"); } if(!isDone){ listeners.remove(listener); } returnthis; } @Override publicIFuture<V>awaitthrowsInterruptedException{ returnawait0(true); } privateIFuture<V>await0(booleaninterruptable)throwsInterruptedException{ if(!isDone){//若已完成就直接返回了 //若允许终端且被中断了则抛出中断异常 if(interruptable&&Thread.interrupted){ thrownewInterruptedException("thread"+Thread.currentThread.getName+"hasbeeninterrupted."); } booleaninterrupted=false; synchronized(this){ while(!isDone){ try{ wait;//释放锁进入waiting状态,等待其它线程调用本对象的notify/notifyAll方法 }catch(InterruptedExceptione){ if(interruptable){ throwe; }else{ interrupted=true; } } } } if(interrupted){ //为什么这里要设中断标志位?因为从wait方法返回后,中断标志是被clear了的, //这里重新设置以便让其它代码知道这里被中断了。 Thread.currentThread.interrupt; } } returnthis; } @Override publicbooleanawait(longtimeoutMillis)throwsInterruptedException{ returnawait0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis),true); } @Override publicbooleanawait(longtimeout,TimeUnitunit)throwsInterruptedException{ returnawait0(unit.toNanos(timeout),true); } privatebooleanawait0(longtimeoutNanos,booleaninterruptable)throwsInterruptedException{ if(isDone){ returntrue; } if(timeoutNanos<=0){ returnisDone; } if(interruptable&&Thread.interrupted){ thrownewInterruptedException(toString); } longstartTime=timeoutNanos<=0?0:System.nanoTime; longwaitTime=timeoutNanos; booleaninterrupted=false; try{ synchronized(this){ if(isDone){ returntrue; } if(waitTime<=0){ returnisDone; } for(;;){ try{ wait(waitTime/1000000,(int)(waitTime%1000000)); }catch(InterruptedExceptione){ if(interruptable){ throwe; }else{ interrupted=true; } } if(isDone){ returntrue; }else{ waitTime=timeoutNanos-(System.nanoTime-startTime); if(waitTime<=0){ returnisDone; } } } } }finally{ if(interrupted){ Thread.currentThread.interrupt; } } } @Override publicIFuture<V>awaitUninterruptibly{ try{ returnawait0(false); }catch(InterruptedExceptione){//这里若抛异常了就无法处理了 thrownewjava.lang.InternalError; } } @Override publicbooleanawaitUninterruptibly(longtimeoutMillis){ try{ returnawait0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis),false); }catch(InterruptedExceptione){ thrownewjava.lang.InternalError; } } @Override publicbooleanawaitUninterruptibly(longtimeout,TimeUnitunit){ try{ returnawait0(unit.toNanos(timeout),false); }catch(InterruptedExceptione){ thrownewjava.lang.InternalError; } } protectedIFuture<V>setFailure(Throwablecause){ if(setFailure0(cause)){ notifyListeners; returnthis; } thrownewIllegalStateException("completealready:"+this); } privatebooleansetFailure0(Throwablecause){ if(isDone){ returnfalse; } synchronized(this){ if(isDone){ returnfalse; } result=newCauseHolder(cause); notifyAll; } returntrue; } protectedIFuture<V>setSuccess(Objectresult){ if(setSuccess0(result)){//设置成功后通知监听器 notifyListeners; returnthis; } thrownewIllegalStateException("completealready:"+this); } privatebooleansetSuccess0(Objectresult){ if(isDone){ returnfalse; } synchronized(this){ if(isDone){ returnfalse; } if(result==null){//异步操作正常执行完毕的结果是null this.result=SUCCESS_SIGNAL; }else{ this.result=result; } notifyAll; } returntrue; } privatevoidnotifyListeners{ for(IFutureListener<V>l:listeners){ notifyListener(l); } } privatevoidnotifyListener(IFutureListener<V>l){ try{ l.operationCompleted(this); }catch(Exceptione){ e.printStackTrace; } } privatestaticclassSuccessSignal{ } privatestaticfinalclassCauseHolder{ finalThrowablecause; CauseHolder(Throwablecause){ this.cause=cause; } } }
那么要怎么使用这个呢,有了上面的骨架实现,我们就可以定制各种各样的异步结果了。下面模拟一下一个延时的任务:
packagefuture.test; importfuture.IFuture; importfuture.IFutureListener; /** *延时加法 *@authorlixiaohui * */ publicclassDelayAdder{ publicstaticvoidmain(String[]args){ newDelayAdder.add(3*1000,1,2).addListener(newIFutureListener<Integer>{ @Override publicvoidoperationCompleted(IFuture<Integer>future)throwsException{ System.out.println(future.getNow); } }); } /** *延迟加 *@paramdelay延时时长milliseconds *@parama加数 *@paramb加数 *@return异步结果 */ publicDelayAdditionFutureadd(longdelay,inta,intb){ DelayAdditionFuturefuture=newDelayAdditionFuture; newThread(newDelayAdditionTask(delay,a,b,future)).start; returnfuture; } privateclassDelayAdditionTaskimplementsRunnable{ privatelongdelay; privateinta,b; privateDelayAdditionFuturefuture; publicDelayAdditionTask(longdelay,inta,intb,DelayAdditionFuturefuture){ super; this.delay=delay; this.a=a; this.b=b; this.future=future; } @Override publicvoidrun{ try{ Thread.sleep(delay); Integeri=a+b; //TODO这里设置future为完成状态(正常执行完毕) future.setSuccess(i); }catch(InterruptedExceptione){ //TODO这里设置future为完成状态(异常执行完毕) future.setFailure(e.getCause); } } } }packagefuture.test; importfuture.AbstractFuture; importfuture.IFuture; //只是把两个方法对外暴露 publicclassDelayAdditionFutureextendsAbstractFuture<Integer>{ @Override publicIFuture<Integer>setSuccess(Objectresult){ returnsuper.setSuccess(result); } @Override publicIFuture<Integer>setFailure(Throwablecause){ returnsuper.setFailure(cause); } }
可以看到客户端不用主动去询问future是否完成,而是future完成时自动回调operationcompleted方法,客户端只需在回调里实现逻辑即可。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。