详解golang net之transport
本文内容纲要:
关于golanghttptransport的讲解,网上有很多文章进行了解读,但都比较粗,很多代码实现并没有讲清楚。故给出更加详细的实现说明。整体看下来细节实现层面还是比较难懂的。
本次使用golang版本1.12.9
transport实现了RoundTripper接口,该接口只有一个方法RoundTrip(),故transport的入口函数就是RoundTrip()。transport的主要功能其实就是缓存了长连接,用于大量http请求场景下的连接复用,减少发送请求时TCP(TLS)连接建立的时间损耗,同时transport还能对连接做一些限制,如连接超时时间,每个host的最大连接数等。transport对长连接的缓存和控制仅限于TCP+(TLS)+HTTP1,不对HTTP2做缓存和限制。
tranport包含如下几个主要概念:
- 连接池:在idleConn中保存了不同类型(connectMethodKey)的请求连接(persistConn)。当发生请求时,首先会尝试从连接池中取一条符合其请求类型的连接使用
- readLoop/writeLoop:连接之上的功能,循环处理该类型的请求(发送request,返回response)
- roundTrip:请求的真正入口,接收到一个请求后会交给writeLoop和readLoop处理。
一对readLoop/writeLoop只能处理一条连接,如果这条连接上没有更多的请求,则关闭连接,退出循环,释放系统资源
下述代码都来自golang源码的src/net/httptransport.go文件
typeRoundTripperinterface{
//RoundTripexecutesasingleHTTPtransaction,returning
//aResponsefortheprovidedRequest.
//
//RoundTripshouldnotattempttointerprettheresponse.In
//particular,RoundTripmustreturnerr==nilifitobtained
//aresponse,regardlessoftheresponse'sHTTPstatuscode.
//Anon-nilerrshouldbereservedforfailuretoobtaina
//response.Similarly,RoundTripshouldnotattemptto
//handlehigher-levelprotocoldetailssuchasredirects,
//authentication,orcookies.
//
//RoundTripshouldnotmodifytherequest,exceptfor
//consumingandclosingtheRequest'sBody.RoundTripmay
//readfieldsoftherequestinaseparategoroutine.Callers
//shouldnotmutateorreusetherequestuntiltheResponse's
//Bodyhasbeenclosed.
//
//RoundTripmustalwaysclosethebody,includingonerrors,
//butdependingontheimplementationmaydosoinaseparate
//goroutineevenafterRoundTripreturns.Thismeansthat
//callerswantingtoreusethebodyforsubsequentrequests
//mustarrangetowaitfortheClosecallbeforedoingso.
//
//TheRequest'sURLandHeaderfieldsmustbeinitialized.
RoundTrip(*Request)(*Response,error)
}
Transport结构体中的主要成员如下(没有列出所有成员):
wantIdle要求关闭所有idle的persistConn
reqCancelermap[*Request]func(error)用于取消request
idleConnmap[connectMethodKey][]*persistConnidle状态的persistConn连接池,最大值受maxIdleConnsPerHost限制
idleConnChmap[connectMethodKey]chan*persistConn用于给调用者传递persistConn
connPerHostCountmap[connectMethodKey]int表示一类连接上的host数目,最大值受MaxConnsPerHost限制
connPerHostAvailablemap[connectMethodKey]chanstruct{}与connPerHostCount配合使用,判断该类型的连接数目是否已经达到上限
idleLRUconnLRU长度受MaxIdleConns限制,队列方式保存所有idle的pconn
altProtoatomic.Valuenilormap[string]RoundTripper,key为URIscheme,表示处理该scheme的RoundTripper实现。注意与TLSNextProto的不同,前者表示URI的scheme,后者表示tls之上的协议。如前者不会体现http2,后者会体现http2
Proxyfunc(*Request)(*url.URL,error)为request返回一个代理的url
DisableKeepAlivesbool是否取消长连接
DisableCompressionbool是否取消HTTP压缩
MaxIdleConnsint所有host的idle状态的最大连接数目,即idleConn中所有连接数
MaxIdleConnsPerHostint每个host的idle状态的最大连接数目,即idleConn中的key对应的连接数
MaxConnsPerHost每个host上的最大连接数目,含dialing/active/idle状态的connections。http2时,每个host只允许有一条idle的conneciton
DialContextfunc(ctxcontext.Context,network,addrstring)(net.Conn,error)创建未加密的tcp连接,比Dial函数增加了context控制
Dialfunc(network,addrstring)(net.Conn,error)创建未加密的tcp连接,废弃,使用DialContext
DialTLSfunc(network,addrstring)(net.Conn,error)为非代理模式的https创建连接的函数,如果该函数非空,则不会使用Dial函数,且忽略TLSClientConfig和TLSHandshakeTimeout;反之使用Dila和TLSClientConfig。即有限使用DialTLS进行tls协商
TLSClientConfig*tls.Configtlsclient用于tls协商的配置
IdleConnTimeout连接保持idle状态的最大时间,超时关闭pconn
TLSHandshakeTimeouttime.Durationtls协商的超时时间
ResponseHeaderTimeouttime.Duration发送完request后等待serveresponse的时间
TLSNextProtomap[string]func(authoritystring,c*tls.Conn)RoundTripper在tls协商带NPN/ALPN的扩展后,transport如何切换到其他协议。指tls之上的协议(next指的就是tls之上的意思)
ProxyConnectHeaderHeader在CONNECT请求时,配置request的首部信息,可选
MaxResponseHeaderBytes指定server响应首部的最大字节数
Transport.roundTrip是主入口,它通过传入一个request参数,由此选择一个合适的长连接来发送该request并返回response。整个流程主要分为两步:
使用getConn函数来获得底层TCP(TLS)连接;调用roundTrip函数进行上层协议(HTTP)处理。
func(t*Transport)roundTrip(req*Request)(*Response,error){
t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
ctx:=req.Context()
trace:=httptrace.ContextClientTrace(ctx)
ifreq.URL==nil{
req.closeBody()
returnnil,errors.New("http:nilRequest.URL")
}
ifreq.Header==nil{
req.closeBody()
returnnil,errors.New("http:nilRequest.Header")
}
scheme:=req.URL.Scheme
isHTTP:=scheme=="http"||scheme=="https"
//下面判断request首部的有效性
ifisHTTP{
fork,vv:=rangereq.Header{
if!httpguts.ValidHeaderFieldName(k){
returnnil,fmt.Errorf("net/http:invalidheaderfieldname%q",k)
}
for_,v:=rangevv{
if!httpguts.ValidHeaderFieldValue(v){
returnnil,fmt.Errorf("net/http:invalidheaderfieldvalue%qforkey%v",v,k)
}
}
}
}
//判断是否使用注册的RoundTrip来处理对应的scheme。对于使用tcp+tls+http1(wss协议升级)的场景
//不能使用注册的roundTrip。后续代码对tcp+tls+http1或tcp+http1进行了roundTrip处理
ift.useRegisteredProtocol(req){
altProto,_:=t.altProto.Load().(map[string]RoundTripper)
ifaltRT:=altProto[scheme];altRT!=nil{
ifresp,err:=altRT.RoundTrip(req);err!=ErrSkipAltProtocol{
returnresp,err
}
}
}
//后续仅处理URLscheme为http或https的连接
if!isHTTP{
req.closeBody()
returnnil,&badStringError{"unsupportedprotocolscheme",scheme}
}
ifreq.Method!=""&&!validMethod(req.Method){
returnnil,fmt.Errorf("net/http:invalidmethod%q",req.Method)
}
ifreq.URL.Host==""{
req.closeBody()
returnnil,errors.New("http:noHostinrequestURL")
}
//下面for循环用于在request出现错误的时候进行请求重试。但不是所有的请求失败都会被尝试,如请求被取消(errRequestCanceled)
//的情况是不会进行重试的。具体参见shouldRetryRequest函数
for{
select{
case<-ctx.Done():
req.closeBody()
returnnil,ctx.Err()
default:
}
//treqgetsmodifiedbyroundTrip,soweneedtorecreateforeachretry.
treq:=&transportRequest{Request:req,trace:trace}
//connectMethodForRequest函数通过输入一个request返回一个connectMethod(简称cm),该类型通过
//{proxyURL,targetScheme,tartgetAddr,onlyH1},即{代理URL,server端的scheme,server的地址,是否HTTP1}
//来表示一个请求。一个符合connectMethod描述的request将会在Transport.idleConn中匹配到一类长连接。
cm,err:=t.connectMethodForRequest(treq)
iferr!=nil{
req.closeBody()
returnnil,err
}
//获取一条长连接,如果连接池中有现成的连接则直接返回,否则返回一条新建的连接。该连接可能是HTTP2格式的,存放在persistCnn.alt中,
//使用其自注册的RoundTrip处理。该函数描述参见下面内容。
//从getConn的实现中可以看到,一个请求只能在idle的连接上执行,反之一条连接只能同时处理一个请求。
pconn,err:=t.getConn(treq,cm)//如果获取底层连接失败,无法继续上层协议的请求,直接返回错误iferr!=nil{//每个request都会在getConn中设置reqCanceler,获取连接失败,清空设置t.setReqCanceler(req,nil)
req.closeBody()
returnnil,err
}
varresp*Response//pconn.alt就是从Transport.TLSNextProto中获取的,它表示TLS之上的协议,如HTTP2。从persistConn.alt的注释中可以看出//目前alt仅支持HTTP2协议,后续可能会支持更多协议。ifpconn.alt!=nil{
//HTTP2处理,使用HTTP2时,由于不缓存HTTP2连接,不对其做限制t.decHostConnCount(cm.key())
//清除getConn中设置的标记。具体参见getConn
t.setReqCanceler(req,nil)
resp,err=pconn.alt.RoundTrip(req)
}else{//pconn.roundTrip中做了比较复杂的处理,该函数用于发送request并返回response。//通过writeLoop发送request,通过readLoop返回responseresp,err=pconn.roundTrip(treq)
}//如果成功返回response,则整个处理结束.iferr==nil{
returnresp,nil
}//判断该request是否满足重试条件,大部分场景是不支持重试的,仅有少部分情况支持,如errServerClosedIdle//err非nil时实际并没有在原来的连接上重试,且pconn没有关闭,提了issueif!pconn.shouldRetryRequest(req,err){
//Issue16465:returnunderlyingnet.Conn.Readerrorfrompeek,//aswe'vehistoricallydone.ife,ok:=err.(transportReadFromServerError);ok{
err=e.err
}
returnnil,err
}
testHookRoundTripRetried()
//Rewindthebodyifwe'reableto.//用于重定向场景ifreq.GetBody!=nil{
newReq:=*req
varerrerror
newReq.Body,err=req.GetBody()
iferr!=nil{
returnnil,err
}
req=&newReq
}}
}
getConn用于返回一条长连接。长连接的来源有2种路径:连接池中获取;当连接池中无法获取到时会新建一条连接
func(t*Transport)getConn(treq*transportRequest,cmconnectMethod)(*persistConn,error){
req:=treq.Request
trace:=treq.trace
ctx:=req.Context()
iftrace!=nil&&trace.GetConn!=nil{
trace.GetConn(cm.addr())
}
//从连接池中找一条合适的连接,如果找到则返回该连接,否则新建连接
ifpc,idleSince:=t.getIdleConn(cm);pc!=nil{
iftrace!=nil&&trace.GotConn!=nil{
trace.GotConn(pc.gotIdleConnTrace(idleSince))
}
//此处设置transport.reqCanceler比较难理解,主要功能是做一个标记,用于判断当前到执行pconn.roundTrip
//期间,request有没有被(如Request.Cancel,Request.Context().Done())取消,被取消的request将无需继续roundTrip处理
t.setReqCanceler(req,func(error){})
returnpc,nil
}
typedialResstruct{
pc*persistConn
errerror
}
//该chan中用于存放通过dialConn函数新创建的长连接persistConn(后续简称pconn),表示一条TCP(TLS)的底层连接.
dialc:=make(chandialRes)
//cmKey实际就是把connectMethod中的元素全部字符串化。cmKey作为一类连接的标识,如Transport.idleConn[cmKey]就表示一类特定的连接
cmKey:=cm.key()
//Copythesehookssowedon'traceonthepostPendingDialin
//thegoroutinewelaunch.Issue11136.
testHookPrePendingDial:=testHookPrePendingDial
testHookPostPendingDial:=testHookPostPendingDial
//在尝试获取连接的时候,如果此时正在创建一条连接,但最后没有选择这条新建的连接(有其它调用者释放了一条连接),
//此时,handlePendingDial负责将这条新创建的连接放到Transport.idleConn连接池中
handlePendingDial:=func(){
testHookPrePendingDial()
gofunc(){
ifv:=<-dialc;v.err==nil{
//将一条连接放入连接池中,描述见下文--tryPutIdleConn
t.putOrCloseIdleConn(v.pc)
}else{
t.decHostConnCount(cmKey)
}
testHookPostPendingDial()
}()
}
cancelc:=make(chanerror,1)
//为request设置ReqCanceler。transport代码中不会主动调用该ReqCanceler函数(会在
//roundTrip中调用replaceReqCanceler将其覆盖),可能的原因是transport提供了一个对外APICancelRequest,
//用户可以调用该函数取消连接,此时会调用该ReqCanceler。需要注意的是从CancelRequest的注释中可以看出,该API
//已经被废弃,这段代码后面可能会被删除(如果有不同看法,请指出)
t.setReqCanceler(req,func(errerror){cancelc<-err})
//如果对host上建立的连接有限制
ift.MaxConnsPerHost>0{
select{
//incHostConnCount会根据主机已经建立的连接是否达到t.MaxConnsPerHost来返回一个未关闭
//的chan(连接数达到MaxConnsPerHost)或关闭的chan(连接数未达到MaxConnsPerHost),
//返回未关闭的chan时会阻塞等待其他请求释放连接,不能新创建pconn;反之可以使用新创建的pconn
case<-t.incHostConnCount(cmKey):
//等待获取某一类连接对应的chan。tryPutIdleConn函数中会尝试将新建或释放的连接放入到该chan中
casepc:=<-t.getIdleConnCh(cm):
iftrace!=nil&&trace.GotConn!=nil{
trace.GotConn(httptrace.GotConnInfo{Conn:pc.conn,Reused:pc.isReused()})
}
returnpc,nil
//下面2个case都表示request被取消,其中Cancel被废弃,建议使用Context来取消request
case<-req.Cancel:
returnnil,errRequestCanceledConn
case<-req.Context().Done():
returnnil,req.Context().Err()
caseerr:=<-cancelc:
iferr==errRequestCanceled{
err=errRequestCanceledConn
}
returnnil,err
}
}
gofunc(){
//新建连接,创建好后将其放入dialcchan中
pc,err:=t.dialConn(ctx,cm)
dialc<-dialRes{pc,err}
}()
//下面会通过两种途径来获得连接:从dialc中获得通过dialConn新建的连接;通过idleConnCh获得其他request释放的连接
//如果首先获取到的是dialConn新建的连接,直接返回该连接即可;如果首先获取到的是其他request释放的连接,在返回该连接前
//需要调用handlePendingDial来处理dialConn新建的连接。
idleConnCh:=t.getIdleConnCh(cm)
select{
//获取dialConn新建的连接
casev:=<-dialc:
//Ourdialfinished.
ifv.pc!=nil{
iftrace!=nil&&trace.GotConn!=nil&&v.pc.alt==nil{
trace.GotConn(httptrace.GotConnInfo{Conn:v.pc.conn})
}
returnv.pc,nil
}
//仅针对MaxConnsPerHost>0有效,对应上面的incHostConnCount()
t.decHostConnCount(cmKey)
//下面用于返回更易读的错误信息
select{
case<-req.Cancel:
//Itwasanerrorduetocancelation,soprioritizethat
//errorvalue.(Issue16049)
returnnil,errRequestCanceledConn
case<-req.Context().Done():
returnnil,req.Context().Err()
caseerr:=<-cancelc:
iferr==errRequestCanceled{
err=errRequestCanceledConn
}
returnnil,err
default:
//Itwasn'tanerrorduetocancelation,so
//returntheoriginalerrormessage:
returnnil,v.err
}
//获取其他request释放的连接
casepc:=<-idleConnCh:
//Anotherrequestfinishedfirstanditsnet.Conn
//becameavailablebeforeourdial.Orsomebody
//else'sdialthattheydidn'tuse.
//Butourdialisstillgoing,sogiveitaway
//whenitfinishes:
handlePendingDial()
iftrace!=nil&&trace.GotConn!=nil{
trace.GotConn(httptrace.GotConnInfo{Conn:pc.conn,Reused:pc.isReused()})
}
returnpc,nil
//如果request取消,也需要调用handlePendingDial处理新建的连接
case<-req.Cancel:
handlePendingDial()
returnnil,errRequestCanceledConn
case<-req.Context().Done():
handlePendingDial()
returnnil,req.Context().Err()
caseerr:=<-cancelc:
handlePendingDial()
iferr==errRequestCanceled{
err=errRequestCanceledConn
}
returnnil,err
}
}
tryPutIdleConn函数用来将一条新创建或回收的连接放回连接池中,以便后续使用。与getIdleConnCh配合使用,后者用于获取一类连接对应的chan。在如下场景会将一个连接放回idleConn中
-
- 在readLoop成功之后(当然还有其他判断,如底层链路没有返回EOF错误);
- 创建一个新连接且新连接没有被使用时;
- roundTrip一开始发现request被取消时
func(t*Transport)tryPutIdleConn(pconn*persistConn)error{
//当不使用长连接或该主机上的连接数小于0(即不允许缓存任何连接)时,返回错误并关闭创建的连接(此处没有做关闭处理,
//但存在不适用的连接时必须关闭,如使用putOrCloseIdleConn)。
//可以看出当不使用长连接时,Transport不能缓存连接
ift.DisableKeepAlives||t.MaxIdleConnsPerHost<0{
returnerrKeepAlivesDisabled
}
ifpconn.isBroken(){
returnerrConnBroken
}
//如果是HTTP2连接,则直接返回,不缓存该连接
ifpconn.alt!=nil{
returnerrNotCachingH2Conn
}
//为新连接标记可重用状态,新创建的连接肯定是可以重用的,用于在Transport.roundTrip
//中的shouldRetryRequest函数中判断连接是否可以重用
pconn.markReused()
//该key对应Transport.idleConn中的key,标识特定的连接
key:=pconn.cacheKey
t.idleMu.Lock()
defert.idleMu.Unlock()
//idleConnCh中的chan元素用于存放可用的连接pconn,每类连接都有一个chan
waitingDialer:=t.idleConnCh[key]
select{
//如果此时有调用者等待一个连接,则直接将该连接传递出去,不进行保存,这种做法有利于提高效率
casewaitingDialer<-pconn:
//We'redonewiththispconnandsomebodyelseis
//currentlywaitingforaconnofthistype(they're
//activelydialing,butthisconnisready
//first).Chromecallsthissocketlatebinding.See
//https://insouciant.org/tech/connection-management-in-chromium/
returnnil
default:
//如果没有调用者等待连接,则清除该chan。删除map中的chan直接会关闭该chan
ifwaitingDialer!=nil{
//Theyhadpopulatedthis,buttheirdialwon
//first,sowecancleanupthismapentry.
delete(t.idleConnCh,key)
}
}
//与DisableKeepAlives有点像,当用户需要关闭所有idle的连接时,不会再缓存连接
ift.wantIdle{
returnerrWantIdle
}
ift.idleConn==nil{
t.idleConn=make(map[connectMethodKey][]*persistConn)
}
idles:=t.idleConn[key]
//当主机上该类连接数超过Transport.MaxIdleConnsPerHost时,不能再保存新的连接,返回错误并关闭连接
iflen(idles)>=t.maxIdleConnsPerHost(){
returnerrTooManyIdleHost
}
//需要缓存的连接与连接池中已有的重复,系统退出(这种情况下系统已经发生了混乱,直接退出)
for_,exist:=rangeidles{
ifexist==pconn{
log.Fatalf("dupidlepconn%pinfreelist",pconn)
}
}
//添加待缓存的连接
t.idleConn[key]=append(idles,pconn)
t.idleLRU.add(pconn)
//受MaxIdleConns的限制,添加策略变为:添加新的连接,删除最老的连接。
//MaxIdleConns限制了所有类型的idle状态的最大连接数目,而MaxIdleConnsPerHost限制了host上单一类型的最大连接数目
//idleLRU中保存了所有的连接,此处的作用为,找出最老的连接并移除
ift.MaxIdleConns!=0&&t.idleLRU.len()>t.MaxIdleConns{
oldest:=t.idleLRU.removeOldest()
oldest.close(errTooManyIdle)
t.removeIdleConnLocked(oldest)
}
//为新添加的连接设置超时时间
ift.IdleConnTimeout>0{
ifpconn.idleTimer!=nil{
//如果该连接是被释放的,则重置超时时间
pconn.idleTimer.Reset(t.IdleConnTimeout)
}else{
//如果该连接时新建的,则设置超时时间并设置超时动作pconn.closeConnIfStillIdle
//closeConnIfStillIdle用于释放连接,从Transport.idleLRU和Transport.idleConn中移除并关闭该连接
pconn.idleTimer=time.AfterFunc(t.IdleConnTimeout,pconn.closeConnIfStillIdle)
}
}
pconn.idleAt=time.Now()
returnnil
}
dialConn用于新创建一条连接,并为该连接启动readLoop和writeLoop
func(t*Transport)dialConn(ctxcontext.Context,cmconnectMethod)(*persistConn,error){
pconn:=&persistConn{
t:t,
cacheKey:cm.key(),
reqch:make(chanrequestAndChan,1),
writech:make(chanwriteRequest,1),
closech:make(chanstruct{}),
writeErrCh:make(chanerror,1),
writeLoopDone:make(chanstruct{}),
}
trace:=httptrace.ContextClientTrace(ctx)
wrapErr:=func(errerror)error{
ifcm.proxyURL!=nil{
//Returnatypederror,perIssue16997
return&net.OpError{Op:"proxyconnect",Net:"tcp",Err:err}
}
returnerr
}
//调用注册的DialTLS处理tls。使用自注册的TLS处理函数时,transport的TLSClientConfig和TLSHandshakeTimeout
//参数会被忽略
ifcm.scheme()=="https"&&t.DialTLS!=nil{
varerrerror
//调用注册的连接函数创建一条连接,注意cm.addr()的实现,如果该连接存在proxy,则此处是与proxy建立TLS连接;否则直接连server。
//存在proxy时,与server建立连接分为2步:与proxy建立TLP(TLS)连接;与server建立HTTP(HTTPS)连接
//func(cm*connectMethod)addr()string{
//ifcm.proxyURL!=nil{
//returncanonicalAddr(cm.proxyURL)
//}
//returncm.targetAddr
//}
pconn.conn,err=t.DialTLS("tcp",cm.addr())
iferr!=nil{
returnnil,wrapErr(err)
}
ifpconn.conn==nil{
returnnil,wrapErr(errors.New("net/http:Transport.DialTLSreturned(nil,nil)"))
}
//如果连接类型是TLS的,则需要处理TLS协商
iftc,ok:=pconn.conn.(*tls.Conn);ok{
//Handshakehere,incaseDialTLSdidn't.TLSNextProtobelow
//dependsonitforknowingtheconnectionstate.
iftrace!=nil&&trace.TLSHandshakeStart!=nil{
trace.TLSHandshakeStart()
}
//启动TLS协商,如果协商失败需要关闭连接
iferr:=tc.Handshake();err!=nil{
gopconn.conn.Close()
iftrace!=nil&&trace.TLSHandshakeDone!=nil{
trace.TLSHandshakeDone(tls.ConnectionState{},err)
}
returnnil,err
}
cs:=tc.ConnectionState()
iftrace!=nil&&trace.TLSHandshakeDone!=nil{
trace.TLSHandshakeDone(cs,nil)
}
//保存TLS协商结果
pconn.tlsState=&cs
}
}else{
//使用默认方式创建连接,此时会用到transport的TLSClientConfig和TLSHandshakeTimeout参数。同样注意cm.addr()
conn,err:=t.dial(ctx,"tcp",cm.addr())
iferr!=nil{
returnnil,wrapErr(err)
}
pconn.conn=conn
//如果scheme是需要TLS协商的,则处理TLS协商,否则为普通的HTTP连接
ifcm.scheme()=="https"{
varfirstTLSHoststring
iffirstTLSHost,_,err=net.SplitHostPort(cm.addr());err!=nil{
returnnil,wrapErr(err)
}
//进行TLS协商,具体参见下文addTLS
iferr=pconn.addTLS(firstTLSHost,trace);err!=nil{
returnnil,wrapErr(err)
}
}
}
//处理proxy的情况
switch{
//不存在proxy直接跳过
casecm.proxyURL==nil:
casecm.proxyURL.Scheme=="socks5":
conn:=pconn.conn
d:=socksNewDialer("tcp",conn.RemoteAddr().String())
ifu:=cm.proxyURL.User;u!=nil{
auth:=&socksUsernamePassword{
Username:u.Username(),
}
auth.Password,_=u.Password()
d.AuthMethods=[]socksAuthMethod{
socksAuthMethodNotRequired,
socksAuthMethodUsernamePassword,
}
d.Authenticate=auth.Authenticate
}
if_,err:=d.DialWithConn(ctx,conn,"tcp",cm.targetAddr);err!=nil{
conn.Close()
returnnil,err
}
//如果存在proxy,且server的scheme为"http",如果需要代理认证,则设置认证信息
casecm.targetScheme=="http":
pconn.isProxy=true
ifpa:=cm.proxyAuth();pa!=""{
pconn.mutateHeaderFunc=func(hHeader){
h.Set("Proxy-Authorization",pa)
}
}
//如果存在proxy,且server的scheme为"https"。与"http"不同,在与server进行tls协商前,会给proxy
//发送一个method为"CONNECT"的HTTP请求,如果请求通过(返回200),则可以继续与server进行TLS协商
casecm.targetScheme=="https":
//该conn表示与proxy建立的连接
conn:=pconn.conn
hdr:=t.ProxyConnectHeader
ifhdr==nil{
hdr=make(Header)
}
connectReq:=&Request{
Method:"CONNECT",
URL:&url.URL{Opaque:cm.targetAddr},
Host:cm.targetAddr,
Header:hdr,
}
ifpa:=cm.proxyAuth();pa!=""{
connectReq.Header.Set("Proxy-Authorization",pa)
}
//发送"CONNECT"http请求
connectReq.Write(conn)
//Readresponse.
//Okaytouseanddiscardbufferedreaderhere,because
//TLSserverwillnotspeakuntilspokento.
br:=bufio.NewReader(conn)
resp,err:=ReadResponse(br,connectReq)
iferr!=nil{
conn.Close()
returnnil,err
}
//proxy返回非200,表示无法建立连接,可能情况如proxy认证失败
ifresp.StatusCode!=200{
f:=strings.SplitN(resp.Status,"",2)
conn.Close()
iflen(f)<2{
returnnil,errors.New("unknownstatuscode")
}
returnnil,errors.New(f[1])
}
}
//与proxy建立连接后,再与server进行TLS协商
ifcm.proxyURL!=nil&&cm.targetScheme=="https"{
iferr:=pconn.addTLS(cm.tlsHost(),trace);err!=nil{
returnnil,err
}
}
//后续进行TLS之上的协议处理,如果TLS之上的协议为注册协议,则使用注册的roundTrip进行处理
//TLS之上的协议为TLS协商过程中使用NPN/ALPN扩展协商出的协议,如HTTP2(参见golang.org/x/net/http2)
ifs:=pconn.tlsState;s!=nil&&s.NegotiatedProtocolIsMutual&&s.NegotiatedProtocol!=""{
ifnext,ok:=t.TLSNextProto[s.NegotiatedProtocol];ok{
return&persistConn{alt:next(cm.targetAddr,pconn.conn.(*tls.Conn))},nil
}
}
ift.MaxConnsPerHost>0{
pconn.conn=&connCloseListener{Conn:pconn.conn,t:t,cmKey:pconn.cacheKey}
}
//创建读写通道,writeLoop用于发送request,readLoop用于接收响应。roundTrip函数中会通过chan给writeLoop发送
//request,通过chan从readLoop接口response。每个连接都有一个readLoop和writeLoop,连接关闭后,这2个Loop也会退出。
//pconn.br给readLoop使用,pconn.bw给writeLoop使用,注意此时已经建立了tcp连接。
pconn.br=bufio.NewReader(pconn)
pconn.bw=bufio.NewWriter(persistConnWriter{pconn})
gopconn.readLoop()
gopconn.writeLoop()
returnpconn,nil
}
addTLS用于进行非注册协议下的TLS协商
func(pconn*persistConn)addTLS(namestring,trace*httptrace.ClientTrace)error{
//InitiateTLSandcheckremotehostnameagainstcertificate.
cfg:=cloneTLSConfig(pconn.t.TLSClientConfig)
ifcfg.ServerName==""{
cfg.ServerName=name
}
ifpconn.cacheKey.onlyH1{
cfg.NextProtos=nil
}
plainConn:=pconn.conn
//配置TLSclient,包含一个TCP连接和TLC配置
tlsConn:=tls.Client(plainConn,cfg)
errc:=make(chanerror,2)
vartimer*time.Timer
//设置TLS超时时间,并在超时后往errc中写入一个tlsHandshakeTimeoutError{}
ifd:=pconn.t.TLSHandshakeTimeout;d!=0{
timer=time.AfterFunc(d,func(){
errc<-tlsHandshakeTimeoutError{}
})
}
gofunc(){
iftrace!=nil&&trace.TLSHandshakeStart!=nil{
trace.TLSHandshakeStart()
}
//执行TLS协商,如果协商没有超时,则将协商结果err放入errc中
err:=tlsConn.Handshake()
iftimer!=nil{
timer.Stop()
}
errc<-err
}()
//阻塞等待TLS协商结果,如果协商失败或协商超时,关闭底层连接
iferr:=<-errc;err!=nil{
plainConn.Close()
iftrace!=nil&&trace.TLSHandshakeDone!=nil{
trace.TLSHandshakeDone(tls.ConnectionState{},err)
}
returnerr
}
//获取协商结果并设置到pconn.tlsState
cs:=tlsConn.ConnectionState()
iftrace!=nil&&trace.TLSHandshakeDone!=nil{
trace.TLSHandshakeDone(cs,nil)
}
pconn.tlsState=&cs
pconn.conn=tlsConn
returnnil
}
在获取到底层TCP(TLS)连接后在roundTrip中处理上层协议:即发送HTTPrequest,返回HTTPresponse。roundTrip给writeLoop提供request,从readLoop获取response。
一个roundTrip用于处理一类request。
func(pc*persistConn)roundTrip(req*transportRequest)(resp*Response,errerror){
testHookEnterRoundTrip()
//此处与getConn中的"t.setReqCanceler(req,func(error){})"相对应,用于判断request是否被取消
//返回false表示request被取消,不必继续后续请求,关闭连接并返回错误
if!pc.t.replaceReqCanceler(req.Request,pc.cancelRequest){
pc.t.putOrCloseIdleConn(pc)
returnnil,errRequestCanceled
}
pc.mu.Lock()
//与readLoop配合使用,表示期望的响应的个数
pc.numExpectedResponses++
//dialConn中定义的函数,设置了proxy的认证信息
headerFn:=pc.mutateHeaderFunc
pc.mu.Unlock()
ifheaderFn!=nil{
headerFn(req.extraHeaders())
}
//Askforacompressedversionifthecallerdidn'tsettheir
//ownvalueforAccept-Encoding.Weonlyattemptto
//uncompressthegzipstreamifwewerethelayerthat
//requestedit.
requestedGzip:=false
//如果需要在request中设置可接受的解码方法,则在request中添加对应的首部。仅支持gzip方式且
//仅在调用者没有设置这些首部时设置
if!pc.t.DisableCompression&&
req.Header.Get("Accept-Encoding")==""&&
req.Header.Get("Range")==""&&
req.Method!="HEAD"{
//Requestgziponly,notdeflate.Deflateisambiguousand
//notasuniversallysupportedanyway.
//See:https://zlib.net/zlib_faq.html#faq39
//
//Notethatwedon'trequestthisforHEADrequests,
//duetoabuginnginx:
//https://trac.nginx.org/nginx/ticket/358
//https://golang.org/issue/5522
//
//Wedon'trequestgzipiftherequestisforarange,since
//auto-decodingaportionofagzippeddocumentwilljustfail
//anyway.Seehttps://golang.org/issue/8923
requestedGzip=true
req.extraHeaders().Set("Accept-Encoding","gzip")
}
//用于处理首部含"Expect:100-continue"的request,客户端使用该首部探测服务器是否能够
//处理request首部中的规格要求(如长度过大的request)。
varcontinueChchanstruct{}
ifreq.ProtoAtLeast(1,1)&&req.Body!=nil&&req.expectsContinue(){
continueCh=make(chanstruct{},1)
}
//HTTP1.1默认使用长连接,当transport设置DisableKeepAlives时会导致处理每个request时都会
//新建一个连接。此处的处理逻辑是:如果transport设置了DisableKeepAlives,而request没有设置
//"Connection:close",则为request设置该首部。将底层表现与上层协议保持一致。
ifpc.t.DisableKeepAlives&&!req.wantsClose(){
req.extraHeaders().Set("Connection","close")
}
//用于在异常场景(如request取消)下通知readLoop,roundTrip是否已经退出,防止ReadLoop发送response阻塞
gone:=make(chanstruct{})
deferclose(gone)
deferfunc(){
iferr!=nil{
pc.t.setReqCanceler(req.Request,nil)
}
}()
constdebugRoundTrip=false
//Writetherequestconcurrentlywithwaitingforaresponse,
//incasetheserverdecidestoreplybeforereadingourfull
//requestbody.
//表示发送了多少个字节的request,debug使用
startBytesWritten:=pc.nwrite
//给writeLoop封装并发送信息,注意此处的先后顺序。首先给writeLoop发送数据,阻塞等待writeLoop
//接收,待writeLoop接收后才能发送数据给readLoop,因此发送request总会优先接收response
writeErrCh:=make(chanerror,1)
pc.writech<-writeRequest{req,writeErrCh,continueCh}
//给readLoop封装并发送信息
resc:=make(chanresponseAndError)
pc.reqch<-requestAndChan{
req:req.Request,
ch:resc,
addedGzip:requestedGzip,
continueCh:continueCh,
callerGone:gone,
}
varrespHeaderTimer<-chantime.Time
cancelChan:=req.Request.Cancel
ctxDoneChan:=req.Context().Done()
//该循环主要用于处理获取response超时和request取消时的条件跳转。正常情况下收到reponse
//退出roundtrip函数
for{
testHookWaitResLoop()
select{
//writeLoop返回发送request后的结果
caseerr:=<-writeErrCh:
ifdebugRoundTrip{
req.logf("writeErrChresv:%T/%#v",err,err)
}
iferr!=nil{
pc.close(fmt.Errorf("writeerror:%v",err))
returnnil,pc.mapRoundTripError(req,startBytesWritten,err)
}
//设置一个接收response的定时器,如果在这段时间内没有接收到response(即没有进入下面代码
//的"casere:=<-resc:"分支),超时后进入""case<-respHeaderTimer:分支,关闭连接,
//防止readLoop一直等待读取response,导致处理阻塞;没有超时则关闭定时器
ifd:=pc.t.ResponseHeaderTimeout;d>0{
ifdebugRoundTrip{
req.logf("startingtimerfor%v",d)
}
timer:=time.NewTimer(d)
defertimer.Stop()//preventleaks
respHeaderTimer=timer.C
}
//处理底层连接关闭。"case<-cancelChan:"和”case<-ctxDoneChan:“为request关闭,request
//关闭也会导致底层连接关闭,但必须处理非上层协议导致底层连接关闭的情况。
case<-pc.closech:
ifdebugRoundTrip{
req.logf("closechrecv:%T%#v",pc.closed,pc.closed)
}
returnnil,pc.mapRoundTripError(req,startBytesWritten,pc.closed)
//等待获取response超时,关闭连接
case<-respHeaderTimer:
ifdebugRoundTrip{
req.logf("timeoutwaitingforresponseheaders.")
}
pc.close(errTimeout)
returnnil,errTimeout
//接收到readLoop返回的response结果
casere:=<-resc:
//极异常情况,直接程序panic
if(re.res==nil)==(re.err==nil){
panic(fmt.Sprintf("internalerror:exactlyoneofresorerrshouldbeset;nil=%v",re.res==nil))
}
ifdebugRoundTrip{
req.logf("rescrecv:%p,%T/%#v",re.res,re.err,re.err)
}
ifre.err!=nil{
returnnil,pc.mapRoundTripError(req,startBytesWritten,re.err)
}
returnre.res,nil
//request取消
case<-cancelChan:
pc.t.CancelRequest(req.Request)
//将关闭之后的chan置为nil,用来防止select一直进入该case(close的chan不会阻塞读,读取的数据为0)
cancelChan=nil
case<-ctxDoneChan:
pc.t.cancelRequest(req.Request,req.Context().Err())
cancelChan=nil
ctxDoneChan=nil
}
}
}
writeLoop用于发送request请求
func(pc*persistConn)writeLoop(){
deferclose(pc.writeLoopDone)
for{
//writeLoop会阻塞等待两个IOcase:
//循环等待并处理roundTrip发来的writeRequest数据,此时需要发送request;
//如果底层连接关闭,则退出writeLoop
select{
casewr:=<-pc.writech:
startBytesWritten:=pc.nwrite
//构造request并发送request请求。waitForContinue用于处理首部含"Expect:100-continue"的request
err:=wr.req.Request.write(pc.bw,pc.isProxy,wr.req.extra,pc.waitForContinue(wr.continueCh))
ifbre,ok:=err.(requestBodyReadError);ok{
err=bre.error
//Errorsreadingfromtheuser's
//Request.Bodyarehighpriority.
//Setitherebeforesendingonthe
//channelsbeloworcalling
//pc.close()whichtearstown
//connectionsandcausesother
//errors.
wr.req.setError(err)
}
iferr==nil{
err=pc.bw.Flush()
}
//请求失败时,需要关闭request和底层连接
iferr!=nil{
wr.req.Request.closeBody()
ifpc.nwrite==startBytesWritten{
err=nothingWrittenError{err}
}
}
//将结果发送给readLoop的pc.wroteRequest()函数处理
pc.writeErrCh<-err//tothebodyreader,whichmightrecycleus
//将结果返回给roundTrip处理,防止响应超时
wr.ch<-err
//如果发送request失败,需要关闭连接。writeLoop退出时会关闭pc.conn和pc.closech,
//同时会导致readLoop退出
iferr!=nil{
pc.close(err)
return
}
case<-pc.closech:
return
}
}
}
readLoop循环接收response响应,成功获得response后会将连接返回连接池,便于后续复用。当readLoop正常处理完一个response之后,会将连接重新放入到连接池中;
当readloop退出后,该连接会被关闭移除。
func(pc*persistConn)readLoop(){
closeErr:=errReadLoopExiting//defaultvalue,ifnotchangedbelow
//当writeLoop或readLoop(异常)跳出循环后,都需要关闭底层连接。即一条连接包含writeLoop和readLoop两个
//处理,任何一个loop退出(协议升级除外)则该连接不可用
//readLoo跳出循环的正常原因是连接上没有待处理的请求,此时关闭连接,释放资源
deferfunc(){
pc.close(closeErr)
pc.t.removeIdleConn(pc)
}()
//尝试将连接放回连接池
tryPutIdleConn:=func(trace*httptrace.ClientTrace)bool{
iferr:=pc.t.tryPutIdleConn(pc);err!=nil{
closeErr=err
iftrace!=nil&&trace.PutIdleConn!=nil&&err!=errKeepAlivesDisabled{
trace.PutIdleConn(err)
}
returnfalse
}
iftrace!=nil&&trace.PutIdleConn!=nil{
trace.PutIdleConn(nil)
}
returntrue
}
//eofcisusedtoblockcallergoroutinesreadingfromResponse.Body
//atEOFuntilthisgoroutineshas(potentially)addedtheconnection
//backtotheidlepool.
//从上面注释可以看出该变量主要用于阻塞调用者协程读取EOF的resp.body,
//直到该连接重新放入连接池中。处理逻辑与上面先尝试放入连接池,然后返回response一样,
//便于连接快速重用
eofc:=make(chanstruct{})
//出现错误时也需要释放读取resp.Body的协程,防止调用者协程挂死
deferclose(eofc)//unblockreaderonerrors
//Readthisonce,beforeloopstarts.(toavoidracesintests)
testHookMu.Lock()
testHookReadLoopBeforeNextRead:=testHookReadLoopBeforeNextRead
testHookMu.Unlock()
alive:=true
foralive{
//获取允许的response首部的最大字节数
pc.readLimit=pc.maxHeaderResponseSize()
//从接收buffer中peek一个字节来判断底层是否接收到response。roundTrip保证了request先于response发送。
//此处peek会阻塞等待response(这也是roundtrip中设置response超时定时器的原因)。goroutine中的read/write
//操作都是阻塞模式。
_,err:=pc.br.Peek(1)
pc.mu.Lock()
//如果期望的response为0,则直接退出readLoop并关闭连接,此时连接上没有需要处理的数据,
//关闭连接,释放系统资源。
ifpc.numExpectedResponses==0{
pc.readLoopPeekFailLocked(err)
pc.mu.Unlock()
return
}
pc.mu.Unlock()
//阻塞等待roundTrip发来的数据
rc:=<-pc.reqch
trace:=httptrace.ContextClientTrace(rc.req.Context())
varresp*Response
//如果有response数据,则读取并解析为Response格式
iferr==nil{
resp,err=pc.readResponse(rc,trace)
}else{
//可能的错误如server端关闭,发送EOF
err=transportReadFromServerError{err}
closeErr=err
}
//底层没有接收到server的任何数据,断开该连接,可能原因是在client发出request的同时,server关闭
//了连接。参见transportReadFromServerError的注释。
iferr!=nil{
ifpc.readLimit<=0{
err=fmt.Errorf("net/http:serverresponseheadersexceeded%dbytes;aborted",pc.maxHeaderResponseSize())
}
//传递错误信息给roundTrip并退出loop
select{
caserc.ch<-responseAndError{err:err}:
case<-rc.callerGone:
return
}
return
}
pc.readLimit=maxInt64//effictivelynolimitforresponsebodies
pc.mu.Lock()
pc.numExpectedResponses--
pc.mu.Unlock()
//判断response是否可写,在使用101SwitchingProtocol进行协议升级时需要返回一个可写的resp.body
//如果使用了101SwitchingProtocol,升级完成后就与transport没有关系了(后续使用http2或websocket等)
bodyWritable:=resp.bodyIsWritable()
//判断response的body是否为空,如果body为空,则不必读取body内容(HEAD的resp.body没有数据)
hasBody:=rc.req.Method!="HEAD"&&resp.ContentLength!=0
//如果server关闭连接或client关闭连接或非预期的响应码或使用了协议升级,这几种情况下不能在该连接上继续
//接收响应,退出readLoop
ifresp.Close||rc.req.Close||resp.StatusCode<=199||bodyWritable{
//Don'tdokeep-aliveonerrorifeitherpartyrequestedaclose
//orwegetanunexpectedinformational(1xx)response.
//StatusCode100isalreadyhandledabove.
alive=false
}
//此处用于处理body为空或协议升级场景,会尝试将连接放回连接池,对于后者,连接由调用者管理,退出readLoop
if!hasBody||bodyWritable{
pc.t.setReqCanceler(rc.req,nil)
//在返回response前将连接放回连接池,快速回收利用。回收连接需要按顺序满足:
//1.alive为true
//2.接收到EOF错误,此时底层连接关闭,该连接不可用
//3.成功发送request;
//此处的执行顺序很重要,将连接返回连接池的操作放到最后,即在协议升级的场景,服务端不再
//发送数据的场景,以及request发送失败的场景下都不会将连接放回连接池,这些情况会导致
//alive为false,readLoop退出并关闭该连接(协议升级后的连接不能关闭)
alive=alive&&
!pc.sawEOF&&
pc.wroteRequest()&&
tryPutIdleConn(trace)
ifbodyWritable{
//协议升级之后还是会使用同一条连接,设置closeErr为errCallerOwnsConn,这样在readLoop
//return后不会被pc.close(closeErr)关闭连接
closeErr=errCallerOwnsConn
}
select{
//1:将response成功返回后继续等待下一个response;
//2:如果roundTrip退出,(此时无法返回给roundTripresponse)则退出readLoop。
//即roundTrip接收完response后退出不会影响readLoop继续运行
caserc.ch<-responseAndError{res:resp}:
case<-rc.callerGone:
return
}
//Nowthatthey'vereadfromtheunbufferedchannel,they'resafely
//outoftheselectthatalsowaitsonthisgoroutinetodie,so
//we'reallowedtoexitnowifneeded(ifaliveisfalse)
testHookReadLoopBeforeNextRead()
continue
}
//下面处理responsebody存在数据的场景,逻辑与body不存在数据的场景类似
waitForBodyRead:=make(chanbool,2)
//初始化body的处理函数,读取完response会返回EOF,这类连接是可重用的
body:=&bodyEOFSignal{
body:resp.Body,
earlyCloseFn:func()error{
waitForBodyRead<-false
<-eofc//willbeclosedbydeferredcallattheendofthefunction
returnnil
},
fn:func(errerror)error{
isEOF:=err==io.EOF
waitForBodyRead<-isEOF
ifisEOF{
<-eofc//seecommentaboveeofcdeclaration
}elseiferr!=nil{
ifcerr:=pc.canceled();cerr!=nil{
returncerr
}
}
returnerr
},
}
//返回的resp.Body类型变为了bodyEOFSignal,如果调用者在读取resp.Body后没有关闭,会导致
//readLoop阻塞在下面"casebodyEOF:=<-waitForBodyRead:"中
resp.Body=body
ifrc.addedGzip&&strings.EqualFold(resp.Header.Get("Content-Encoding"),"gzip"){
resp.Body=&gzipReader{body:body}
resp.Header.Del("Content-Encoding")
resp.Header.Del("Content-Length")
resp.ContentLength=-1
resp.Uncompressed=true
}
//此处与处理不带resp.body的场景相同
select{
caserc.ch<-responseAndError{res:resp}:
case<-rc.callerGone:
return
}
//Beforeloopingbacktothetopofthisfunctionandpeekingon
//thebufio.Reader,waitforthecallergoroutinetofinish
//readingtheresponsebody.(orforcancelationordeath)
select{
casebodyEOF:=<-waitForBodyRead:
pc.t.setReqCanceler(rc.req,nil)//beforepcmightreturntoidlepool
alive=alive&&
//如果读取完response的数据,则该连接可以被重用,否则直接释放。释放一个未读取完数据的连接会导致数据丢失。
//注意区分bodyEOF和pc.sawEOF的区别,一个是上层通道(httpresponse.Body)关闭,一个是底层通道(TCP)关闭。
bodyEOF&&
!pc.sawEOF&&
pc.wroteRequest()&&
tryPutIdleConn(trace)
//释放阻塞的读操作
ifbodyEOF{
eofc<-struct{}{}
}
case<-rc.req.Cancel:
alive=false
pc.t.CancelRequest(rc.req)
case<-rc.req.Context().Done():
alive=false
pc.t.cancelRequest(rc.req,rc.req.Context().Err())
case<-pc.closech:
alive=false
}
testHookReadLoopBeforeNextRead()
}
}
本文内容总结:
原文链接:https://www.cnblogs.com/charlieroro/p/11409153.html