Golang Pipeline
本文介绍 Golang 中的一种并发编程模式,Pipeline 流水线模式。
什么是流水线模式?
流水线顾名思义,即一系列处理单元组成的一套处理逻辑。
为什么需要流水线模式?
编程当然可以写成串行的方式,但是这样带来的缺点是,如果某一个阶段耗时比较久,将会导致CPU空闲,这一点就不必多说了,因此引入了并发编程。
流水线模型是并发编程模式的一种,并发编程可以使得计算机可以同时处理多个数据。而在 Golang 中,由于 Goroutine 的存在使得并发编程异常简单,Goroutine 之间需要数据传递,由于 channel 的存在也简化了许多。
怎么组装一条流水线?使用 Channel 连接
在流水线中,每个处理单元称为一个 Stage,每个 Stage 都需要接受上游传来的数据,经过当前 Stage 处理后,再对下游传递。Channel 可以很好的承担传递数据的角色,也就是每个 Stage 都必须有两个 channel,一个负责接受数据的 channel 和一个负责对外发送数据的 channel。
Golang 提供的 channel 通信机制,可以帮助我们便捷的组装各个 stage,下面将通过实例说明。
一个简单的例子:求平方数
我们在这个例子中,将会输入 int 类型的参数,输出各个数的平方值,尽管这个逻辑可以用十分简单的代码实现,但我们还是将他分成3个 stage 来实现,以验证 golang pipeline 编程模型所带来的好处。
Stage1 :
首先要产生数字,将这个 stage 取一个直观的名字 gen,输入参数是 int 列表,返回值是一个 channel。
func gen(nums …int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
Satge2 :
按照我们对 Pipeline 的定义,输入是一个 channel,输出还是一个 channel,处理逻辑即对收到的每个数进行平方
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
Stage3:
最后一个 stage,也是 main 函数,他将会输出这些值。
func main() {
in := gen(2, 3)
c1 := sq(in)
for n := range c1 {
fmt.Println(n)
}
}
扇入扇出 Fan-Out、Fan-In:多个 Routine 同时处理
我不认为扇出扇出是一个好的翻译,这两个单词单独去看并不能让我很直观的了解具体代表的逻辑。Fan-Out 和 Fan-In 是来自电子电路领域的一个概念,如果一个模块输出连接到了多个模块称为Fan-Out.
移植这个概念到 软件的领域,自然指的是一个输出 Channel 连接多个 Staging。利用这个方式,我们可以将工作分发给更多的处理任务,提高并发度。当然,接收端也可以从多个 channel 接受数据。
func main() {
in := gen(2, 3, 3,3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
out := merge( c1, c2)
fmt.Println(<-out)
}
分发给两个 sq,这是典型的 Fan-Out 操作,还引入了一个 merge 函数,这个函数是将两个 channel 输入,输出给一个 output channel,显然这是 Fan-In 操作。
但是 merge 函数有一个重要的点需要注意,输出的 output channel 必须在所有数据发送完成后调用 close方法去关闭。如果过早的关闭,还有未发送的数据,如果发送到已经关闭的 channel上,会引起 panic。如果过晚的关闭或者不关闭,会造成资源泄露或者 main 函数一直在等待数据发送结束。
这是一个很简单的同步模型,Golang 中提供了 WaitGroup 方式来实现这种逻辑。
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
提前结束
首先明确两个要点:
output channel必须在数据全部发送完之前,一直保持 open
只要 channel 还在 open 状态,就必须从里面接受数据
接下来我们将处理两种特殊情况。
之所以说是特殊情况,是因为在正常的逻辑中,我们会等到上游所有的数据发送完,下游所有的数据接受完以后关闭channel ,从而使得 goroutine 能够顺利被回收。但这种情况并不一定是我们每次都需要的,比如我们只需要结果的一个子集,又或者上游的 Stage 产生了异常,这种情况我们都没有必要接受完全部的数据再处理,这种情况我们称为提前结束。
提前结束不是仅仅不再接受数据即可,但是这里还涉及到合理的关闭各个 channel,如果下游已经不想接受数据,上游的发送还没完成的话,上游将会永远阻塞,这会导致 goroutine 泄露,如果在常驻进程中,这可能是一笔极大的损耗。
因此必须有一个机制来通知上游 stage:我们不再接受数据。首先想到的是增加一个 channel,用来在下游和上游中做消息通知。
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the first value from output.
done := make(chan struct{}, 2)
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// Tell the remaining senders we're leaving.
done <- struct{}{}
done <- struct{}{}
}
我们模拟了下游出现异常的情况,只在 out 中接受了一个数据就不再接受了,此时如果不关闭上游 channel,将会造成资源泄露。
我们在 main 函数中增加了一个 done channel,并且把该 channel 放入到 merge 方法中,用来接受信号。
这里连续对两个 done 发送消息的原因是因为,在下面的 merge 函数中,select 在 done 接受到消息后,会顺序执行下面的所有指令,即 wg.Done(),而在示例代码中有两个 channel 被放入到output 中处理,因此两个 channel 都要被关闭。
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed or it receives a value
// from done, then output calls wg.Done.
output := func(c <-chan int) {
for n := range c {
select {
case out <- n:
case <-done:
}
}
wg.Done()
}
// ... the rest is unchanged ...
这种方式有明显的缺点,你必须知道上游有多少个 channel Fan-In,才能在下游发送多少个 done 消息去关闭每一个 channel,下面我们改进一下。
自动化一点:利用 Close Channel 通知
func main() {
// Set up a done channel that's shared by the whole pipeline,
// and close that channel when this pipeline exits, as a signal
// for all the goroutines we started to exit.
done := make(chan struct{})
defer close(done)
in := gen(done, 2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(done, in)
c2 := sq(done, in)
// Consume the first value from output.
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// done will be closed by the deferred call.
}
在 merge 函数中增加对 done channel 的处理,因为我们不关心 done channel 的接收到的值是什么,当done channel 被 close 时,所有 stage 都可以在 done channel 中接受到 0 值。
当 done 中接受到值时,直接return,return 时defer 函数将会自动执行 wg.Done()。
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c or done is closed, then calls
// wg.Done.
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
// ... the rest is unchanged ...
这里的 return 和 defer 是相互搭配的,也可以不使用 return,但必须手动增加 defer 对应的逻辑。
对应的,上游 stage 也应该执行相同的 close 操作。
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
两个 Guideline:
stages close their outbound channels when all the send operations are done.
要么所有数据发送完,主动关闭。
stages keep receiving values from inbound channels until those channels are closed or the senders are unblocked.
要么持续从上游接受数据,直到上游关闭或者发动端全部发完。
引用连接: