golang将多路复异步io转成阻塞io的方法详解
前言
本文主要给大家介绍了关于golang如何将多路复异步io转变成阻塞io的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍:
packagemain
import(
"net"
)
funchandleConnection(cnet.Conn){
//读写数据
buffer:=make([]byte,1024)
c.Read(buffer)
c.Write([]byte("Hellofromserver"))
}
funcmain(){
l,err:=net.Listen("tcp","host:port")
iferr!=nil{
return
}
deferl.Close()
for{
c,err:=l.Accept()
iferr!=nil{
return
}
gohandleConnection(c)
}
}
对于我们都会写上面的代码,很简单,的确golang的网络部分对于我们隐藏了太多东西,我们不用像c++一样去调用底层的socket函数,也不用去使用epoll等复杂的io多路复用相关的逻辑,但是上面的代码真的就像我们看起来的那样在调用accept和read时阻塞吗?
//MultiplegoroutinesmayinvokemethodsonaConnsimultaneously.
//官方注释:多个goroutines可能同时调用方法在一个连接上,我的理解就是所谓的惊群效应吧
//换句话说就是你多个goroutines监听同一个连接同一个事件,所有的goroutines都会触发,
//这只是我的猜测,有待验证。
typeConninterface{
Read(b[]byte)(nint,errerror)
Write(b[]byte)(nint,errerror)
Close()error
LocalAddr()Addr
RemoteAddr()Addr
SetDeadline(ttime.Time)error
SetReadDeadline(ttime.Time)error
SetWriteDeadline(ttime.Time)error
}
typeconnstruct{
fd*netFD
}
这里面又一个Conn接口,下面conn实现了这个接口,里面只有一个成员netFD.
//Networkfiledescriptor.
typenetFDstruct{
//locking/lifetimeofsysfd+serializeaccesstoReadandWritemethods
fdmufdMutex
//immutableuntilClose
sysfdint
familyint
sotypeint
isConnectedbool
netstring
laddrAddr
raddrAddr
//waitserver
pdpollDesc
}
func(fd*netFD)accept()(netfd*netFD,errerror){
//................
for{
s,rsa,err=accept(fd.sysfd)
iferr!=nil{
nerr,ok:=err.(*os.SyscallError)
if!ok{
returnnil,err
}
switchnerr.Err{
/*如果错误是EAGAIN说明Socket的缓冲区为空,未读取到任何数据
则调用fd.pd.WaitRead,*/
casesyscall.EAGAIN:
iferr=fd.pd.waitRead();err==nil{
continue
}
casesyscall.ECONNABORTED:
continue
}
returnnil,err
}
break
}
//.........
//代码过长不再列出,感兴趣看go的源码,runtime下的fd_unix.go
returnnetfd,nil
}
上面代码段是accept部分,这里我们注意当accept有错误发生的时候,会检查这个错误是否是syscall.EAGAIN,如果是,则调用WaitRead将当前读这个fd的goroutine在此等待,直到这个fd上的读事件再次发生为止。当这个socket上有新数据到来的时候,WaitRead调用返回,继续for循环的执行,这样以来就让调用netFD的Read的地方变成了同步“阻塞”。有兴趣的可以看netFD的读和写方法,都有同样的实现。
到这里所有的疑问都集中到了pollDesc上,它到底是什么呢?
const(
pdReadyuintptr=1
pdWaituintptr=2
)
//Networkpollerdescriptor.
typepollDescstruct{
link*pollDesc//inpollcache,protectedbypollcache.lock
lockmutex//protectsthefollowingfields
fduintptr
closingbool
sequintptr//protectsfromstaletimersandreadynotifications
rguintptr//pdReady,pdWait,Gwaitingforreadornil
rttimer//readdeadlinetimer(setifrt.f!=nil)
rdint64//readdeadline
wguintptr//pdReady,pdWait,Gwaitingforwriteornil
wttimer//writedeadlinetimer
wdint64//writedeadline
useruint32//usersettablecookie
}
typepollCachestruct{
lockmutex
first*pollDesc
}
pollDesc网络轮询器是Golang中针对每个socket文件描述符建立的轮询机制。此处的轮询并不是一般意义上的轮询,而是Golang的runtime在调度goroutine或者GC完成之后或者指定时间之内,调用epoll_wait获取所有产生IO事件的socket文件描述符。当然在runtime轮询之前,需要将socket文件描述符和当前goroutine的相关信息加入epoll维护的数据结构中,并挂起当前goroutine,当IO就绪后,通过epoll返回的文件描述符和其中附带的goroutine的信息,重新恢复当前goroutine的执行。这里我们可以看到pollDesc中有两个变量wg和rg,其实我们可以把它们看作信号量,这两个变量有几种不同的状态:
- pdReady:io就绪
- pdWait:当前的goroutine正在准备挂起在信号量上,但是还没有挂起。
- Gpointer:当我们把它改为指向当前goroutine的指针时,当前goroutine挂起
继续接着上面的WaitRead调用说起,go在这里到底做了什么让当前的goroutine挂起了呢。
funcnet_runtime_pollWait(pd*pollDesc,modeint)int{
err:=netpollcheckerr(pd,int32(mode))
iferr!=0{
returnerr
}
//AsfornowonlySolarisuseslevel-triggeredIO.
ifGOOS=="solaris"{
netpollarm(pd,mode)
}
for!netpollblock(pd,int32(mode),false){
err=netpollcheckerr(pd,int32(mode))
iferr!=0{
returnerr
}
//Canhappeniftimeouthasfiredandunblockedus,
//butbeforewehadachancetorun,timeouthasbeenreset.
//Pretendithasnothappenedandretry.
}
return0
}
//returnstrueifIOisready,orfalseiftimedoutorclosed
//waitio-waitonlyforcompletedIO,ignoreerrors
funcnetpollblock(pd*pollDesc,modeint32,waitiobool)bool{
//根据读写模式获取相应的pollDesc中的读写信号量
gpp:=&pd.rg
ifmode=='w'{
gpp=&pd.wg
}
for{
old:=*gpp
//已经准备好直接返回true
ifold==pdReady{
*gpp=0
returntrue
}
ifold!=0{
throw("netpollblock:doublewait")
}
//设置gpppdWait
ifatomic.Casuintptr(gpp,0,pdWait){
break
}
}
ifwaitio||netpollcheckerr(pd,mode)==0{
gopark(netpollblockcommit,unsafe.Pointer(gpp),"IOwait",traceEvGoBlockNet,5)
}
old:=atomic.Xchguintptr(gpp,0)
ifold>pdWait{
throw("netpollblock:corruptedstate")
}
returnold==pdReady
}
当调用WaitRead时经过一段汇编最重调用了上面的net_runtime_pollWait函数,该函数循环调用了netpollblock函数,返回true表示io已准备好,返回false表示错误或者超时,在netpollblock中调用了gopark函数,gopark函数调用了mcall的函数,该函数用汇编来实现,具体功能就是把当前的goroutine挂起,然后去执行其他可执行的goroutine。到这里整个goroutine挂起的过程已经结束,那当goroutine可读的时候是如何通知该goroutine呢,这就是epoll的功劳了。
funcnetpoll(blockbool)*g{
ifepfd==-1{
returnnil
}
waitms:=int32(-1)
if!block{
waitms=0
}
varevents[128]epollevent
retry:
//每次最多监听128个事件
n:=epollwait(epfd,&events[0],int32(len(events)),waitms)
ifn<0{
ifn!=-_EINTR{
println("runtime:epollwaitonfd",epfd,"failedwith",-n)
throw("epollwaitfailed")
}
gotoretry
}
vargpguintptr
fori:=int32(0);i
这里就是熟悉的代码了,epoll的使用,看起来亲民多了。pd:=*(**pollDesc)(unsafe.Pointer(&ev.data))这是最关键的一句,我们在这里拿到当前可读时间的pollDesc,上面我们已经说了,当pollDesc的读写信号量保存为Gpointer时当前goroutine就会挂起。而在这里我们调用了netpollready函数,函数中把相应的读写信号量G指针擦出,置为pdReady,G-pointer状态被抹去,当前goroutine的G指针就放到可运行队列中,这样goroutine就被唤醒了。
可以看到虽然我们在写tcpserver看似一个阻塞的网络模型,在其底层实际上是基于异步多路复用的机制来实现的,只是把它封装成了跟阻塞io相似的开发模式,这样是使得我们不用去关注异步io,多路复用等这些复杂的概念以及混乱的回调函数。
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对毛票票的支持。