Redis实现分布式锁和等待序列的方法示例
在集群下,经常会因为同时处理发生资源争抢和并发问题,但是我们都知道同步锁 synchronized、 cas、 ReentrankLock这些锁的作用范围都是 JVM,说白了在集群下没啥用。这时我们就需要能在多台 JVM之间决定执行顺序的锁了,现在分布式锁主要有 redis、 Zookeeper实现的,还有数据库的方式,不过性能太差,也就是需要一个第三方的监管。
背景
最近在做一个消费 Kafka消息的时候发现,由于线上的消费者过多,经常会遇到,多个机器同时处理一个主键类型的数据的情况发生,如果最后是执行更新操作的话,也就是一个更新顺序的问题,但是如果恰好都需要插入数据的时候,会出现主键重复的问题。这是生产上不被允许的(因为公司有异常监管的机制,扣分啥的),这是就需要个分布式锁了,斟酌后用了 Redis的实现方式(因为网上例子多)
分析
redis实现的分布式锁,实现原理是 set方法,因为多个线程同时请求的时候,只有一个线程可以成功并返回结果,还可以设置有效期,来避免死锁的发生,一切都是这么的完美,不过有个问题,在 set的时候,会直接返回结果,成功或者失败,不具有阻塞效果,需要我们自己对失败的线程进程处理,有两种方式
- 丢弃
- 等待重试由于我们的系统需要这些数据,那么只能重新尝试获取。这里使用 redis的 List类型实现等待序列的作用
代码
直接上代码其实直接redis的工具类就可以解决了
packagecom.test
importredis.clients.jedis.Jedis;
importjava.util.Collections;
importjava.util.List;
/**
*@descredis队列实现方式
*@anthor
*@date
**/
publicclassRedisUcUitl{
privatestaticfinalStringLOCK_SUCCESS="OK";
privatestaticfinalStringSET_IF_NOT_EXIST="NX";
privatestaticfinalStringSET_WITH_EXPIRE_TIME="PX";
privatestaticfinalLongRELEASE_SUCCESS=1L;
privateRedisUcUitl(){
}
/**
*logger
**/
/**
*存储redis队列顺序存储在队列首部存入
*
*@paramkey字节类型
*@paramvalue字节类型
*/
publicstaticLonglpush(Jedisjedis,finalbyte[]key,finalbyte[]value){
returnjedis.lpush(key,value);
}
/**
*移除列表中最后一个元素并将改元素添加入另一个列表中,当列表为空时将阻塞连接直到等待超时
*
*@paramsrckey
*@paramdstkey
*@paramtimeout0表示永不超时
*@return
*/
publicstaticbyte[]brpoplpush(Jedisjedis,finalbyte[]srckey,finalbyte[]dstkey,finalinttimeout){
returnjedis.brpoplpush(srckey,dstkey,timeout);
}
/**
*返回制定的key,起始位置的redis数据
*@paramredisKey
*@paramstart
*@paramend-1表示到最后
*@return
*/
publicstaticListlrange(Jedisjedis,finalbyte[]redisKey,finallongstart,finallongend){
returnjedis.lrange(redisKey,start,end);
}
/**
*删除key
*@paramredisKey
*/
publicstaticvoiddelete(Jedisjedis,finalbyte[]redisKey){
returnjedis.del(redisKey);
}
/**
*尝试加锁
*@paramlockKeykey名称
*@paramrequestId身份标识
*@paramexpireTime过期时间
*@return
*/
publicstaticbooleantryGetDistributedLock(Jedisjedis,finalStringlockKey,finalStringrequestId,finalintexpireTime){
Stringresult=jedis.set(lockKey,requestId,SET_IF_NOT_EXIST,SET_WITH_EXPIRE_TIME,expireTime);
returnLOCK_SUCCESS.equals(result);
}
/**
*释放锁
*@paramlockKeykey名称
*@paramrequestId身份标识
*@return
*/
publicstaticbooleanreleaseDistributedLock(Jedisjedis,finalStringlockKey,finalStringrequestId){
finalStringscript="ifredis.call('get',KEYS[1])==ARGV[1]thenreturnredis.call('del',KEYS[1])elsereturn0end";
jedis.eval(script,Collections.singletonList(lockKey),Collections.singletonList(requestId));
returnRELEASE_SUCCESS.equals(result);
}
}
业务逻辑主要代码如下
1.先消耗队列中的
while(true){
//消费队列
try{
//被放入redis队列的数据序列化后的
byte[]bytes=RedisUcUitl.brpoplpush(keyStr.getBytes(UTF_8),dstKeyStr.getBytes(UTF_8),1);
if(bytes==null||bytes.isEmpty()){
//队列中没数据时退出
break;
}
//反序列化对象
MapsingleMap=(Map)ObjectSerialUtil.bytesToObject(bytes);
//塞入唯一的值防止被其他线程误解锁
StringrequestId=UUID.randomUUID().toString();
booleanlockGetFlag=RedisUcUitl.tryGetDistributedLock(keyStr,requestId,100);
if(lockGetFlag){
//成功获取锁进行业务处理
//TODO
//处理完毕释放锁
booleanfreeLock=RedisUcUitl.releaseDistributedLock(keyStr,requestId);
}else{
//未能获得锁放入等待队列
RedisUcUitl.lpush(keyStr.getBytes(UTF_8),ObjectSerialUtil.objectToBytes(param));
}
}catch(Exceptione){
break;
}
}
2.处理最新接到的数据
同样是走尝试获取锁,获取不到放入队列的流程
一般序列化用 fastJson之列的就可以了,这里用的是 JDK自带的,工具类如下
publicclassObjectSerialUtil{
privateObjectSerialUtil(){
//工具类
}
/**
*将Object对象序列化为byte[]
*
*@paramobj对象
*@returnbyte数组
*@throwsException
*/
publicstaticbyte[]objectToBytes(Objectobj)throwsIOException{
ByteArrayOutputStreambos=newByteArrayOutputStream();
ObjectOutputStreamoos=newObjectOutputStream(bos);
oos.writeObject(obj);
byte[]bytes=bos.toByteArray();
bos.close();
oos.close();
returnbytes;
}
/**
*将bytes数组还原为对象
*
*@parambytes
*@return
*@throwsException
*/
publicstaticObjectbytesToObject(byte[]bytes){
try{
ByteArrayInputStreambin=newByteArrayInputStream(bytes);
ObjectInputStreamois=newObjectInputStream(bin);
returnois.readObject();
}catch(Exceptione){
thrownewBaseException("反序列化出错!",e);
}
}
}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。