Java分布式锁的三种实现方案
方案一:数据库乐观锁
乐观锁通常实现基于数据版本(version)的记录机制实现的,比如有一张红包表(t_bonus),有一个字段(left_count)记录礼物的剩余个数,用户每领取一个奖品,对应的left_count减1,在并发的情况下如何要保证left_count不为负数,乐观锁的实现方式为在红包表上添加一个版本号字段(version),默认为0。
异常实现流程
--可能会发生的异常情况 --线程1查询,当前left_count为1,则有记录 select*fromt_bonuswhereid=10001andleft_count>0 --线程2查询,当前left_count为1,也有记录 select*fromt_bonuswhereid=10001andleft_count>0 --线程1完成领取记录,修改left_count为0, updatet_bonussetleft_count=left_count-1whereid=10001 --线程2完成领取记录,修改left_count为-1,产生脏数据 updatet_bonussetleft_count=left_count-1whereid=10001
通过乐观锁实现
--添加版本号控制字段 ALTERTABLEtableADDCOLUMNversionINTDEFAULT'0'NOTNULLAFTERt_bonus; --线程1查询,当前left_count为1,则有记录,当前版本号为1234 selectleft_count,versionfromt_bonuswhereid=10001andleft_count>0 --线程2查询,当前left_count为1,有记录,当前版本号为1234 selectleft_count,versionfromt_bonuswhereid=10001andleft_count>0 --线程1,更新完成后当前的version为1235,update状态为1,更新成功 updatet_bonussetversion=1235,left_count=left_count-1whereid=10001andversion=1234 --线程2,更新由于当前的version为1235,udpate状态为0,更新失败,再针对相关业务做异常处理 updatet_bonussetversion=1235,left_count=left_count-1whereid=10001andversion=1234
方案二:基于Redis的分布式锁
SETNX命令(SETifNoteXists)\
语法:SETNXkeyvalue\
功能:原子性操作,当且仅当key不存在,将key的值设为value,并返回1;若给定的key已经存在,则SETNX不做任何动作,并返回0。\
Expire命令\
语法:expire(key,expireTime)\
功能:key设置过期时间\
GETSET命令\
语法:GETSETkeyvalue\
功能:将给定key的值设为value,并返回key的旧值(oldvalue),当key存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。\
GET命令\
语法:GETkey\
功能:返回key所关联的字符串值,如果key不存在那么返回特殊值nil。\
DEL命令\
语法:DELkey[KEY…]\
功能:删除给定的一个或多个key,不存在的key会被忽略。
第一种:使用redis的setnx()、expire()方法,用于分布式锁
- setnx(lockkey,1)如果返回0,则说明占位失败;如果返回1,则说明占位成功
- expire()命令对lockkey设置超时时间,为的是避免死锁问题。
- 执行完业务代码后,可以通过delete命令删除key。
这个方案其实是可以解决日常工作中的需求的,但从技术方案的探讨上来说,可能还有一些可以完善的地方。比如,如果在第一步setnx执行成功后,在expire()命令执行成功前,发生了宕机的现象,那么就依然会出现死锁的问题
第二种:使用redis的setnx()、get()、getset()方法,用于分布式锁,解决死锁问题
- setnx(lockkey,当前时间+过期超时时间),如果返回1,则获取锁成功;如果返回0则没有获取到锁,转向2。
- get(lockkey)获取值oldExpireTime,并将这个value值与当前的系统时间进行比较,如果小于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取,转向3。
- 计算newExpireTime=当前时间+过期超时时间,然后getset(lockkey,newExpireTime)会返回当前lockkey的值currentExpireTime。
- 判断currentExpireTime与oldExpireTime是否相等,如果相等,说明当前getset设置成功,获取到了锁。如果不相等,说明这个锁又被别的请求获取走了,那么当前请求可以直接返回失败,或者继续重试。
- 在获取到锁之后,当前线程可以开始自己的业务处理,当处理完毕后,比较自己的处理时间和对于锁设置的超时时间,如果小于锁设置的超时时间,则直接执行delete释放锁;如果大于锁设置的超时时间,则不需要再锁进行处理。
importcn.com.tpig.cache.redis.RedisService;
importcn.com.tpig.utils.SpringUtils;
/**
*CreatedbyIDEA
*User:shma1664
*Date:2016-08-1614:01
*Desc:redis分布式锁
*/
publicfinalclassRedisLockUtil{
privatestaticfinalintdefaultExpire=60;
privateRedisLockUtil(){
//
}
/**
*加锁
*@paramkeyrediskey
*@paramexpire过期时间,单位秒
*@returntrue:加锁成功,false,加锁失败
*/
publicstaticbooleanlock(Stringkey,intexpire){
RedisServiceredisService=SpringUtils.getBean(RedisService.class);
longstatus=redisService.setnx(key,"1");
if(status==1){
redisService.expire(key,expire);
returntrue;
}
returnfalse;
}
publicstaticbooleanlock(Stringkey){
returnlock2(key,defaultExpire);
}
/**
*加锁
*@paramkeyrediskey
*@paramexpire过期时间,单位秒
*@returntrue:加锁成功,false,加锁失败
*/
publicstaticbooleanlock2(Stringkey,intexpire){
RedisServiceredisService=SpringUtils.getBean(RedisService.class);
longvalue=System.currentTimeMillis()+expire;
longstatus=redisService.setnx(key,String.valueOf(value));
if(status==1){
returntrue;
}
longoldExpireTime=Long.parseLong(redisService.get(key,"0"));
if(oldExpireTime<System.currentTimeMillis()){
//超时
longnewExpireTime=System.currentTimeMillis()+expire;
longcurrentExpireTime=Long.parseLong(redisService.getSet(key,String.valueOf(newExpireTime)));
if(currentExpireTime==oldExpireTime){
returntrue;
}
}
returnfalse;
}
publicstaticvoidunLock1(Stringkey){
RedisServiceredisService=SpringUtils.getBean(RedisService.class);
redisService.del(key);
}
publicstaticvoidunLock2(Stringkey){
RedisServiceredisService=SpringUtils.getBean(RedisService.class);
longoldExpireTime=Long.parseLong(redisService.get(key,"0"));
if(oldExpireTime>System.currentTimeMillis()){
redisService.del(key);
}
}
}
publicvoiddrawRedPacket(longuserId){
Stringkey="draw.redpacket.userid:"+userId;
booleanlock=RedisLockUtil.lock2(key,60);
if(lock){
try{
//领取操作
}finally{
//释放锁
RedisLockUtil.unLock(key);
}
}else{
newRuntimeException("重复领取奖励");
}
}
SpringAOP基于注解方式和SpEL实现开箱即用的redis分布式锁策略
importjava.lang.annotation.ElementType;
importjava.lang.annotation.Retention;
importjava.lang.annotation.RetentionPolicy;
importjava.lang.annotation.Target;
/**
*RUNTIME
*定义注解
*编译器将把注释记录在类文件中,在运行时VM将保留注释,因此可以反射性地读取。
*@authorshma1664
*
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public@interfaceRedisLockable{
String[]key()default"";
longexpiration()default60;
}
importjavax.annotation.Resource;
importjava.lang.reflect.Method;
importcom.autohome.api.dealer.util.cache.RedisClient;
importcom.google.common.base.Joiner;
importorg.aspectj.lang.ProceedingJoinPoint;
importorg.aspectj.lang.Signature;
importorg.aspectj.lang.annotation.Around;
importorg.aspectj.lang.annotation.Aspect;
importorg.aspectj.lang.annotation.Pointcut;
importorg.aspectj.lang.reflect.MethodSignature;
importorg.springframework.expression.EvaluationContext;
importorg.springframework.expression.Expression;
importorg.springframework.expression.ExpressionParser;
importorg.springframework.expression.spel.standard.SpelExpressionParser;
importorg.springframework.expression.spel.support.StandardEvaluationContext;
importorg.springframework.stereotype.Component;
/**
*CreatedbyIDEA
*User:mashaohua
*Date:2016-09-2818:08
*Desc:
*/
@Aspect
@Component
publicclassRedisLockAop{
@Resource
privateRedisClientredisClient;
@Pointcut("execution(*com.autohome.api.dealer.tuan.service.*.*(..))")
publicvoidpointcut(){}
@Around("pointcut()")
publicObjectdoAround(ProceedingJoinPointpoint)throwsThrowable{
Signaturesignature=point.getSignature();
MethodSignaturemethodSignature=(MethodSignature)signature;
Methodmethod=methodSignature.getMethod();
StringtargetName=point.getTarget().getClass().getName();
StringmethodName=point.getSignature().getName();
Object[]arguments=point.getArgs();
if(method!=null&&method.isAnnotationPresent(RedisLockable.class)){
RedisLockableredisLock=method.getAnnotation(RedisLockable.class);
longexpire=redisLock.expiration();
StringredisKey=getLockKey(targetName,methodName,redisLock.key(),arguments);
booleanisLock=RedisLockUtil.lock2(redisKey,expire);
if(!isLock){
try{
returnpoint.proceed();
}finally{
unLock2(redisKey);
}
}else{
thrownewRuntimeException("您的操作太频繁,请稍后再试");
}
}
returnpoint.proceed();
}
privateStringgetLockKey(StringtargetName,StringmethodName,String[]keys,Object[]arguments){
StringBuildersb=newStringBuilder();
sb.append("lock.").append(targetName).append(".").append(methodName);
if(keys!=null){
StringkeyStr=Joiner.on(".").skipNulls().join(keys);
String[]parameters=ReflectParamNames.getNames(targetName,methodName);
ExpressionParserparser=newSpelExpressionParser();
Expressionexpression=parser.parseExpression(keyStr);
EvaluationContextcontext=newStandardEvaluationContext();
intlength=parameters.length;
if(length>0){
for(inti=0;i<length;i++){
context.setVariable(parameters[i],arguments[i]);
}
}
StringkeysValue=expression.getValue(context,String.class);
sb.append("#").append(keysValue);
}
returnsb.toString();
}
<!--https://mvnrepository.com/artifact/javassist/javassist--> <dependency> <groupId>org.javassist</groupId> <artifactId>javassist</artifactId> <version>3.18.1-GA</version> </dependency>
importjavassist.*;
importjavassist.bytecode.CodeAttribute;
importjavassist.bytecode.LocalVariableAttribute;
importjavassist.bytecode.MethodInfo;
importorg.apache.log4j.Logger;
/**
*CreatedbyIDEA
*User:mashaohua
*Date:2016-09-2818:39
*Desc:
*/
publicclassReflectParamNames{
privatestaticLoggerlog=Logger.getLogger(ReflectParamNames.class);
privatestaticClassPoolpool=ClassPool.getDefault();
static{
ClassClassPathclassPath=newClassClassPath(ReflectParamNames.class);
pool.insertClassPath(classPath);
}
publicstaticString[]getNames(StringclassName,StringmethodName){
CtClasscc=null;
try{
cc=pool.get(className);
CtMethodcm=cc.getDeclaredMethod(methodName);
//使用javaassist的反射方法获取方法的参数名
MethodInfomethodInfo=cm.getMethodInfo();
CodeAttributecodeAttribute=methodInfo.getCodeAttribute();
LocalVariableAttributeattr=(LocalVariableAttribute)codeAttribute.getAttribute(LocalVariableAttribute.tag);
if(attr==null)returnnewString[0];
intbegin=0;
String[]paramNames=newString[cm.getParameterTypes().length];
intcount=0;
intpos=Modifier.isStatic(cm.getModifiers())?0:1;
for(inti=0;i<attr.tableLength();i++){
//为什么加这个判断,发现在windows跟linux执行时,参数顺序不一致,通过观察,实际的参数是从this后面开始的
if(attr.variableName(i).equals("this")){
begin=i;
break;
}
}
for(inti=begin+1;i<=begin+paramNames.length;i++){
paramNames[count]=attr.variableName(i);
count++;
}
returnparamNames;
}catch(Exceptione){
e.printStackTrace();
}finally{
try{
if(cc!=null)cc.detach();
}catch(Exceptione2){
log.error(e2.getMessage());
}
}
returnnewString[0];
}
}
在需要使用分布式锁的地方添加注解
/**
*抽奖接口
*添加redis分布式锁保证一个订单只有一个请求处理,防止用户刷礼物,支持SpEL表达式
*redisLockKey:lock.com.autohome.api.dealer.tuan.service.impl.drawBonus#orderId
*@paramorderId订单id
*@return抽中的奖品信息
*/
@RedisLockable(key={"#orderId"},expiration=120)
@Override
publicBonusConvertBeandrawBonus(IntegerorderId)throwsBonusException{
//业务逻辑
}
第三种方案:基于Zookeeper的分布式锁
利用节点名称的唯一性来实现独占锁
ZooKeeper机制规定同一个目录下只能有一个唯一的文件名,zookeeper上的一个znode看作是一把锁,通过createznode的方式来实现。所有客户端都去创建/lock/${lock_name}_lock节点,最终成功创建的那个客户端也即拥有了这把锁,创建失败的可以选择监听继续等待,还是放弃抛出异常实现独占锁。
packagecom.shma.example.zookeeper.lock;
importjava.io.IOException;
importjava.util.ArrayList;
importjava.util.Collections;
importjava.util.List;
importjava.util.concurrent.CountDownLatch;
importjava.util.concurrent.TimeUnit;
importjava.util.concurrent.locks.Condition;
importjava.util.concurrent.locks.Lock;
importorg.apache.zookeeper.*;
importorg.apache.zookeeper.data.Stat;
/**
*CreatedbyIDEA
*User:mashaohua
*Date:2016-09-3016:09
*Desc:
*/
publicclassZookeeperLockimplementsLock,Watcher{
privateZooKeeperzk;
privateStringroot="/locks";//根
privateStringlockName;//竞争资源的标志
privateStringmyZnode;//当前锁
privateintsessionTimeout=30000;
privateList<Exception>exception=newArrayList<Exception>();
/**
*创建分布式锁,使用前请确认config配置的zookeeper服务可用
*@paramconfig127.0.0.1:2181
*@paramlockName竞争资源标志,lockName中不能包含单词lock
*/
publicZookeeperLock(Stringconfig,StringlockName){
this.lockName=lockName;
//创建一个与服务器的连接
try{
zk=newZooKeeper(config,sessionTimeout,this);
Statstat=zk.exists(root,false);
if(stat==null){
//创建根节点
zk.create(root,newbyte[0],ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
}catch(IOExceptione){
exception.add(e);
}catch(KeeperExceptione){
exception.add(e);
}catch(InterruptedExceptione){
exception.add(e);
}
}
@Override
publicvoidlock(){
if(exception.size()>0){
thrownewLockException(exception.get(0));
}
if(!tryLock()){
thrownewLockException("您的操作太频繁,请稍后再试");
}
}
@Override
publicvoidlockInterruptibly()throwsInterruptedException{
this.lock();
}
@Override
publicbooleantryLock(){
try{
myZnode=zk.create(root+"/"+lockName,newbyte[0],ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
returntrue;
}catch(KeeperExceptione){
e.printStackTrace();
}catch(InterruptedExceptione){
e.printStackTrace();
}
returnfalse;
}
@Override
publicbooleantryLock(longtime,TimeUnitunit)throwsInterruptedException{
returntryLock();
}
@Override
publicvoidunlock(){
try{
zk.delete(myZnode,-1);
myZnode=null;
zk.close();
}catch(InterruptedExceptione){
e.printStackTrace();
}catch(KeeperExceptione){
e.printStackTrace();
}
}
@Override
publicConditionnewCondition(){
returnnull;
}
@Override
publicvoidprocess(WatchedEventwatchedEvent){
//
}
}
ZookeeperLocklock=null;
try{
lock=newZookeeperLock("127.0.0.1:2182","test1");
lock.lock();
//业务逻辑处理
}catch(LockExceptione){
throwe;
}finally{
if(lock!=null)
lock.unlock();
}
利用临时顺序节点控制时序实现
/lock已经预先存在,所有客户端在它下面创建临时顺序编号目录节点,和选master一样,编号最小的获得锁,用完删除,依次方便。\
算法思路:对于加锁操作,可以让所有客户端都去/lock目录下创建临时顺序节点,如果创建的客户端发现自身创建节点序列号是/lock/目录下最小的节点,则获得锁。否则,监视比自己创建节点的序列号小的节点(比自己创建的节点小的最大节点),进入等待。
对于解锁操作,只需要将自身创建的节点删除即可。
packagecom.shma.example.zookeeper.lock;
importjava.io.IOException;
importjava.util.ArrayList;
importjava.util.Collections;
importjava.util.List;
importjava.util.concurrent.CountDownLatch;
importjava.util.concurrent.TimeUnit;
importjava.util.concurrent.locks.Condition;
importjava.util.concurrent.locks.Lock;
importorg.apache.zookeeper.CreateMode;
importorg.apache.zookeeper.KeeperException;
importorg.apache.zookeeper.WatchedEvent;
importorg.apache.zookeeper.Watcher;
importorg.apache.zookeeper.ZooDefs;
importorg.apache.zookeeper.ZooKeeper;
importorg.apache.zookeeper.data.Stat;
/**
*CreatedbyIDEA
*User:mashaohua
*Date:2016-09-3016:09
*Desc:
*/
publicclassDistributedLockimplementsLock,Watcher{
privateZooKeeperzk;
privateStringroot="/locks";//根
privateStringlockName;//竞争资源的标志
privateStringwaitNode;//等待前一个锁
privateStringmyZnode;//当前锁
privateCountDownLatchlatch;//计数器
privateintsessionTimeout=30000;
privateList<Exception>exception=newArrayList<Exception>();
/**
*创建分布式锁,使用前请确认config配置的zookeeper服务可用
*@paramconfig127.0.0.1:2181
*@paramlockName竞争资源标志,lockName中不能包含单词lock
*/
publicDistributedLock(Stringconfig,StringlockName){
this.lockName=lockName;
//创建一个与服务器的连接
try{
zk=newZooKeeper(config,sessionTimeout,this);
Statstat=zk.exists(root,false);
if(stat==null){
//创建根节点
zk.create(root,newbyte[0],ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
}catch(IOExceptione){
exception.add(e);
}catch(KeeperExceptione){
exception.add(e);
}catch(InterruptedExceptione){
exception.add(e);
}
}
/**
*zookeeper节点的监视器
*/
publicvoidprocess(WatchedEventevent){
if(this.latch!=null){
this.latch.countDown();
}
}
publicvoidlock(){
if(exception.size()>0){
thrownewLockException(exception.get(0));
}
try{
if(this.tryLock()){
System.out.println("Thread"+Thread.currentThread().getId()+""+myZnode+"getlocktrue");
return;
}
else{
waitForLock(waitNode,sessionTimeout);//等待锁
}
}catch(KeeperExceptione){
thrownewLockException(e);
}catch(InterruptedExceptione){
thrownewLockException(e);
}
}
publicbooleantryLock(){
try{
StringsplitStr="_lock_";
if(lockName.contains(splitStr))
thrownewLockException("lockNamecannotcontains\\u000B");
//创建临时子节点
myZnode=zk.create(root+"/"+lockName+splitStr,newbyte[0],ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(myZnode+"iscreated");
//取出所有子节点
List<String>subNodes=zk.getChildren(root,false);
//取出所有lockName的锁
List<String>lockObjNodes=newArrayList<String>();
for(Stringnode:subNodes){
String_node=node.split(splitStr)[0];
if(_node.equals(lockName)){
lockObjNodes.add(node);
}
}
Collections.sort(lockObjNodes);
System.out.println(myZnode+"=="+lockObjNodes.get(0));
if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
//如果是最小的节点,则表示取得锁
returntrue;
}
//如果不是最小的节点,找到比自己小1的节点
StringsubMyZnode=myZnode.substring(myZnode.lastIndexOf("/")+1);
waitNode=lockObjNodes.get(Collections.binarySearch(lockObjNodes,subMyZnode)-1);
}catch(KeeperExceptione){
thrownewLockException(e);
}catch(InterruptedExceptione){
thrownewLockException(e);
}
returnfalse;
}
publicbooleantryLock(longtime,TimeUnitunit){
try{
if(this.tryLock()){
returntrue;
}
returnwaitForLock(waitNode,time);
}catch(Exceptione){
e.printStackTrace();
}
returnfalse;
}
privatebooleanwaitForLock(Stringlower,longwaitTime)throwsInterruptedException,KeeperException{
Statstat=zk.exists(root+"/"+lower,true);
//判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
if(stat!=null){
System.out.println("Thread"+Thread.currentThread().getId()+"waitingfor"+root+"/"+lower);
this.latch=newCountDownLatch(1);
this.latch.await(waitTime,TimeUnit.MILLISECONDS);
this.latch=null;
}
returntrue;
}
publicvoidunlock(){
try{
System.out.println("unlock"+myZnode);
zk.delete(myZnode,-1);
myZnode=null;
zk.close();
}catch(InterruptedExceptione){
e.printStackTrace();
}catch(KeeperExceptione){
e.printStackTrace();
}
}
publicvoidlockInterruptibly()throwsInterruptedException{
this.lock();
}
publicConditionnewCondition(){
returnnull;
}
publicclassLockExceptionextendsRuntimeException{
privatestaticfinallongserialVersionUID=1L;
publicLockException(Stringe){
super(e);
}
publicLockException(Exceptione){
super(e);
}
}
}
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,同时也希望多多支持毛票票!