golang timeoutHandler解析及kubernetes中的变种
本文内容纲要:
-Basic
-Advanced
-golanghttp.TimeoutHandler
-kubernetestimeoutHandler
-Other
-写在最后
Golang里的httprequesttimeout比较简单,但是稍不留心就容易出现错误,最近在kubernetes生产环境中出现了的一个问题让我有机会好好捋一捋golang中关于timeout中的所有相关的东西。
Basic
golang中timeout有关的设置,资料已经比较多,其中必须阅读的就是ThecompleteguidetoGonet/httptimeouts,里面详述了关于http中各个timeou字段及其影响,写的很详细,本文就不在重复造轮子了。所以我们在生产环境中的代码绝对不能傻傻的使用http.Get("www.baidu.com")了,很容易造成clienthang死,默认的httpclient的timeout值为0,也就是没有超时。具体的血泪教训可以参见Don’tuseGo’sdefaultHTTPclient(inproduction)。对于httppackage中default的设置最后还是仔细review一遍再使用。
Advanced
golanghttp.TimeoutHandler
了解了基本的使用方式后,笔者带领大家解析一下其中的http.TimeoutHandler,TimeoutHandler顾名思义是一个handlerwrapper,用来限制ServeHttp的最大时间,也就是除去读写socket外真正执行服务器逻辑的时间,如果ServeHttp运行时间超过了设定的时间,将返回一个"503ServiceUnavailable"和一个指定的message。(golangnet中各个结构体中各种timeout的不尽相同,但是并没有直接设置ServeHttptimeout的方法,TimeoutHandler是唯一一个方法)。
我们来一起探究一下他的实现,首先是函数定义:
//TimeoutHandlerreturnsaHandlerthatrunshwiththegiventimelimit.
//
//ThenewHandlercallsh.ServeHTTPtohandleeachrequest,butifa
//callrunsforlongerthanitstimelimit,thehandlerrespondswith
//a503ServiceUnavailableerrorandthegivenmessageinitsbody.
//(Ifmsgisempty,asuitabledefaultmessagewillbesent.)
//Aftersuchatimeout,writesbyhtoitsResponseWriterwillreturn
//ErrHandlerTimeout.
//
//TimeoutHandlerbuffersallHandlerwritestomemoryanddoesnot
//supporttheHijackerorFlusherinterfaces.
funcTimeoutHandler(hHandler,dttime.Duration,msgstring)Handler{
	return&timeoutHandler{
		handler:h,
		body:msg,
		dt:dt,
	}
}
可以看到典型的handlerwrapper的函数signature,接收一个handler并返回一个hander,返回的timeouthandler中ServeHttp方法如下:
func(h*timeoutHandler)ServeHTTP(wResponseWriter,r*Request){
	ctx:=h.testContext
	ifctx==nil{
		varcancelCtxcontext.CancelFunc
		ctx,cancelCtx=context.WithTimeout(r.Context(),h.dt)
		defercancelCtx()
	}
	r=r.WithContext(ctx)
	done:=make(chanstruct{})
	tw:=&timeoutWriter{
		w:w,
		h:make(Header),
	}
	panicChan:=make(chaninterface{},1)
	gofunc(){
		deferfunc(){
			ifp:=recover();p!=nil{
				panicChan<-p
			}
		}()
		h.handler.ServeHTTP(tw,r)
		close(done)
	}()
	select{
	casep:=<-panicChan:
		panic(p)
	case<-done:
		tw.mu.Lock()
		defertw.mu.Unlock()
		dst:=w.Header()
		fork,vv:=rangetw.h{
			dst[k]=vv
		}
		if!tw.wroteHeader{
			tw.code=StatusOK
		}
		w.WriteHeader(tw.code)
		w.Write(tw.wbuf.Bytes())
	case<-ctx.Done():
		tw.mu.Lock()
		defertw.mu.Unlock()
		w.WriteHeader(StatusServiceUnavailable)
		io.WriteString(w,h.errorBody())
		tw.timedOut=true
	}
}
整体流程为:
- 
首先初始化context的timeout 
- 
初始化一个timeoutWriter,该timeoutWriter实现了 http.ResponseWriter接口,内部结构体中有一个bytes.Buffer,所有的Write方法都是写入到该buffer中。
- 
异步goroutine调用 serveHttp方法,timeoutWriter作为serveHttp的参数,所以此时写入的数据并没有发送给用户,而是缓存到了timeoutWriter的buffer中
- 
最后select监听各个channel: - 如果子groutinepanic,则捕获该panic并在主grouinte中panic进行propagate
- 如果请求正常完成则开始写入header并将buffer中的内容写给真正的httpwriter
- 如果请求超时则返回用户503
 
