【GoLang】golang 的精髓--流水线,对现实世界的完美模拟
本文内容纲要:
-简介
-阅读建议
-什么是"流水线"(pipeline)?
-流水线入门:求平方数
-流水线进阶:扇入和扇出
-停下来思考一下
-显式取消(Explicitcancellation)
-结论
-相关链接
直接上代码:
packagemain
import(
"fmt"
"runtime"
"strconv"
"sync"
)
funcsay(strstring){
fori:=0;i<5;i++{
runtime.Gosched()
fmt.Println(str)
}
}
funcsayStat(strstring,chchanint64){
fori:=0;i<5000;i++{
runtime.Gosched()
fmt.Println(str)
ch<-int64(i)
}
close(ch)
}
funcsayStat_2_Worker(strstring,chchanint64){
sum:=0
fori:=0;i<5000;i++{
runtime.Gosched()
fmt.Println(str)
sum+=i
}
ch<-int64(sum)
//close(ch)
}
funcgen(done<-chanstruct{},nums...int)<-chanint{
out:=make(chanint)
gofunc(){
deferclose(out)
for_,i:=rangenums{
select{
caseout<-i:
case<-done:
return
}
}
}()
returnout
}
funcsquare(done<-chanstruct{},in<-chanint)<-chanint{
out:=make(chanint)
gofunc(){
deferclose(out)
forc:=rangein{
select{
caseout<-c*c:
case<-done:
return
}
}
}()
returnout
}
funcmerge(done<-chanstruct{},ins...<-chanint)<-chanint{
varwgsync.WaitGroup
wg.Add(len(ins))
out:=make(chanint)
//ERROR:http://studygolang.com/articles/7994
//REF:"for"声明中的迭代变量和闭包
//for_,in:=rangeins{
//gofunc(){
//forc:=rangein{
//out<-c
//}
//wg.Done()
//}()
//}
//Solution1:NewfuncOutline
//ff:=func(in<-chanint){
//forc:=rangein{
//out<-c
//}
//wg.Done()
//}
//for_,in:=rangeins{
//goff(in)
//}
//Solution2:Inlinefuncwithparameter
//for_,in:=rangeins{
//gofunc(in<-chanint){
//forc:=rangein{
//out<-c
//}
//wg.Done()
//}(in)
//}
//Solution3:Inlinefuncwithparametercopybak
for_,in:=rangeins{
in_copy:=in
gofunc(){
deferwg.Done()
forc:=rangein_copy{
select{
caseout<-c:
case<-done:
return
}
}
}()
}
gofunc(){
wg.Wait()
close(out)
}()
returnout
}
funcgenNew(nums...int)<-chanint{
out:=make(chanint,len(nums))
for_,n:=rangenums{
out<-n
}
close(out)
returnout
}
funcmain(){
//DEFAULTVALUE:NUMBEROFCPUCORE
fmt.Println(runtime.GOMAXPROCS(-1))
runtime.Gosched()
fmt.Println(runtime.GOMAXPROCS(-1))
fmt.Println(runtime.NumCPU())
//gosay("hello")
//say("world")
ch:=make(chanint64)
gosayStat("hello",ch)
//gosayStat("hello",ch)
//sayStat("world",ch)
varstatint64=0
forc:=rangech{
fmt.Println(c)
stat+=c
}
fmt.Println(stat)//12497500
////DEADLOCK!
//cc:=make(chanint)
////NOGOROUTINERECEIVETHEUNBUFFEREDCHANNELDATA!
//cc<-888
//fmt.Println(<-cc)
stat=0
cc:=make(chanint64)
worker_num:=2
fori:=0;i<worker_num;i++{
gosayStat_2_Worker("TEST-"+strconv.Itoa(i),cc)
}
fori:=0;i<worker_num;i++{
stat+=<-cc
}
close(cc)
fmt.Println(stat)//12497500*2=24995000
//out:=square(gen(1,2,3,4,5))
//forc:=rangeout{
//fmt.Println(c)
//}
done:=make(chanstruct{})
//deferclose(done)
out_new:=gen(done,1,2,3,4,5)
c1:=square(done,out_new)
c2:=square(done,out_new)
//forr1:=rangec1{
//fmt.Println(r1)
//}
//forr2:=rangec2{
//fmt.Println(r2)
//}
//forr:=rangemerge(c1,c2){
//fmt.Println(r)
//}
mg:=merge(done,c1,c2)
fmt.Println(<-mg)
fmt.Println(<-mg)
fmt.Println(<-mg)
close(done)
//fmt.Println(<-mg)
//fmt.Println(<-mg)
//fmt.Println(<-mg)
//fmt.Println(<-mg)
for{
ifmsg,closed:=<-mg;!closed{
fmt.Println("<-mghasclosed!")
return
}else{
fmt.Println(msg)
}
}
//gen_new:=genNew(1,2,3,4,5)
////close(gen_new)
//forgn:=rangegen_new{
//fmt.Println(gn)
//}
}
简介
Go语言的并发原语允许开发者以类似于UnixPipe的方式构建数据流水线(datapipelines),数据流水线能够高效地利用I/O和多核CPU的优势。
本文要讲的就是一些使用流水线的一些例子,流水线的错误处理也是本文的重点。
阅读建议
数据流水线充分利用了多核特性,代码层面是基于channel类型和go关键字。
channel和go贯穿本文的始终。如果你对这两个概念不太了解,建议先阅读之前公众号发布的两篇文章:Go语言内存模型(上/下)。
如果你对操作系统中"生产者"和"消费者"模型比较了解的话,也将有助于对本文中流水线的理解。
本文中绝大多数讲解都是基于代码进行的。换句话说,如果你看不太懂某些代码片段,建议补全以后,在机器或play.golang.org上运行一下。对于某些不明白的细节,可以手动添加一些语句以助于理解。
由于Go语言并发模型的英文原文GoConcurrencyPatterns:Pipelinesandcancellation篇幅比较长,本文只包含理论推导和简单的例子。
下一篇文章我们会对"并行MD5"这个现实生活的例子进行详细地讲解。
什么是"流水线"(pipeline)?
对于"流水线"这个概念,Go语言中并没有正式的定义,它只是很多种并发方式的一种。这里我给出一个非官方的定义:一条流水线是是由多个阶段组成的,相邻的两个阶段由channel进行连接;每个阶段是由一组在同一个函数中启动的goroutine组成。在每个阶段,这些goroutine会执行下面三个操作:
- 通过inboundchannels从上游接收数据
- 对接收到的数据执行一些操作,通常会生成新的数据
- 将新生成的数据通过outboundchannels发送给下游
除了第一个和最后一个阶段,每个阶段都可以有任意个inbound和outboundchannel。
显然,第一个阶段只有outboundchannel,而最后一个阶段只有inboundchannel。
我们通常称第一个阶段为"生产者"
或"源头"
,称最后一个阶段为"消费者"
或"接收者"
。
首先,我们通过一个简单的例子来演示这个概念和其中的技巧。后面我们会更出一个真实世界的例子。
流水线入门:求平方数
假设我们有一个流水线,它由三个阶段组成。
第一阶段是gen函数,它能够将一组整数转换为channel,channel可以将数字发送出去。
gen函数首先启动一个goroutine,该goroutine发送数字到channel,当数字发送完时关闭channel。
代码如下:
funcgen(nums...int)<-chanint{out:=make(chanint)gofunc(){for_,n:=rangenums{out<-n}close(out)}()returnout}
第二阶段是sq函数,它从channel接收一个整数,然后返回一个channel,返回的channel可以发送接收到整数的平方。当它的inboundchannel关闭,并且把所有数字均发送到下游时,会关闭outboundchannel。代码如下:
funcsq(in<-chanint)<-chanint{out:=make(chanint)gofunc(){forn:=rangein{out<-n*n}close(out)}()returnout}
main函数用于设置流水线并运行最后一个阶段。最后一个阶段会从第二阶段接收数字,并逐个打印出来,直到来自于上游的inboundchannel关闭。代码如下:
funcmain(){
//设置流水线
c:=gen(2,3)out:=sq(c)//消费输出结果fmt.Println(<-out)//4fmt.Println(<-out)//9}
由于sq函数的inboundchannel和outboundchannel类型一样,所以组合任意个sq函数。比如像下面这样使用:
funcmain(){
//设置流水线并消费输出结果
forn:=rangesq(sq(gen(2,3))){
fmt.Println(n)//16then81}}
如果我们稍微修改一下gen函数,便可以模拟haskell的惰性求值。有兴趣的读者可以自己折腾一下。
流水线进阶:扇入和扇出
扇出:同一个channel可以被多个函数读取数据,直到channel关闭。
这种机制允许将工作负载分发到一组worker,以便更好地并行使用CPU和I/O。
扇入:多个channel的数据可以被同一个函数读取和处理,然后合并到一个channel,直到所有channel都关闭。
下面这张图对扇入有一个直观的描述:
我们修改一下上个例子中的流水线,这里我们运行两个sq实例,它们从同一个channel读取数据。这里我们引入一个新函数merge对结果进行"扇入"操作:
funcmain(){
in:=gen(2,3)
//启动两个sq实例,即两个goroutines处理channel"in"的数据
c1:=sq(in)c2:=sq(in)//merge函数将channelc1和c2合并到一起,这段代码会消费merge的结果forn:=rangemerge(c1,c2){fmt.Println(n)//打印49,或94}}
merge函数将多个channel转换为一个channel,它为每一个inboundchannel启动一个goroutine,用于将数据
拷贝到outboundchannel。
merge函数的实现见下面代码(注意wg变量):
funcmerge(cs...<-chanint)<-chanint{
varwgsync.WaitGroupout:=make(chanint)//为每一个输入channelcs创建一个goroutineoutput//output将数据从c拷贝到out,直到c关闭,然后调用wg.Doneoutput:=func(c<-chanint){forn:=rangec{out<-n}wg.Done()}wg.Add(len(cs))for_,c:=rangecs{gooutput(c)}//启动一个goroutine,用于所有outputgoroutine结束时,关闭out//该goroutine必须在wg.Add之后启动gofunc(){wg.Wait()close(out)}()returnout}
在上面的代码中,每个inboundchannel对应一个output
函数。所有output
goroutine被创建以后,merge启动一个额外的goroutine,
这个goroutine会等待所有inboundchannel上的发送操作结束以后,关闭outboundchannel。
对已经关闭的channel执行发送操作(ch<-)会导致异常,所以我们必须保证所有的发送操作都在关闭channel之前结束。
sync.WaitGroup提供了一种组织同步的方式。
它保证merge中所有inboundchannel(cs...<-chanint)均被正常关闭,outputgoroutine正常结束后,关闭outchannel。
停下来思考一下
在使用流水线函数时,有一个固定的模式:
- 在一个阶段,当所有发送操作(ch<-)结束以后,关闭outboundchannel
- 在一个阶段,goroutine会持续从inbountchannel接收数据,直到所有inboundchannel全部关闭
在这种模式下,每一个接收阶段都可以写成range
循环的方式,
从而保证所有数据都被成功发送到下游后,goroutine能够立即退出。
在现实中,阶段并不总是接收所有的inbound数据。有时候是设计如此:接收者可能只需要数据的一个子集就可以继续执行。
更常见的情况是:由于前一个阶段返回一个错误,导致该阶段提前退出。
这两种情况下,接收者都不应该继续等待后面的值被传送过来。
我们期望的结果是:当后一个阶段不需要数据时,上游阶段能够停止生产。
在我们的例子中,如果一个阶段不能消费所有的inbound数据,试图发送这些数据的goroutine会永久阻塞。看下面这段代码片段:
//只消费out的第一个数据
out:=merge(c1,c2)
fmt.Println(<-out)//4or9
return//由于我们不再接收out的第二个数据//其中一个goroutineoutput将会在发送时被阻塞}
显然这里存在资源泄漏。一方面goroutine消耗内存和运行时资源,另一方面goroutine栈中的堆引用会阻止gc执行回收操作。既然goroutine不能被回收,那么他们必须自己退出。
我们重新整理一下流水线中的不同阶段,保证在下游阶段接收数据失败时,上游阶段也能够正常退出。
一个方式是使用带有缓冲的管道作为outboundchannel。缓存可以存储固定个数的数据。
如果缓存没有用完,那么发送操作会立即返回。看下面这段代码示例:
c:=make(chanint,2)//缓冲大小为2
c<-1//立即返回c<-2//立即返回c<-3//该操作会被阻塞,直到有一个goroutine执行<-c,并接收到数字1
如果在创建channel时就知道要发送的值的个数,使用buffer就能够简化代码。
仍然使用求平方数的例子,我们对gen函数进行重写。我们将这组整型数拷贝到一个
缓冲channel中,从而避免创建一个新的goroutine:
funcgen(nums...int)<-chanint{out:=make(chanint,len(nums))for_,n:=rangenums{out<-n}close(out)returnout}
回到流水线中被阻塞的goroutine,我们考虑让merge函数返回一个缓冲管道:
funcmerge(cs...<-chanint)<-chanint{varwgsync.WaitGroupout:=make(chanint,1)//在本例中存储未读的数据足够了//...其他部分代码不变...
尽管这种方法解决了这个程序中阻塞goroutine的问题,但是从长远来看,它并不是好办法。缓存大小选择为1是建立在两个前提之上:
- 我们已经知道merge函数有两个inboundchannel
- 我们已经知道下游阶段会消耗多少个值
这段代码很脆弱。如果我们在传入一个值给gen函数,或者下游阶段读取的值变少,goroutine会再次被阻塞。
为了从根本上解决这个问题,我们需要提供一种机制,让下游阶段能够告知上游发送者停止接收的消息。下面我们看下这种机制。
显式取消(Explicitcancellation)
当main函数决定退出,并停止接收out发送的任何数据时,它必须告诉上游阶段的goroutine让它们放弃
正在发送的数据。main函数通过发送数据到一个名为done的channel实现这样的机制。由于有两个潜在的
发送者被阻塞,它发送两个值。如下代码所示:
funcmain(){
in:=gen(2,3)//启动两个运行sq的goroutine//两个goroutine的数据均来自于inc1:=sq(in)c2:=sq(in)//消耗output生产的第一个值done:=make(chanstruct{},2)out:=merge(done,c1,c2)fmt.Println(<-out)//4or9//告诉其他发送者,我们将要离开//不再接收它们的数据done<-struct{}{}done<-struct{}{}}
发送数据的goroutine使用一个select表达式代替原来的操作,select表达式只有在接收到out或done
发送的数据后,才会继续进行下去。done的值类型为struct{},因为它发送什么值不重要,重要的是它发送没发送:
接收事件发生意味着channelout的发送操作被丢弃。goroutineoutput基于inboundchannelc继续执行
循环,所以上游阶段不会被阻塞。(后面我们会讨论如何让循环提前退出)。使用donechannel方式实现的merge函数如下:
funcmerge(done<-chanstruct{},cs...<-chanint)<-chanint{varwgsync.WaitGroupout:=make(chanint)//为cs的的每一个输入channel//创建一个goroutine。output函数将//数据从c拷贝到out,直到c关闭,//或者接收到done信号;//然后调用wg.Done()output:=func(c<-chanint){forn:=rangec{select{caseout<-n:case<-done:}}wg.Done()}//...therestisunchanged...
这种方法有一个问题:每一个下游的接收者需要知道潜在被阻上游发送者的个数,然后向这些发送者发送信号让它们提前退出。时刻追踪这些数目是一项繁琐且易出错的工作。
我们需要一种方式能够让未知数目、且个数不受限制的goroutine停止向下游发送数据。在Go语言中,我们可以通过关闭一个
channel实现,因为在一个已关闭channel上执行接收操作(<-ch)总是能够立即返回,返回值是对应类型的零值
。关于这点的细节,点击这里查看。
换句话说,我们只要关闭donechannel,就能够让解开对所有发送者的阻塞。对一个管道的关闭操作事实上是对所有接收者的广播信号。
我们把donechannel作为一个参数传递给每一个流水线上的函数,通过defer表达式声明对donechannel的关闭操作。因此,所有从main函数作为源头被调用的函数均能够收到done的信号,每个阶段都能够正常退出。使用done对main函数重构以后,代码如下:
funcmain(){
//设置一个全局共享的donechannel,
//当流水线退出时,关闭donechannel//所有goroutine接收到done的信号后,//都会正常退出。done:=make(chanstruct{})deferclose(done)in:=gen(done,2,3)//将sq的工作分发给两个goroutine//这两个goroutine均从in读取数据c1:=sq(done,in)c2:=sq(done,in)//消费outtput生产的第一个值out:=merge(done,c1,c2)fmt.Println(<-out)//4or9//defer调用时,donechannel会被关闭。}
现在,流水线中的每个阶段都能够在donechannel被关闭时返回。merge函数中的output代码也能够顺利返回,因为它知道donechannel关闭时,上游发送者sq会停止发送数据。在defer表达式执行结束时,所有调用链上的output都能保证wg.Done()被调用:
funcmerge(done<-chanstruct{},cs...<-chanint)<-chanint{varwgsync.WaitGroupout:=make(chanint)//为cs的每一个channel创建一个goroutine//这个goroutine运行output,它将数据从c//拷贝到out,直到c关闭,或者接收到done//的关闭信号。人啊后调用wg.Done()output:=func(c<-chanint){deferwg.Done()forn:=rangec{select{caseout<-n:case<-done:return}}}//...therestisunchanged...
同样的原理,donechannel被关闭时,sq也能够立即返回。在defer表达式执行结束时,所有调用链上的sq都能保证outchannel被关闭。代码如下:
funcsq(done<-chanstruct{},in<-chanint)<-chanint{out:=make(chanint)gofunc(){deferclose(out)forn:=rangein{select{caseout<-n*n:case<-done:return}}}()returnout}
这里,我们给出几条构建流水线的指导:
- 当所有发送操作结束时,每个阶段都关闭自己的outboundchannels
- 每个阶段都会一直从inboundchannels接收数据,直到这些channels被关闭,或发送者解除阻塞状态。
流水线通过两种方式解除发送者的阻塞:
- 提供足够大的缓冲保存发送者发送的数据
- 接收者放弃channel时,显式地通知发送者。
结论
本文介绍了Go语言中构建数据流水线的一些技巧。流水线的错误处理比较复杂,流水线的每个阶段都可能阻塞向下游发送数据,
下游阶段也可能不再关注上游发送的数据。上面我们介绍了通过关闭一个channel,向流水线中的所有goroutine发送一个"done"信号;也定义了
构建流水线的正确方法。
下一篇文章,我们将通过一个并行md5的例子来说明本文所讲的一些理念和技巧。
原作者SameerAjmani,翻译Oscar
下期预告:Go语言并发模型:以并行md5计算为例。英文原文链接
相关链接
-
原文链接:https://blog.golang.org/pipel...
Go并发模型:http://talks.golang.org/2012/...
Go高级并发模型:http://blog.golang.org/advanc...
本文内容总结:简介,阅读建议,什么是"流水线"(pipeline)?,流水线入门:求平方数,流水线进阶:扇入和扇出,停下来思考一下,显式取消(Explicitcancellation),结论,相关链接,
原文链接:https://www.cnblogs.com/junneyang/p/6215785.html