6.4 Channel
不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。
6.4.1 设计原理
常见的多线程通信是通过共享内存来进行的:
Golang 除了支持共享内存的方式之外,还提供了基于 CSP(Communicating sequential processes)的并发模型。
先入先出 FIFO
Channel 的数据遵循 FIFO原则:
- 先从 channel 中读取数据的 goroutine 先接收到数据
- 先向 channel 中发送数据的 goroutine 可以优先发送数据
6.4.2 数据结构
运行时的 channel 使用runtime.hchan
表示:
type hchan struct {
qcount uint
dataqsiz uint
buf unsafe.Pointer
elemsize uint16
closed uint32
elemtype *_type
sendx uint
recvx uint
recvq waitq
sendq waitq
lock mutex
}
qcount
:channel 中元素的个数dataqsiz
:channel 中循环队列的长度buf
:channel 缓冲区数据指针sendx
:channel 的发送操作当前所在位置(索引)recvx
:channel 的接收操作当前所在位置(索引)elemsize
:channel 中元素的大小elemtype
:channel 中元素的类型sendq
:发送操作阻塞的 goroutine 队列recvq
:接收操作阻塞的 goroutine 队列
goroutine 队列由双向链表runtime.waitq
表示:
type waitq struct {
first *sudog
last *sudog
}
6.4.3 创建 channel
创建 channel 使用 make 关键字。编译期将make
转换成OMAKE
节点并在类型检查阶段将OMAKE
转换成OMAKECHAN
:
func typecheck1(n *Node, top int) (res *Node) {
switch n.Op {
case OMAKE:
...
switch t.Etype {
case TCHAN:
l = nil
if i < len(args) { // 带缓冲区的异步 Channel
...
n.Left = l
} else { // 不带缓冲区的同步 Channel
n.Left = nodintconst(0)
}
n.Op = OMAKECHAN
}
}
}
OMAKECHAN
会在 SSA 中间代码生成阶段转换成调用 runtime.makechan
或者 runtime.makechan64
(用于处理缓冲区大于)的函数。
其中,runtime.makechan
:
func makechan(t *chantype, size int) *hchan {
elem := t.elem
mem, _ := math.MulUintptr(elem.size, uintptr(size))
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.kind&kindNoPointers != 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
根据 channel 中收发的元素类型和缓冲区大小初始化:
- 若 channel 无缓冲区,只为
runtime.hchan
分配内存 - 若 channel 有缓冲区且中元素类型不是指针类型,则将 channel 和底层数组分配在一块连续的内存空间中
- 默认情况,则为
runtime.hchan
和缓冲区单独分配内存
6.4.4 发送数据
发送数据时使用 ch <- v
的形式,编译器会将其解析成 OSEND
节点并转换成runtime.chansend1
:
func walkexpr(n *Node, init *Nodes) *Node {
switch n.Op {
case OSEND:
n1 := n.Right
n1 = assignconv(n1, n.Left.Type.Elem(), "chan send")
n1 = walkexpr(n1, init)
n1 = nod(OADDR, n1, nil)
n = mkcall1(chanfn("chansend1", 2, n.Left.Type), nil, init, n.Left, n1)
}
}
chansend1
实际会调用函数runtime.chansend
, 首先:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
- 进行加锁
- 若当前 channel 已关闭,触发 panic
之后进入发送阶段,可以分为三个部分:
- 直接发送:当存在等待的接收者时,通过
runtime.send
直接发送数据 - 发送到缓冲区:当缓冲区存在且还有剩余空间时,将数据发送至缓冲区
- 阻塞:当无缓冲区或缓冲区已满时,等待 goroutine 接收数据
直接发送
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
从 goroutine 等待队列中取出最先开始等待的 gortoutine,直接发送数据。
发送数据时使用runtime.send
,函数流程如下:
- 调用
runtime.sendDirect
将发送的数据直接拷贝到x := <-c
的目标变量x
的内存地址中 - 调用
runtime.goready
将等待接收数据的 goroutine 标记为可运行状态,并将该 goroutine 放到发送方所在的处理器 P 的runnext
上等待执行,该处理器在下一次调度时会立刻唤醒接收方 goroutine(注意:发送数据时,不会立即执行接收方的 goroutine)
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
goready(gp, skip+1)
}
发送到缓冲区
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
}
流程如下:
- 使用
runtime.chanbuf
计算下一个可以存储数据的位置 - 通过
runtime.typedmemmove
将发送的数据拷贝到缓冲区中 - 更新
sendx
索引 - 更新
qcount
计数器
channel 的缓冲区是循环(或者说滚动)数组,当sendx
到达数组尾部之后,会回到数据首部。
阻塞发送
当缓冲区已满 或 当前没有接收者(无缓冲)时,若当前操作是阻塞的(block 为 true)发送数据操作会被阻塞:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if !block {
unlock(&c.lock)
return false
}
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.g = gp
mysg.c = c
gp.waiting = mysg
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
gp.waiting = nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true
}
使用
runtime.getg
获取发送数据的 goroutine执行
runtime.acquireSudog
获取runtime.sudog
结构并设置阻塞相关的信息,如:发送的 channel,是否在select
结构中,待发送的数据的地址等将已初始化的
mysg
放入发送等待队列,并将当前的 goroutine 的waiting
字段设置为mysg
表示当前 goroutine 正在等待mysg
准备就绪使用
runtime.goparkunlock
使得当前的 goroutine 进入睡眠并等待唤醒当前的 goroutine 唤醒之后,将相关属性置零并释放
mysg
函数返回 true 则表示发送成功。
小结
发送数据时:
- 若当前 channel 上的等待接收队列
recvq
存在被阻塞的 goroutine,则将数据发送给接收方,并将接收方 goroutine 设置成下一个可运行的 goroutine。 - 当缓冲区可用时,直接发送到缓冲区中
- 缓冲区不可用(已满或无缓冲区),且无等待接收的 goroutine 时,创建一个
runtime.sudog
加入到 channel 的发送队列sendq
中,当前 goroutine 陷入睡眠并等待接收方接收数据
发送数据过程中涉及的 goroutine 调度:
- 直接发送数据时,将接收方 goroutine 设置为当前 goroutine 的处理器 P 的
runnext
(表示为当前P的下一个执行的 goroutine),不会立刻执行接收方 goroutine - 发送阻塞时,将当前 goroutine 加入 channel 的发送队列
sendq
,并进入睡眠让出处理器的使用权
6.4.5 接收数据
接收数据有两种方式:
v := <-ch
v, ok := <-ch
两种不同的方式会被转换成ORECV
节点,并在类型检查阶段转换成OAS2RECV
:
函数 runtime.chanrecv1
和 runtime.chanrecv2
最终会调用函数runtime.chanrecv
:
首先,若从nil
channel 中读取则直接进入阻塞态。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
- 为 channel 加锁
- 若当前 channel 已关闭 并且 缓冲区中无数据,则清除
ep
指针中的数据并返回
后序的流程分为三种情况:
- 当存在等待中的发送者,则调用
runtime.recv
从阻塞的发送者 或 缓冲区获取数据 - 当缓冲区中有数据时,从缓冲区获取数据
- 缓冲区无数据时,等待发送者
直接接收
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
当 channel 的等待发送队列 sendq
中有等待中的发送者时,调用 runtime.recv
接收数据:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
goready(gp, skip+1)
}
函数会根据缓冲区的大小分为两种情况:
- 若不存在缓冲区,调用
runtime.recvDirect
将发送者发送的数据elem
拷贝到目标内存地址上(ep指针指向的位置) - 若存在缓冲区:
- 将队列中的数据拷贝到接收方的内存地址
- 将发送队列首部的数据放入缓冲区,释放一个阻塞的发送方
之后调用 runtime.goready
将当前处理器的runnext
设置为 发送方的 goroutine,在下一次调度时将发送方唤醒。
从缓冲区接收
当缓冲区中有数据时,从缓冲区中获取数据:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
return true, true
}
...
}
- 将缓冲区的数据拷贝至目标地址
- 从缓冲区中清除已接收的数据
- 更新缓冲区的接收索引
recvx++
- 更新缓冲区计数器
qcount--
阻塞接收
当发送队列中无等待中的发送者 并且 缓冲区无数据时,若当前操作是阻塞的(block 为true)则接收会被阻塞:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if !block {
unlock(&c.lock)
return false, false
}
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
gp.waiting = mysg
mysg.g = gp
mysg.c = c
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
gp.waiting = nil
closed := gp.param == nil
gp.param = nil
releaseSudog(mysg)
return true, !closed
}
- 使用
runtime.sudog
封装当前 goroutine,加入接收等待队列recvq
中。 - 调用
runtime.goparkunlock
让出处理器使用权,等待调度
小结
从 channel 中接收数据时,会出现5中情况:
- 若 channel 为 nil,则调用
runtime.gopark
挂起当前 goroutine - 若 channel 已关闭 并且 缓冲区无数据,则直接返回
- 若 channel 的发送等待队列
sendq
中存在等待中的 goroutine,则将recvx
对应的数据拷贝至目标内存上,并将sendq
中的数据存入缓冲区 - 若 channel 的缓冲区存在数据,则直接从缓冲区中读取
- 缓冲区无数据并且无等待中的发送者,则将当前 goroutine(使用
runtime.sudog
封装)加入到等待接收队列中,并陷入睡眠等待唤醒
接收数据中会有两个时机涉及 goroutine 的调度:
- 当 channel 为 nil 时,直接挂起当前 goroutine
- 当 缓冲区无数据 且 无等待中的发送者时,当前 goroutine 进入睡眠等待唤醒
6.4.6 关闭 channel
关闭channel 使用close(ch)
,编译器将close
转换成 OCLOSE
节点并调用runtime.closechan
:
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
若当前 channel 为 nil 或 已经关闭时,触发 panic。
c.closed = 1
var glist gList
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
gp := sg.g
gp.param = nil
glist.push(gp)
}
for {
sg := c.sendq.dequeue()
...
}
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
- 将
recvq
和sendq
中的数据加入到 goroutine 列表gList
中 - 清除所有
runtime.sudog
上未被处理的元素 - 使用
runtime.goready
触发所有等待中的 goroutine 的调度