go 通道基于 go 的并发调度实现, 本身并不复杂, go 并发调度请看我的这篇文章: go 并发调度原理学习
1.channel 数据结构
- type hchan struct {
- qcount uint // 缓冲区中已有元素个数
- dataqsiz uint // 循环队列容量大小
- buf unsafe.Pointer // 缓冲区指针
- elemsize uint16 // 元素大小
- closed uint32 // 关闭标记, 0 没关闭, 1 关闭
- elemtype *_type // 数据项类型
- sendx uint // 发送索引
- recvx uint // 接收索引
- recvq waitq // 等待接收排队链表
- sendq waitq // 等待发送排队链表
- lock mutex // 锁
- }
- type waitq struct {
- first *sudog
- last *sudog
- }
2. 创建 channel 实现
创建 channel 实例:
ch := make(chan int, 4)
实现函数:
func makechan(t *chantype, size int64) *hchan
大致实现:
执行上面这行代码会 new 一个 hchan 结构, 同时创建一个 dataqsiz=4 的 int 类型的循环队列, 其实就是一个容纳 4 个元素的数组, 就是按顺序往里面写数据, 写满之后又从 0 开始写, 这个顺序索引就是 hchan.sendx
3. 发送数据
发送数据实例:
ch <- 100
发送数据实现函数:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
ep 指向要发送数据的首地址
- func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
- lock(&c.lock)
- if c.closed != 0 {
- unlock(&c.lock)
- panic(plainError("send on closed channel"))
- }
- if sg := c.recvq.dequeue(); sg != nil {
- // 缓冲区就是一个固定长度的循环列表
- // 发送队列是一个双向链表, 接在缓冲区的后面, 整体是一个队列, 保证先进先出
- // 有接收者, 并不是将当前要发送的数据直接发出, 而是将缓冲区的第一个元素发送给接收者, 同时将发送队列的第一个元素加入缓冲区刚出队列的位置
- send(c, sg, ep, func() { unlock(&c.lock) }, 3)
- return true
- }
- if c.qcount <c.dataqsiz {
- // 缓冲区没有满, 直接将要发送的数据复制到缓冲区, 直接返回,
- qp := chanbuf(c, c.sendx)
- typedmemmove(c.elemtype, qp, ep)
- c.sendx++
- if c.sendx == c.dataqsiz {
- c.sendx = 0
- }
- c.qcount++
- unlock(&c.lock)
- return true
- }
- if !block {
- unlock(&c.lock)
- return false
- }
- // 以上都是同步非阻塞的, ch <- 100 直接返回
- // 以下是同步阻塞
- // 缓冲区满了, 也没有接收者, 通道将被阻塞, 其实就是不执行当前 G 了, 将状态改成等待状态
- gp := getg()
- mysg := acquireSudog()
- c.sendq.enqueue(mysg)
- goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
- // 当 G 被唤醒, 状态改成可执行状态, 从这里开始继续执行
- releaseSudog(mysg)
- return true
- }
大致实现:
1: 接收队列不为空, 从接收队列中取出第一个接收者 * sudog, 将数据复制到 sudog.elem, 复制函数为 memmove 用汇编实现, 通知接收方数据给你了, 将接收方协程由等待状态改成可运行状态, 将当前协程加入协程队列, 等待被调度.
2: 没有接收者, 有缓冲区且没有满, 直接将数据复制到缓冲中, 写入缓冲区的位置为 hchan.buf[sendx++], 如果缓冲区已满 sendx=0, 就是循环队列的实现, 往 sendx 指定的位置写数据, hchan.qcount++
3: 没有接收者, 没有缓冲区或是满了, 则从当前协程对应的 P 的 sudog 队列中取一个 struct sudog, 将数据复制到 sudog.elem, 将 sudog 加入 sendq 队列中, 通知接收方, 当前流程阻塞, 等待被唤醒, 接收方收到通知后 (被唤醒), 继续往下执行, 接收数据完成后会通知发送方, 即将发送方协程状态由等待状态改成可运行状态, 加入协程可运行队列, 等着被执行不会阻塞的情况:
1: 通道缓冲区没有满之前, 因为只是将要发送的数据复制到缓冲区就返回了
2: 有接收者的情况, 有数据复制到接收方的数据结构中 (不是最终接收数据的变量, 在执行接收函数的时候会拷贝到最终接收数据的变量), 唤醒接收协程会阻塞的情况: 自然就是缓冲区满了, 也没有接收方, 这个时候会将数据打包放到发送队列, 当前协程被设置成等待状态, 这个状态不会被调度, 当有接收方收到数据后, 才会被唤醒
4. 接收数据
接收数据实例:
val := <- ch
接收数据实现函数:
- func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
- func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
- lock(&c.lock)
- if sg := c.sendq.dequeue(); sg != nil {
- // Found a waiting sender. If buffer is size 0, receive value
- // directly from sender. Otherwise, receive from head of queue
- // and add sender's value to the tail of the queue (both map to
- // the same buffer slot because the queue is full).
- recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
- return true, true
- }
- if c.qcount> 0 {
- // Receive directly from queue
- qp := chanbuf(c, c.recvx)
- if ep != nil {
- typedmemmove(c.elemtype, ep, qp)
- }
- typedmemclr(c.elemtype, qp)
- c.recvx++
- if c.recvx == c.dataqsiz {
- c.recvx = 0
- }
- c.qcount--
- unlock(&c.lock)
- return true, true
- }
- if !block {
- unlock(&c.lock)
- return false, false
- }
- // 以上同步非阻塞
- // 以下同步阻塞
- gp := getg()
- mysg := acquireSudog()
- c.recvq.enqueue(mysg)
- // 将当前 G 状态改成等待状态, 停止调度
- goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
- // 当前 G 被唤醒从这里继续执行
- mysg.c = nil
- releaseSudog(mysg)
- return true, !closed
- }
大致实现:
1. 发送队列不为空 (说明缓冲区已满), 从发送队列中取出第一个发送者 * sudog
1.1. 没有缓冲区, 直接将发送队列中的数据 sudog.elem 复制出来, 存到接收数据的变量 val 中, 通知发送方我处理完了, 你可以继续执行
1.2. 有缓冲区, 复制出缓冲区 hchan.buf[recvx] 对应的元素到 val, 在将发送方 sudog.elem 复制到 hchan.buf[recvx], 发送方按顺序写, 接收方按顺序读, 典型的 FIFO, 为了保证是先进先出, 所以先复制出, 再将队列首元素复制到对应的缓冲区中, 其实就是发送队列连接在缓冲区后面, 缓冲区满了, 就写队列, 接收的时候先从缓冲区中拿数据, 拿掉之后空出来的位置从发送队列中取第一个填满, 并唤醒对应的 G, 只要发送队列不为空, 缓冲区肯定会被填满
2. 发送队列为空, 缓冲区不为空, 复制出缓冲区 hchan.buf[recvx] 对应的元素到 val,hchan.qcount--
3. 发送队列为空, 缓冲区也为空, 那就是没有任何待接收的数据, 接收流程就只能等了, 将接收信息打包成 sudog, 加入接收队列 recvq, 当前执行流程阻塞, 等有发送数据后会被唤醒继续
5.channel FIFO 在解释一次
5.1: 缓冲区没满, 发送数据就是进缓冲队列, 接收数据就是出缓冲队列, 比较好理解
5.2: 缓冲区已满, 发送数据就是进等待队列, 接收数据先出缓冲队列, 即为要接收的数据, 等待队列出列, 将数据存在缓冲队列刚出列的位置, 刚出列的位置相当于缓冲队列的末尾, 也就是说等待队列的列头连在缓冲队列的末尾, 将等待队列的列头加入缓存队列的列尾, 保证了缓冲队列是满的, 减少的是缓冲队列中的数据, 保证先进先出
5.3: 接收数据, 缓冲队列或等待队列有数据, 拿走第一个, 保证等待队列是接在缓冲区末尾, 即缓冲区末尾有空缺, 就让等待队列出列, 并填充至缓冲区末尾, 否则将自己打包加入接收队列, 当前 G 进入等待状态, 有数据发送自然会通知你
总结: Go channel 基于 go 的并发调度实现阻塞和非阻塞两种通讯方式
来源: https://www.cnblogs.com/hlxs/p/10275303.html