6.6 网络轮询器

Kesa...大约 12 分钟golang

6.6.1 设计原理

I/O 模型

操作系统中包含:

  1. 阻塞 I/O
  2. 非阻塞 I/O
  3. 信号驱动 I/O
  4. 异步 I/O
  5. I/O 多路复用

阻塞 I/O

阻塞 I/O 是最常见的 I/O 模型,通过 read 或者 write 等系统调用读写文件或者网络时,应用程序会被阻塞:

ssize_t read(int fd, void *buf, size_t count);
ssize_t write(int fd, const void *buf, size_t nbytes);

执行 read 系统调用时,应用程序会从用户态陷入内核态,内核会检查文件描述符是否可读;当文件描述符中存在数据时,操作系统内核会将准备好的数据拷贝给应用程序并交回控制权。

blocking-io-mode
blocking-io-mode

非阻塞 I/O

当进程把一个文件描述符设置成非阻塞时,执行 readwrite 等 I/O 操作会立刻返回。

在 C 语言中,可以使用如下所示的代码片段将一个文件描述符设置成非阻塞的:

int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);

fcntlopen in new window提供了操作文件描述符的能力,可以通过它修改文件描述符的特性。当我们将文件描述符修改成非阻塞后,读写文件会经历以下流程:

non-blocking-io-mode
non-blocking-io-mode

进程使用非阻塞的 I/O 操作时,可以在等待过程中执行其他任务,提高 CPU 的利用率。

I/O 多路复用

I/O 多路复用被用来处理同一个事件循环中的多个 I/O 事件。I/O 多路复用需要使用特定的系统调用。

最常见的系统调用是 selectopen in new window,该函数可以同时监听最多 1024 个文件描述符的可读或者可写状态:

int select(int nfds, fd_set *restrict readfds, fd_set *restrict writefds, fd_set *restrict errorfds, struct timeval *restrict timeout);

poll 函数,它使用链表存储文件描述符,摆脱了 1024 的数量上限。

io-multiplexing
io-multiplexing

多路复用函数会阻塞的监听一组文件描述符,当文件描述符的状态转变为可读或者可写时,select 会返回可读或者可写事件的个数,应用程序可以在输入的文件描述符中查找哪些可读或者可写,然后执行相应的操作。

io-multiplexing-mode
io-multiplexing-mode

I/O 多路复用模型是效率较高的 I/O 模型,它可以同时阻塞监听了一组文件描述符的状态。很多高性能的服务和应用程序都会使用这一模型来处理 I/O 操作,例如:Redis 和 Nginx 等。

多模块

虽然 select 也可以提供 I/O 多路复用的能力,但是使用它有比较多的限制:

  • 监听能力有限 — 最多只能监听 1024 个文件描述符
  • 内存拷贝开销大 , 需要维护一个较大的数据结构存储文件描述符,该结构需要拷贝到内核中;
  • 时间复杂度 O(n) , 返回准备就绪的事件个数后,需要遍历所有的文件描述符

为了提高 I/O 多路复用的性能,不同的操作系统也都实现了自己的 I/O 多路复用函数,例如:epollkqueueevport 等。

Go 语言为了提高在不同操作系统上的 I/O 操作性能,使用平台特定的函数实现了多个版本的网络轮询模块:

这些模块在不同平台上实现了相同的功能,构成了一个常见的树形结构。编译器在编译 Go 语言程序时,会根据目标平台选择树中特定的分支进行编译:

netpoll-modules
netpoll-modules

接口

epollkqueuesolaries 等多路复用模块都要实现以下五个函数,这五个函数构成一个虚拟的接口:

func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(delta int64) gList
func netpollBreak()
func netpollIsPollDescriptor(fd uintptr) bool

6.6.2 数据结构

Go 语言网络轮询器会监听 runtime.pollDescopen in new window 结构体的状态,其封装了操作系统的文件描述符:

type pollDesc struct {
	link *pollDesc

	lock    mutex
	fd      uintptr
	...
	rseq    uintptr
	rg      uintptr
	rt      timer
	rd      int64
	wseq    uintptr
	wg      uintptr
	wt      timer
	wd      int64
}
  • rseqwseq , 表示文件描述符被重用或者计时器被重置
  • rgwg , 表示二进制的信号量,可能为 pdReadypdWait、等待文件描述符可读或者可写的 Goroutine 以及 nil
  • rdwd , 等待文件描述符可读或者可写的截止日期
  • rtwt , 用于等待文件描述符的计时器

