golang模拟实现带超时的信号量示例代码
前言
最近在写项目,需要用到信号量等待一些资源完成,但是最多等待N毫秒。在看本文的正文之前,我们先来看下C语言里的实现方法。
在C语言里,有如下的API来实现带超时的信号量等待:
SYNOPSIS #includeint pthread_cond_timedwait(pthread_cond_t*cond,pthread_mutex_t*mutex,conststructtimespec*abstime);
然后在查看golang的document后,发现golang里并没有实现带超时的信号量,官方文档在这里。
原理
我的业务场景是这样的:我有一个缓存字典,当多个用户请求1个不存在的key时,只有1个请求会穿透到后端,而所有用户都要排队等这个请求完成,或者超时返回。
怎么实现呢?其实稍微想一想cond的原理,就能模拟一个带超时的cond出来。
在golang里,要同时实现”挂起等待”和”超时返回”,一般得用selectcase语法,一个case等待阻塞的资源,一个case等待一个timer,这一点是非常确定的。
原本阻塞的资源应该通过条件变量的机制来实现完成通知,既然这里决定用selectcase,那么自然想到用channel来代替这个完成通知。
接下来的问题就是,很多请求者并发来获取这个资源,但是资源还没有准备好,所以大家都要排队并挂起,等待资源完成,并且当资源完成后通知大家。
所以,这里很自然要为这个资源做一个队列,每个请求者创建一个chan,并将chan放到队列里,接着selectcase等待这个chan的通知。而另一端,资源完成后遍历队列,通知每个chan即可。
最后一个问题是,只有第一个请求者才能穿透请求到后端,而后续请求者不应该穿透重复的请求,这可以通过判断缓存里是否有这个key作为判定首次的条件,而标记位init来判断请求者是否应该排队。
我的场景
上面是思路,下面是我的业务场景实现。
func(cache*Cache)Get(keystring,keyTypeint)*string{
ifkeyType==KEY_TYPE_DOMAIN{
key="#"+key
}else{
key="="+key
}
cache.mutex.Lock()
item,existed:=cache.dict[key]
if!existed{
item=&cacheItem{}
item.key=&key
item.waitQueue=list.New()
cache.dict[key]=item
}
cache.mutex.Unlock()
conf:=config.GetConfig()
lastGet:=getCurMs()
item.mutex.Lock()
item.lastGet=lastGet
ifitem.init{//已存在并且初始化
deferitem.mutex.Unlock()
returnitem.value
}
//未初始化,排队等待结果
wait:=waitItem{}
wait.wait_chan=make(chan*string,1)
item.waitQueue.PushBack(&wait)
item.mutex.Unlock()
//新增key,启动goroutine获取初始值
if!existed{
gocache.initCacheItem(item,keyType)
}
timer:=time.NewTimer(time.Duration(conf.Cache_waitTime)*time.Millisecond)
varretval*string=nil
//等待初始化完成
select{
caseretval=<-wait.wait_chan:
case<-timer.C:
}
returnretval
}
简述一下整个过程:
- 首先锁字典,如果key不存在,说明我是第一个请求者,我会创建这个key对应的value,只不过init=false表示它正在初始化。最后,释放字典锁。
- 接下来,锁住这个key,判断它已经初始化完成,那么直接返回value。否则,创建一个chan放入waitQueue等待队列。最后,释放key锁。
- 接着,如果当前是第一个请求者,那么会穿透请求到后端(在一个独立的协程里去发起网络调用)。
- 现在,创建一个用于超时的定时器。
- 最后,无论当前是否是key的第一个请求者,还是初始化期间的并发请求者,它们都通过selectcase超时的等待结果完成。
在initCacheItem函数里,数据已获取成功
//一旦标记为init,后续请求将不再操作waitQueue
item.mutex.Lock()
item.value=newValue
item.init=true
item.expire=expire
item.mutex.Unlock()
//唤醒所有排队者
waitQueue:=item.waitQueue
forelem:=waitQueue.Front();elem!=nil;elem=waitQueue.Front(){
wait:=elem.Value.(*waitItem)
wait.wait_chan<-newValue
waitQueue.Remove(elem)
}
- 首先,锁住key,标记init=true,并赋值value,并释放锁。此后的请求,都可以立即返回,无需排队。
- 之后,因为init=true已被标记,此刻再也有没有请求会修改waitQueue,所以无需加锁,直接遍历队列,通知其中的每个chan。
最后
这样就实现了带超时的条件变量效果,实际上我的场景是一个broadcast的cond例子,大家可以参照思路实现自己想要的效果,活学活用。
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对毛票票的支持。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。