使用Golang的singleflight防止缓存击穿的方法
在使用缓存时,容易发生缓存击穿。
缓存击穿:一个存在的key,在缓存过期的瞬间,同时有大量的请求过来,造成所有请求都去读dB,这些请求都会击穿到DB,造成瞬时DB请求量大、压力骤增。
singleflight
介绍
import"golang.org/x/sync/singleflight"
singleflight类的使用方法就新建一个singleflight.Group,使用其方法Do或者DoChan来包装方法,被包装的方法在对于同一个key,只会有一个协程执行,其他协程等待那个协程执行结束后,拿到同样的结果。
Group结构体
代表一类工作,同一个group中,同样的key同时只能被执行一次。
Do方法
func(g*Group)Do(keystring,fnfunc()(interface{},error))(vinterface{},errerror,sharedbool)
key:同一个key,同时只有一个协程执行。
fn:被包装的函数。
v:返回值,即执行的结果。其他等待的协程都会拿到。
shared:表示是否有其他协程得到了这个结果v。
DoChan方法
func(g*Group)DoChan(keystring,fnfunc()(interface{},error))<-chanResult
与Do方法一样,只是返回的是一个channel,执行结果会发送到channel中,其他等待的协程都可以从channel中拿到结果。
ref:https://godoc.org/golang.org/x/sync/singleflight
示例
使用Do方法来模拟,解决缓存击穿的问题
funcmain(){ varsingleSetCachesingleflight.Group getAndSetCache:=func(requestIDint,cacheKeystring)(string,error){ log.Printf("request%vstarttogetandsetcache...",requestID) value,_,_:=singleSetCache.Do(cacheKey,func()(retinterface{},errerror){//do的入参key,可以直接使用缓存的key,这样同一个缓存,只有一个协程会去读DB log.Printf("request%vissettingcache...",requestID) time.Sleep(3*time._Second_) log.Printf("request%vsetcachesuccess!",requestID) return"VALUE",nil }) returnvalue.(string),nil } cacheKey:="cacheKey" fori:=1;i<10;i++{//模拟多个协程同时请求 gofunc(requestIDint){ value,_:=getAndSetCache(requestID,cacheKey) log.Printf("request%vgetvalue:%v",requestID,value) }(i) } time.Sleep(20*time._Second_) }
输出:
2020/04/1218:18:40request4start to get and setcache...
2020/04/1218:18:40request4issettingcache...
2020/04/1218:18:40request2start to get and setcache...
2020/04/1218:18:40request7start to get and setcache...
2020/04/1218:18:40request5start to get and setcache...
2020/04/1218:18:40request1start to get and setcache...
2020/04/1218:18:40request6start to get and setcache...
2020/04/1218:18:40request3start to get and setcache...
2020/04/1218:18:40request8start to get and setcache...
2020/04/1218:18:40request9start to get and setcache...
2020/04/1218:18:43request4set cache success!
2020/04/1218:18:43request4getvalue:VALUE
2020/04/1218:18:43request9getvalue:VALUE
2020/04/1218:18:43request6getvalue:VALUE
2020/04/1218:18:43request3getvalue:VALUE
2020/04/1218:18:43request8getvalue:VALUE
2020/04/1218:18:43request1getvalue:VALUE
2020/04/1218:18:43request5getvalue:VALUE
2020/04/1218:18:43request2getvalue:VALUE
2020/04/1218:18:43request7getvalue:VALUE`
可以看到确实只有一个协程执行了被包装的函数,并且其他协程都拿到了结果。
源码分析
看一下这个Do方法是怎么实现的。
首先看一下Group的结构:
typeGroupstruct{ musync.Mutex mmap[string]*call//保存key对应的函数执行过程和结果的变量。 }
Group的结构非常简单,一个锁来保证并发安全,另一个map用来保存key对应的函数执行过程和结果的变量。
看下call的结构:
typecallstruct{ wgsync.WaitGroup//用WaitGroup实现只有一个协程执行函数 valinterface{}//函数执行结果 errerror forgottenbool dupsint//含义是duplications,即同时执行同一个key的协程数量 chans[]chan<-Result }
看下Do方法
func(g*Group)Do(keystring,fnfunc()(interface{},error))(vinterface{},errerror,sharedbool){ g.mu.Lock()//写Group的m字段时,加锁保证写安全。 ifg.m==nil{ g.m=make(map[string]*call) } ifc,ok:=g.m[key];ok{//如果key已经存在,说明已经有协程在执行,则dups++,并等待其执行完毕后,返回其执行结果,执行结果保存在对应的call的val字段里 c.dups++ g.mu.Unlock() c.wg.Wait() returnc.val,c.err,true } //如果key不存在,则新建一个call,并使用WaitGroup来阻塞其他协程,同时在m字段里写入key和对应的call c:=new(call) c.wg.Add(1) g.m[key]=c g.mu.Unlock() g.doCall(c,key,fn)//第一个进来的协程来执行这个函数 returnc.val,c.err,c.dups>0 }
继续看下g.doCall里具体干了什么
func(g*Group)doCall(c*call,keystring,fnfunc()(interface{},error)){ c.val,c.err=fn()//执行被包装的函数 c.wg.Done()//执行完毕后,就可以通知其他协程可以拿结果了 g.mu.Lock() if!c.forgotten{//其实这里是为了保证执行完毕之后,对应的key被删除,Group有一个方法Forget(keystring),可以用来主动删除key,这里是判断那个方法是否被调用过,被调用过则字段forgotten会置为true,如果没有被调用过,则在这里把key删除。 delete(g.m,key) } for_,ch:=rangec.chans{//将执行结果发送到channel里,这里是给DoChan方法使用的 ch<-Result{c.val,c.err,c.dups>0} } g.mu.Unlock() }
由此看来,其实现是非常简单的。不得不赞叹一百来行代码就实现了功能。
其他
顺便附上DoChan方法的使用示例:
funcmain(){ varsingleSetCachesingleflight.Group getAndSetCache:=func(requestIDint,cacheKeystring)(string,error){ log.Printf("request%vstarttogetandsetcache...",requestID) retChan:=singleSetCache.DoChan(cacheKey,func()(retinterface{},errerror){ log.Printf("request%vissettingcache...",requestID) time.Sleep(3*time._Second_) log.Printf("request%vsetcachesuccess!",requestID) return"VALUE",nil }) varretsingleflight.Result timeout:=time.After(5*time._Second_) select{//加入了超时机制 case<-timeout: log.Printf("timeout!") return"",errors.New("timeout") caseret=<-retChan://从chan中取出结果 returnret.Val.(string),ret.Err } return"",nil } cacheKey:="cacheKey" fori:=1;i<10;i++{ gofunc(requestIDint){ value,_:=getAndSetCache(requestID,cacheKey) log.Printf("request%vgetvalue:%v",requestID,value) }(i) } time.Sleep(20*time._Second_) }
看下DoChan的源码
func(g*Group)DoChan(keystring,fnfunc()(interface{},error))<-chanResult{ ch:=make(chanResult,1) g.mu.Lock() ifg.m==nil{ g.m=make(map[string]*call) } ifc,ok:=g.m[key];ok{ c.dups++ c.chans=append(c.chans,ch)//可以看到,每个等待的协程,都有一个结果channel。从之前的g.doCall里也可以看到,每个channel都给塞了结果。为什么不所有协程共用一个channel?因为那样就得在channel里塞至少与协程数量一样的结果数量,但是你却无法保证用户一个协程只读取一次。 g.mu.Unlock() returnch } c:=&call{chans:[]chan<-Result{ch}} c.wg.Add(1) g.m[key]=c g.mu.Unlock() gog.doCall(c,key,fn) returnch }
到此这篇关于使用Golang的singleflight防止缓存击穿的方法的文章就介绍到这了,更多相关Golangsingleflight防止缓存击穿内容请搜索毛票票以前的文章或继续浏览下面的相关文章希望大家以后多多支持毛票票!