6.4 Channel

Kesa...大约 9 分钟golang

不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。

6.4.1 设计原理

常见的多线程通信是通过共享内存来进行的:

shared-memory
shared-memory

Golang 除了支持共享内存的方式之外,还提供了基于 CSP(Communicating sequential processes)的并发模型。

channel-and-goroutines
channel-and-goroutines

先入先出 FIFO

Channel 的数据遵循 FIFO原则:

  • 先从 channel 中读取数据的 goroutine 先接收到数据
  • 先向 channel 中发送数据的 goroutine 可以优先发送数据

6.4.2 数据结构

运行时的 channel 使用runtime.hchanopen in new window表示:

type hchan struct {
	qcount   uint
	dataqsiz uint
	buf      unsafe.Pointer
	elemsize uint16
	closed   uint32
	elemtype *_type
	sendx    uint
	recvx    uint
	recvq    waitq
	sendq    waitq

	lock mutex
}
  • qcount:channel 中元素的个数
  • dataqsiz:channel 中循环队列的长度
  • buf:channel 缓冲区数据指针
  • sendx:channel 的发送操作当前所在位置(索引)
  • recvx:channel 的接收操作当前所在位置(索引)
  • elemsize:channel 中元素的大小
  • elemtype:channel 中元素的类型
  • sendq:发送操作阻塞的 goroutine 队列
  • recvq:接收操作阻塞的 goroutine 队列

goroutine 队列由双向链表runtime.waitqopen in new window表示:

type waitq struct {
	first *sudog
	last  *sudog
}

6.4.3 创建 channel

创建 channel 使用 make 关键字。编译期将make转换成OMAKE节点并在类型检查阶段将OMAKE转换成OMAKECHAN

func typecheck1(n *Node, top int) (res *Node) {
	switch n.Op {
	case OMAKE:
		...
		switch t.Etype {
		case TCHAN:
			l = nil
			if i < len(args) { // 带缓冲区的异步 Channel
				...
				n.Left = l
			} else { // 不带缓冲区的同步 Channel
				n.Left = nodintconst(0)
			}
			n.Op = OMAKECHAN
		}
	}
}

OMAKECHAN会在 SSA 中间代码生成阶段转换成调用 runtime.makechanopen in new window 或者 runtime.makechan64open in new window (用于处理缓冲区大于2322^{32})的函数。

其中,runtime.makechanopen in new window

func makechan(t *chantype, size int) *hchan {
	elem := t.elem
	mem, _ := math.MulUintptr(elem.size, uintptr(size))

	var c *hchan
	switch {
	case mem == 0:
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		c.buf = c.raceaddr()
	case elem.kind&kindNoPointers != 0:
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}
	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	return c
}

根据 channel 中收发的元素类型和缓冲区大小初始化:

6.4.4 发送数据

发送数据时使用 ch <- v 的形式,编译器会将其解析成 OSEND节点并转换成runtime.chansend1open in new window:

func walkexpr(n *Node, init *Nodes) *Node {
	switch n.Op {
	case OSEND:
		n1 := n.Right
		n1 = assignconv(n1, n.Left.Type.Elem(), "chan send")
		n1 = walkexpr(n1, init)
		n1 = nod(OADDR, n1, nil)
		n = mkcall1(chanfn("chansend1", 2, n.Left.Type), nil, init, n.Left, n1)
	}
}

chansend1实际会调用函数runtime.chansendopen in new window, 首先:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	lock(&c.lock)

	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
  1. 进行加锁
  2. 若当前 channel 已关闭,触发 panic

之后进入发送阶段,可以分为三个部分:

  1. 直接发送:当存在等待的接收者时,通过runtime.sendopen in new window直接发送数据
  2. 发送到缓冲区:当缓冲区存在且还有剩余空间时,将数据发送至缓冲区
  3. 阻塞:当无缓冲区或缓冲区已满时,等待 goroutine 接收数据

