JUC之Semaphore源码分析
Semaphore主要用于限量控制并发执行代码的工具类,其内部通过一个permit来进行定义并发执行的数量。
/**
*使用非公平版本构件Semaphore
*/
publicKSemaphore(intpermits){
sync=newNonfairSync(permits);
}
/**
*指定版本构件Semaphore
*/
publicKSemaphore(intpermits,booleanfair){
sync=fair?newFairSync(permits):newNonfairSync(permits);
}
/**AQS的子类主要定义获取释放lock*/
abstractstaticclassSyncextendsKAbstractQueuedSynchronizer{
privatestaticfinallongserialVersionUID=1192457210091910933L;
/**
*指定permit初始化Semaphore
*/
Sync(intpermits){
setState(permits);
}
/**
*返回剩余permit
*/
finalintgetPermits(){
returngetState();
}
/**
*获取permit
*/
finalintnonfairTryAcquireShared(intacquires){
for(;;){
intavailable=getState();
intremaining=available-acquires;//判断获取acquires的剩余permit数目
if(remaining<0||
compareAndSetState(available,remaining)){//cas改变state
returnremaining;
}
}
}
/**
*释放lock
*/
protectedfinalbooleantryReleaseShared(intreleases){
for(;;){
intcurrent=getState();
intnext=current+releases;
if(nextcurrent){//underflow
thrownewError("Permitcountunderflow");
}
if(compareAndSetState(current,next)){
return;
}
}
}
/**将permit置为0*/
finalintdrainPermits(){
for(;;){
intcurrent=getState();
if(current==0||compareAndSetState(current,0)){
returncurrent;
}
}
}
}
/**
*调用acquireSharedInterruptibly响应中断的方式获取permit
*/
publicvoidacquire()throwsInterruptedException{
sync.acquireSharedInterruptibly(1);
}
/**
*调用acquireUninterruptibly非响应中断的方式获取permit
*/
publicvoidacquireUninterruptibly(){
sync.acquireShared(1);
}
/**
*尝试获取permit
*/
publicbooleantryAcquire(){
returnsync.nonfairTryAcquireShared(1)>=0;
}
/**
*尝试的获取permit,支持超时与中断
*/
publicbooleantryAcquire(longtimeout,TimeUnitunit)throwsInterruptedException{
returnsync.tryAcquireSharedNanos(1,unit.toNanos(timeout));
}
/**
*支持中断的获取permit
*/
publicvoidacquire(intpermits)throwsInterruptedException{
if(permits<0){
thrownewIllegalArgumentException();
}
sync.acquireSharedInterruptibly(permits);
}
/**
*不响应中断的获取permit
*/
publicvoidacquireUninterruptibly(intpermits){
if(permits<0)thrownewIllegalArgumentException();
sync.acquireShared(permits);
}
/**
*尝试获取permit
*/
publicbooleantryAcquire(intpermits){
if(permits<0)thrownewIllegalArgumentException();
returnsync.nonfairTryAcquireShared(permits)>=0;
}
/**
*尝试支持超时机制,支持中断的获取permit
*/
publicbooleantryAcquire(intpermits,longtimout,TimeUnitunit)throwsInterruptedException{
if(permits<0)thrownewIllegalArgumentException();
returnsync.tryAcquireSharedNanos(permits,unit.toNanos(timout));
}
/**
*释放permit
*/
publicvoidrelease(){
sync.releaseShared(1);
}
/**
*释放permit
*/
publicvoidrelease(intpermits){
if(permits<0)thrownewIllegalArgumentException();
sync.releaseShared(permits);
}
/**
*返回可用的permit
*/
publicintavailablePermits(){
returnsync.getPermits();
}
/**
*消耗光permit
*/
publicintdrainPermits(){
returnsync.drainPermits();
}
/**
*减少reduction个permit
*/
protectedvoidreducePermits(intreduction){
if(reduction<0)thrownewIllegalArgumentException();
sync.reducePermits(reduction);
}
/**
*判断是否是公平版本
*/
publicbooleanisFair(){
returnsyncinstanceofFairSync;
}
/**
*返回AQS中SyncQueue里面的等待线程
*/
publicfinalbooleanhasQueuedThreads(){
returnsync.hasQueuedThreads();
}
/**
*返回AQS中SyncQueue里面的等待线程长度
*/
publicfinalintgetQueueLength(){
returnsync.getQueueLength();
}
/**
*返回AQS中SyncQueue里面的等待线程
*/
protectedCollectiongetQueueThreads(){
returnsync.getQueuedThreads();
}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。