Java中Future、FutureTask原理以及与线程池的搭配使用
Java中的Future和Future通常和线程池搭配使用,用来获取线程池返回执行后的返回值。我们假设通过Executors工厂方法构建一个线程池es,es要执行某个任务有两种方式,一种是执行es.execute(runnable),这种情况是没有返回值的;另外一种情况是执行es.submit(runnale)或者es.submit(callable),这种情况会返回一个Future的对象,然后调用Future的get()来获取返回值。
Future
publicinterfaceFuture{ booleancancel(booleanmayInterruptIfRunning); booleanisCancelled(); booleanisDone(); Vget()throwsInterruptedException,ExecutionException; Vget(longtimeout,TimeUnitunit) throwsInterruptedException,ExecutionException,TimeoutException; }
Future是一个接口,他提供给了我们方法来检测当前的任务是否已经结束,还可以等待任务结束并且拿到一个结果,通过调用Future的get()方法可以当任务结束后返回一个结果值,如果工作没有结束,则会阻塞当前线程,直到任务执行完毕,我们可以通过调用cancel()方法来停止一个任务,如果任务已经停止,则cancel()方法会返回true;如果任务已经完成或者已经停止了或者这个任务无法停止,则cancel()会返回一个false。当一个任务被成功停止后,他无法再次执行。isDone()和isCancel()方法可以判断当前工作是否完成和是否取消。
线程池中有以下几个方法:
publicFuture>submit(Runnabletask){ if(task==null)thrownewNullPointerException(); RunnableFutureftask=newTaskFor(task,null); execute(ftask); returnftask; }
publicFuture submit(Runnabletask,Tresult){ if(task==null)thrownewNullPointerException(); RunnableFuture ftask=newTaskFor(task,result); execute(ftask); returnftask; }
publicFuture submit(Callable task){ if(task==null)thrownewNullPointerException(); RunnableFuture ftask=newTaskFor(task); execute(ftask); returnftask; }
都返回一个Future对象,仔细查看发现,所有的方法最终都将runnable或者callable转变成一个RunnableFuture的对象,这个RunnableFutre的对象是一个同时继承了Runnable和Future的接口
publicinterfaceRunnableFutureextendsRunnable,Future { voidrun(); }
然后调用executor(runnable)方法,关于executor(runnable)的具体实现我们后面再讲。最后返回一个RunnableFuture对象。RunnableFuture这个接口直有一个具体的实现类,那就时我们接下来要讲的FutureTask。
FutureTask
publicclassFutureTaskimplementsRunnableFuture
FutureTask实现了RunnableFuture的接口,既然我们知道最终返回的是一个FutureTask对象ftask,而且我们可以通过ftask.get()可以的来得到execute(task)的返回值,这个过程具体事怎么实现的了??这个也是本篇文章的所要讲的。
我们可以先来猜测一下它的实现过程,首先Runnable的run()是没有返回值的,所以当es.submit()的参数只有一个Runnable对象的时候,通过ftask.get()得到的也是一个null值,当参数还有一个result的时候,就返回这个result;如果参数是一个Callable的对象的时候,Callable的call()是有返回值的,同时这个call()方法会在转换的Runable对象ftask的run()方法中被调用,然后将它的返回值赋值给一个全局变量,最后在ftask的get()方法中得到这个值。猜想对不对了?我们直接看源码。
将Runnable对象转为RunnableFuture的方法:
protectedRunnableFuture newTaskFor(Runnablerunnable,Tvalue){ returnnewFutureTask (runnable,value); }
publicFutureTask(Runnablerunnable,Vresult){ this.callable=Executors.callable(runnable,result); this.state=NEW;//ensurevisibilityofcallable }
Executors::callable()
publicstaticCallable callable(Runnabletask,Tresult){ if(task==null) thrownewNullPointerException(); returnnewRunnableAdapter (task,result); }
Executors的内部类RunnableAdapter
staticfinalclassRunnableAdapterimplementsCallable { finalRunnabletask; finalTresult; RunnableAdapter(Runnabletask,Tresult){ this.task=task; this.result=result; } publicTcall(){ task.run(); returnresult; } }
将Callable对象转为RunnableFuture的方法:
protectedRunnableFuture newTaskFor(Callable callable){ returnnewFutureTask (callable); }
publicFutureTask(Callablecallable){ if(callable==null) thrownewNullPointerException(); this.callable=callable; this.state=NEW;//ensurevisibilityofcallable }
接下来我们来看execute(runnbale)的执行过程:
execute(runnable)最终的实现是在ThreadPoolExecutor,基本上所有的线程池都是通过ThreadPoolExecutor的构造方法传入不同的参数来构造的。
ThreadPoolExecutor::executor(runnable):
publicvoidexecute(Runnablecommand){ if(command==null) thrownewNullPointerException(); intc=ctl.get(); if(workerCountOf(c)如上所示,这个过程分为三步:
Step1:
如果当前线程池的的线程小于核心线程的数量的时候,就会调用addWorker检查运行状态和正在运行的线程数量,通过返回false来防止错误地添加线程,然后执行当前任务。
Step2:
否则当前线程池的的线程大于核心线程的数量的时候,我们仍然需要先判断是否需要添加一个新的线程来执行这个任务,因为可能已经存在的线程此刻任务执行完毕处于空闲状态,这个时候可以直接复用。否则创建一个新的线程来执行此任务。
Step3:
如果不能再添加新的任务,就拒绝。
执行execute(runnable)最终会回调runnable的run()方法,也就是FutureTask的对象ftask的run()方法,源码如下:
publicvoidrun(){ if(state!=NEW|| !UNSAFE.compareAndSwapObject(this,runnerOffset, null,Thread.currentThread())) return; try{ Callablec=callable; if(c!=null&&state==NEW){ Vresult; booleanran; try{ result=c.call(); ran=true; }catch(Throwableex){ result=null; ran=false; setException(ex); } if(ran) set(result); } }finally{ //runnermustbenon-nulluntilstateissettledto //preventconcurrentcallstorun() runner=null; //statemustbere-readafternullingrunnertoprevent //leakedinterrupts ints=state; if(s>=INTERRUPTING) handlePossibleCancellationInterrupt(s); } } 通过执行result=c.call()拿到返回值,然后set(result),因此get()方法获得的值正是这个result。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。