协程
Go 语言里创建一个协程很简单, 使用 go 关键字就可以让一个普通方法协程化:
- package main
- import (
- "fmt"
- "time"
- )
- func main(){
- fmt.Println("run in main coroutine.")
- for i:=0; i<10; i++ {
- go func(i int) {
- fmt.Printf("run in child coroutine %d.\n", i)
- }(i)
- }
- // 防止子协程还没有结束主协程就退出了
- time.Sleep(time.Second * 1)
- }
下面这些概念可能不太好理解, 需要慢慢理解. 可以先跳过, 回头再来看.
概念:
协程可以理解为纯用户态的线程, 其通过协作而不是抢占来进行切换. 相对于进程或者线程, 协程所有的操作都可以在用户态完成, 创建和切换的消耗更低.
一个进程内部可以运行多个线程, 而每个线程又可以运行很多协程. 线程要负责对协程进行调度, 保证每个协程都有机会得到执行. 当一个协程睡眠时, 它要将线程的运行权让给其它的协程来运行, 而不能持续霸占这个线程. 同一个线程内部最多只会有一个协程正在运行.
协程可以简化为三个状态: 运行态, 就绪态和休眠态. 同一个线程中最多只会存在一个处于运行态的协程. 就绪态协程是指那些具备了运行能力但是还没有得到运行机会的协程, 它们随时会被调度到运行态; 休眠态的协程还不具备运行能力, 它们是在等待某些条件的发生, 比如 IO 操作的完成, 睡眠时间的结束等.
子协程的异常退出会将异常传播到主协程, 直接会导致主协程也跟着挂掉.
协程一般用 TCP/HTTP/RPC 服务, 消息推送系统, 聊天系统等. 使用协程, 我们可以很方便的搭建一个支持高并发的 TCP 或 HTTP 服务端.
通道
通道的英文是 Channels, 简称 chan. 什么时候要用到通道呢? 可以先简单的理解为: 协程在需要协作通信的时候就需要用通道.
在 GO 里, 不同的并行协程之间交流的方式有两种, 一种是通过共享变量, 另一种是通过通道. Go 语言鼓励使用通道的形式来交流.
举个简单的例子, 我们使用协程实现并发调用远程接口, 最终我们需要把每个协程请求回来的数据进行汇总一起返回, 这个时候就用到通道了.
创建通道
创建通道 (channel) 只能使用 make 函数:
c := make(chan int)
通道是区分类型的, 如这里的 int.
Go 语言为通道的读写设计了特殊的箭头语法糖 <-, 让我们使用通道时非常方便. 把箭头写在通道变量的右边就是写通道, 把箭头写在通道的左边就是读通道. 一次只能读写一个元素.
- c := make(chan bool)
- c <- true // 写入
- <- c // 读取
缓冲通道
上面我们介绍了默认的非缓存类型的 channel, 不过 Go 也允许指定 channel 的缓冲大小, 很简单, 就是 channel 可以存储多少元素:
c := make(chan int, value)
当 value = 0 时, 通道是无缓冲阻塞读写的, 等价于 make(chan int); 当 value> 0 时, 通道有缓冲, 是非阻塞的, 直到写满 value 个元素才阻塞写入. 具体说明下:
非缓冲通道
无论是发送操作还是接收操作, 一开始执行就会被阻塞, 直到配对的操作也开始执行才会继续传递. 由此可见, 非缓冲通道是在用同步的方式传递数据. 也就是说, 只有收发双方对接上了, 数据才会被传递. 数据是直接从发送方复制到接收方的, 中间并不会用非缓冲通道做中转.
缓冲通道
缓冲通道可以理解为消息队列, 在有容量的时候, 发送和接收是不会互相依赖的. 用异步的方式传递数据.
下面我们用一个例子来理解一下:
- package main
- import "fmt"
- func main() {
- var c = make(chan int, 0)
- var a string
- go func() {
- a = "hello world"
- <-c
- }()
- c <- 0
- fmt.Println(a)
- }
这个例子输出的一定是 hello world. 但是如果你把通道的容量由 0 改为大于 0 的数字, 输出结果就不一定是 hello world 了, 很可能是空. 为什么?
当通道是无缓冲通道时, 执行到 c <- 0, 通道满了, 写操作会被阻塞住, 直到执行<-c 解除阻塞, 后面的语句接着执行.
要是改成非阻塞通道, 执行到 c <- 0, 发现还能写入, 主协程就不会阻塞了, 但这时候输出的是空字符串还是 hello world, 取决于是子协程和主协程哪个运行的速度快.
通道作为容器, 它可以像切片一样, 使用 cap() 和 len() 全局函数获得通道的容量和当前内部的元素个数.
模拟消息队列
上一节 "协程" 的例子里, 我们在主协程里加了个 time.Sleep(), 目的是防止子协程还没有结束主协程就退出了. 但是对于实际生活的大多数场景来说, 1 秒是不够的, 并且大部分时候我们都无法预知 for 循环内代码运行时间的长短. 这时候就不能使用 time.Sleep() 来完成等待操作了. 下面我们用通道来改写:
- package main
- import (
- "fmt"
- )
- func main() {
- fmt.Println("run in main coroutine.")
- count := 10
- c := make(chan bool, count)
- for i := 0; i < count; i++ {
- go func(i int) {
- fmt.Printf("run in child coroutine %d.\n", i)
- c <- true
- }(i)
- }
- for i := 0; i < count; i++ {
- <-c
- }
- }
单向通道
默认的通道是支持读写的, 我们可以定义单向通道:
- // 只读
- var readOnlyChannel = make(<-chan int)
- // 只写
- var writeOnlyChannel = make(chan<- int)
下面是一个示例, 我们模拟消息队列的消费者, 生产者:
- package main
- import (
- "fmt"
- "time"
- )
- func Producer(c chan<- int) {
- for i := 0; i < 10; i++ {
- c <- i
- }
- }
- func Consumer1(c <-chan int) {
- for m := range c {
- fmt.Printf("oh, I get luckly num: %v\n", m)
- }
- }
- func Consumer2(c <-chan int) {
- for m := range c {
- fmt.Printf("oh, I get luckly num too: %v\n", m)
- }
- }
- func main() {
- c := make(chan int, 2)
- go Consumer1(c)
- go Consumer2(c)
- Producer(c)
- time.Sleep(time.Second)
- }
对于生产者, 我们希望通道是只写属性, 而对于消费者则是只读属性, 这样避免对通道进行错误的操作. 当然, 如果你将本例里消费者, 生产者的通道单向属性去掉也是可以的, 没什么问题:
- func Producer(c chan int) {
- }
- func Consumer1(c chan int) {
- }
- func Consumer2(c chan int) {
- }
事实上 channel 只读或只写都没有意义, 所谓的单向 channel 其实只是方法里声明时用, 如果后续代码里, 向本来用于读 channel 里写入了数据, 编译器会提示错误.
关闭通道
读取一个已经关闭的通道会立即返回通道类型的零值, 而写一个已经关闭的通道会抛异常. 如果通道里的元素是整型的, 读操作是不能通过返回值来确定通道是否关闭的.
1, 如何安全的读通道, 确保不是读取的已关闭通道的零值?
答案是使用 for...range 语法. 当通道为空时, 循环会阻塞; 当通道关闭, 循环会停止. 通过循环停止, 我们可以认为通道已经关闭. 示例:
- package main
- import "fmt"
- func main() {
- var c = make(chan int, 3)
- // 子协程写
- go func() {
- c <- 1
- close(c)
- }()
- // 直接读取通道, 存在不知道子协程是否已关闭的情况
- //fmt.Println(<-c)
- //fmt.Println(<-c)
- // 主协程读取: 使用 for...range 安全的读取
- for value := range c {
- fmt.Println(value)
- }
- }
输出:
1
2, 如何安全的写通道, 确保不会写入已关闭的通道?
Go 语言并不存在一个内置函数可以判断出通道是否已经被关闭. 确保通道写安全的最好方式是由负责写通道的协程自己来关闭通道, 读通道的协程不要去关闭通道.
但是这个方法只能解决单写多读的场景. 如果遇到多写单读的情况就有问题了: 无法知道其它写协程什么时候写完, 那么也就不能确定什么时候关闭通道. 这个时候就得额外使用一个通道专门做这个事情.
我们可以使用内置的 sync.WaitGroup, 它使用计数来等待指定事件完成:
- package main
- import (
- "fmt"
- "sync"
- "time"
- )
- func main() {
- var ch = make(chan int, 8)
- // 写协程
- var wg = new(sync.WaitGroup)
- for i := 1; i <= 4; i++ {
- wg.Add(1)
- go func(num int, ch chan int, wg *sync.WaitGroup) {
- defer wg.Done()
- ch <- num
- ch <- num * 10
- }(i, ch, wg)
- }
- // 读
- go func(ch chan int) {
- for num := range ch {
- fmt.Println(num)
- }
- }(ch)
- //Wait 阻塞等待所有的写通道协程结束, 待计数值变成零, Wait 才会返回
- wg.Wait()
- // 安全的关闭通道
- close(ch)
- // 防止读取通道的协程还没有完毕
- time.Sleep(time.Second)
- fmt.Println("finish")
- }
输出:
- 3
- 30
- 2
- 20
- 1
- 10
- 4
- 40
- finish
多路通道
有时候还会遇到多个生产者, 只要有一个生产者就绪, 消费者就可以进行消费的情况. 这个时候可以使用 go 语言提供的 select 语句, 它可以同时管理多个通道读写, 如果所有通道都不能读写, 它就整体阻塞, 只要有一个通道可以读写, 它就会继续. 示例:
- package main
- import (
- "fmt"
- "time"
- )
- func main() {
- var ch1 = make(chan int)
- var ch2 = make(chan int)
- fmt.Println(time.Now().Format("15:04:05"))
- go func(ch chan int) {
- time.Sleep(time.Second)
- ch <- 1
- }(ch1)
- go func(ch chan int) {
- time.Sleep(time.Second * 2)
- ch <- 2
- }(ch2)
- for {
- select {
- case v := <-ch1:
- fmt.Println(time.Now().Format("15:04:05") + ": 来自 ch1:", v)
- case v := <-ch2:
- fmt.Println(time.Now().Format("15:04:05") + ": 来自 ch2:", v)
- //default:
- //fmt.Println("channel is empty !")
- }
- }
- }
输出:
13:39:56
- package main
- import "fmt"
- func write(d map[string]string) {
- d["name"] = "yujc"
- }
- func read(d map[string]string) {
- fmt.Println(d["name"])
- }
- func main() {
- d := map[string]string{}
- go read(d)
- write(d)
- }
- $ go run -race main.go
- ==================
- WARNING: DATA RACE
- Read at 0x00c0000a8180 by goroutine 6:
- ...
- yujc
- Found 2 data race(s)
- exit status 66
- package main
- import (
- "fmt"
- "sync"
- )
- type SafeDict struct {
- data map[string]string
- mux *sync.Mutex
- }
- func NewSafeDict(data map[string]string) *SafeDict {
- return &SafeDict{
- data: data,
- mux: &sync.Mutex{},
- }
- }
- func (d *SafeDict) Get(key string) string {
- d.mux.Lock()
- defer d.mux.Unlock()
- return d.data[key]
- }
- func (d *SafeDict) Set(key string, value string) {
- d.mux.Lock()
- defer d.mux.Unlock()
- d.data[key] = value
- }
- func main(){
- dict := NewSafeDict(map[string]string{})
- go func(dict *SafeDict) {
- fmt.Println(dict.Get("name"))
- }(dict)
- dict.Set("name", "yujc")
- }
- $ go run -race main.go
- yujc
- type SafeDict struct {
- data map[string]string
- *sync.Mutex
- }
- func NewSafeDict(data map[string]string) *SafeDict {
- return &SafeDict{data, &sync.Mutex{}}
- }
- func (d *SafeDict) Get(key string) string {
- d.Lock()
- defer d.Unlock()
- return d.data[key]
- }
- package main
- import (
- "fmt"
- "sync"
- )
- type SafeDict struct {
- data map[string]string
- *sync.RWMutex
- }
- func NewSafeDict(data map[string]string) *SafeDict {
- return &SafeDict{data, &sync.RWMutex{}}
- }
- func (d *SafeDict) Get(key string) string {
- d.RLock()
- defer d.RUnlock()
- return d.data[key]
- }
- func (d *SafeDict) Set(key string, value string) {
- d.Lock()
- defer d.Unlock()
- d.data[key] = value
- }
- func main(){
- dict := NewSafeDict(map[string]string{})
- go func(dict *SafeDict) {
- fmt.Println(dict.Get("name"))
- }(dict)
- dict.Set("name", "yujc")
- }
- https://www.jianshu.com/p/f12e1766c19f
- 2,channel
- https://www.jianshu.com/p/4d97dc032730
来源: https://www.cnblogs.com/52fhy/p/11369028.html