Java分布式锁的三种实现方案
方案一:数据库乐观锁
乐观锁通常实现基于数据版本(version)的记录机制实现的,比如有一张红包表(t_bonus),有一个字段(left_count)记录礼物的剩余个数,用户每领取一个奖品,对应的left_count减1,在并发的情况下如何要保证left_count不为负数,乐观锁的实现方式为在红包表上添加一个版本号字段(version),默认为0。
异常实现流程
--可能会发生的异常情况 --线程1查询,当前left_count为1,则有记录 select*fromt_bonuswhereid=10001andleft_count>0 --线程2查询,当前left_count为1,也有记录 select*fromt_bonuswhereid=10001andleft_count>0 --线程1完成领取记录,修改left_count为0, updatet_bonussetleft_count=left_count-1whereid=10001 --线程2完成领取记录,修改left_count为-1,产生脏数据 updatet_bonussetleft_count=left_count-1whereid=10001
通过乐观锁实现
--添加版本号控制字段 ALTERTABLEtableADDCOLUMNversionINTDEFAULT'0'NOTNULLAFTERt_bonus; --线程1查询,当前left_count为1,则有记录,当前版本号为1234 selectleft_count,versionfromt_bonuswhereid=10001andleft_count>0 --线程2查询,当前left_count为1,有记录,当前版本号为1234 selectleft_count,versionfromt_bonuswhereid=10001andleft_count>0 --线程1,更新完成后当前的version为1235,update状态为1,更新成功 updatet_bonussetversion=1235,left_count=left_count-1whereid=10001andversion=1234 --线程2,更新由于当前的version为1235,udpate状态为0,更新失败,再针对相关业务做异常处理 updatet_bonussetversion=1235,left_count=left_count-1whereid=10001andversion=1234
方案二:基于Redis的分布式锁
SETNX命令(SETifNoteXists)\
语法:SETNXkeyvalue\
功能:原子性操作,当且仅当key不存在,将key的值设为value,并返回1;若给定的key已经存在,则SETNX不做任何动作,并返回0。\
Expire命令\
语法:expire(key,expireTime)\
功能:key设置过期时间\
GETSET命令\
语法:GETSETkeyvalue\
功能:将给定key的值设为value,并返回key的旧值(oldvalue),当key存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。\
GET命令\
语法:GETkey\
功能:返回key所关联的字符串值,如果key不存在那么返回特殊值nil。\
DEL命令\
语法:DELkey[KEY…]\
功能:删除给定的一个或多个key,不存在的key会被忽略。
第一种:使用redis的setnx()、expire()方法,用于分布式锁
- setnx(lockkey,1)如果返回0,则说明占位失败;如果返回1,则说明占位成功
- expire()命令对lockkey设置超时时间,为的是避免死锁问题。
- 执行完业务代码后,可以通过delete命令删除key。
这个方案其实是可以解决日常工作中的需求的,但从技术方案的探讨上来说,可能还有一些可以完善的地方。比如,如果在第一步setnx执行成功后,在expire()命令执行成功前,发生了宕机的现象,那么就依然会出现死锁的问题
第二种:使用redis的setnx()、get()、getset()方法,用于分布式锁,解决死锁问题
- setnx(lockkey,当前时间+过期超时时间),如果返回1,则获取锁成功;如果返回0则没有获取到锁,转向2。
- get(lockkey)获取值oldExpireTime,并将这个value值与当前的系统时间进行比较,如果小于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取,转向3。
- 计算newExpireTime=当前时间+过期超时时间,然后getset(lockkey,newExpireTime)会返回当前lockkey的值currentExpireTime。
- 判断currentExpireTime与oldExpireTime是否相等,如果相等,说明当前getset设置成功,获取到了锁。如果不相等,说明这个锁又被别的请求获取走了,那么当前请求可以直接返回失败,或者继续重试。
- 在获取到锁之后,当前线程可以开始自己的业务处理,当处理完毕后,比较自己的处理时间和对于锁设置的超时时间,如果小于锁设置的超时时间,则直接执行delete释放锁;如果大于锁设置的超时时间,则不需要再锁进行处理。
importcn.com.tpig.cache.redis.RedisService; importcn.com.tpig.utils.SpringUtils; /** *CreatedbyIDEA *User:shma1664 *Date:2016-08-1614:01 *Desc:redis分布式锁 */ publicfinalclassRedisLockUtil{ privatestaticfinalintdefaultExpire=60; privateRedisLockUtil(){ // } /** *加锁 *@paramkeyrediskey *@paramexpire过期时间,单位秒 *@returntrue:加锁成功,false,加锁失败 */ publicstaticbooleanlock(Stringkey,intexpire){ RedisServiceredisService=SpringUtils.getBean(RedisService.class); longstatus=redisService.setnx(key,"1"); if(status==1){ redisService.expire(key,expire); returntrue; } returnfalse; } publicstaticbooleanlock(Stringkey){ returnlock2(key,defaultExpire); } /** *加锁 *@paramkeyrediskey *@paramexpire过期时间,单位秒 *@returntrue:加锁成功,false,加锁失败 */ publicstaticbooleanlock2(Stringkey,intexpire){ RedisServiceredisService=SpringUtils.getBean(RedisService.class); longvalue=System.currentTimeMillis()+expire; longstatus=redisService.setnx(key,String.valueOf(value)); if(status==1){ returntrue; } longoldExpireTime=Long.parseLong(redisService.get(key,"0")); if(oldExpireTime<System.currentTimeMillis()){ //超时 longnewExpireTime=System.currentTimeMillis()+expire; longcurrentExpireTime=Long.parseLong(redisService.getSet(key,String.valueOf(newExpireTime))); if(currentExpireTime==oldExpireTime){ returntrue; } } returnfalse; } publicstaticvoidunLock1(Stringkey){ RedisServiceredisService=SpringUtils.getBean(RedisService.class); redisService.del(key); } publicstaticvoidunLock2(Stringkey){ RedisServiceredisService=SpringUtils.getBean(RedisService.class); longoldExpireTime=Long.parseLong(redisService.get(key,"0")); if(oldExpireTime>System.currentTimeMillis()){ redisService.del(key); } } }
publicvoiddrawRedPacket(longuserId){ Stringkey="draw.redpacket.userid:"+userId; booleanlock=RedisLockUtil.lock2(key,60); if(lock){ try{ //领取操作 }finally{ //释放锁 RedisLockUtil.unLock(key); } }else{ newRuntimeException("重复领取奖励"); } }
SpringAOP基于注解方式和SpEL实现开箱即用的redis分布式锁策略
importjava.lang.annotation.ElementType; importjava.lang.annotation.Retention; importjava.lang.annotation.RetentionPolicy; importjava.lang.annotation.Target; /** *RUNTIME *定义注解 *编译器将把注释记录在类文件中,在运行时VM将保留注释,因此可以反射性地读取。 *@authorshma1664 * */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public@interfaceRedisLockable{ String[]key()default""; longexpiration()default60; }
importjavax.annotation.Resource; importjava.lang.reflect.Method; importcom.autohome.api.dealer.util.cache.RedisClient; importcom.google.common.base.Joiner; importorg.aspectj.lang.ProceedingJoinPoint; importorg.aspectj.lang.Signature; importorg.aspectj.lang.annotation.Around; importorg.aspectj.lang.annotation.Aspect; importorg.aspectj.lang.annotation.Pointcut; importorg.aspectj.lang.reflect.MethodSignature; importorg.springframework.expression.EvaluationContext; importorg.springframework.expression.Expression; importorg.springframework.expression.ExpressionParser; importorg.springframework.expression.spel.standard.SpelExpressionParser; importorg.springframework.expression.spel.support.StandardEvaluationContext; importorg.springframework.stereotype.Component; /** *CreatedbyIDEA *User:mashaohua *Date:2016-09-2818:08 *Desc: */ @Aspect @Component publicclassRedisLockAop{ @Resource privateRedisClientredisClient; @Pointcut("execution(*com.autohome.api.dealer.tuan.service.*.*(..))") publicvoidpointcut(){} @Around("pointcut()") publicObjectdoAround(ProceedingJoinPointpoint)throwsThrowable{ Signaturesignature=point.getSignature(); MethodSignaturemethodSignature=(MethodSignature)signature; Methodmethod=methodSignature.getMethod(); StringtargetName=point.getTarget().getClass().getName(); StringmethodName=point.getSignature().getName(); Object[]arguments=point.getArgs(); if(method!=null&&method.isAnnotationPresent(RedisLockable.class)){ RedisLockableredisLock=method.getAnnotation(RedisLockable.class); longexpire=redisLock.expiration(); StringredisKey=getLockKey(targetName,methodName,redisLock.key(),arguments); booleanisLock=RedisLockUtil.lock2(redisKey,expire); if(!isLock){ try{ returnpoint.proceed(); }finally{ unLock2(redisKey); } }else{ thrownewRuntimeException("您的操作太频繁,请稍后再试"); } } returnpoint.proceed(); } privateStringgetLockKey(StringtargetName,StringmethodName,String[]keys,Object[]arguments){ StringBuildersb=newStringBuilder(); sb.append("lock.").append(targetName).append(".").append(methodName); if(keys!=null){ StringkeyStr=Joiner.on(".").skipNulls().join(keys); String[]parameters=ReflectParamNames.getNames(targetName,methodName); ExpressionParserparser=newSpelExpressionParser(); Expressionexpression=parser.parseExpression(keyStr); EvaluationContextcontext=newStandardEvaluationContext(); intlength=parameters.length; if(length>0){ for(inti=0;i<length;i++){ context.setVariable(parameters[i],arguments[i]); } } StringkeysValue=expression.getValue(context,String.class); sb.append("#").append(keysValue); } returnsb.toString(); }
<!--https://mvnrepository.com/artifact/javassist/javassist--> <dependency> <groupId>org.javassist</groupId> <artifactId>javassist</artifactId> <version>3.18.1-GA</version> </dependency>
importjavassist.*; importjavassist.bytecode.CodeAttribute; importjavassist.bytecode.LocalVariableAttribute; importjavassist.bytecode.MethodInfo; importorg.apache.log4j.Logger; /** *CreatedbyIDEA *User:mashaohua *Date:2016-09-2818:39 *Desc: */ publicclassReflectParamNames{ privatestaticLoggerlog=Logger.getLogger(ReflectParamNames.class); privatestaticClassPoolpool=ClassPool.getDefault(); static{ ClassClassPathclassPath=newClassClassPath(ReflectParamNames.class); pool.insertClassPath(classPath); } publicstaticString[]getNames(StringclassName,StringmethodName){ CtClasscc=null; try{ cc=pool.get(className); CtMethodcm=cc.getDeclaredMethod(methodName); //使用javaassist的反射方法获取方法的参数名 MethodInfomethodInfo=cm.getMethodInfo(); CodeAttributecodeAttribute=methodInfo.getCodeAttribute(); LocalVariableAttributeattr=(LocalVariableAttribute)codeAttribute.getAttribute(LocalVariableAttribute.tag); if(attr==null)returnnewString[0]; intbegin=0; String[]paramNames=newString[cm.getParameterTypes().length]; intcount=0; intpos=Modifier.isStatic(cm.getModifiers())?0:1; for(inti=0;i<attr.tableLength();i++){ //为什么加这个判断,发现在windows跟linux执行时,参数顺序不一致,通过观察,实际的参数是从this后面开始的 if(attr.variableName(i).equals("this")){ begin=i; break; } } for(inti=begin+1;i<=begin+paramNames.length;i++){ paramNames[count]=attr.variableName(i); count++; } returnparamNames; }catch(Exceptione){ e.printStackTrace(); }finally{ try{ if(cc!=null)cc.detach(); }catch(Exceptione2){ log.error(e2.getMessage()); } } returnnewString[0]; } }
在需要使用分布式锁的地方添加注解
/** *抽奖接口 *添加redis分布式锁保证一个订单只有一个请求处理,防止用户刷礼物,支持SpEL表达式 *redisLockKey:lock.com.autohome.api.dealer.tuan.service.impl.drawBonus#orderId *@paramorderId订单id *@return抽中的奖品信息 */ @RedisLockable(key={"#orderId"},expiration=120) @Override publicBonusConvertBeandrawBonus(IntegerorderId)throwsBonusException{ //业务逻辑 }
第三种方案:基于Zookeeper的分布式锁
利用节点名称的唯一性来实现独占锁
ZooKeeper机制规定同一个目录下只能有一个唯一的文件名,zookeeper上的一个znode看作是一把锁,通过createznode的方式来实现。所有客户端都去创建/lock/${lock_name}_lock节点,最终成功创建的那个客户端也即拥有了这把锁,创建失败的可以选择监听继续等待,还是放弃抛出异常实现独占锁。
packagecom.shma.example.zookeeper.lock;
importjava.io.IOException; importjava.util.ArrayList; importjava.util.Collections; importjava.util.List; importjava.util.concurrent.CountDownLatch; importjava.util.concurrent.TimeUnit; importjava.util.concurrent.locks.Condition; importjava.util.concurrent.locks.Lock; importorg.apache.zookeeper.*; importorg.apache.zookeeper.data.Stat; /** *CreatedbyIDEA *User:mashaohua *Date:2016-09-3016:09 *Desc: */ publicclassZookeeperLockimplementsLock,Watcher{ privateZooKeeperzk; privateStringroot="/locks";//根 privateStringlockName;//竞争资源的标志 privateStringmyZnode;//当前锁 privateintsessionTimeout=30000; privateList<Exception>exception=newArrayList<Exception>(); /** *创建分布式锁,使用前请确认config配置的zookeeper服务可用 *@paramconfig127.0.0.1:2181 *@paramlockName竞争资源标志,lockName中不能包含单词lock */ publicZookeeperLock(Stringconfig,StringlockName){ this.lockName=lockName; //创建一个与服务器的连接 try{ zk=newZooKeeper(config,sessionTimeout,this); Statstat=zk.exists(root,false); if(stat==null){ //创建根节点 zk.create(root,newbyte[0],ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } }catch(IOExceptione){ exception.add(e); }catch(KeeperExceptione){ exception.add(e); }catch(InterruptedExceptione){ exception.add(e); } } @Override publicvoidlock(){ if(exception.size()>0){ thrownewLockException(exception.get(0)); } if(!tryLock()){ thrownewLockException("您的操作太频繁,请稍后再试"); } } @Override publicvoidlockInterruptibly()throwsInterruptedException{ this.lock(); } @Override publicbooleantryLock(){ try{ myZnode=zk.create(root+"/"+lockName,newbyte[0],ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); returntrue; }catch(KeeperExceptione){ e.printStackTrace(); }catch(InterruptedExceptione){ e.printStackTrace(); } returnfalse; } @Override publicbooleantryLock(longtime,TimeUnitunit)throwsInterruptedException{ returntryLock(); } @Override publicvoidunlock(){ try{ zk.delete(myZnode,-1); myZnode=null; zk.close(); }catch(InterruptedExceptione){ e.printStackTrace(); }catch(KeeperExceptione){ e.printStackTrace(); } } @Override publicConditionnewCondition(){ returnnull; } @Override publicvoidprocess(WatchedEventwatchedEvent){ // } }
ZookeeperLocklock=null; try{ lock=newZookeeperLock("127.0.0.1:2182","test1"); lock.lock(); //业务逻辑处理 }catch(LockExceptione){ throwe; }finally{ if(lock!=null) lock.unlock(); }
利用临时顺序节点控制时序实现
/lock已经预先存在,所有客户端在它下面创建临时顺序编号目录节点,和选master一样,编号最小的获得锁,用完删除,依次方便。\
算法思路:对于加锁操作,可以让所有客户端都去/lock目录下创建临时顺序节点,如果创建的客户端发现自身创建节点序列号是/lock/目录下最小的节点,则获得锁。否则,监视比自己创建节点的序列号小的节点(比自己创建的节点小的最大节点),进入等待。
对于解锁操作,只需要将自身创建的节点删除即可。
packagecom.shma.example.zookeeper.lock; importjava.io.IOException; importjava.util.ArrayList; importjava.util.Collections; importjava.util.List; importjava.util.concurrent.CountDownLatch; importjava.util.concurrent.TimeUnit; importjava.util.concurrent.locks.Condition; importjava.util.concurrent.locks.Lock; importorg.apache.zookeeper.CreateMode; importorg.apache.zookeeper.KeeperException; importorg.apache.zookeeper.WatchedEvent; importorg.apache.zookeeper.Watcher; importorg.apache.zookeeper.ZooDefs; importorg.apache.zookeeper.ZooKeeper; importorg.apache.zookeeper.data.Stat; /** *CreatedbyIDEA *User:mashaohua *Date:2016-09-3016:09 *Desc: */ publicclassDistributedLockimplementsLock,Watcher{ privateZooKeeperzk; privateStringroot="/locks";//根 privateStringlockName;//竞争资源的标志 privateStringwaitNode;//等待前一个锁 privateStringmyZnode;//当前锁 privateCountDownLatchlatch;//计数器 privateintsessionTimeout=30000; privateList<Exception>exception=newArrayList<Exception>(); /** *创建分布式锁,使用前请确认config配置的zookeeper服务可用 *@paramconfig127.0.0.1:2181 *@paramlockName竞争资源标志,lockName中不能包含单词lock */ publicDistributedLock(Stringconfig,StringlockName){ this.lockName=lockName; //创建一个与服务器的连接 try{ zk=newZooKeeper(config,sessionTimeout,this); Statstat=zk.exists(root,false); if(stat==null){ //创建根节点 zk.create(root,newbyte[0],ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } }catch(IOExceptione){ exception.add(e); }catch(KeeperExceptione){ exception.add(e); }catch(InterruptedExceptione){ exception.add(e); } } /** *zookeeper节点的监视器 */ publicvoidprocess(WatchedEventevent){ if(this.latch!=null){ this.latch.countDown(); } } publicvoidlock(){ if(exception.size()>0){ thrownewLockException(exception.get(0)); } try{ if(this.tryLock()){ System.out.println("Thread"+Thread.currentThread().getId()+""+myZnode+"getlocktrue"); return; } else{ waitForLock(waitNode,sessionTimeout);//等待锁 } }catch(KeeperExceptione){ thrownewLockException(e); }catch(InterruptedExceptione){ thrownewLockException(e); } } publicbooleantryLock(){ try{ StringsplitStr="_lock_"; if(lockName.contains(splitStr)) thrownewLockException("lockNamecannotcontains\\u000B"); //创建临时子节点 myZnode=zk.create(root+"/"+lockName+splitStr,newbyte[0],ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(myZnode+"iscreated"); //取出所有子节点 List<String>subNodes=zk.getChildren(root,false); //取出所有lockName的锁 List<String>lockObjNodes=newArrayList<String>(); for(Stringnode:subNodes){ String_node=node.split(splitStr)[0]; if(_node.equals(lockName)){ lockObjNodes.add(node); } } Collections.sort(lockObjNodes); System.out.println(myZnode+"=="+lockObjNodes.get(0)); if(myZnode.equals(root+"/"+lockObjNodes.get(0))){ //如果是最小的节点,则表示取得锁 returntrue; } //如果不是最小的节点,找到比自己小1的节点 StringsubMyZnode=myZnode.substring(myZnode.lastIndexOf("/")+1); waitNode=lockObjNodes.get(Collections.binarySearch(lockObjNodes,subMyZnode)-1); }catch(KeeperExceptione){ thrownewLockException(e); }catch(InterruptedExceptione){ thrownewLockException(e); } returnfalse; } publicbooleantryLock(longtime,TimeUnitunit){ try{ if(this.tryLock()){ returntrue; } returnwaitForLock(waitNode,time); }catch(Exceptione){ e.printStackTrace(); } returnfalse; } privatebooleanwaitForLock(Stringlower,longwaitTime)throwsInterruptedException,KeeperException{ Statstat=zk.exists(root+"/"+lower,true); //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听 if(stat!=null){ System.out.println("Thread"+Thread.currentThread().getId()+"waitingfor"+root+"/"+lower); this.latch=newCountDownLatch(1); this.latch.await(waitTime,TimeUnit.MILLISECONDS); this.latch=null; } returntrue; } publicvoidunlock(){ try{ System.out.println("unlock"+myZnode); zk.delete(myZnode,-1); myZnode=null; zk.close(); }catch(InterruptedExceptione){ e.printStackTrace(); }catch(KeeperExceptione){ e.printStackTrace(); } } publicvoidlockInterruptibly()throwsInterruptedException{ this.lock(); } publicConditionnewCondition(){ returnnull; } publicclassLockExceptionextendsRuntimeException{ privatestaticfinallongserialVersionUID=1L; publicLockException(Stringe){ super(e); } publicLockException(Exceptione){ super(e); } } }
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,同时也希望多多支持毛票票!