go语言同步教程之条件变量
Go的标准库中有一个类型叫条件变量:sync.Cond。这种类型与互斥锁和读写锁不同,它不是开箱即用的,它需要与互斥锁组合使用:
//NewCondreturnsanewCondwithLockerl. funcNewCond(lLocker)*Cond{ return&Cond{L:l} } //ALockerrepresentsanobjectthatcanbelockedandunlocked. typeLockerinterface{ Lock() Unlock() }
通过使用NewCond函数可以返回*sync.Cond类型的结果,*sync.Cond我们主要操作其三个方法,分别是:
wait():等待通知
Signal():单播通知
Broadcast():广播通知
具体的函数说明如下:
//Waitatomicallyunlocksc.Landsuspendsexecution //ofthecallinggoroutine.Afterlaterresumingexecution, //Waitlocksc.Lbeforereturning.Unlikeinothersystems, //WaitcannotreturnunlessawokenbyBroadcastorSignal. // //Becausec.LisnotlockedwhenWaitfirstresumes,thecaller //typicallycannotassumethattheconditionistruewhen //Waitreturns.Instead,thecallershouldWaitinaloop: // //c.L.Lock() //for!condition(){ //c.Wait() //} //...makeuseofcondition... //c.L.Unlock() // func(c*Cond)Wait(){ c.checker.check() t:=runtime_notifyListAdd(&c.notify) c.L.Unlock() runtime_notifyListWait(&c.notify,t) c.L.Lock() } //Signalwakesonegoroutinewaitingonc,ifthereisany. // //Itisallowedbutnotrequiredforthecallertoholdc.L //duringthecall. func(c*Cond)Signal(){ c.checker.check() runtime_notifyListNotifyOne(&c.notify) } //Broadcastwakesallgoroutineswaitingonc. // //Itisallowedbutnotrequiredforthecallertoholdc.L //duringthecall. func(c*Cond)Broadcast(){ c.checker.check() runtime_notifyListNotifyAll(&c.notify) }
条件变量sync.Cond本质上是一些正在等待某个条件的线程的同步机制。
sync.Cond主要实现一个条件变量,假如goroutineA执行前需要等待另外的goroutineB的通知,那边处于等待的goroutineA会保存在一个通知列表,也就是说需要某种变量状态的goroutineA将会等待/Wait在那里,当某个时刻状态改变时负责通知的goroutineB通过对条件变量通知的方式(Broadcast,Signal)来通知处于等待条件变量的goroutineA,这样便可首先一种“消息通知”的同步机制。
以go的http处理为例,在Go的源码中http模块server部分源码中所示,当需要处理一个新的连接的时候,若连接conn是实现自*tls.Conn的情况下,会进行相关的客户端与服务端的“握手”处理Handshake(),入口代码如下:
iftlsConn,ok:=c.rwc.(*tls.Conn);ok{ ifd:=c.server.ReadTimeout;d!=0{ c.rwc.SetReadDeadline(time.Now().Add(d)) } ifd:=c.server.WriteTimeout;d!=0{ c.rwc.SetWriteDeadline(time.Now().Add(d)) } iferr:=tlsConn.Handshake();err!=nil{ c.server.logf("http:TLShandshakeerrorfrom%s:%v",c.rwc.RemoteAddr(),err) return } c.tlsState=new(tls.ConnectionState) *c.tlsState=tlsConn.ConnectionState() ifproto:=c.tlsState.NegotiatedProtocol;validNPN(proto){ iffn:=c.server.TLSNextProto[proto];fn!=nil{ h:=initNPNRequest{tlsConn,serverHandler{c.server}} fn(c.server,tlsConn,h) } return } }
其中的Handshake函数代码通过使用条件变量的方式来处理新连接握手调用的同步问题:
func(c*Conn)Handshake()error{ c.handshakeMutex.Lock() deferc.handshakeMutex.Unlock() for{ iferr:=c.handshakeErr;err!=nil{ returnerr } ifc.handshakeComplete{ returnnil } ifc.handshakeCond==nil{ break } c.handshakeCond.Wait() } c.handshakeCond=sync.NewCond(&c.handshakeMutex) c.handshakeMutex.Unlock() c.in.Lock() deferc.in.Unlock() c.handshakeMutex.Lock() ifc.handshakeErr!=nil||c.handshakeComplete{ panic("handshakeshouldnothavebeenabletocompleteafterhandshakeCondwasset") } ifc.isClient{ c.handshakeErr=c.clientHandshake() }else{ c.handshakeErr=c.serverHandshake() } ifc.handshakeErr==nil{ c.handshakes++ }else{ c.flush() } ifc.handshakeErr==nil&&!c.handshakeComplete{ panic("handshakeshouldhavehadaresult.") } c.handshakeCond.Broadcast() c.handshakeCond=nil returnc.hand
我们也可以再通过一个例子熟悉sync.Cond的使用:
我们尝试实现一个读写同步的例子,需求是:我们有数个读取器和数个写入器,读取器必须依赖写入器对缓存区进行数据写入后,才可从缓存区中对数据进行读出。我们思考下,要实现类似的功能,除了使用channel,还能如何做?
写入器每次完成写入数据后,它都需要某种通知机制广播给处于阻塞状态的读取器,告诉它们可以对数据进行访问,这其实跟sync.Cond的广播机制是不是很像?有了这个广播机制,我们可以通过sync.Cond来实现这个例子了:
packagemain import( "bytes" "fmt" "io" "sync" "time" ) typeMyDataBucketstruct{ br*bytes.Buffer gmutex*sync.RWMutex rcond*sync.Cond//读操作需要用到的条件变量 } funcNewDataBucket()*MyDataBucket{ buf:=make([]byte,0) db:=&MyDataBucket{ br:bytes.NewBuffer(buf), gmutex:new(sync.RWMutex), } db.rcond=sync.NewCond(db.gmutex.RLocker()) returndb } func(db*MyDataBucket)Read(iint){ db.gmutex.RLock() deferdb.gmutex.RUnlock() vardata[]byte vardbyte varerrerror for{ //读取一个字节 ifd,err=db.br.ReadByte();err!=nil{ iferr==io.EOF{ ifstring(data)!=""{ fmt.Printf("reader-%d:%s\n",i,data) } db.rcond.Wait() data=data[:0] continue } } data=append(data,d) } } func(db*MyDataBucket)Put(d[]byte)(int,error){ db.gmutex.Lock() deferdb.gmutex.Unlock() //写入一个数据块 n,err:=db.br.Write(d) db.rcond.Broadcast() returnn,err } funcmain(){ db:=NewDataBucket() godb.Read(1) godb.Read(2) fori:=0;i<10;i++{ gofunc(iint){ d:=fmt.Sprintf("data-%d",i) db.Put([]byte(d)) }(i) time.Sleep(100*time.Millisecond) } }
当使用sync.Cond的时候有两点移动要注意的:
- 一定要在调用cond.Wait方法前,锁定与之关联的读写锁
- 一定不要忘记在cond.Wait后,若数据已经处理完毕,在返回前要对与之关联的读写锁进行解锁。
如下面Wait()的源码所示,Cond.Wait会自动释放锁等待信号的到来,当信号到来后,第一个获取到信号的Wait将继续往下执行并从新上锁
func(c*Cond)Wait(){ c.checker.check() t:=runtime_notifyListAdd(&c.notify) c.L.Unlock() runtime_notifyListWait(&c.notify,t) c.L.Lock() }
如果不释放锁,其它收到信号的gouroutine将阻塞无法继续执行。
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对毛票票的支持。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。