6.6 网络轮询器
6.6.1 设计原理
I/O 模型
操作系统中包含:
- 阻塞 I/O
- 非阻塞 I/O
- 信号驱动 I/O
- 异步 I/O
- I/O 多路复用
阻塞 I/O
阻塞 I/O 是最常见的 I/O 模型,通过 read
或者 write
等系统调用读写文件或者网络时,应用程序会被阻塞:
ssize_t read(int fd, void *buf, size_t count);
ssize_t write(int fd, const void *buf, size_t nbytes);
执行 read
系统调用时,应用程序会从用户态陷入内核态,内核会检查文件描述符是否可读;当文件描述符中存在数据时,操作系统内核会将准备好的数据拷贝给应用程序并交回控制权。
非阻塞 I/O
当进程把一个文件描述符设置成非阻塞时,执行 read
和 write
等 I/O 操作会立刻返回。
在 C 语言中,可以使用如下所示的代码片段将一个文件描述符设置成非阻塞的:
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
fcntl
提供了操作文件描述符的能力,可以通过它修改文件描述符的特性。当我们将文件描述符修改成非阻塞后,读写文件会经历以下流程:
进程使用非阻塞的 I/O 操作时,可以在等待过程中执行其他任务,提高 CPU 的利用率。
I/O 多路复用
I/O 多路复用被用来处理同一个事件循环中的多个 I/O 事件。I/O 多路复用需要使用特定的系统调用。
最常见的系统调用是 select
,该函数可以同时监听最多 1024 个文件描述符的可读或者可写状态:
int select(int nfds, fd_set *restrict readfds, fd_set *restrict writefds, fd_set *restrict errorfds, struct timeval *restrict timeout);
poll
函数,它使用链表存储文件描述符,摆脱了 1024 的数量上限。
多路复用函数会阻塞的监听一组文件描述符,当文件描述符的状态转变为可读或者可写时,select
会返回可读或者可写事件的个数,应用程序可以在输入的文件描述符中查找哪些可读或者可写,然后执行相应的操作。
I/O 多路复用模型是效率较高的 I/O 模型,它可以同时阻塞监听了一组文件描述符的状态。很多高性能的服务和应用程序都会使用这一模型来处理 I/O 操作,例如:Redis 和 Nginx 等。
多模块
虽然 select
也可以提供 I/O 多路复用的能力,但是使用它有比较多的限制:
- 监听能力有限 — 最多只能监听 1024 个文件描述符
- 内存拷贝开销大 , 需要维护一个较大的数据结构存储文件描述符,该结构需要拷贝到内核中;
- 时间复杂度 O(n) , 返回准备就绪的事件个数后,需要遍历所有的文件描述符
为了提高 I/O 多路复用的性能,不同的操作系统也都实现了自己的 I/O 多路复用函数,例如:epoll
、kqueue
和 evport
等。
Go 语言为了提高在不同操作系统上的 I/O 操作性能,使用平台特定的函数实现了多个版本的网络轮询模块:
src/runtime/netpoll_epoll.go
src/runtime/netpoll_kqueue.go
src/runtime/netpoll_solaris.go
src/runtime/netpoll_windows.go
src/runtime/netpoll_aix.go
src/runtime/netpoll_fake.go
这些模块在不同平台上实现了相同的功能,构成了一个常见的树形结构。编译器在编译 Go 语言程序时,会根据目标平台选择树中特定的分支进行编译:
接口
epoll
、kqueue
、solaries
等多路复用模块都要实现以下五个函数,这五个函数构成一个虚拟的接口:
func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(delta int64) gList
func netpollBreak()
func netpollIsPollDescriptor(fd uintptr) bool
runtime.netpollinit
— 初始化网络轮询器,通过sync.Once
和netpollInited
变量保证函数只会调用一次runtime.netpollopen
— 监听文件描述符上的边缘触发事件,创建事件并加入监听runtime.netpoll
— 轮询网络并返回一组已经准备就绪的 Goroutine,传入的参数会决定它的行为- 若参数小于 0,无限期等待文件描述符就绪
- 若参数等于 0,非阻塞地轮询网络
- 若参数大于 0,阻塞特定时间轮询网络
runtime.netpollBreak
— 唤醒网络轮询器,例如:计时器向前修改时间时会通过该函数中断网络轮询器runtime.netpollIsPollDescriptor
— 判断文件描述符是否被轮询器使用
6.6.2 数据结构
Go 语言网络轮询器会监听 runtime.pollDesc
结构体的状态,其封装了操作系统的文件描述符:
type pollDesc struct {
link *pollDesc
lock mutex
fd uintptr
...
rseq uintptr
rg uintptr
rt timer
rd int64
wseq uintptr
wg uintptr
wt timer
wd int64
}
rseq
和wseq
, 表示文件描述符被重用或者计时器被重置rg
和wg
, 表示二进制的信号量,可能为pdReady
、pdWait
、等待文件描述符可读或者可写的 Goroutine 以及nil
rd
和wd
, 等待文件描述符可读或者可写的截止日期rt
和wt
, 用于等待文件描述符的计时器
runtime.pollDesc
结构体会使用 link
字段串联成链表存储在 runtime.pollCache
中:
type pollCache struct {
lock mutex
first *pollDesc
}
runtime.pollCache
是运行时包中的全局变量,该结构体中包含一个用于保护轮询数据的互斥锁和链表头:
运行时会在第一次调用 runtime.pollCache.alloc
方法时初始化总大小约为 4KB 的 runtime.pollDesc
结构体,runtime.persistentAlloc
会保证这些数据结构初始化在不会触发垃圾回收的内存中,让这些数据结构只能被内部的 epoll
和 kqueue
模块引用:
func (c *pollCache) alloc() *pollDesc {
lock(&c.lock)
if c.first == nil {
const pdSize = unsafe.Sizeof(pollDesc{})
n := pollBlockSize / pdSize
if n == 0 {
n = 1
}
mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
for i := uintptr(0); i < n; i++ {
pd := (*pollDesc)(add(mem, i*pdSize))
pd.link = c.first
c.first = pd
}
}
pd := c.first
c.first = pd.link
unlock(&c.lock)
return pd
}
每次调用该结构体都会返回链表头还没有被使用的 runtime.pollDesc
,这种批量初始化的做法能够增加网络轮询器的吞吐量。Go 语言运行时会调用 runtime.pollCache.free
方法释放已经用完的 runtime.pollDesc
结构,它会直接将结构体插入链表的最前面:
func (c *pollCache) free(pd *pollDesc) {
lock(&c.lock)
pd.link = c.first
c.first = pd
unlock(&c.lock)
}
上述方法没有重置 runtime.pollDesc
结构体中的字段,该结构体被重复利用时才会由 runtime.poll_runtime_pollOpen
函数重置。
6.6.3 多路复用
网络轮询器实际上是对 I/O 多路复用技术的封装.
初始化
因为文件 I/O、网络 I/O 以及计时器都依赖网络轮询器,所以 Go 语言会通过以下两条不同路径初始化网络轮询器:
internal/poll.pollDesc.init
— 通过net.netFD.init
和os.newFile
初始化网络 I/O 和文件 I/O 的轮询信息时runtime.doaddtimer
— 向处理器中增加新的计时器时
网络轮询器的初始化会使用 runtime.poll_runtime_pollServerInit
和 runtime.netpollGenericInit
两个函数:
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
if atomic.Load(&netpollInited) == 0 {
lock(&netpollInitLock)
if netpollInited == 0 {
netpollinit()
atomic.Store(&netpollInited, 1)
}
unlock(&netpollInitLock)
}
}
runtime.netpollGenericInit
会调用平台上特定实现的 runtime.netpollinit
,以 Linux 的epoll
为例:
- 是调用
epollcreate1
创建一个新的epoll
文件描述符,这个文件描述符会在整个程序的生命周期中使用; - 通过
runtime.nonblockingPipe
创建一个用于通信的管道; - 使用
epollctl
将用于读取数据的文件描述符打包成epollevent
事件加入监听;
var (
epfd int32 = -1
netpollBreakRd, netpollBreakWr uintptr
)
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC)
r, w, _ := nonblockingPipe()
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
初始化的管道提供了中断多路复用等待文件描述符中事件的方法,runtime.netpollBreak
会向管道中写入数据唤醒 epoll
:
func netpollBreak() {
for {
var b byte
n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
if n == 1 {
break
}
if n == -_EINTR {
continue
}
if n == -_EAGAIN {
return
}
}
}
轮询事件
调用 internal/poll.pollDesc.init
初始化文件描述符时不止会初始化网络轮询器,还会通过 runtime.poll_runtime_pollOpen
重置轮询信息 runtime.pollDesc
并调用 runtime.netpollopen
初始化轮询事件:
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
pd := pollcache.alloc()
lock(&pd.lock)
if pd.wg != 0 && pd.wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
...
pd.fd = fd
pd.closing = false
pd.everr = false
...
pd.wseq++
pd.wg = 0
pd.wd = 0
unlock(&pd.lock)
var errno int32
errno = netpollopen(fd, pd)
return pd, int(errno)
}
runtime.netpollopen
会调用 epollctl
向全局的轮询文件描述符 epfd
中加入新的轮询事件监听文件描述符的可读和可写状态:
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
事件循环
等待事件
在文件描述符上执行读写操作时,若文件描述符不可读或者不可写,当前 Goroutine 会执行 runtime.poll_runtime_pollWait
检查 runtime.pollDesc
的状态并调用 runtime.netpollblock
等待文件描述符的可读或者可写:
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
...
for !netpollblock(pd, int32(mode), false) {
...
}
return 0
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
...
if waitio || netpollcheckerr(pd, mode) == 0 {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
...
}
runtime.netpollblock
是 Goroutine 等待 I/O 事件的关键函数,它会使用运行时提供的 runtime.gopark
让出当前线程,将 Goroutine 转换到休眠状态并等待运行时的唤醒。
轮询等待
Go 语言的运行时会在调度或者系统监控中调用 runtime.netpoll
轮询网络,该函数的执行过程可以分成以下几个部分:
- 根据传入的
delay
计算epoll
系统调用需要等待的时间; - 调用
epollwait
等待可读或者可写事件的发生; - 在循环中依次处理
epollevent
事件;
因为传入 delay
的单位是纳秒,下面这段代码会将纳秒转换成毫秒:
func netpoll(delay int64) gList {
var waitms int32
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
waitms = 1e9
}
计算了需要等待的时间之后,runtime.netpoll
会执行 epollwait
等待文件描述符转换成可读或者可写,若该函数返回了负值,可能会返回空的 Goroutine 列表或者重新调用 epollwait
陷入等待:
var events [128]epollevent
retry:
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if waitms > 0 {
return gList{}
}
goto retry
}
当 epollwait
系统调用返回的值大于 0 时,意味着被监控的文件描述符出现了待处理的事件,我们在如下所示的循环中会依次处理这些事件:
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
...
continue
}
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
...
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
netpollready(&toRun, pd, mode)
}
}
return toRun
}
处理的事件总共包含两种:
- 调用
runtime.netpollBreak
触发的事件,作用是中断网络轮询器; - 其他文件描述符的正常读写事件,对于这些事件,会交给
runtime.netpollready
处理
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
...
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
...
if wg != nil {
toRun.push(wg)
}
}
runtime.netpollunblock
会在读写事件发生时,将 runtime.pollDesc
中的读或者写信号量转换成 pdReady
并返回其中存储的 Goroutine;若返回的 Goroutine 不会为空,那么运行时会将该 Goroutine 会加入 toRun
列表,并将列表中的全部 Goroutine 加入运行队列并等待调度器的调度。
runtime.netpoll
返回的 Goroutine 列表都会被 runtime.injectglist
注入到处理器或者全局的运行队列上。因为系统监控 Goroutine 直接运行在线程上,所以它获取的 Goroutine 列表会直接加入全局的运行队列,其他 Goroutine 获取的列表都会加入 Goroutine 所在处理器的运行队列上。
截止日期
网络轮询器和计时器的关系非常紧密,因为:
- 网络轮询器负责计时器的唤醒
- 文件和网络 I/O 的截止日期也由网络轮询器负责处理; 截止日期在 I/O 操作中,尤其是网络调用中很关键,网络请求存在很高的不确定因素,需要设置一个截止日期保证程序的正常运行
设置截至日期需要runtime.poll_runtime_pollSetDeadline
函数:
func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
rd0, wd0 := pd.rd, pd.wd
if d > 0 {
d += nanotime()
}
pd.rd = d
...
if pd.rt.f == nil {
if pd.rd > 0 {
pd.rt.f = netpollReadDeadline
pd.rt.arg = pd
pd.rt.seq = pd.rseq
resettimer(&pd.rt, pd.rd)
}
} else if pd.rd != rd0 {
pd.rseq++
if pd.rd > 0 {
modtimer(&pd.rt, pd.rd, 0, rtf, pd, pd.rseq)
} else {
deltimer(&pd.rt)
pd.rt.f = nil
}
}
先使用截止日期计算出过期的时间点,然后根据 runtime.pollDesc
的状态做出以下不同的处理:
- 若结构体中的计时器没有设置执行的函数时,该函数会设置计时器到期后执行的函数、传入的参数并调用
runtime.resettimer
重置计时器; - 若结构体的读截止日期已经被改变,我们会根据新的截止日期做出不同的处理:
- 若新的截止日期大于 0,调用
runtime.modtimer
修改计时器; - 若新的截止日期小于 0,调用
runtime.deltimer
删除计时器;
- 若新的截止日期大于 0,调用
在 runtime.poll_runtime_pollSetDeadline
的最后,会重新检查轮询信息中存储的截止日期:
var rg *g
if pd.rd < 0 {
if pd.rd < 0 {
rg = netpollunblock(pd, 'r', false)
}
...
}
if rg != nil {
netpollgoready(rg, 3)
}
...
}
若截止日期小于 0,上述代码会调用 runtime.netpollgoready
直接唤醒对应的 Goroutine。
在 runtime.poll_runtime_pollSetDeadline
中直接调用 runtime.netpollgoready
是相对比较特殊的情况。在正常情况下,运行时都会在计时器到期时调用 runtime.netpollDeadline
、runtime.netpollReadDeadline
和 runtime.netpollWriteDeadline
三个函数:
上述三个函数都会通过 runtime.netpolldeadlineimpl
调用 runtime.netpollgoready
直接唤醒相应的 Goroutine:
func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
currentSeq := pd.rseq
if !read {
currentSeq = pd.wseq
}
if seq != currentSeq {
return
}
var rg *g
if read {
pd.rd = -1
atomic.StorepNoWB(unsafe.Pointer(&pd.rt.f), nil)
rg = netpollunblock(pd, 'r', false)
}
...
if rg != nil {
netpollgoready(rg, 0)
}
...
}
Goroutine 在被唤醒之后会意识到当前的 I/O 操作已经超时,可以根据需要选择重试请求或者中止调用。