6.5 调度器
线程是操作系统调度的基本单位,多个线程可以属于同一进程并共享内存空间。
Golang 的调度器通过使用与CPU数量相同的线程减少线程频繁切换的内存开销,同时在每个线程上执行开销更小的 goroutine 来降低操作系统和硬件的负担。
6.5.1 设计原理
Golang 调度器的发展历程:
单线程调度器 0.x: 程序中只有一个活跃线程。有G-M模型组成
多线程调度器 1.0 :
允许运行多线程程序
任务窃取调度器 1.1 :
- 引入处理器 P,构成 G-M-P 模型
- 实现了基于 工作窃取 的调度器
抢占式调度器 1.2 :
- 基于协作的抢占式调度器 1.2~1.13:
- 基于信号的抢占式调度器 1.4 ~
6.5.2 数据结构
Golang 调度器使用 GMP 模型:
- G:goroutine,表示一个待执行的任务
- M:操作系统线程,由操作系统的调度器进行调度和管理
- P:处理器,包含运行 G 的资源
G
Goroutine 是 Golang 调度器中待执行的任务,只存在于运行时。
Goroutine 是 Golang 在用户态提供的线程,但 goroutine 比系统线程要更加轻量占用更少的资源。
Goroutine 的数据结构用 runtime.g
表示,其中:
和栈有关的字段为:
type g struct {
...
stack stack
stackguard0 uintptr
...
}
stack
:描述了当前 goroutine 的栈内存地址范围 [stack.lo, stack.hi]stackguard0
:用于调度器的抢占式调度
和抢占相关的字段:
type g struct {
...
preempt bool // 抢占信号
preemptStop bool // 抢占时将状态修改成 `_Gpreempted`
preemptShrink bool // 在同步安全点收缩栈
...
}
goroutine 还持有 derfer
和 panic
的链表:
type g struct {
...
_panic *_panic // 最内侧的 panic 结构体
_defer *_defer // 最内侧的延迟函数结构体
...
}
比较重要的字段:
type g struct {
...
m *m
sched gobuf
atomicstatus uint32
goid int64
...
}
m
:当前 goroutine 占用的线程atomicstatus
:goroutine 的状态sched
:goroutine 调度相关信息goid
:goroutine ID
其中的 sched
字段使用 runtime.gobuf
结构体:
type gobuf struct {
sp uintptr
pc uintptr
g guintptr
ret sys.Uintreg
...
}
sp
:栈指针pc
:程序计数器g
:持有runtime.gobuf
的 goroutineret
:系统调用的返回值
状态
atomicstatus
表示 goroutine 状态,除去 GC 相关的状态之外,可能有 9 中状态:
状态 | 描述 |
---|---|
_Gidle | 刚刚被分配并且还没有被初始化 |
_Grunnable | 没有执行代码,没有栈的所有权,存储在运行队列中 |
_Grunning | 可以执行代码,拥有栈的所有权,被赋予了内核线程 M 和处理器 P |
_Gsyscall | 正在执行系统调用,拥有栈的所有权,没有执行用户代码,被赋予了内核线程 M 但是不在运行队列上 |
_Gwaiting | 由于运行时而被阻塞,没有执行用户代码并且不在运行队列上,但是可能存在于 Channel 的等待队列上 |
_Gdead | 没有被使用,没有执行代码,可能有分配的栈 |
_Gcopystack | 栈正在被拷贝,没有执行代码,不在运行队列上 |
_Gpreempted | 由于抢占而被阻塞,没有执行用户代码并且不在运行队列上,等待唤醒 |
_Gscan | GC 正在扫描栈空间,没有执行代码,可以与其他状态同时存在 |
主要的几个状态有三种:
- 等待中:G 等待某些条件满足;如:
_Gwaiting
、_Gsyscall
和_Gpreempted
- 可运行:G 准备就绪,可以在线程上运行;即
_Grunnable
- 运行中:G 正在线程上运行;即
_Grunning
M
M 表示操作系统线程,即内核态线程。
最大活跃线程数量和GOMAXPROCS
有关,一个程序最多只会有 GOMAXPROCS
个活跃线程。
默认情况下GOMAXPROCS
为当前机器的CPU数量:
M 的运行时数据结构由runtime.m
表示:
和 G 相关的字段:
type m struct {
...
g0 *g
curg *g
...
}
g0
:持有调度栈的 Gcurg
:当前线程上运行的用户 G
和 P 相关的字段:
type m struct {
p puintptr
nextp puintptr
oldp puintptr
}
p
:正在运行的 Pnextp
:暂存的 Poldp
:执行系统调用前的使用线程的 P
P
P 是 M 和 G 的中间层:
- 提供线程M所需的上下文环境
- 负责调度线程上的等待队列
P 的数量和GOMAXPROCS
相同。
P 的运行时数据结构由runtime.p
表示,其中:
和 线程 与 运行队列 相关的字段:
type p struct {
m muintptr
runqhead uint32
runqtail uint32
runq [256]guintptr
runnext guintptr
...
}
m
:P 对应的 Mrunq
:P 持有的运行队列,存储等待运行的 Grunnext
:下一个需要执行的 G
P 的状态有以下五种:
状态 | 描述 |
---|---|
_Pidle | 处理器没有运行用户代码或者调度器,被空闲队列或者改变其状态的结构持有,运行队列为空 |
_Prunning | 被线程 M 持有,并且正在执行用户代码或者调度器 |
_Psyscall | 没有执行用户代码,当前线程陷入系统调用 |
_Pgcstop | 被线程 M 持有,当前处理器由于垃圾回收被停止 |
_Pdead | 当前处理器已经不被使用 |
6.5.3 调度器的启动
调度器在运行时通过runtime.schedinit
初始化:
func schedinit() {
_g_ := getg()
...
sched.maxmcount = 10000
...
sched.lastpoll = uint64(nanotime())
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
}
sched.maxmcount = 10000
:表示 Golang 能够创建的最大线程 M 的数量
之后会调用runtime.procresize
,流程如下:
- 如果全局变量
allp
切片中的处理器数量少于期望数量,会对切片进行扩容; - 使用
new
创建新的处理器结构体并调用runtime.p.init
初始化刚刚扩容的处理器; - 通过指针将线程 m0 和处理器
allp[0]
绑定到一起 - 调用
runtime.p.destroy
释放不再使用的处理器结构 - 通过截断改变全局变量
allp
的长度保证与期望处理器数量相等 - 将除
allp[0]
之外的处理器 P 全部设置成_Pidle
并加入到全局的空闲队列中
6.5.4 G 的创建
创建 G 可以通过 go
关键字,编译期会将其转换成 runtime.newproc
函数调用:
func (s *state) call(n *Node, k callKind) *ssa.Value {
if k == callDeferStack {
...
} else {
switch {
case k == callGo:
call = s.newValue1A(ssa.OpStaticCall, types.TypeMem, newproc, s.mem())
default:
}
}
...
}
func newproc(siz int32, fn *funcval) {
argp := add(unsafe.Pointer(&fn), sys.PtrSize)
gp := getg()
pc := getcallerpc()
systemstack(func() {
newg := newproc1(fn, argp, siz, gp, pc)
_p_ := getg().m.p.ptr()
runqput(_p_, newg, true)
if mainStarted {
wakep()
}
})
}
- 入参:
siz
:参数大小fn
:函数指针
newproc1
:获取 G 结构体runqput
:将创建的 G 加入 P 的运行队列中- 在满足条件时,使用
runtime.wakep
唤醒 P 执行 G
runtime.newproc1
的执行流程如下:
- 获取或创建新的 G
- 将入参放入 G 的栈上
- 更新 G 的相关信息
创建 G 的结构体:
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
_g_ := getg()
siz := narg
siz = (siz + 7) &^ 7
_p_ := _g_.m.p.ptr()
newg := gfget(_p_)
if newg == nil {
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg)
}
...
- 从 P 的
gFree
中获取空闲的 G - 若不存在,则通过
malg
创建一个新的 G
将参数放入 G 的栈内存中:
...
totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize
totalSize += -totalSize & (sys.SpAlign - 1)
sp := newg.stack.hi - totalSize
spArg := sp
if narg > 0 {
memmove(unsafe.Pointer(spArg), argp, uintptr(narg))
}
...
- 调用
runtime.memmove
将函数fn
的所有参数拷贝到栈上
更新 G 的信息,并返回:
...
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
newg.stktopsp = sp
newg.sched.pc = funcPC(goexit) + sys.PCQuantum
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
newg.gopc = callerpc
newg.startpc = fn.fn
casgstatus(newg, _Gdead, _Grunnable)
newg.goid = int64(_p_.goidcache)
_p_.goidcache++
return newg
}
获取空闲的 G 或 创建 G
runtime.gfget
函数获取 G 的方式有两种:
- 从
gFree
列表中获取空闲的 G:- 从当前的 G 所在的 P 的
gFree
中获取 - 从当前的调度器的
sched.gFree
中获取
- 从当前的 G 所在的 P 的
- 使用
runtime.malg
生成一个新的 G,并加入全局的 G 列表allgs
中
运行队列
runtime.runqput
将 G 放到运行队列中,运行队列可能是:
- 全局运行队列
- P 的本地运行队列
func runqput(_p_ *p, gp *g, next bool) {
if next {
retryNext:
oldnext := _p_.runnext
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
if oldnext == 0 {
return
}
gp = oldnext.ptr()
}
retry:
h := atomic.LoadAcq(&_p_.runqhead)
t := _p_.runqtail
if t-h < uint32(len(_p_.runq)) {
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
atomic.StoreRel(&_p_.runqtail, t+1)
return
}
if runqputslow(_p_, gp, h, t) {
return
}
goto retry
}
- 当
next
为true
时,将 Goroutine 设置到P的runnext
作为下一个处理器执行的任务 - 当
next
为false
并且本地运行队列还有剩余空间时,将 Goroutine 加入 P 持有的本地运行队列 - 当 P 的本地运行队列已经没有剩余空间时就会把本地队列中的一部分 Goroutine 和待加入的 Goroutine 通过
runtime.runqputslow
添加到调度器持有的全局运行队列上
P 的本地的运行队列是一个使用数组构成的环形链表,它最多可以存储 256 个待执行任务。
综上,Golang 中有两个 G 的运行队列:
- P 的本地运行队列
- 调度器持有的 全局 运行队列,当 P 的本地运行队列已满时才会加入到全局运行队列中
调度信息
新的 G 在被创建后会设置相关的调度信息。
6.5.5 调度循环
在调度器启动之后:
- 调用
runtime.mstart
,初始化g0
的stackguard0
和stackguard1
字段 - 调用
runtime.mstart1
初始化线程,并使用runtime.schedule
进入调度循环
func schedule() {
_g_ := getg()
top:
var gp *g
var inheritTime bool
if gp == nil {
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
}
if gp == nil {
gp, inheritTime = findrunnable()
}
execute(gp, inheritTime)
}
获取待执行的 G 流程如下:
- 当全局队列中有待执行的 G 时,通过
schedtick
确保有一定的机率从全局队列中获取G - 从P的本地队列中获取 G
- 若无法从 P 的本地队列和全局队列中获取,则调用函数
runtime.findrunnable
进行查找
runtime.findrunnable
的流程如下:
- 从本地运行队列、全局运行队列中查找
- 从网络轮询器中查找是否有 Goroutine 等待运行
- 通过
runtime.runqsteal
尝试从其他随机的 P 中窃取待运行的 Goroutine
获取 G 之后,调用函数 runtime.execute
执行 G:
func execute(gp *g, inheritTime bool) {
_g_ := getg()
_g_.m.curg = gp
gp.m = _g_.m
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
if !inheritTime {
_g_.m.p.ptr().schedtick++
}
gogo(&gp.sched)
}
函数最终将使用 runtime.gogo
将 G 调度到当前的线程 M 上,gogo
函数在不同架构上实现有所不同,386 架构上如下:
TEXT runtime·gogo(SB), NOSPLIT, $8-4
MOVL buf+0(FP), BX // 获取调度信息
MOVL gobuf_g(BX), DX
MOVL 0(DX), CX // 保证 Goroutine 不为空
get_tls(CX)
MOVL DX, g(CX)
MOVL gobuf_sp(BX), SP // 将 runtime.goexit 函数的 PC 恢复到 SP 中
MOVL gobuf_ret(BX), AX
MOVL gobuf_ctxt(BX), DX
MOVL $0, gobuf_sp(BX)
MOVL $0, gobuf_ret(BX)
MOVL $0, gobuf_ctxt(BX)
MOVL gobuf_pc(BX), BX // 获取待执行函数的程序计数器
JMP BX // 开始执行
当 Goroutine 中运行的函数返回时,程序会跳转到 runtime.goexit
所在位置执行该函数:
TEXT runtime·goexit(SB),NOSPLIT,$0-0
CALL runtime·goexit1(SB)
func goexit1() {
mcall(goexit0)
}
经过复杂的函数调用之后,最终在当前线程的 g0 的栈上调用 runtime.goexit0
函数,该函数会将 Goroutine 转换会 _Gdead
状态、清理其中的字段、移除 Goroutine 和线程的关联并调用 runtime.gfput
重新加入处理器的 Goroutine 空闲列表 gFree
:
func goexit0(gp *g) {
_g_ := getg()
casgstatus(gp, _Grunning, _Gdead)
gp.m = nil
...
gp.param = nil
gp.labels = nil
gp.timer = nil
dropg()
gfput(_g_.m.p.ptr(), gp)
schedule()
}
runtime.goexit0
会重新调用 runtime.schedule
触发新一轮的 Goroutine 调度。
Golang 中的运行时调度循环会从 runtime.schedule
开始,最终又回到 runtime.schedule
,认为调度循环永远都不会返回。
6.5.6 触发调度
触发调度的主要有以下路径:
- 主动挂起:
runtime.gopark
->runtime.park_m
- 系统调用 :
runtime.exitsyscall
->runtime.exitsyscall0
- 协作式调度 :
runtime.Gosched
->runtime.gosched_m
->runtime.goschedImpl
- 系统监控 :
runtime.sysmon
->runtime.retake
->runtime.preemptone
主动挂起
runtime.gopark
会将当前 Goroutine 暂停,被暂停的任务不会放回运行队列:
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
mp := acquirem()
gp := mp.curg
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
mcall(park_m)
}
使用runtime.mcall
切换到 g0 的栈上调用 runtime.park_m
:
func park_m(gp *g) {
_g_ := getg()
casgstatus(gp, _Grunning, _Gwaiting)
dropg()
schedule()
}
runtime.park_m
会将当前 Goroutine 的状态从 _Grunning
切换至 _Gwaiting
,调用 runtime.dropg
移除线程和 Goroutine 之间的关联,在这之后调用 runtime.schedule
触发新一轮的调度。
当 Goroutine 等待的特定条件满足后,运行时会调用 runtime.goready
将因为调用 runtime.gopark
而陷入休眠的 Goroutine 唤醒。
func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}
func ready(gp *g, traceskip int, next bool) {
_g_ := getg()
casgstatus(gp, _Gwaiting, _Grunnable)
runqput(_g_.m.p.ptr(), gp, next)
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
wakep()
}
}
runtime.ready
会将准备就绪的 Goroutine 的状态切换至 _Grunnable
并将其加入处理器的运行队列中,等待调度器的调度。
系统调用
系统调用也会触发运行时调度器的调度。
通过 syscall.Syscall
和 syscall.RawSyscall
等使用汇编语言编写的方法封装操作系统提供的所有系统调用,其中 syscall.Syscall
的实现如下:
#define INVOKE_SYSCALL INT $0x80
TEXT ·Syscall(SB),NOSPLIT,$0-28
CALL runtime·entersyscall(SB)
...
INVOKE_SYSCALL
...
CALL runtime·exitsyscall(SB)
RET
ok:
...
CALL runtime·exitsyscall(SB)
RET
INVOKE_SYSCALL
执行系统调用前后,上述函数会调用运行时的 runtime.entersyscall
和 runtime.exitsyscall
。
6.5.7 线程管理
Golang 通过调度器改变线程的所有权。
runtime.LockOSThread
通过如下所示的代码绑定 Goroutine 和当前线程:
func LockOSThread() {
if atomic.Load(&newmHandoff.haveTemplateThread) == 0 && GOOS != "plan9" {
startTemplateThread()
}
_g_ := getg()
_g_.m.lockedExt++
dolockOSThread()
}
func dolockOSThread() {
_g_ := getg()
_g_.m.lockedg.set(_g_)
_g_.lockedm.set(_g_.m)
}
runtime.dolockOSThread
会分别设置线程的 lockedg
字段和 Goroutine 的 lockedm
字段,绑定线程和 Goroutine。
当 Goroutine 完成了特定的操作之后,会调用以下函数 runtime.UnlockOSThread
分离 Goroutine 和线程:
func UnlockOSThread() {
_g_ := getg()
if _g_.m.lockedExt == 0 {
return
}
_g_.m.lockedExt--
dounlockOSThread()
}
func dounlockOSThread() {
_g_ := getg()
if _g_.m.lockedInt != 0 || _g_.m.lockedExt != 0 {
return
}
_g_.m.lockedg = 0
_g_.lockedm = 0
}
线程生命周期
Go 语言的运行时会通过 runtime.startm
启动线程来执行处理器 P,如果在该函数中没能从闲置列表中获取到线程 M 就会调用 runtime.newm
创建新的线程:
func newm(fn func(), _p_ *p, id int64) {
mp := allocm(_p_, fn, id)
mp.nextp.set(_p_)
mp.sigmask = initSigmask
...
newm1(mp)
}
func newm1(mp *m) {
if iscgo {
...
}
newosproc(mp)
}
创建新的线程需要使用如下所示的 runtime.newosproc
,该函数在 Linux 平台上会通过系统调用 clone
创建新的操作系统线程,它也是创建线程链路上距离操作系统最近的 Go 语言函数:
func newosproc(mp *m) {
stk := unsafe.Pointer(mp.g0.stack.hi)
...
ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
...
}
使用系统调用 clone
创建的线程会在线程主动调用 exit
、或者传入的函数 runtime.mstart
返回会主动退出,runtime.mstart
会执行调用 runtime.newm
时传入的匿名函数 fn
,到这里也就完成了从线程创建到销毁的整个闭环。