runtime.pollDescopen in new window 结构体会使用 link 字段串联成链表存储在 runtime.pollCacheopen in new window 中:

type pollCache struct {
	lock  mutex
	first *pollDesc
}

runtime.pollCacheopen in new window 是运行时包中的全局变量,该结构体中包含一个用于保护轮询数据的互斥锁和链表头:

poll-desc-list
poll-desc-list

运行时会在第一次调用 runtime.pollCache.allocopen in new window 方法时初始化总大小约为 4KB 的 runtime.pollDescopen in new window 结构体,runtime.persistentAllocopen in new window 会保证这些数据结构初始化在不会触发垃圾回收的内存中,让这些数据结构只能被内部的 epollkqueue 模块引用:

func (c *pollCache) alloc() *pollDesc {
	lock(&c.lock)
	if c.first == nil {
		const pdSize = unsafe.Sizeof(pollDesc{})
		n := pollBlockSize / pdSize
		if n == 0 {
			n = 1
		}
		mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
		for i := uintptr(0); i < n; i++ {
			pd := (*pollDesc)(add(mem, i*pdSize))
			pd.link = c.first
			c.first = pd
		}
	}
	pd := c.first
	c.first = pd.link
	unlock(&c.lock)
	return pd
}

每次调用该结构体都会返回链表头还没有被使用的 runtime.pollDescopen in new window,这种批量初始化的做法能够增加网络轮询器的吞吐量。Go 语言运行时会调用 runtime.pollCache.freeopen in new window 方法释放已经用完的 runtime.pollDescopen in new window 结构,它会直接将结构体插入链表的最前面

func (c *pollCache) free(pd *pollDesc) {
	lock(&c.lock)
	pd.link = c.first
	c.first = pd
	unlock(&c.lock)
}

上述方法没有重置 runtime.pollDescopen in new window 结构体中的字段,该结构体被重复利用时才会由 runtime.poll_runtime_pollOpenopen in new window 函数重置。

6.6.3 多路复用

网络轮询器实际上是对 I/O 多路复用技术的封装.

初始化

因为文件 I/O、网络 I/O 以及计时器都依赖网络轮询器,所以 Go 语言会通过以下两条不同路径初始化网络轮询器:

  1. internal/poll.pollDesc.initopen in new window — 通过 net.netFD.initopen in new windowos.newFileopen in new window 初始化网络 I/O 和文件 I/O 的轮询信息时
  2. runtime.doaddtimeropen in new window — 向处理器中增加新的计时器时

网络轮询器的初始化会使用 runtime.poll_runtime_pollServerInitopen in new windowruntime.netpollGenericInitopen in new window 两个函数:

func poll_runtime_pollServerInit() {
	netpollGenericInit()
}

func netpollGenericInit() {
	if atomic.Load(&netpollInited) == 0 {
		lock(&netpollInitLock)
		if netpollInited == 0 {
			netpollinit()
			atomic.Store(&netpollInited, 1)
		}
		unlock(&netpollInitLock)
	}
}

runtime.netpollGenericInitopen in new window 会调用平台上特定实现的 runtime.netpollinitopen in new window,以 Linux 的epoll为例:

  1. 是调用 epollcreate1 创建一个新的 epoll 文件描述符,这个文件描述符会在整个程序的生命周期中使用;
  2. 通过 runtime.nonblockingPipeopen in new window 创建一个用于通信的管道;
  3. 使用 epollctl 将用于读取数据的文件描述符打包成 epollevent 事件加入监听;
var (
	epfd int32 = -1
	netpollBreakRd, netpollBreakWr uintptr
)

func netpollinit() {
	epfd = epollcreate1(_EPOLL_CLOEXEC)
	r, w, _ := nonblockingPipe()
	ev := epollevent{
		events: _EPOLLIN,
	}
	*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
	epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
	netpollBreakRd = uintptr(r)
	netpollBreakWr = uintptr(w)
}

初始化的管道提供了中断多路复用等待文件描述符中事件的方法,runtime.netpollBreakopen in new window 会向管道中写入数据唤醒 epoll

