浅谈Java 并发的底层实现
并发编程的目的是让程序运行更快,但是使用并发并不定会使得程序运行更快,只有当程序的并发数量达到一定的量级的时候才能体现并发编程的优势。所以谈并发编程在高并发量的时候才有意义。虽然目前还没有开发过高并发量的程序,但是学习并发是为了更好理解一些分布式架构。那么当程序的并发量不高,比如是单线程的程序,单线程的执行效率反而比多线程更高。这又是为什么呢?熟悉操作系统的应该知道,CPU是通过给每个线程分配时间片的方式实现多线程的。这样,当CPU从一个任务切换到另一个任务的时候,会保存上一个任务的状态,当执行完这个任务的时候CPU就会继续上一个任务的状态继续执行。这个过程称为上下文切换。
在Java多线程中,volatile关键字个synchronized关键字扮演了重要的角色,它们都可以实现线程的同步,但是在底层是如何实现的呢?
volatile
volatile只能保证变量对各个线程的可见性,但不能保证原子性。关于Java语言volatile的使用方法就不多说了,我的建议是除了配合packagejava.util.concurrent.atomic中的类库,其他情况一概别用。更多的解释参见这篇文章。
引子
参见如下代码
packageorg.go; publicclassGo{ volatileinti=0; privatevoidinc(){ i++; } publicstaticvoidmain(String[]args){ Gogo=newGo(); for(inti=0;i<10;i++){ newThread(()->{ for(intj=0;j<1000;j++) go.inc(); }).start(); } while(Thread.activeCount()>1){ Thread.yield(); } System.out.println(go.i); } }
每次执行上述代码结果都不同,输出的数字总是小于10000.这是因为在进行inc()的时候,i++并不是原子操作。或许有些人会提议说用synchronized来同步inc(),或者用packagejava.util.concurrent.locks下的锁去控制线程同步。但它们都不如下面的解决方案:
packageorg.go; importjava.util.concurrent.atomic.AtomicInteger; publicclassGo{ AtomicIntegeri=newAtomicInteger(0); privatevoidinc(){ i.getAndIncrement(); } publicstaticvoidmain(String[]args){ Gogo=newGo(); for(inti=0;i<10;i++){ newThread(()->{ for(intj=0;j<1000;j++) go.inc(); }).start(); } while(Thread.activeCount()>1){ Thread.yield(); } System.out.println(go.i); } }
这时,如果你不了解atomic的实现,你一定会不屑的怀疑说不定AtomicInteger底层就是使用锁来实现的所以也未必高效。那么究竟是什么,我们来看看。
原子类的内部实现
无论是AtomicInteger或者是ConcurrentLinkedQueue的节点类ConcurrentLinkedQueue.Node,他们都有个静态变量
privatestaticfinalsun.misc.UnsafeUNSAFE;,这个类是实现原子语义的C++对象sun::misc::Unsafe的Java封装。想看看底层实现,正好我手边有gcc4.8的源代码,对照本地路径,很方便找到Github的路径,看这里。
以接口getAndIncrement()的实现举例
AtomicInteger.java
privatestaticfinalUnsafeunsafe=Unsafe.getUnsafe(); publicfinalintgetAndIncrement(){ for(;;){ intcurrent=get(); intnext=current+1; if(compareAndSet(current,next)) returncurrent; } } publicfinalbooleancompareAndSet(intexpect,intupdate){ returnunsafe.compareAndSwapInt(this,valueOffset,expect,update); }
留意这个for循环,只有在compareAndSet成功时才会返回。否则就一直compareAndSet。
调用了compareAndSet实现。此处,我注意到OracleJDK的实现是略有不同的,如果你查看JDK下的src,你可以看到OracleJDK是调用的Unsafe的getAndIncrement(),但我相信OracleJDK实现Unsafe.java的时候应该也是只调用compareAndSet,因为一个compareAndSet就可以实现增加、减少、设值的原子操作了。
Unsafe.java
publicnativebooleancompareAndSwapInt(Objectobj,longoffset, intexpect,intupdate);
通过JNI调用的C++的实现。
natUnsafe.cc
jboolean sun::misc::Unsafe::compareAndSwapInt(jobjectobj,jlongoffset, jintexpect,jintupdate) { jint*addr=(jint*)((char*)obj+offset); returncompareAndSwap(addr,expect,update); } staticinlinebool compareAndSwap(volatilejint*addr,jintold,jintnew_val) { jbooleanresult=false; spinlocklock; if((result=(*addr==old))) *addr=new_val; returnresult; }
Unsafe::compareAndSwapInt调用static函数compareAndSwap。而compareAndSwap又使用spinlock作为锁。这里的spinlock有LockGuard的意味,构造时加锁,析构时释放。
我们需要聚焦在spinlock里。这里是保证spinlock释放之前都是原子操作的真正实现。
什么是spinlock
spinlock,即自旋锁,一种循环等待(busywaiting)以获取资源的锁。不同于mutex的阻塞当前线程、释放CPU资源以等待需求的资源,spinlock不会进入挂起、等待条件满足、重新竞争CPU的过程。这意味着只有在等待锁的代价小于线程执行上下文切换的代价时,Spinlock才优于mutex。
natUnsafe.cc
classspinlock { staticvolatileobj_addr_tlock; public: spinlock() { while(!compare_and_swap(&lock,0,1)) _Jv_ThreadYield(); } ~spinlock() { release_set(&lock,0); } };
以一个静态变量staticvolatileobj_addr_tlock;作为标志位,通过C++RAII实现一个Guard,所以所谓的锁其实是静态成员变量obj_addr_tlock,C++中volatile并不能保证同步,保证同步的是构造函数里调用的compare_and_swap和一个static变量lock.这个lock变量是1的时候,就需要等;是0的时候,就通过原子操作把它置为1,表示自己获得了锁。
这里会用一个static变量实在是一个意外,如此相当于所有的无锁结构都共用同一个变量(实际就是size_t)来区分是否加锁。当这个变量置为1时,其他用到spinlock的都需要等。为什么不在sun::misc::Unsafe添加一个私有变量volatileobj_addr_tlock;,并作为构造参数传给spinlock?这样相当于每个UnSafe共享一个标志位,效果会不会好一些?
_Jv_ThreadYield在下面的文件里,通过系统调用sched_yield(man2sched_yield)让出CPU资源。宏HAVE_SCHED_YIELD在configure里定义,意味着编译时如果取消定义,spinlock就称为真正意义的自旋锁了。
posix-threads.h
inlinevoid _Jv_ThreadYield(void) { #ifdefHAVE_SCHED_YIELD sched_yield(); #endif/*HAVE_SCHED_YIELD*/ }
这个lock.h在不同平台有着不同的实现,我们以ia64(IntelAMDx64)平台举例,其他的实现可以在这里看到。
ia64/locks.h
typedefsize_tobj_addr_t; inlinestaticbool compare_and_swap(volatileobj_addr_t*addr, obj_addr_told, obj_addr_tnew_val) { return__sync_bool_compare_and_swap(addr,old,new_val); } inlinestaticvoid release_set(volatileobj_addr_t*addr,obj_addr_tnew_val) { __asm____volatile__("":::"memory"); *(addr)=new_val; }
__sync_bool_compare_and_swap是gcc内建函数,汇编指令"memory"完成内存屏障。
- 一般地,如果CPU硬件支持指令cmpxchg(该指令从硬件保障原子性,毫无疑问十分高效),那么__sync_bool_compare_and_swap就应该是用cmpxchg来实现的。
- 不支持cmpxchg的CPU架构可以用lock指令前缀,通过锁CPU总线的方式实现。
- 如果连lock指令都不支持,有可能通过APIC实现
总之,硬件上保证多核CPU同步,而Unsafe的实现也是尽可能的高效。GCC-java的还算高效,相信Oracle和OpenJDK不会更差。
原子操作和GCC内建的原子操作
原子操作
Java的表达式以及C++的表达式,都不是原子操作,也就是说你在代码里:
//假设i是线程间共享的变量 i++;
在多线程环境下,i的访问是非原子性的,实际分成如下三个操作数:
- 从缓存取到寄存器
- 在寄存器加1
- 存入缓存
编译器会改变执行的时序,因此执行结果可能并非所期望的。
GCC内建的原子操作
gcc内建了如下的原子操作,这些原子操作从4.1.2被加入。而之前,他们是使用内联的汇编实现的。
type__sync_fetch_and_add(type*ptr,typevalue,...) type__sync_fetch_and_sub(type*ptr,typevalue,...) type__sync_fetch_and_or(type*ptr,typevalue,...) type__sync_fetch_and_and(type*ptr,typevalue,...) type__sync_fetch_and_xor(type*ptr,typevalue,...) type__sync_fetch_and_nand(type*ptr,typevalue,...) type__sync_add_and_fetch(type*ptr,typevalue,...) type__sync_sub_and_fetch(type*ptr,typevalue,...) type__sync_or_and_fetch(type*ptr,typevalue,...) type__sync_and_and_fetch(type*ptr,typevalue,...) type__sync_xor_and_fetch(type*ptr,typevalue,...) type__sync_nand_and_fetch(type*ptr,typevalue,...) bool__sync_bool_compare_and_swap(type*ptr,typeoldvaltypenewval,...) type__sync_val_compare_and_swap(type*ptr,typeoldvaltypenewval,...) __sync_synchronize(...) type__sync_lock_test_and_set(type*ptr,typevalue,...) void__sync_lock_release(type*ptr,...)
需要注意的是:
- __sync_fetch_and_add和__sync_add_and_fetch的关系对应于i++和++i。其他类推
- CAS的两种实现,bool版本的如果对比oldval与ptr成功并给ptr设值newval返回true;另一个返回原本*ptr的值
- __sync_synchronize添加一个完全的内存屏障
OpenJDK的相关文件
下面列出一些Github上OpenJDK9的原子操作实现,希望能帮助需要了解的人。毕竟OpenJDK比Gcc的实现应用更广泛一些。————但终究没有OracleJDK的源码,虽然据说OpenJDK与Oracle的源码差距很小。
AtomicInteger.java
Unsafe.java::compareAndExchangeObject
unsafe.cpp::Unsafe_CompareAndExchangeObject
oop.inline.hpp::oopDesc::atomic_compare_exchange_oop
atomic_linux_x86.hpp::Atomic::cmpxchg
inlinejlongAtomic::cmpxchg(jlongexchange_value,volatilejlong*dest,jlongcompare_value,cmpxchg_memory_orderorder){ boolmp=os::is_MP(); __asm____volatile__(LOCK_IF_MP(%4)"cmpxchgq%1,(%3)" :"=a"(exchange_value) :"r"(exchange_value),"a"(compare_value),"r"(dest),"r"(mp) :"cc","memory"); returnexchange_value; }
这里需要给不熟悉C/C++的Java程序员提示一下,嵌入汇编指令的格式如下
__asm__[__volatile__](assemblytemplate//汇编模板 :[outputoperandlist]//输入列表 :[inputoperandlist]//输出列表 :[clobberlist])//破坏列表
汇编模板中的%1,%3,%4对应于后面的参数列表{"r"(exchange_value),"r"(dest),"r"(mp)},参数列表以逗号分隔,从0排序。输出参数放第一个冒号右边,输出参数放第二个冒号右边。"r"表示放到通用寄存器,"a"表示寄存器EAX,有"="表示用于输出(写还)。cmpxchg指令隐含使用EAX寄存器即参数%2.
其他细节就不在此罗列了,Gcc的实现是把要交换的指针传下来,对比成功后直接赋值(赋值非原子),原子性通过spinlock保证。
OpenJDK的实现是把要交换的指针传下来,直接通过汇编指令cmpxchgq赋值,原子性通过汇编指令保证。当然gcc的spinlock底层也是通过cmpxchgq保证的。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。