5.2 select
select
是操作系统中的系统调用,可以同时监听多个文件描述符的可读或可写的状态。
使用 select
、poll
和 epoll
等函数构建 I/O 多路复用模型可以提升程序的性能。
Golang 中的select
可同时等待多个channel
可读或者可写,在能够读取/写入之前,select
会一直阻塞当前goroutine。
select
和switch
的结构相似,但select
的case
中只能是channel
的读取/写入操作。
func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}
- 当
c
或quit
可用时会进入对应的分支执行 - 当两者均可用时,会随机选择一个进行处理
5.2.1 现象
select
结构有两个特点:
select
可进行非阻塞的收发操作select
在遇到多个channel
可用时,会随机选取一个分支执行
非阻塞的收发
一般情况下,select
在无channel
可用的时,会阻塞当前的goroutine。
若在select
结构中添加default
语句,则有:
- 若存在可用的
channel
,直接执行对应的分支 - 若不存在可用的
channel
,执行default
分支
func main() {
ch := make(chan int)
select {
case i := <-ch:
println(i)
default:
println("default")
}
}
$ go run main.go
default
非阻塞收发的应用场景:不希望阻塞当前的goroutine,只关心channel是否可用
errCh := make(chan error, len(tasks))
wg := sync.WaitGroup{}
wg.Add(len(tasks))
for i := range tasks {
go func() {
defer wg.Done()
if err := tasks[i].Run(); err != nil {
errCh <- err
}
}()
}
wg.Wait()
select {
case err := <-errCh:
return err
default:
return nil
}
上述代码不关心多少任务失败了,只想要直到是否有任务失败,不需要阻塞当前的goroutine。
随机执行
当select
中有多个channel
可用时,会随机执行其中的一个分支。
func main() {
ch := make(chan int)
go func() {
for range time.Tick(1 * time.Second) {
ch <- 0
}
}()
for {
select {
case <-ch:
println("case1")
case <-ch:
println("case2")
}
}
}
$ go run main.go
case1
case2
case1
...
随机执行是为了避免饥饿问题的发生。
5.2.2 数据结构
select
本身并不存在结构体,但是其case
可以用runtime.scase
来表示:
type scase struct {
c *hchan // chan
elem unsafe.Pointer // data element
}
5.2.3 实现原理
select
语句在编译期间会被转换成OSELECT
节点,每个OSELECT
会持有一组OCASE
节点,若OCASE
节点的执行条件为空,则表示default
。
编译器使用cmd/compile/internal/gc.walkselectcases
根据case
的不同进行优化:
- 不存在任何
case
- 只有一个
case
- 存在两个
case
,其中一个是default
- 存在多个
case
直接阻塞(无case)
当select
中没有case
时,会直接阻塞当前goroutine,导致 goroutine 进入无法被唤醒的永久休眠状态。
func walkselectcases(cases *Nodes) []*Node {
n := cases.Len()
if n == 0 {
return []*Node{mkcall("block", nil, nil)}
}
...
}
上述代码直接将select{}
转换成调用runtime.block
:
func block() {
gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1)
}
单一通道(一个case)
当select
中只有一个case
时,编译器将会改写为if
结构:
// 改写前
select {
case v, ok <-ch: // case ch <- v
...
}
// 改写后
if ch == nil {
block()
}
v, ok := <-ch // case ch <- v
...
其中若channel
是空指针,则会直接阻塞当前goroutine并陷入永久休眠。
非阻塞收发(一个case和一个default)
当select
中仅包含一个case
和一个default
时,编译器会认为这是一个非阻塞收发,并根据操作是发送或者接收进行不同的优化。
发送
当 case
中表达式的类型是 OSEND
时,编译器会使用条件语句和 runtime.selectnbsend
函数改写代码:
// 改写前
select {
case ch <- i:
...
default:
...
}
// 改写后
if selectnbsend(ch, i) {
...
} else {
...
}
其中的runtime.selectnbsend
提供了向 Channel 非阻塞地发送数据的能力:
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
向 runtime.chansend
函数传入了非阻塞,所以在不存在接收方或者缓冲区空间不足时,当前 Goroutine 都不会阻塞而是会直接返回。
接收
// 改写前
select {
case v <- ch: // case v, ok <- ch:
......
default:
......
}
// 改写后
if selectnbrecv(&v, ch) { // if selectnbrecv2(&v, &ok, ch) {
...
} else {
...
}
根据返回值的不同,会使用两个函数:
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
selected, _ = chanrecv(c, elem, false)
return
}
func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
selected, *received = chanrecv(c, elem, false)
return
}
和发送的情况类似,调用runtime.chanrecv
也传入了表示非阻塞的参数。
一般情况(多个case)
当select
中有多个case
的情况时,流程如下:
- 将所有的
case
转换成包含 Channel 以及类型等信息的runtime.scase
结构体 - 调用运行时函数
runtime.selectgo
从多个准备就绪的 Channel 中选择一个可执行的runtime.scase
结构体 - 通过
for
循环生成一组if
语句,在语句中判断自己是不是被选中的case
以三个case
的select
为例,其转化后的语句如下:
selv := [3]scase{}
order := [6]uint16
for i, cas := range cases {
c := scase{}
c.kind = ...
c.elem = ...
c.c = ...
}
chosen, revcOK := selectgo(selv, order, 3)
if chosen == 0 {
...
break
}
if chosen == 1 {
...
break
}
if chosen == 2 {
...
break
}
其中的runtime.selectgo
执行过程主要有两步:
- 初始化操作并确定
case
的处理顺序 - 在循环中根据
case
的类型做出不同的处理
初始化
runtime.selectgo
进行初始化后会决定处理case
的顺序:
- 轮询顺序(pollOrder)
- 加锁顺序(lockOrder)
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
ncases := nsends + nrecvs
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]
norder := 0
for i := range scases {
cas := &scases[i]
}
for i := 1; i < ncases; i++ {
j := fastrandn(uint32(i + 1))
pollorder[norder] = pollorder[j]
pollorder[j] = uint16(i)
norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]
// 根据 Channel 的地址排序确定加锁顺序
...
sellock(scases, lockorder)
...
}
轮询顺序:通过
runtime.fastrandn
函数引入随机性; 随机的轮询顺序可以避免 Channel 的饥饿问题,保证公平性加锁顺序:按照 Channel 的地址排序后确定加锁顺序; 根据 Channel 的地址顺序确定加锁顺序能够避免死锁的发生
runtime.sellock
按照加锁顺序锁定所有的channel
循环
当锁定了所有 Channel 之后就会进入 runtime.selectgo
函数的主循环,流程如下:
- 查找是否已经存在准备就绪的 Channel,即可以执行收发操作
- 将当前 Goroutine 加入 Channel 对应的收发队列上并等待其他 Goroutine 的唤醒
- 当前 Goroutine 被唤醒之后找到满足条件的 Channel 并进行处理
runtime.selectgo
函数会根据不同情况通过 goto
语句跳转到函数内部的不同标签执行相应的逻辑,其中包括:
bufrecv
:可以从缓冲区读取数据;bufsend
:可以向缓冲区写入数据;recv
:可以从休眠的发送方获取数据;send
:可以向休眠的接收方发送数据;rclose
:可以从关闭的 Channel 读取 EOF;sclose
:向关闭的 Channel 发送数据;retc
:结束调用并返回
查找准备就绪的 Channel
根据 case
的四种类型分别处理:
- 当
case
不包含 Channel 时:case
会被跳过
- 当
case
会从 Channel 中接收数据时:- 若当前 Channel 的
sendq
上有等待的 Goroutine,就会跳到recv
标签并从缓冲区读取数据后将等待 Goroutine 中的数据放入到缓冲区中相同的位置 - 若当前 Channel 的缓冲区不为空,就会跳到
bufrecv
标签处从缓冲区获取数据 - 若当前 Channel 已经被关闭,就会跳到
rclose
做一些清除的收尾工作
- 若当前 Channel 的
- 当
case
会向 Channel 发送数据时:- 若当前 Channel 已经被关,闭就会直接跳到
sclose
标签,触发panic
尝试中止程序 - 若当前 Channel 的
recvq
上有等待的 Goroutine,就会跳到send
标签向 Channel 发送数据 - 若当前 Channel 的缓冲区存在空闲位置,就会将待发送的数据存入缓冲区
- 若当前 Channel 已经被关,闭就会直接跳到
- 当
select
语句中包含default
时:- 表示前面的所有
case
都没有被执行,这里会解锁所有 Channel 并返回,意味着当前select
结构中的收发都是非阻塞的
- 表示前面的所有
加入 Channel 对应的收发队列
若没有立即找到可以处理的channel
,则将当前 Goroutine 加入到 Channel 的 sendq
或者 recvq
队列中:
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
...
gp = getg()
nextp = &gp.waiting
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
sg := acquireSudog()
sg.g = gp
sg.c = c
if casi < nsends {
c.sendq.enqueue(sg)
} else {
c.recvq.enqueue(sg)
}
}
gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
...
}
除了将当前 Goroutine 对应的 runtime.sudog
结构体加入队列之外,这些结构体都会被串成链表附着在 Goroutine 上。
在入队之后会调用 runtime.gopark
挂起当前 Goroutine 等待调度器的唤醒。
唤醒Goroutine,处理可用的Channel
等到 select
中的一些 Channel 准备就绪之后,当前 Goroutine 就会被调度器唤醒。这时会继续执行 runtime.selectgo
函数的第三部分,从 runtime.sudog
中读取数据:
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
...
sg = (*sudog)(gp.param)
gp.param = nil
casi = -1
cas = nil
sglist = gp.waiting
for _, casei := range lockorder {
k = &scases[casei]
if sg == sglist {
casi = int(casei)
cas = k
} else {
c = k.c
if int(casei) < nsends {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}
c = cas.c
goto retc
...
}
先获取当前 Goroutine 接收到的参数 sudog
结构,我们会依次对比所有 case
对应的 sudog
结构找到被唤醒的 case
,获取该 case
对应的索引并返回。
由于当前的 select
结构找到了一个 case
执行,剩下 case
中没有被用到的 sudog
就会被忽略并且释放掉。为了不影响 Channel 的正常使用,需要将废弃的 sudog
从 Channel 中出队。
循环中发现缓冲区中有元素或者缓冲区未满时就会通过 goto
关键字跳转到 bufrecv
和 bufsend
,向 Channel 中发送数据或者从缓冲区中获取新数据:
bufrecv:
recvOK = true
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder)
goto retc
bufsend:
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
selunlock(scases, lockorder)
goto retc
两个直接收发 Channel 的情况会调用运行时函数 runtime.send
和 runtime.recv
:
recv:
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
recvOK = true
goto retc
send:
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
goto retc
若向已关闭的Channel 发送数据或者接收数据:
- 从一个已关闭 Channel 中接收数据会直接清除 Channel 中的相关内容
- 向一个已关闭的 Channel 发送数据就会直接
panic
造成程序崩溃
rclose:
selunlock(scases, lockorder)
recvOK = false
if cas.elem != nil {
typedmemclr(c.elemtype, cas.elem)
}
goto retc
sclose:
selunlock(scases, lockorder)
panic(plainError("send on closed channel"))
5.2.4 小结
select
的编译期优化
- 空的
select
语句会被转换成调用runtime.block
直接挂起当前 Goroutine - 若
select
语句中只包含一个case
,编译器会将其转换成if ch == nil { block }; n;
表达式:- 首先判断操作的 Channel 是否为空
- 然后执行
case
结构中的内容
- 若
select
语句中只包含两个case
并且其中一个是default
,那么会使用runtime.selectnbrecv
和runtime.selectnbsend
非阻塞地执行收发操作 - 在默认情况下会通过
runtime.selectgo
获取执行case
的索引,并通过多个if
语句执行对应case
中的代码
select
的执行流程
运行时执行编译期间展开的 runtime.selectgo
函数,按照以下的流程执行:
- 随机生成一个遍历的轮询顺序
pollOrder
并根据 Channel 地址生成锁定顺序lockOrder
- 根据
pollOrder
遍历所有的case
查看是否有可以立刻处理的 Channel:- 若存在,直接获取
case
对应的索引并返回 - 若不存在,创建
runtime.sudog
结构体,将当前 Goroutine 加入到所有相关 Channel 的收发队列,并调用runtime.gopark
挂起当前 Goroutine 等待调度器的唤醒
- 若存在,直接获取
- 当调度器唤醒当前 Goroutine 时,会再次按照
lockOrder
遍历所有的case
,从中查找需要被处理的runtime.sudog
对应的索引