func netpollBreak() {
	for {
		var b byte
		n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
		if n == 1 {
			break
		}
		if n == -_EINTR {
			continue
		}
		if n == -_EAGAIN {
			return
		}
	}
}

轮询事件

调用 internal/poll.pollDesc.initopen in new window 初始化文件描述符时不止会初始化网络轮询器,还会通过 runtime.poll_runtime_pollOpenopen in new window 重置轮询信息 runtime.pollDescopen in new window 并调用 runtime.netpollopenopen in new window 初始化轮询事件:

func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
	pd := pollcache.alloc()
	lock(&pd.lock)
	if pd.wg != 0 && pd.wg != pdReady {
		throw("runtime: blocked write on free polldesc")
	}
	...
	pd.fd = fd
	pd.closing = false
	pd.everr = false
	...
	pd.wseq++
	pd.wg = 0
	pd.wd = 0
	unlock(&pd.lock)

	var errno int32
	errno = netpollopen(fd, pd)
	return pd, int(errno)
}

runtime.netpollopenopen in new window会调用 epollctl 向全局的轮询文件描述符 epfd 中加入新的轮询事件监听文件描述符的可读和可写状态:

func netpollopen(fd uintptr, pd *pollDesc) int32 {
	var ev epollevent
	ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
	*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
	return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

事件循环

等待事件

在文件描述符上执行读写操作时,若文件描述符不可读或者不可写,当前 Goroutine 会执行 runtime.poll_runtime_pollWaitopen in new window 检查 runtime.pollDescopen in new window 的状态并调用 runtime.netpollblockopen in new window 等待文件描述符的可读或者可写:

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
	...
	for !netpollblock(pd, int32(mode), false) {
		...
	}
	return 0
}

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
	gpp := &pd.rg
	if mode == 'w' {
		gpp = &pd.wg
	}
	...
	if waitio || netpollcheckerr(pd, mode) == 0 {
		gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
	}
	...
}

runtime.netpollblockopen in new window 是 Goroutine 等待 I/O 事件的关键函数,它会使用运行时提供的 runtime.goparkopen in new window 让出当前线程,将 Goroutine 转换到休眠状态并等待运行时的唤醒。

轮询等待

Go 语言的运行时会在调度或者系统监控中调用 runtime.netpollopen in new window 轮询网络,该函数的执行过程可以分成以下几个部分:

  1. 根据传入的 delay 计算 epoll 系统调用需要等待的时间;
  2. 调用 epollwait 等待可读或者可写事件的发生;
  3. 在循环中依次处理 epollevent 事件;

因为传入 delay 的单位是纳秒,下面这段代码会将纳秒转换成毫秒:

func netpoll(delay int64) gList {
	var waitms int32
	if delay < 0 {
		waitms = -1
	} else if delay == 0 {
		waitms = 0
	} else if delay < 1e6 {
		waitms = 1
	} else if delay < 1e15 {
		waitms = int32(delay / 1e6)
	} else {
		waitms = 1e9
	}

计算了需要等待的时间之后,runtime.netpollopen in new window 会执行 epollwait 等待文件描述符转换成可读或者可写,若该函数返回了负值,可能会返回空的 Goroutine 列表或者重新调用 epollwait 陷入等待:

	var events [128]epollevent
retry:
	n := epollwait(epfd, &events[0], int32(len(events)), waitms)
	if n < 0 {
		if waitms > 0 {
			return gList{}
		}
		goto retry
	}

epollwait 系统调用返回的值大于 0 时,意味着被监控的文件描述符出现了待处理的事件,我们在如下所示的循环中会依次处理这些事件:

	var toRun gList
	for i := int32(0); i < n; i++ {
		ev := &events[i]
		if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
			...
			continue
		}

		var mode int32
		if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
			mode += 'r'
		}
		...
		if mode != 0 {
			pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
			pd.everr = false
			netpollready(&toRun, pd, mode)
		}
	}
	return toRun
}

处理的事件总共包含两种:

  1. 调用 runtime.netpollBreakopen in new window 触发的事件,作用是中断网络轮询器;
  2. 其他文件描述符的正常读写事件,对于这些事件,会交给 runtime.netpollreadyopen in new window 处理
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
	var rg, wg *g
	...
	if mode == 'w' || mode == 'r'+'w' {
		wg = netpollunblock(pd, 'w', true)
	}
	...
	if wg != nil {
		toRun.push(wg)
	}
}