直接发送

	if sg := c.recvq.dequeue(); sg != nil {
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

从 goroutine 等待队列中取出最先开始等待的 gortoutine,直接发送数据。

channel-direct-send
channel-direct-send

发送数据时使用runtime.sendopen in new window,函数流程如下:

  1. 调用 runtime.sendDirectopen in new window 将发送的数据直接拷贝x := <-c的目标变量x的内存地址中
  2. 调用 runtime.goreadyopen in new window等待接收数据的 goroutine 标记可运行状态,并将该 goroutine 放到发送方所在的处理器 P 的runnext等待执行,该处理器在下一次调度时会立刻唤醒接收方 goroutine(注意:发送数据时,不会立即执行接收方的 goroutine)
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if sg.elem != nil {
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	goready(gp, skip+1)
}

发送到缓冲区

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	...
	if c.qcount < c.dataqsiz {
		qp := chanbuf(c, c.sendx)
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}
	...
}

流程如下:

  1. 使用 runtime.chanbufopen in new window 计算下一个可以存储数据的位置
  2. 通过runtime.typedmemmoveopen in new window将发送的数据拷贝到缓冲区中
  3. 更新 sendx索引
  4. 更新qcount计数器
channel-buffer-send
channel-buffer-send

channel 的缓冲区是循环(或者说滚动)数组,当sendx到达数组尾部之后,会回到数据首部。

阻塞发送

当缓冲区已满 或 当前没有接收者(无缓冲)时,若当前操作是阻塞的(block 为 true)发送数据操作会被阻塞:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	...
	if !block {
		unlock(&c.lock)
		return false
	}

	gp := getg()
	mysg := acquireSudog()
	mysg.elem = ep
	mysg.g = gp
	mysg.c = c
	gp.waiting = mysg
	c.sendq.enqueue(mysg)
	goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)

	gp.waiting = nil
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true
}
  1. 使用runtime.getgopen in new window获取发送数据的 goroutine

  2. 执行 runtime.acquireSudogopen in new window获取 runtime.sudogopen in new window 结构并设置阻塞相关的信息,如:发送的 channel,是否在 select 结构中,待发送的数据的地址等

  3. 将已初始化的mysg放入发送等待队列,并将当前的 goroutine 的waiting字段设置为 mysg表示当前 goroutine 正在等待 mysg准备就绪

  4. 使用 runtime.goparkunlockopen in new window 使得当前的 goroutine 进入睡眠并等待唤醒

  5. 当前的 goroutine 唤醒之后,将相关属性置零并释放 mysg

函数返回 true 则表示发送成功。

小结

发送数据时:

  1. 若当前 channel 上的等待接收队列 recvq 存在被阻塞的 goroutine,则将数据发送给接收方,并将接收方 goroutine 设置成下一个可运行的 goroutine。
  2. 当缓冲区可用时,直接发送到缓冲区中
  3. 缓冲区不可用(已满或无缓冲区),且无等待接收的 goroutine 时,创建一个runtime.sudogopen in new window加入到 channel发送队列sendq中,当前 goroutine 陷入睡眠并等待接收方接收数据

发送数据过程中涉及的 goroutine 调度

  1. 直接发送数据时,将接收方 goroutine 设置为当前 goroutine 的处理器 P 的 runnext(表示为当前P的下一个执行的 goroutine),不会立刻执行接收方 goroutine
  2. 发送阻塞时,将当前 goroutine 加入 channel 的发送队列 sendq,并进入睡眠让出处理器的使用权

6.4.5 接收数据

接收数据有两种方式:

  1. v := <-ch
  2. v, ok := <-ch

两种不同的方式会被转换成ORECV节点,并在类型检查阶段转换成OAS2RECV

channel-receive-node
channel-receive-node

函数 runtime.chanrecv1open in new windowruntime.chanrecv2open in new window 最终会调用函数runtime.chanrecvopen in new window

首先,若从nil channel 中读取则直接进入阻塞态。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	lock(&c.lock)

	if c.closed != 0 && c.qcount == 0 {
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}
  • 为 channel 加锁
  • 若当前 channel 已关闭 并且 缓冲区中无数据,则清除ep指针中的数据并返回

