concurrency in go这本书出版于 2017 年八月, 里面有些观点还是蛮新颖的, 烂大街的我就先不写了, 重点写写书里提到的, 我之前忽视的观点, 以及一些奇技淫巧.
锁的粒度太大的话, 有可能造成其它的 goroutine 饥饿
- package main
- import (
- "fmt"
- "sync"
- "time"
- )
- func main() {
- var wg sync.WaitGroup
- var sharedLock sync.Mutex
- const runtime = 1 * time.Second
- greedyWorker := func() {defer wg.Done()
- var count int
- for begin := time.Now(); time.Since(begin) <= runtime; {
- sharedLock.Lock()
- time.Sleep(3 * time.Nanosecond)
- sharedLock.Unlock()
- count++
- }
- fmt.Printf("Greedy worker was able to execute %v work loops\n", count)
- }
- politeWorker := func() {
- defer wg.Done()
- var count int
- for begin := time.Now(); time.Since(begin) <= runtime; {
- sharedLock.Lock()
- time.Sleep(1 * time.Nanosecond)
- sharedLock.Unlock()
- sharedLock.Lock()
- time.Sleep(1 * time.Nanosecond)
- sharedLock.Unlock()
- sharedLock.Lock()
- time.Sleep(1 * time.Nanosecond)
- sharedLock.Unlock()
- count++
- }
- fmt.Printf("Polite worker was able to execute %v work loops.\n", count)
- }
- wg.Add(2)
- go greedyWorker()
- go politeWorker()
- wg.Wait()
- }
在我的 Mac 上跑的结果:
Greedy worker was able to execute 507672 work loops
Polite worker was able to execute 297669 work loops.
锁的粒度大, 会得到更多的运行时间, 而导致其它 goroutine 获得的执行机会变少.
close 各种状态的 channel 的结果对照表:
这里面 close receive only 的 channel 发现和我的直觉有所不同.. 会直接 panic.
利用作用域来约束 channel 的逻辑
channel 一般都有发送端和接收端, 所以声明, 定义的时候一般是这么写的:
var a = make(chan int, 10)
如果我们的消费者和生产者代码紧跟着上面的代码的话, 那么可能就写成这样:
- var a = make(chan int, 10)
- go func() {
- for {
- select {
- case <- a:
- // do some thing
- }
- }
- }()
- // producer...
当然, 非得这么写也不是说就一定会有问题. 但随着项目升级, 代码被改得面目全非的时候, 保不准会有人在 consumer 里向全局的 channel 里塞数据, 从而造成错误的结果. 我们还可以有更好的写法来规避这个问题:
- consumer := func(ch <-chan int) {
- for {
- select {
- case <- ch:
- // do some thing
- }
- }
- }
- var a = make(chan int, 10)
- go consumer(a)
将 channel 的定义放在 consumer 定义的后面, 从作用域上彻底根绝了在 consumer 里直接操作 a channel 的可能性.
通过 done channel 避免 goroutine 泄露
- package main
- import (
- "fmt"
- "time"
- )
- func main() {
- doWork := func(
- done <-chan interface{},
- strings <-chan string,
- ) <-chan interface{} { // <1>
- terminated := make(chan interface{})
- go func() {
- defer fmt.Println("doWork exited.")
- defer close(terminated)
- for {
- select {
- case s := <-strings:
- // Do something interesting
- fmt.Println(s)
- case <-done: // <2>
- return
- }
- }
- }()
- return terminated
- }
- done := make(chan interface{})
- terminated := doWork(done, nil)
- go func() { // <3>
- // Cancel the operation after 1 second.
- time.Sleep(1 * time.Second)
- fmt.Println("Canceling doWork goroutine...")
- close(done)
- }()
- <-terminated // <4>
- fmt.Println("Done.")
- }
这个其实平常用的已经挺多的了..
多个 ch 中有一个返回结果即退出的 or channel
- package main
- // if one of the channel is ready
- // then the or channel will be ready
- import (
- "fmt"
- "time"
- )
- func main() {
- var or func(chs ...chan interface{}) chan interface{}
- or = func(chs ...chan interface{}) chan interface{} {
- if len(chs) == 0 {
- return nil
- }
- if len(chs) == 1 {
- return chs[0]
- }
- orDone := make(chan interface{})
- go func() {
- defer close(orDone)
- switch len(chs) {
- case 2:
- select {
- case <-chs[0]:
- case <-chs[1]:
- }
- default: // len(chs)> 2
- select {
- case <-chs[0]:
- case <-chs[1]:
- case <-chs[2]:
- case <-or(append(chs[3:], orDone)...):
- }
- }
- }()
- return orDone
- }
- start := time.Now()
- sig := func(after time.Duration) chan interface{} {
- ch := make(chan interface{})
- go func() {
- defer close(ch)
- time.Sleep(after)
- }()
- return ch
- }
- <-or(
- sig(time.Second),
- sig(time.Second*2),
- sig(time.Minute),
- )
- fmt.Println(time.Since(start))
- }
作者声称是在多个数据源, 有一个返回时即可返回给用户的场景下, 这种模式比较好使. 不过我好像还没碰到过这种场景 orz.
用 fan-in fan-out 加快整个 pipeline 流程
这个实在没啥好说的, channel 然后又 channel 的场景, 哪一步慢了, 就在那里做并行.
用 bridge channel 简化 channel in channel 的消费代码
- package main
- import (
- "fmt"
- )
- func main() {
- orDone := func(done, c <-chan interface{}) <-chan interface{} {
- valStream := make(chan interface{})
- go func() {
- defer close(valStream)
- for {
- select {
- case <-done:
- return
- case v, ok := <-c:
- if ok == false {
- return
- }
- select {
- case valStream <- v:
- case <-done:
- }
- }
- }
- }()
- return valStream
- }
- bridge := func(
- done <-chan interface{},
- chanStream <-chan <-chan interface{},
- ) <-chan interface{} {
- valStream := make(chan interface{}) // <1>
- go func() {
- defer close(valStream)
- for { // <2>
- var stream <-chan interface{}
- select {
- case maybeStream, ok := <-chanStream:
- if ok == false {
- return
- }
- stream = maybeStream
- case <-done:
- return
- }
- for val := range orDone(done, stream) { // <3>
- select {
- case valStream <- val:
- case <-done:
- }
- }
- }
- }()
- return valStream
- }
- genVals := func() <-chan <-chan interface{} {
- chanStream := make(chan (<-chan interface{}))
- go func() {
- defer close(chanStream)
- for i := 0; i < 10; i++ {
- stream := make(chan interface{}, 1)
- stream <- i
- close(stream)
- chanStream <- stream
- }
- }()
- return chanStream
- }
- for v := range bridge(nil, genVals()) {
- fmt.Printf("%v", v)
- }
- }
也是作者声称有些场景下需要在 channel 中套 channel, 来保证一定的 "发送顺序", 不过原谅我又没见过相关的场景...
context 使用
嗯, 大致意思就是 context 是不可变的, 可以用 WithCancel 和 WithTimeout 得到一个 cancel 方法, 然后直接调用来取消整个 call graph, 不过在 Go 中, 如果在子 goroutine 中要支持取消的话, 始终需要写:
- select {
- case <-ctx.Done():
- case ...
- }
而且理论上要被取消时尽快返回, 所以对下游的代码侵入非常大, 不知道其它语言是怎么解决的, 抽时间调研一下~
来源: http://www.tuicool.com/articles/eAfAbuq