runtime.netpollunblockopen in new window 会在读写事件发生时,将 runtime.pollDescopen in new window 中的读或者写信号量转换成 pdReady 并返回其中存储的 Goroutine;若返回的 Goroutine 不会为空,那么运行时会将该 Goroutine 会加入 toRun 列表,并将列表中的全部 Goroutine 加入运行队列并等待调度器的调度。

runtime.netpollopen in new window 返回的 Goroutine 列表都会被 runtime.injectglistopen in new window 注入到处理器或者全局的运行队列上。因为系统监控 Goroutine 直接运行在线程上,所以它获取的 Goroutine 列表会直接加入全局的运行队列,其他 Goroutine 获取的列表都会加入 Goroutine 所在处理器的运行队列上。

截止日期

网络轮询器和计时器的关系非常紧密,因为:

  1. 网络轮询器负责计时器的唤醒
  2. 文件和网络 I/O 的截止日期也由网络轮询器负责处理; 截止日期在 I/O 操作中,尤其是网络调用中很关键,网络请求存在很高的不确定因素,需要设置一个截止日期保证程序的正常运行

设置截至日期需要runtime.poll_runtime_pollSetDeadlineopen in new window函数:

func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
	rd0, wd0 := pd.rd, pd.wd
	if d > 0 {
		d += nanotime()
	}
	pd.rd = d
	...
	if pd.rt.f == nil {
		if pd.rd > 0 {
			pd.rt.f = netpollReadDeadline
			pd.rt.arg = pd
			pd.rt.seq = pd.rseq
			resettimer(&pd.rt, pd.rd)
		}
	} else if pd.rd != rd0 {
		pd.rseq++
		if pd.rd > 0 {
			modtimer(&pd.rt, pd.rd, 0, rtf, pd, pd.rseq)
		} else {
			deltimer(&pd.rt)
			pd.rt.f = nil
		}
	}

先使用截止日期计算出过期的时间点,然后根据 runtime.pollDescopen in new window 的状态做出以下不同的处理:

  1. 若结构体中的计时器没有设置执行的函数时,该函数会设置计时器到期后执行的函数、传入的参数并调用 runtime.resettimeropen in new window 重置计时器;
  2. 若结构体的读截止日期已经被改变,我们会根据新的截止日期做出不同的处理:
    1. 若新的截止日期大于 0,调用 runtime.modtimeropen in new window 修改计时器;
    2. 若新的截止日期小于 0,调用 runtime.deltimeropen in new window 删除计时器;

runtime.poll_runtime_pollSetDeadlineopen in new window 的最后,会重新检查轮询信息中存储的截止日期:

	var rg *g
	if pd.rd < 0 {
		if pd.rd < 0 {
			rg = netpollunblock(pd, 'r', false)
		}
		...
	}
	if rg != nil {
		netpollgoready(rg, 3)
	}
	...
}

若截止日期小于 0,上述代码会调用 runtime.netpollgoreadyopen in new window 直接唤醒对应的 Goroutine。

runtime.poll_runtime_pollSetDeadlineopen in new window 中直接调用 runtime.netpollgoreadyopen in new window 是相对比较特殊的情况。在正常情况下,运行时都会在计时器到期时调用 runtime.netpollDeadlineopen in new windowruntime.netpollReadDeadlineopen in new windowruntime.netpollWriteDeadlineopen in new window 三个函数:

netpoll-deadline
netpoll-deadline

上述三个函数都会通过 runtime.netpolldeadlineimplopen in new window 调用 runtime.netpollgoreadyopen in new window 直接唤醒相应的 Goroutine:

func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
	currentSeq := pd.rseq
	if !read {
		currentSeq = pd.wseq
	}
	if seq != currentSeq {
		return
	}
	var rg *g
	if read {
		pd.rd = -1
		atomic.StorepNoWB(unsafe.Pointer(&pd.rt.f), nil)
		rg = netpollunblock(pd, 'r', false)
	}
	...
	if rg != nil {
		netpollgoready(rg, 0)
	}
	...
}

Goroutine 在被唤醒之后会意识到当前的 I/O 操作已经超时,可以根据需要选择重试请求或者中止调用。

Reference

  1. https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-netpoller/open in new window
上次编辑于:
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.15.2