Java并发编程Semaphore计数信号量详解
Semaphore是一个计数信号量,它的本质是一个共享锁。信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。线程可以通过release()来释放它所持有的信号量许可(用完信号量之后必须释放,不然其他线程可能会无法获取信号量)。
简单示例:
packageme.socketthread;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;
importjava.util.concurrent.Semaphore;
publicclassSemaphoreLearn{
//信号量总数
privatestaticfinalintSEM_MAX=12;
publicstaticvoidmain(String[]args){
Semaphoresem=newSemaphore(SEM_MAX);
//创建线程池
ExecutorServicethreadPool=Executors.newFixedThreadPool(3);
//在线程池中执行任务
threadPool.execute(newMyThread(sem,7));
threadPool.execute(newMyThread(sem,4));
threadPool.execute(newMyThread(sem,2));
//关闭池
threadPool.shutdown();
}
}
classMyThreadextendsThread{
privatevolatileSemaphoresem;//信号量
privateintcount;//申请信号量的大小
MyThread(Semaphoresem,intcount){
this.sem=sem;
this.count=count;
}
publicvoidrun(){
try{
//从信号量中获取count个许可
sem.acquire(count);
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName()+"acquirecount="+count);
}catch(InterruptedExceptione){
e.printStackTrace();
}finally{
//释放给定数目的许可,将其返回到信号量。
sem.release(count);
System.out.println(Thread.currentThread().getName()+"release"+count+"");
}
}
}
执行结果:
pool-1-thread-2acquirecount=4 pool-1-thread-1acquirecount=7 pool-1-thread-1release7 pool-1-thread-2release4 pool-1-thread-3acquirecount=2 pool-1-thread-3release2
线程1和线程2会并发执行,因为两者的信号量和没有超过总信号量,当前两个线程释放掉信号量之后线程3才能继续执行。
源码分析:
1、构造函数
在构造函数中会初始化信号量值,这值最终是作为锁标志位state的值
Semaphoresem=newSemaphore(12);//简单来说就是给锁标识位state赋值为12
2、Semaphore.acquire(n);简单理解为获取锁资源,如果获取不到线程阻塞
Semaphore.acquire(n);//从锁标识位state中获取n个信号量,简单来说是state=state-n此时state大于0表示可以获取信号量,如果小于0则将线程阻塞
publicvoidacquire(intpermits)throwsInterruptedException{
if(permits<0)thrownewIllegalArgumentException();
//获取锁
sync.acquireSharedInterruptibly(permits);
}
acquireSharedInterruptibly中的操作是获取锁资源,如果可以获取则将state=state-permits,否则将线程阻塞
publicfinalvoidacquireSharedInterruptibly(intarg)
throwsInterruptedException{
if(Thread.interrupted())
thrownewInterruptedException();
if(tryAcquireShared(arg)<0)//tryAcquireShared中尝试获取锁资源
doAcquireSharedInterruptibly(arg);//将线程阻塞
}
tryAcquireShared中的操作是尝试获取信号量值,简单来说就是state=state-acquires,如果此时小于0则返回负值,否则返回大于新值,再判断是否将当线程线程阻塞
protectedinttryAcquireShared(intacquires){
for(;;){
if(hasQueuedPredecessors())
return-1;
//获取state值
intavailable=getState();
//从state中获取信号量
intremaining=available-acquires;
if(remaining<0||
compareAndSetState(available,remaining))
//如果信号量小于0则直接返回,表示无法获取信号量,否则将state值修改为新值
returnremaining;
}
}
doAcquireSharedInterruptibly中的操作简单来说是将当前线程添加到FIFO队列中并将当前线程阻塞。
/会将线程添加到FIFO队列中,并阻塞
privatevoiddoAcquireSharedInterruptibly(intarg)
throwsInterruptedException{
//将线程添加到FIFO队列中
finalNodenode=addWaiter(Node.SHARED);
booleanfailed=true;
try{
for(;;){
finalNodep=node.predecessor();
if(p==head){
intr=tryAcquireShared(arg);
if(r>=0){
setHeadAndPropagate(node,r);
p.next=null;//helpGC
failed=false;
return;
}
}
//parkAndCheckInterrupt完成线程的阻塞操作
if(shouldParkAfterFailedAcquire(p,node)&&
parkAndCheckInterrupt())
thrownewInterruptedException();
}
}finally{
if(failed)
cancelAcquire(node);
}
}
3、Semaphore.release(intpermits),这个函数的实现操作是将state=state+permits并唤起处于FIFO队列中的阻塞线程。
publicvoidrelease(intpermits){
if(permits<0)thrownewIllegalArgumentException();
//state=state+permits,并将FIFO队列中的阻塞线程唤起
sync.releaseShared(permits);
}
releaseShared中的操作是将state=state+permits,并将FIFO队列中的阻塞线程唤起。
publicfinalbooleanreleaseShared(intarg){
//tryReleaseShared将state设置为state=state+arg
if(tryReleaseShared(arg)){
//唤起FIFO队列中的阻塞线程
doReleaseShared();
returntrue;
}
returnfalse;
}
tryReleaseShared将state设置为state=state+arg
protectedfinalbooleantryReleaseShared(intreleases){
for(;;){
intcurrent=getState();
intnext=current+releases;
if(next
doReleaseShared()唤起FIFO队列中的阻塞线程
privatevoiddoReleaseShared(){
for(;;){
Nodeh=head;
if(h!=null&&h!=tail){
intws=h.waitStatus;
if(ws==Node.SIGNAL){
if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))
continue;//looptorecheckcases
//完成阻塞线程的唤起操作
unparkSuccessor(h);
}
elseif(ws==0&&
!compareAndSetWaitStatus(h,0,Node.PROPAGATE))
continue;//looponfailedCAS
}
if(h==head)//loopifheadchanged
break;
}
}
总结:Semaphore简单来说设置了一个信号量池state,当线程执行时会从state中获取值,如果可以获取则线程执行,并且在执行后将获取的资源返回到信号量池中,并唤起其他阻塞线程;如果信号量池中的资源无法满足某个线程的需求则将此线程阻塞。
Semaphore源码:
publicclassSemaphoreimplementsjava.io.Serializable{
privatestaticfinallongserialVersionUID=-3222578661600680210L;
privatefinalSyncsync;
abstractstaticclassSyncextendsAbstractQueuedSynchronizer{
privatestaticfinallongserialVersionUID=1192457210091910933L;
//设置锁标识位state的初始值
Sync(intpermits){
setState(permits);
}
//获取锁标识位state的值,如果state值大于其需要的值则表示锁可以获取
finalintgetPermits(){
returngetState();
}
//获取state值减去acquires后的值,如果大于等于0则表示锁可以获取
finalintnonfairTryAcquireShared(intacquires){
for(;;){
intavailable=getState();
intremaining=available-acquires;
if(remaining<0||
compareAndSetState(available,remaining))
returnremaining;
}
}
//释放锁
protectedfinalbooleantryReleaseShared(intreleases){
for(;;){
intcurrent=getState();
//将state值加上release值
intnext=current+releases;
if(nextcurrent)//underflow
thrownewError("Permitcountunderflow");
if(compareAndSetState(current,next))
return;
}
}
finalintdrainPermits(){
for(;;){
intcurrent=getState();
if(current==0||compareAndSetState(current,0))
returncurrent;
}
}
}
//非公平锁
staticfinalclassNonfairSyncextendsSync{
privatestaticfinallongserialVersionUID=-2694183684443567898L;
NonfairSync(intpermits){
super(permits);
}
protectedinttryAcquireShared(intacquires){
returnnonfairTryAcquireShared(acquires);
}
}
//公平锁
staticfinalclassFairSyncextendsSync{
privatestaticfinallongserialVersionUID=2014338818796000944L;
FairSync(intpermits){
super(permits);
}
protectedinttryAcquireShared(intacquires){
for(;;){
if(hasQueuedPredecessors())
return-1;
intavailable=getState();
intremaining=available-acquires;
if(remaining<0||
compareAndSetState(available,remaining))
returnremaining;
}
}
}
//设置信号量
publicSemaphore(intpermits){
sync=newNonfairSync(permits);
}
publicSemaphore(intpermits,booleanfair){
sync=fair?newFairSync(permits):newNonfairSync(permits);
}
//获取锁
publicvoidacquire()throwsInterruptedException{
sync.acquireSharedInterruptibly(1);
}
publicvoidacquireUninterruptibly(){
sync.acquireShared(1);
}
publicbooleantryAcquire(){
returnsync.nonfairTryAcquireShared(1)>=0;
}
publicbooleantryAcquire(longtimeout,TimeUnitunit)
throwsInterruptedException{
returnsync.tryAcquireSharedNanos(1,unit.toNanos(timeout));
}
publicvoidrelease(){
sync.releaseShared(1);
}
//获取permits值锁
publicvoidacquire(intpermits)throwsInterruptedException{
if(permits<0)thrownewIllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
publicvoidacquireUninterruptibly(intpermits){
if(permits<0)thrownewIllegalArgumentException();
sync.acquireShared(permits);
}
publicbooleantryAcquire(intpermits){
if(permits<0)thrownewIllegalArgumentException();
returnsync.nonfairTryAcquireShared(permits)>=0;
}
publicbooleantryAcquire(intpermits,longtimeout,TimeUnitunit)
throwsInterruptedException{
if(permits<0)thrownewIllegalArgumentException();
returnsync.tryAcquireSharedNanos(permits,unit.toNanos(timeout));
}
//释放
publicvoidrelease(intpermits){
if(permits<0)thrownewIllegalArgumentException();
sync.releaseShared(permits);
}
publicintavailablePermits(){
returnsync.getPermits();
}
publicintdrainPermits(){
returnsync.drainPermits();
}
protectedvoidreducePermits(intreduction){
if(reduction<0)thrownewIllegalArgumentException();
sync.reducePermits(reduction);
}
publicbooleanisFair(){
returnsyncinstanceofFairSync;
}
publicfinalbooleanhasQueuedThreads(){
returnsync.hasQueuedThreads();
}
publicfinalintgetQueueLength(){
returnsync.getQueueLength();
}
protectedCollectiongetQueuedThreads(){
returnsync.getQueuedThreads();
}
publicStringtoString(){
returnsuper.toString()+"[Permits="+sync.getPermits()+"]";
}
}
总结
以上就是本文关于Java并发编程Semaphore计数信号量详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:Java并发编程之重入锁与读写锁、Java系统的高并发解决方法详解、java高并发锁的3种实现示例代码等,有什么问题,可以留言交流讨论。感谢朋友们对本站的支持!