5.2 select

Kesa...大约 10 分钟golang

select 是操作系统中的系统调用,可以同时监听多个文件描述符的可读或可写的状态。

使用 selectpollepoll 等函数构建 I/O 多路复用模型可以提升程序的性能。

Golang 中的select可同时等待多个channel可读或者可写,在能够读取/写入之前,select会一直阻塞当前goroutine。

Golang-Select-Channels
Golang-Select-Channels

selectswitch的结构相似,但selectcase中只能是channel的读取/写入操作。

func fibonacci(c, quit chan int) {
	x, y := 0, 1
	for {
		select {
		case c <- x:
			x, y = y, x+y
		case <-quit:
			fmt.Println("quit")
			return
		}
	}
}
  • cquit可用时会进入对应的分支执行
  • 当两者均可用时,会随机选择一个进行处理

5.2.1 现象

select结构有两个特点:

  1. select可进行非阻塞的收发操作
  2. select在遇到多个channel可用时,会随机选取一个分支执行

非阻塞的收发

一般情况下,select在无channel可用的时,会阻塞当前的goroutine。

若在select结构中添加default语句,则有:

  • 若存在可用的channel,直接执行对应的分支
  • 若不存在可用的channel,执行default分支
func main() {
	ch := make(chan int)
	select {
	case i := <-ch:
		println(i)

	default:
		println("default")
	}
}

$ go run main.go
default

非阻塞收发的应用场景:不希望阻塞当前的goroutine,只关心channel是否可用

errCh := make(chan error, len(tasks))
wg := sync.WaitGroup{}
wg.Add(len(tasks))
for i := range tasks {
    go func() {
        defer wg.Done()
        if err := tasks[i].Run(); err != nil {
            errCh <- err
        }
    }()
}
wg.Wait()

select {
case err := <-errCh:
    return err
default:
    return nil
}

上述代码不关心多少任务失败了,只想要直到是否有任务失败,不需要阻塞当前的goroutine。

随机执行

select中有多个channel可用时,会随机执行其中的一个分支。

func main() {
	ch := make(chan int)
	go func() {
		for range time.Tick(1 * time.Second) {
			ch <- 0
		}
	}()

	for {
		select {
		case <-ch:
			println("case1")
		case <-ch:
			println("case2")
		}
	}
}

$ go run main.go
case1
case2
case1
...

随机执行是为了避免饥饿问题的发生。

5.2.2 数据结构

select本身并不存在结构体,但是其case可以用runtime.scaseopen in new window来表示:

type scase struct {
	c    *hchan         // chan
	elem unsafe.Pointer // data element
}

5.2.3 实现原理

select语句在编译期间会被转换成OSELECT节点,每个OSELECT会持有一组OCASE节点,若OCASE节点的执行条件为空,则表示default

golang-oselect-and-ocases
golang-oselect-and-ocases

编译器使用cmd/compile/internal/gc.walkselectcasesopen in new window根据case的不同进行优化:

  1. 不存在任何case
  2. 只有一个case
  3. 存在两个case,其中一个是default
  4. 存在多个case

直接阻塞(无case)

select中没有case时,会直接阻塞当前goroutine,导致 goroutine 进入无法被唤醒的永久休眠状态。

func walkselectcases(cases *Nodes) []*Node {
	n := cases.Len()

	if n == 0 {
		return []*Node{mkcall("block", nil, nil)}
	}
	...
}

上述代码直接将select{}转换成调用runtime.blockopen in new window

func block() {
	gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1)
}

单一通道(一个case)

select中只有一个case时,编译器将会改写为if结构:

// 改写前
select {
case v, ok <-ch: // case ch <- v
    ...    
}

// 改写后
if ch == nil {
    block()
}
v, ok := <-ch // case ch <- v
...

其中若channel是空指针,则会直接阻塞当前goroutine并陷入永久休眠。

非阻塞收发(一个case和一个default)

select中仅包含一个case和一个default时,编译器会认为这是一个非阻塞收发,并根据操作是发送或者接收进行不同的优化。

发送

case 中表达式的类型是 OSEND 时,编译器会使用条件语句和 runtime.selectnbsendopen in new window 函数改写代码:

// 改写前
select {
case ch <- i:
    ...
default:
    ...
}
// 改写后
if selectnbsend(ch, i) {
    ...
} else {
    ...
}

其中的runtime.selectnbsendopen in new window提供了向 Channel 非阻塞地发送数据的能力:

func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
	return chansend(c, elem, false, getcallerpc())
}

runtime.chansendopen in new window 函数传入了非阻塞,所以在不存在接收方或者缓冲区空间不足时,当前 Goroutine 都不会阻塞而是会直接返回

接收

// 改写前
select {
case v <- ch: // case v, ok <- ch:
    ......
default:
    ......
}

// 改写后
if selectnbrecv(&v, ch) { // if selectnbrecv2(&v, &ok, ch) {
    ...
} else {
    ...
}

根据返回值的不同,会使用两个函数:

  1. runtime.selectnbrecvopen in new window
  2. runtime.selectnbrecv2open in new window
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
	selected, _ = chanrecv(c, elem, false)
	return
}