为什么需要先写入buffer,然后在写给真正的writer呐?因为我们无法严格意义上的cancel掉一个请求。如果我们已经往一个httpwriter中写了部分数据(例如已经写了hedaer),而此时因为某些逻辑处理较慢,并且发现已经过了timeout阈值,想要cancel该请求。此时已经没有办法真正意义上取消了,可能对端已经读取了部分数据了。一个典型的场景是HTTP/1.1中的分块传输,我们先写入header,然后依次写入各个chunk,如果后面的chunk还没写已经超时了,那此时就陷入了两难的情况。
此时就需要使用golang内置的TimeoutHandler了,它提供了两个优势:
- 首先是提供了一个buffer,等到所有的数据写入完成,如果此时没有超时再统一发送给对端。并且timeoutWriter在每次Write的时候都会判断此时是否超时,如果超时就马上返回错误。
- 给用户返回一个友好的503提示
实现上述两点的代价就是需要维护一个buffer来缓存所有的数据。有些情况下是这个buffer会导致一定的问题,设想一下对于一个高吞吐的server,每个请求都维护一个buffer势必是不可接受的,以kubernete为例,每次listpods时可能有好几M的数据,如果每个请求都写缓存势必会占用过多内存,那kubernetes是如何实现timeout的呐?
kubernetestimeoutHandler
kubernetes为了防止某个请求hang死之后一直占用连接,所以会对每个请求进行timeout的处理,这部分逻辑是在一个handlerchain中WithTimeoutForNonLongRunningRequestshandler实现。其中返回的WithTimeout的实现如下:
//WithTimeoutreturnsanhttp.Handlerthatrunshwithatimeout
//determinedbytimeoutFunc.Thenewhttp.Handlercallsh.ServeHTTPtohandle
//eachrequest,butifacallrunsforlongerthanitstimelimit,the
//handlerrespondswitha504GatewayTimeouterrorandthemessage
//provided.(Ifmsgisempty,asuitabledefaultmessagewillbesent.)After
//thehandlertimesout,writesbyhtoitshttp.ResponseWriterwillreturn
//http.ErrHandlerTimeout.IftimeoutFuncreturnsaniltimeoutchannel,no
//timeoutwillbeenforced.recordFnisafunctionthatwillbeinvokedwhenever
//atimeouthappens.
funcWithTimeout(hhttp.Handler,timeoutFuncfunc(*http.Request)(timeout<-chantime.Time,recordFnfunc(),err*apierrors.StatusError))http.Handler{
	return&timeoutHandler{h,timeoutFunc}
}
其中主要是timeoutHandler,实现如下:
typetimeoutHandlerstruct{
	handlerhttp.Handler
	timeoutfunc(*http.Request)(<-chantime.Time,func(),*apierrors.StatusError)
}
func(t*timeoutHandler)ServeHTTP(whttp.ResponseWriter,r*http.Request){
	after,recordFn,err:=t.timeout(r)
	ifafter==nil{
		t.handler.ServeHTTP(w,r)
		return
	}
	result:=make(chaninterface{})
	tw:=newTimeoutWriter(w)
	gofunc(){
		deferfunc(){
			result<-recover()
		}()
		t.handler.ServeHTTP(tw,r)
	}()
	select{
	caseerr:=<-result:
		iferr!=nil{
			panic(err)
		}
		return
	case<-after:
		recordFn()
		tw.timeout(err)
	}
}
如上,在ServeHTTP中主要做了几件事情:
- 调用timeoutHandler.timeout设置一个timer,如果timeout时间到到达会通过after这个channel传递过来,后面会监听该channel
- 创建timeoutWriter对象,该timeoutWriter中有一个timeout方法,该方法会在超时之后会被调用
- 异步调用ServeHTTP并将timeoutWriter传递进去,如果该groutinepanic则进行捕获并通过channel传递到调用方groutine,因为我们不能因为一个groutinepanic导致整个进程退出,而且调用方groutine对这些panic信息比较感兴趣,需要传递过去。
- 监听定时器channel
如果定时器channel超时会调用timeoutWrite.timeout方法,该方法如下:
func(tw*baseTimeoutWriter)timeout(err*apierrors.StatusError){
	tw.mu.Lock()
	defertw.mu.Unlock()
	tw.timedOut=true
	//Thetimeoutwriterhasnotbeenusedbytheinnerhandler.
	//WecansafelytimeouttheHTTPrequestbysendingbyatimeout
	//handler
	if!tw.wroteHeader&&!tw.hijacked{
		tw.w.WriteHeader(http.StatusGatewayTimeout)
		enc:=json.NewEncoder(tw.w)
		enc.Encode(&err.ErrStatus)
	}else{
		//Thetimeoutwriterhasbeenusedbytheinnerhandler.Thereis
		//nowaytotimeouttheHTTPrequestatthepoint.Wehavetoshutdown
		//theconnectionforHTTP1orresetstreamforHTTP2.
		//
		//Notefrom:BradFitzpatrick
		//iftheServeHTTPgoroutinepanics,thatwilldothebestpossiblethingforboth
		//HTTP/1andHTTP/2.InHTTP/1,assumingyou'rereplyingwithatleastHTTP/1.1and
		//you'vealreadyflushedtheheaderssoit'susingHTTPchunking,it'llkilltheTCP
		//connectionimmediatelywithoutaproper0-byteEOFchunk,sothepeerwillrecognize
		//theresponseasbogus.InHTTP/2theserverwilljustRST_STREAMthestream,leaving
		//theTCPconnectionopen,butresettingthestreamtothepeersoit'llhaveanerror,
		//liketheHTTP/1case.
		panic(errConnKilled)
	}
}
可以看到,如果此时还没有写入任何数据,则直接返回504状态码,否则直接panic。上面有一大段注释说明为什么panic,这段注释的出处在kubernetesissue:
APIserverpanicswhenwritingresponse#29001。引用的是golanghttp包作者BradFitzpatrick的话,意思是:如果我们已经往一个writer中写入了部分数据,我们是没有办法timeout,此时goroutinepanic或许是最好的选择,无论是对于HTTP/1.1还是HTTP/2.0,如果是HTTP/1.1,他不会发送任何数据,直接断开tcp连接,此时对端就能够识别出来server异常,如果是HTTP/2.0此时srever会RST_STREAM该stream,并且不会影响connnection,对端也能够很好的处理。这部分代码还是很有意思的,很难想象kubernetes会以panic掉groutine的方式来处理一个request的超时。
panic掉一个groutine,如果你上层没有任何recover机制的话,整个程序都会退出,对于kubenernetesapiserver肯定是不能接受的,kubernetes在每个request的handlerchain中会有一个genericfilters.WithPanicRecovery进行捕获这样的panic,避免整个进程崩溃。
Other
谈完TimeoutHandler,再回到golangtimeout,有时虽然我们正常timeout返回,但并不意味整个groutine就正常返回了。此时调用返回也只是上层返回了,异步调用的底层逻辑没有办法撤回的。因为我们没办法cancel掉另一个grouine,只能是groutine主动退出,主动退出的实现思路大部分是通过传递一个context或者closechannel给该groutine,该groutine监听到退出信号就终止,但是目前很多调用是不支持接收一个context或closechannle作为参数的。
例如下面这段代码:因为在主逻辑中sleep了4s是没有办法中断的,即时此时request已经返回,但是server端该groutine还是没有被释放,所以golangtimeout这块还是非常容易leakgrouine的,使用的时候需要小心。
packagemain
import(
	"fmt"
	"net/http"
	"runtime"
	"time"
)
funcmain(){
	gofunc(){
		for{
			time.Sleep(time.Second)
			fmt.Printf("groutinenum:%d\n",runtime.NumGoroutine())
		}
	}()
	handleFunc:=func(whttp.ResponseWriter,r*http.Request){
		fmt.Printf("request%v\n",r.URL)
		time.Sleep(4*time.Second)
		_,err:=fmt.Fprintln(w,"ok")
		iferr!=nil{
			fmt.Printf("writeerr:%v\n",err)
		}
	}
	err:=http.ListenAndServe("localhost:9999",http.TimeoutHandler(http.HandlerFunc(handleFunc),2*time.Second,"err:timeout"))
	iferr!=nil{
		fmt.Printf("%v",err)
	}
}
写在最后
golangtimeout简单但是比较繁琐,只有明白其原理才能真正防患于未然
2020/4/13更新:上述代码存在资源泄露的问题,已经被社区修复,参加http://likakuli.com/post/2019/12/06/apiserver_goroutine_leak/
本文内容总结:Basic,Advanced,golanghttp.TimeoutHandler,kubernetestimeoutHandler,Other,写在最后,
原文链接:https://www.cnblogs.com/gaorong/p/11336834.html