后序的流程分为三种情况:

  1. 当存在等待中发送者,则调用runtime.recvopen in new window 从阻塞的发送者 或 缓冲区获取数据
  2. 当缓冲区中有数据时,从缓冲区获取数据
  3. 缓冲区无数据时,等待发送者

直接接收

	if sg := c.sendq.dequeue(); sg != nil {
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}

当 channel 的等待发送队列 sendq中有等待中的发送者时,调用 runtime.recvopen in new window接收数据:

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if c.dataqsiz == 0 {
		if ep != nil {
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
		qp := chanbuf(c, c.recvx)
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemmove(c.elemtype, qp, sg.elem)
		c.recvx++
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	gp := sg.g
	gp.param = unsafe.Pointer(sg)
	goready(gp, skip+1)
}

函数会根据缓冲区的大小分为两种情况:

  1. 若不存在缓冲区,调用 runtime.recvDirectopen in new window 将发送者发送的数据elem拷贝到目标内存地址上(ep指针指向的位置)
  2. 若存在缓冲区:
    1. 将队列中的数据拷贝到接收方的内存地址
    2. 将发送队列首部的数据放入缓冲区,释放一个阻塞的发送方

之后调用 runtime.goreadyopen in new window 将当前处理器的runnext设置为 发送方的 goroutine,在下一次调度时将发送方唤醒。

channel-receive-from-sendq
channel-receive-from-sendq

从缓冲区接收

当缓冲区中有数据时,从缓冲区中获取数据:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...
	if c.qcount > 0 {
		qp := chanbuf(c, c.recvx)
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		return true, true
	}
	...
}
  • 将缓冲区的数据拷贝至目标地址
  • 从缓冲区中清除已接收的数据
  • 更新缓冲区的接收索引recvx++
  • 更新缓冲区计数器 qcount--

阻塞接收

当发送队列中无等待中的发送者 并且 缓冲区无数据时,若当前操作是阻塞的(block 为true)则接收会被阻塞:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...
	if !block {
		unlock(&c.lock)
		return false, false
	}

	gp := getg()
	mysg := acquireSudog()
	mysg.elem = ep
	gp.waiting = mysg
	mysg.g = gp
	mysg.c = c
	c.recvq.enqueue(mysg)
	goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

	gp.waiting = nil
	closed := gp.param == nil
	gp.param = nil
	releaseSudog(mysg)
	return true, !closed
}

小结

从 channel 中接收数据时,会出现5中情况:

  1. 若 channel 为 nil,则调用 runtime.goparkopen in new window 挂起当前 goroutine
  2. 若 channel 已关闭 并且 缓冲区无数据,则直接返回
  3. 若 channel 的发送等待队列sendq中存在等待中的 goroutine,则将recvx对应的数据拷贝至目标内存上,并将sendq中的数据存入缓冲区
  4. 若 channel 的缓冲区存在数据,则直接从缓冲区中读取
  5. 缓冲区无数据并且无等待中的发送者,则将当前 goroutine(使用runtime.sudogopen in new window封装)加入到等待接收队列中,并陷入睡眠等待唤醒

接收数据中会有两个时机涉及 goroutine 的调度:

  1. 当 channel 为 nil 时,直接挂起当前 goroutine
  2. 当 缓冲区无数据 且 无等待中的发送者时,当前 goroutine 进入睡眠等待唤醒

6.4.6 关闭 channel

关闭channel 使用close(ch),编译器将close转换成 OCLOSE节点并调用runtime.closechanopen in new window

func closechan(c *hchan) {
	if c == nil {
		panic(plainError("close of nil channel"))
	}

	lock(&c.lock)
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

若当前 channel 为 nil 或 已经关闭时,触发 panic。

	c.closed = 1

	var glist gList
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		gp := sg.g
		gp.param = nil
		glist.push(gp)
	}

	for {
		sg := c.sendq.dequeue()
		...
	}
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}

Reference

  1. https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/open in new window
上次编辑于:
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.15.2