func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
	selected, *received = chanrecv(c, elem, false)
	return
}

和发送的情况类似,调用runtime.chanrecvopen in new window也传入了表示非阻塞的参数。

一般情况(多个case)

select中有多个case的情况时,流程如下:

  1. 将所有的 case 转换成包含 Channel 以及类型等信息的 runtime.scaseopen in new window 结构体
  2. 调用运行时函数 runtime.selectgoopen in new window 从多个准备就绪的 Channel 中选择一个可执行的 runtime.scaseopen in new window 结构体
  3. 通过 for 循环生成一组 if 语句,在语句中判断自己是不是被选中的 case

以三个caseselect为例,其转化后的语句如下:

selv := [3]scase{}
order := [6]uint16
for i, cas := range cases {
    c := scase{}
    c.kind = ...
    c.elem = ...
    c.c = ...
}
chosen, revcOK := selectgo(selv, order, 3)
if chosen == 0 {
    ...
    break
}
if chosen == 1 {
    ...
    break
}
if chosen == 2 {
    ...
    break
}

其中的runtime.selectgoopen in new window执行过程主要有两步:

  1. 初始化操作并确定 case 的处理顺序
  2. 在循环中根据 case 的类型做出不同的处理

初始化

runtime.selectgoopen in new window进行初始化后会决定处理case的顺序:

  1. 轮询顺序(pollOrder)
  2. 加锁顺序(lockOrder)
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
	cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
	order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
	
	ncases := nsends + nrecvs
	scases := cas1[:ncases:ncases]
	pollorder := order1[:ncases:ncases]
	lockorder := order1[ncases:][:ncases:ncases]

	norder := 0
	for i := range scases {
		cas := &scases[i]
	}

	for i := 1; i < ncases; i++ {
		j := fastrandn(uint32(i + 1))
		pollorder[norder] = pollorder[j]
		pollorder[j] = uint16(i)
		norder++
	}
	pollorder = pollorder[:norder]
	lockorder = lockorder[:norder]

	// 根据 Channel 的地址排序确定加锁顺序
	...
	sellock(scases, lockorder)
	...
}
  • 轮询顺序:通过 runtime.fastrandnopen in new window 函数引入随机性; 随机的轮询顺序可以避免 Channel 的饥饿问题,保证公平性

  • 加锁顺序:按照 Channel 的地址排序后确定加锁顺序; 根据 Channel 的地址顺序确定加锁顺序能够避免死锁的发生

  • runtime.sellockopen in new window按照加锁顺序锁定所有的channel

循环

当锁定了所有 Channel 之后就会进入 runtime.selectgoopen in new window 函数的主循环,流程如下:

  1. 查找是否已经存在准备就绪的 Channel,即可以执行收发操作
  2. 将当前 Goroutine 加入 Channel 对应的收发队列上并等待其他 Goroutine 的唤醒
  3. 当前 Goroutine 被唤醒之后找到满足条件的 Channel 并进行处理

runtime.selectgoopen in new window 函数会根据不同情况通过 goto 语句跳转到函数内部的不同标签执行相应的逻辑,其中包括:

  • bufrecv:可以从缓冲区读取数据;
  • bufsend:可以向缓冲区写入数据;
  • recv:可以从休眠的发送方获取数据;
  • send:可以向休眠的接收方发送数据;
  • rclose:可以从关闭的 Channel 读取 EOF;
  • sclose:向关闭的 Channel 发送数据;
  • retc:结束调用并返回

查找准备就绪的 Channel

根据 case 的四种类型分别处理:

  1. case 不包含 Channel 时:
    • case 会被跳过
  2. case 会从 Channel 中接收数据时:
    • 若当前 Channel 的 sendq 上有等待的 Goroutine,就会跳到 recv 标签并从缓冲区读取数据后将等待 Goroutine 中的数据放入到缓冲区中相同的位置
    • 若当前 Channel 的缓冲区不为空,就会跳到 bufrecv 标签处从缓冲区获取数据
    • 若当前 Channel 已经被关闭,就会跳到 rclose 做一些清除的收尾工作
  3. case 会向 Channel 发送数据时:
    • 若当前 Channel 已经被关,闭就会直接跳到 sclose 标签,触发 panic 尝试中止程序
    • 若当前 Channel 的 recvq 上有等待的 Goroutine,就会跳到 send 标签向 Channel 发送数据
    • 若当前 Channel 的缓冲区存在空闲位置,就会将待发送的数据存入缓冲区
  4. select 语句中包含 default 时:
    • 表示前面的所有 case 都没有被执行,这里会解锁所有 Channel 并返回,意味着当前 select 结构中的收发都是非阻塞
golang-runtime-selectgo
golang-runtime-selectgo

加入 Channel 对应的收发队列

若没有立即找到可以处理的channel,则将当前 Goroutine 加入到 Channel 的 sendq 或者 recvq 队列中:

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
	...
	gp = getg()
	nextp = &gp.waiting
	for _, casei := range lockorder {
		casi = int(casei)
		cas = &scases[casi]
		c = cas.c
		sg := acquireSudog()
		sg.g = gp
		sg.c = c

		if casi < nsends {
			c.sendq.enqueue(sg)
		} else {
			c.recvq.enqueue(sg)
		}
	}

	gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
	...
}

