Redis上实现分布式锁以提高性能的方案研究
背景:
在很多互联网产品应用中,有些场景需要加锁处理,比如:秒杀,全局递增ID,楼层生成等等。大部分是解决方案基于DB实现的,Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对Redis的连接并不存在竞争关系。
项目实践
任务队列用到分布式锁的情况比较多,在将业务逻辑中可以异步处理的操作放入队列,在其他线程中处理后出队,此时队列中使用了分布式锁,保证入队和出队的一致性。关于redis队列这块的逻辑分析,我将在下一次对其进行总结,此处先略过。
接下来对redis实现的分布式锁的逻辑代码进行详细的分析和理解:
1、为避免特殊原因导致锁无法释放,在加锁成功后,锁会被赋予一个生存时间(通过lock方法的参数设置或者使用默认值),超出生存时间锁将被自动释放.
2、锁的生存时间默认比较短(秒级,具体见lock方法),因此若需要长时间加锁,可以通过expire方法延长锁的生存时间为适当的时间.比如在循环内调用expire
3、系统级的锁当进程无论因为任何原因出现crash,操作系统会自己回收锁,所以不会出现资源丢失。
4、但分布式锁不同。若一次性设置很长的时间,一旦由于各种原因进程crash或其他异常导致unlock未被调用,则该锁在剩下的时间就变成了垃圾锁,导致其他进程或进程重启后无法进入加锁区域。
<?php
require_once'RedisFactory.php';
/**
*在Redis上实现的分布式锁
*/
classRedisLock{
//单例模式
privatestatic$_instance=null;
publicstaticfunctioninstance(){
if(self::$_instance==null){
self::$_instance=newRedisLock();
}
returnself::$_instance;
}
//redis对象变量
private$redis;
//存放被锁的标志名的数组
private$lockedNames=array();
publicfunction__construct(){
//获取一个RedisString实例
$this->redis=RedisFactory::instance()->getString();
}
/**
*加锁
*
*@paramstring锁的标识名
*@paramint获取锁失败时的等待超时时间(秒),在此时间之内会一直尝试获取锁直到超时.为0表示失败后直接返回不等待
*@paramint当前锁的最大生存时间(秒),必须大于0.如果超过生存时间后锁仍未被释放,则系统会自动将其强制释放
*@paramint获取锁失败后挂起再试的时间间隔(微秒)
*/
publicfunctionlock($name,$timeout=0,$expire=15,$waitIntervalUs=100000){
if(empty($name))returnfalse;
$timeout=(int)$timeout;
$expire=max((int)$expire,5);
$now=microtime(true);
$timeoutAt=$now+$timeout;
$expireAt=$now+$expire;
$redisKey="Lock:$name";
while(true){
$result=$this->redis->setnx($redisKey,(string)$expireAt);
if($result!==false){
//对$redisKey设置生存时间
$this->redis->expire($redisKey,$expire);
//将最大生存时刻记录在一个数组里面
$this->lockedNames[$name]=$expireAt;
returntrue;
}
//以秒为单位,返回$redisKey的剩余生存时间
$ttl=$this->redis->ttl($redisKey);
//TTL小于0表示key上没有设置生存时间(key不会不存在,因为前面setnx会自动创建)
//如果出现这种情况,那就是进程在某个实例setnx成功后crash导致紧跟着的expire没有被调用.这时可以直接设置expire并把锁纳为己用
if($ttl<0){
$this->redis->set($redisKey,(string)$expireAt,$expire);
$this->lockedNames[$name]=$expireAt;
returntrue;
}
//设置了不等待或者已超时
if($timeout<=0||microtime(true)>$timeoutAt)break;
//挂起一段时间再试
usleep($waitIntervalUs);
}
returnfalse;
}
/**
*给当前锁增加指定的生存时间(秒),必须大于0
*
*@paramstring锁的标识名
*@paramint生存时间(秒),必须大于0
*/
publicfunctionexpire($name,$expire){
if($this->isLocking($name)){
if($this->redis->expire("Lock:$name",max($expire,1))){
returntrue;
}
}
returnfalse;
}
/**
*判断当前是否拥有指定名称的锁
*
*@parammixed$name
*/
publicfunctionisLocking($name){
if(isset($this->lockedNames[$name])){
return(string)$this->lockedNames[$name]==(string)$this->redis->get("Lock:$name");
}
returnfalse;
}
/**
*释放锁
*
*@paramstring锁的标识名
*/
publicfunctionunlock($name){
if($this->isLocking($name)){
if($this->redis->deleteKey("Lock:$name")){
unset($this->lockedNames[$name]);
returntrue;
}
}
returnfalse;
}
/**释放当前已经获取到的所有锁*/
publicfunctionunlockAll(){
$allSuccess=true;
foreach($this->lockedNamesas$name=>$item){
if(false===$this->unlock($name)){
$allSuccess=false;
}
}
return$allSuccess;
}
}
此类很多代码都写上了注释,只要认真理解下,就很容易懂得如何在redis实现分布式锁了。