java 中ThreadPoolExecutor原理分析
java中ThreadPoolExecutor原理分析
线程池简介
Java线程池是开发中常用的工具,当我们有异步、并行的任务要处理时,经常会用到线程池,或者在实现一个服务器时,也需要使用线程池来接收连接处理请求。
线程池使用
JDK中提供的线程池实现位于java.util.concurrent.ThreadPoolExecutor。在使用时,通常使用ExecutorService接口,它提供了submit,invokeAll,shutdown等通用的方法。
在线程池配置方面,Executors类中提供了一些静态方法能够提供一些常用场景的线程池,如newFixedThreadPool,newCachedThreadPool,newSingleThreadExecutor等,这些方法最终都是调用到了ThreadPoolExecutor的构造函数。
ThreadPoolExecutor的包含所有参数的构造函数是
/**
*@paramcorePoolSizethenumberofthreadstokeepinthepool,even
*iftheyareidle,unless{@codeallowCoreThreadTimeOut}isset
*@parammaximumPoolSizethemaximumnumberofthreadstoallowinthe
*pool
*@paramkeepAliveTimewhenthenumberofthreadsisgreaterthan
*thecore,thisisthemaximumtimethatexcessidlethreads
*willwaitfornewtasksbeforeterminating.
*@paramunitthetimeunitforthe{@codekeepAliveTime}argument
*@paramworkQueuethequeuetouseforholdingtasksbeforetheyare
*executed.Thisqueuewillholdonlythe{@codeRunnable}
*taskssubmittedbythe{@codeexecute}method.
*@paramthreadFactorythefactorytousewhentheexecutor
*createsanewthread
*@paramhandlerthehandlertousewhenexecutionisblocked
*becausethethreadboundsandqueuecapacitiesarereached
publicThreadPoolExecutor(intcorePoolSize,
intmaximumPoolSize,
longkeepAliveTime,
TimeUnitunit,
BlockingQueueworkQueue,
ThreadFactorythreadFactory,
RejectedExecutionHandlerhandler){
if(corePoolSize<0||
maximumPoolSize<=0||
maximumPoolSize
- corePoolSize设置线程池的核心线程数,当添加新任务时,如果线程池中的线程数小于corePoolSize,则不管当前是否有线程闲置,都会创建一个新的线程来执行任务。
- maximunPoolSize是线程池中允许的最大的线程数
- workQueue用于存放排队的任务
- keepAliveTime是大于corePoolSize的线程闲置的超时时间
- handler用于在任务逸出、线程池关闭时的任务处理,线程池的线程增长策略为,当前线程数小于corePoolSize时,新增线程,当线程数=corePoolSize且corePoolSize时,只有在workQueue不能存放新的任务时创建新线程,超出的线程在闲置keepAliveTime后销毁。
实现(基于JDK1.8)
ThreadPoolExecutor中保存的状态有
当前线程池状态,包括RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。
当前有效的运行线程的数量。
将这两个状态放到一个int变量中,前三位作为线程池状态,后29位作为线程数量。
例如0b11100000000000000000000000000001,表示RUNNING,一个线程。
通过HashSet来存储工作者集合,访问该HashSet前必须先获取保护状态的mainLock:ReentrantLock
submit、execute
execute的执行方式为,首先检查当前worker数量,如果小于corePoolSize,则尝试add一个coreWorker。线程池在维护线程数量以及状态检查上做了大量检测。
publicvoidexecute(Runnablecommand){
intc=ctl.get();
//如果当期数量小于corePoolSize
if(workerCountOf(c)
addWorker方法实现
privatebooleanaddWorker(RunnablefirstTask,booleancore){
retry:
for(;;){
intc=ctl.get();
intrs=runStateOf(c);
//Checkifqueueemptyonlyifnecessary.
if(rs>=SHUTDOWN&&
!(rs==SHUTDOWN&&
firstTask==null&&
!workQueue.isEmpty()))
returnfalse;
for(;;){
intwc=workerCountOf(c);
if(wc>=CAPACITY||
wc>=(core?corePoolSize:maximumPoolSize))
returnfalse;
if(compareAndIncrementWorkerCount(c))
breakretry;
c=ctl.get();//Re-readctl
if(runStateOf(c)!=rs)
continueretry;
//elseCASfailedduetoworkerCountchange;retryinnerloop
}
}
booleanworkerStarted=false;
booleanworkerAdded=false;
Workerw=null;
try{
w=newWorker(firstTask);
finalThreadt=w.thread;
if(t!=null){
finalReentrantLockmainLock=this.mainLock;
mainLock.lock();
try{
//Recheckwhileholdinglock.
//BackoutonThreadFactoryfailureorif
//shutdownbeforelockacquired.
intrs=runStateOf(ctl.get());
if(rslargestPoolSize)
largestPoolSize=s;
workerAdded=true;
}
}finally{
mainLock.unlock();
}
if(workerAdded){
//如果添加成功,则启动该线程,执行Worker的run方法,Worker的run方法执行外部的runWorker(Worker)
t.start();
workerStarted=true;
}
}
}finally{
if(!workerStarted)
addWorkerFailed(w);
}
returnworkerStarted;
}
Worker类继承了AbstractQueuedSynchronizer获得了同步等待这样的功能。
privatefinalclassWorker
extendsAbstractQueuedSynchronizer
implementsRunnable
{
/**
*Thisclasswillneverbeserialized,butweprovidea
*serialVersionUIDtosuppressajavacwarning.
*/
privatestaticfinallongserialVersionUID=6138294804551838833L;
/**Threadthisworkerisrunningin.Nulliffactoryfails.*/
finalThreadthread;
/**Initialtasktorun.Possiblynull.*/
RunnablefirstTask;
/**Per-threadtaskcounter*/
volatilelongcompletedTasks;
/**
*CreateswithgivenfirsttaskandthreadfromThreadFactory.
*@paramfirstTaskthefirsttask(nullifnone)
*/
Worker(RunnablefirstTask){
setState(-1);//inhibitinterruptsuntilrunWorker
this.firstTask=firstTask;
this.thread=getThreadFactory().newThread(this);
}
/**DelegatesmainrunlooptoouterrunWorker*/
publicvoidrun(){
runWorker(this);
}
//Lockmethods
//
//Thevalue0representstheunlockedstate.
//Thevalue1representsthelockedstate.
protectedbooleanisHeldExclusively(){
returngetState()!=0;
}
protectedbooleantryAcquire(intunused){
if(compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
returntrue;
}
returnfalse;
}
protectedbooleantryRelease(intunused){
setExclusiveOwnerThread(null);
setState(0);
returntrue;
}
publicvoidlock(){acquire(1);}
publicbooleantryLock(){returntryAcquire(1);}
publicvoidunlock(){release(1);}
publicbooleanisLocked(){returnisHeldExclusively();}
voidinterruptIfStarted(){
Threadt;
if(getState()>=0&&(t=thread)!=null&&!t.isInterrupted()){
try{
t.interrupt();
}catch(SecurityExceptionignore){
}
}
}
runWorker(Worker)是Worker的轮询执行逻辑,不断地从工作队列中获取任务并执行它们。Worker每次执行任务前需要进行lock,防止在执行任务时被interrupt。
finalvoidrunWorker(Workerw){
Threadwt=Thread.currentThread();
Runnabletask=w.firstTask;
w.firstTask=null;
w.unlock();//allowinterrupts
booleancompletedAbruptly=true;
try{
while(task!=null||(task=getTask())!=null){
w.lock();
//Ifpoolisstopping,ensurethreadisinterrupted;
//ifnot,ensurethreadisnotinterrupted.This
//requiresarecheckinsecondcasetodealwith
//shutdownNowracewhileclearinginterrupt
if((runStateAtLeast(ctl.get(),STOP)||
(Thread.interrupted()&&
runStateAtLeast(ctl.get(),STOP)))&&
!wt.isInterrupted())
wt.interrupt();
try{
beforeExecute(wt,task);
Throwablethrown=null;
try{
task.run();
}catch(RuntimeExceptionx){
thrown=x;throwx;
}catch(Errorx){
thrown=x;throwx;
}catch(Throwablex){
thrown=x;thrownewError(x);
}finally{
afterExecute(task,thrown);
}
}finally{
task=null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly=false;
}finally{
processWorkerExit(w,completedAbruptly);
}
}
ThreadPoolExecutor的submit方法中将Callable包装成FutureTask后交给execute方法。
FutureTask
FutureTask继承于Runnable和Future,FutureTask定义的几个状态为
NEW,尚未执行
COMPLETING,正在执行
NORMAL,正常执行完成得到结果
EXCEPTIONAL,执行抛出异常
CANCELLED,执行被取消
INTERRUPTING,执行正在被中断
INTERRUPTED,已经中断。
其中关键的get方法
publicVget()throwsInterruptedException,ExecutionException{
ints=state;
if(s<=COMPLETING)
s=awaitDone(false,0L);
returnreport(s);
}
先获取当前状态,如果还未执行完成并且正常,则进入等待结果流程。在awaitDone不断循环获取当前状态,如果没有结果,则将自己通过CAS的方式添加到等待链表的头部,如果设置了超时,则LockSupport.parkNanos到指定的时间。
staticfinalclassWaitNode{
volatileThreadthread;
volatileWaitNodenext;
WaitNode(){thread=Thread.currentThread();}
}
privateintawaitDone(booleantimed,longnanos)
throwsInterruptedException{
finallongdeadline=timed?System.nanoTime()+nanos:0L;
WaitNodeq=null;
booleanqueued=false;
for(;;){
if(Thread.interrupted()){
removeWaiter(q);
thrownewInterruptedException();
}
ints=state;
if(s>COMPLETING){
if(q!=null)
q.thread=null;
returns;
}
elseif(s==COMPLETING)//cannottimeoutyet
Thread.yield();
elseif(q==null)
q=newWaitNode();
elseif(!queued)
queued=UNSAFE.compareAndSwapObject(this,waitersOffset,
q.next=waiters,q);
elseif(timed){
nanos=deadline-System.nanoTime();
if(nanos<=0L){
removeWaiter(q);
returnstate;
}
LockSupport.parkNanos(this,nanos);
}
else
LockSupport.park(this);
}
}
FutureTask的run方法是执行任务并设置结果的位置,首先判断当前状态是否为NEW并且将当前线程设置为执行线程,然后调用Callable的call获取结果后设置结果修改FutureTask状态。
publicvoidrun(){
if(state!=NEW||
!UNSAFE.compareAndSwapObject(this,runnerOffset,
null,Thread.currentThread()))
return;
try{
Callablec=callable;
if(c!=null&&state==NEW){
Vresult;
booleanran;
try{
result=c.call();
ran=true;
}catch(Throwableex){
result=null;
ran=false;
setException(ex);
}
if(ran)
set(result);
}
}finally{
//runnermustbenon-nulluntilstateissettledto
//preventconcurrentcallstorun()
runner=null;
//statemustbere-readafternullingrunnertoprevent
//leakedinterrupts
ints=state;
if(s>=INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!