前一篇文章《Golang 并发模型: 轻松入门流水线模型》, 介绍了流水线模型的概念, 这篇文章是流水线模型进阶, 介绍 FAN-IN 和 FAN-OUT,FAN 模式可以让我们的流水线模型更好的利用 Golang 并发, 提高软件性能. 但 FAN 模式不一定是万能, 不见得能提高程序的性能, 甚至还不如普通的流水线. 我们先介绍下 FAN 模式, 再看看它怎么提升性能的, 它是不是万能的.
这篇文章内容略多, 本来打算分几次写的, 但不如一次读完爽, 所以干脆还是放一篇文章了, 要是时间不充足, 利用好碎片时间, 可以每次看 1 个标题的内容.
FAN-IN 和 FAN-OUT 模式
Golang 的并发模式灵感来自现实世界, 这些模式是通用的, 毫无例外, FAN 模式也是对当前世界的模仿. 以汽车组装为例, 汽车生产线上有个阶段是给小汽车装 4 个轮子, 可以把这个阶段任务交给 4 个人同时去做, 这 4 个人把轮子都装完后, 再把汽车移动到生产线下一个阶段. 这个过程中, 就有任务的分发, 和任务结果的收集. 其中任务分发是 FAN-OUT, 任务收集是 FAN-IN.
FAN-OUT 模式: 多个 goroutine 从同一个通道读取数据, 直到该通道关闭. OUT 是一种张开的模式, 所以又被称为扇出, 可以用来分发任务.
FAN-IN 模式: 1 个 goroutine 从多个通道读取数据, 直到这些通道关闭. IN 是一种收敛的模式, 所以又被称为扇入, 用来收集处理的结果.
FAN-IN 和 FAN-OUT 实践
我们这次试用 FAN-OUT 和 FAN-IN, 解决《Golang 并发模型: 轻松入门流水线模型》中提到的问题: 计算一个整数切片中元素的平方值并把它打印出来.
producer() 保持不变, 负责生产数据.
squre() 也不变, 负责计算平方值.
修改 main(), 启动 3 个 square, 这 3 个 squre 从 producer 生成的通道读数据, 这是 FAN-OUT.
增加 merge(), 入参是 3 个 square 各自写数据的通道, 给这 3 个通道分别启动 1 个协程, 把数据写入到自己创建的通道, 并返回该通道, 这是 FAN-IN.
FAN 模式流水线示例:
- package main
- import (
- "fmt"
- "sync"
- )
- func producer(nums ...int) <-chan int {
- out := make(chan int)
- go func() {
- defer close(out)
- for _, n := range nums {
- out <- i
- }
- }()
- return out
- }
- func square(inCh <-chan int) <-chan int {
- out := make(chan int)
- go func() {
- defer close(out)
- for n := range inCh {
- out <- n * n
- }
- }()
- return out
- }
- func merge(cs ...<-chan int) <-chan int {
- out := make(chan int)
- var wg sync.WaitGroup
- collect := func(in <-chan int) {
- defer wg.Done()
- for n := range in {
- out <- n
- }
- }
- wg.Add(len(cs))
- // FAN-IN
- for _, c := range cs {
- go collect(c)
- }
- // 错误方式: 直接等待是 bug, 死锁, 因为 merge 写了 out,main 却没有读
- // wg.Wait()
- // close(out)
- // 正确方式
- go func() {
- wg.Wait()
- close(out)
- }()
- return out
- }
- func main() {
- in := producer(1, 2, 3, 4)
- // FAN-OUT
- c1 := square(in)
- c2 := square(in)
- c3 := square(in)
- // consumer
- for ret := range merge(c1, c2, c3) {
- fmt.Printf("=", ret)
- }
- fmt.Println()
- }
3 个 squre 协程并发运行, 结果顺序是无法确定的, 所以你得到的结果, 不一定与下面的相同.
- awesome Git:(master) go run hi.go
- 1 4 16 9
FAN 模式真能提升性能吗?
相信你心里已经有了答案, 可以的. 我们还是使用老问题, 对比一下简单的流水线和 FAN 模式的流水线, 修改下代码, 增加程序的执行时间:
produer() 使用参数生成指定数量的数据.
square() 增加阻塞操作, 睡眠 1s, 模拟阶段的运行时间.
main() 关闭对结果数据的打印, 降低结果处理时的 IO 对 FAN 模式的对比.
普通流水线:
- // hi_simple.go
- package main
- import (
- "fmt"
- )
- func producer(n int) <-chan int {
- out := make(chan int)
- go func() {
- defer close(out)
- for i := 0; i < n; i++ {
- out <- i
- }
- }()
- return out
- }
- func square(inCh <-chan int) <-chan int {
- out := make(chan int)
- go func() {
- defer close(out)
- for n := range inCh {
- out <- n * n
- // simulate
- time.Sleep(time.Second)
- }
- }()
- return out
- }
- func main() {
- in := producer(10)
- ch := square(in)
- // consumer
- for _ = range ch {
- }
- }
使用 FAN 模式的流水线:
- // hi_fan.go
- package main
- import (
- "sync"
- "time"
- )
- func producer(n int) <-chan int {
- out := make(chan int)
- go func() {
- defer close(out)
- for i := 0; i < n; i++ {
- out <- i
- }
- }()
- return out
- }
- func square(inCh <-chan int) <-chan int {
- out := make(chan int)
- go func() {
- defer close(out)
- for n := range inCh {
- out <- n * n
- // simulate
- time.Sleep(time.Second)
- }
- }()
- return out
- }
- func merge(cs ...<-chan int) <-chan int {
- out := make(chan int)
- var wg sync.WaitGroup
- collect := func(in <-chan int) {
- defer wg.Done()
- for n := range in {
- out <- n
- }
- }
- wg.Add(len(cs))
- // FAN-IN
- for _, c := range cs {
- go collect(c)
- }
- // 错误方式: 直接等待是 bug, 死锁, 因为 merge 写了 out,main 却没有读
- // wg.Wait()
- // close(out)
- // 正确方式
- go func() {
- wg.Wait()
- close(out)
- }()
- return out
- }
- func main() {
- in := producer(10)
- // FAN-OUT
- c1 := square(in)
- c2 := square(in)
- c3 := square(in)
- // consumer
- for _ = range merge(c1, c2, c3) {
- }
- }
多次测试, 每次结果近似, 结果如下:
FAN 模式利用了 7% 的 CPU, 而普通流水线 CPU 只使用了 3%,FAN 模式能够更好的利用 CPU, 提供更好的并发, 提高 Golang 程序的并发性能.
FAN 模式耗时 10s, 普通流水线耗时 4s. 在协程比较费时时, FAN 模式可以减少程序运行时间, 同样的时间, 可以处理更多的数据.
- awesome Git:(master) time go run hi_simple.go
- go run hi_simple.go 0.17s user 0.18s system 3% CPU 10.389 total
- awesome Git:(master)
- awesome Git:(master) time go run hi_fan.go
- go run hi_fan.go 0.17s user 0.16s system 7% CPU 4.288 total
也可以使用 Benchmark 进行测试, 看 2 个类型的执行时间, 结论相同. 为了节约篇幅, 这里不再介绍, 方法和结果贴在 Gist 了, 想看的朋友瞄一眼, 或自己动手搞搞.
FAN 模式一定能提升性能吗?
FAN 模式可以提高并发的性能, 那我们是不是可以都使用 FAN 模式?
不行的, 因为 FAN 模式不一定能提升性能.
依然使用之前的问题, 再次修改下代码, 其他不变:
squre() 去掉耗时.
main() 增加 producer() 的入参, 让 producer 生产 10,000,000 个数据.
简单版流水线修改代码:
- // hi_simple.go
- func square(inCh <-chan int) <-chan int {
- out := make(chan int)
- go func() {
- defer close(out)
- for n := range inCh {
- out <- n * n
- }
- }()
- return out
- }
- func main() {
- in := producer(10000000)
- ch := square(in)
- // consumer
- for _ = range ch {
- }
- }
FAN 模式流水线修改代码:
- // hi_fan.go
- package main
- import (
- "sync"
- )
- func square(inCh <-chan int) <-chan int {
- out := make(chan int)
- go func() {
- defer close(out)
- for n := range inCh {
- out <- n * n
- }
- }()
- return out
- }
- func main() {
- in := producer(10000000)
- // FAN-OUT
- c1 := square(in)
- c2 := square(in)
- c3 := square(in)
- // consumer
- for _ = range merge(c1, c2, c3) {
- }
- }
结果, 可以跑多次, 结果近似:
- awesome Git:(master) time go run hi_simple.go
- go run hi_simple.go 9.96s user 5.93s system 168% CPU 9.424 total
- awesome Git:(master) time go run hi_fan.go
- go run hi_fan.go 23.35s user 11.51s system 297% CPU 11.737 total
从这个结果, 我们能看到 2 点.
FAN 模式可以提高 CPU 利用率.
FAN 模式不一定能提升效率, 降低程序运行时间.
优化 FAN 模式
既然 FAN 模式不一定能提高性能, 如何优化?
不同的场景优化不同, 要依具体的情况, 解决程序的瓶颈.
我们当前程序的瓶颈在 FAN-IN,squre 函数很快就完成, merge 函数它把 3 个数据写入到 1 个通道的时候出现了瓶颈, 适当使用带缓冲通道可以提高程序性能, 再修改下代码
merge() 中的 out 修改为:
out := make(chan int, 100)
结果:
- awesome Git:(master) time go run hi_fan_buffered.go
- go run hi_fan_buffered.go 19.85s user 8.19s system 323% CPU 8.658 total
使用带缓存通道后, 程序的性能有了较大提升, CPU 利用率提高到 323%, 提升了 8%, 运行时间从 11.7 降低到 8.6, 降低了 26%.
FAN 模式的特点很简单, 相信你已经掌握了, 如果记不清了看这里, 本文所有代码在该 GitHub 仓库.
FAN 模式很有意思, 并且能提高 Golang 并发的性能, 如果想以后运用自如, 用到自己的项目中去, 还是要写写自己的 Demo, 快去实践一把.
来源: https://segmentfault.com/a/1190000017182416