除了将当前 Goroutine 对应的 runtime.sudogopen in new window 结构体加入队列之外,这些结构体都会被串成链表附着在 Goroutine 上。

在入队之后会调用 runtime.goparkopen in new window 挂起当前 Goroutine 等待调度器的唤醒。

Golang-Select-Waiting
Golang-Select-Waiting

唤醒Goroutine,处理可用的Channel

等到 select 中的一些 Channel 准备就绪之后,当前 Goroutine 就会被调度器唤醒。这时会继续执行 runtime.selectgoopen in new window 函数的第三部分,从 runtime.sudogopen in new window 中读取数据:

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
	...
	sg = (*sudog)(gp.param)
	gp.param = nil

	casi = -1
	cas = nil
	sglist = gp.waiting
	for _, casei := range lockorder {
		k = &scases[casei]
		if sg == sglist {
			casi = int(casei)
			cas = k
		} else {
			c = k.c
			if int(casei) < nsends {
				c.sendq.dequeueSudoG(sglist)
			} else {
				c.recvq.dequeueSudoG(sglist)
			}
		}
		sgnext = sglist.waitlink
		sglist.waitlink = nil
		releaseSudog(sglist)
		sglist = sgnext
	}

	c = cas.c
	goto retc
	...
}

先获取当前 Goroutine 接收到的参数 sudog 结构,我们会依次对比所有 case 对应的 sudog 结构找到被唤醒的 case,获取该 case 对应的索引并返回。

由于当前的 select 结构找到了一个 case 执行,剩下 case 中没有被用到的 sudog 就会被忽略并且释放掉。为了不影响 Channel 的正常使用,需要将废弃的 sudog 从 Channel 中出队

循环中发现缓冲区中有元素或者缓冲区未满时就会通过 goto 关键字跳转到 bufrecvbufsend,向 Channel 中发送数据或者从缓冲区中获取新数据:

bufrecv:
	recvOK = true
	qp = chanbuf(c, c.recvx)
	if cas.elem != nil {
		typedmemmove(c.elemtype, cas.elem, qp)
	}
	typedmemclr(c.elemtype, qp)
	c.recvx++
	if c.recvx == c.dataqsiz {
		c.recvx = 0
	}
	c.qcount--
	selunlock(scases, lockorder)
	goto retc

bufsend:
	typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
	c.sendx++
	if c.sendx == c.dataqsiz {
		c.sendx = 0
	}
	c.qcount++
	selunlock(scases, lockorder)
	goto retc

两个直接收发 Channel 的情况会调用运行时函数 runtime.sendopen in new windowruntime.recvopen in new window

recv:
	recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
	recvOK = true
	goto retc

send:
	send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
	goto retc

若向已关闭的Channel 发送数据或者接收数据:

  • 从一个已关闭 Channel 中接收数据会直接清除 Channel 中的相关内容
  • 向一个已关闭的 Channel 发送数据就会直接 panic 造成程序崩溃
rclose:
	selunlock(scases, lockorder)
	recvOK = false
	if cas.elem != nil {
		typedmemclr(c.elemtype, cas.elem)
	}
	goto retc

sclose:
	selunlock(scases, lockorder)
	panic(plainError("send on closed channel"))

5.2.4 小结

select的编译期优化

  1. 空的 select 语句会被转换成调用 runtime.blockopen in new window 直接挂起当前 Goroutine
  2. select 语句中只包含一个 case,编译器会将其转换成 if ch == nil { block }; n; 表达式:
    • 首先判断操作的 Channel 是否为空
    • 然后执行 case 结构中的内容
  3. select 语句中只包含两个 case 并且其中一个是 default,那么会使用 runtime.selectnbrecvopen in new windowruntime.selectnbsendopen in new window 非阻塞地执行收发操作
  4. 在默认情况下会通过 runtime.selectgoopen in new window 获取执行 case 的索引,并通过多个 if 语句执行对应 case 中的代码

select的执行流程

运行时执行编译期间展开的 runtime.selectgoopen in new window 函数,按照以下的流程执行:

  1. 随机生成一个遍历的轮询顺序 pollOrder 并根据 Channel 地址生成锁定顺序 lockOrder
  2. 根据 pollOrder 遍历所有的 case 查看是否有可以立刻处理的 Channel:
    1. 若存在,直接获取 case 对应的索引并返回
    2. 若不存在,创建 runtime.sudogopen in new window 结构体,将当前 Goroutine 加入到所有相关 Channel 的收发队列,并调用 runtime.goparkopen in new window 挂起当前 Goroutine 等待调度器的唤醒
  3. 当调度器唤醒当前 Goroutine 时,会再次按照 lockOrder 遍历所有的 case,从中查找需要被处理的 runtime.sudogopen in new window 对应的索引

Reference

  1. https://draveness.me/golang/docs/part2-foundation/ch05-keyword/golang-select/open in new window
上次编辑于:
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.15.2