6.5 调度器

Kesa...大约 13 分钟golang

线程是操作系统调度的基本单位,多个线程可以属于同一进程并共享内存空间。

process-and-threads
process-and-threads

Golang 的调度器通过使用与CPU数量相同的线程减少线程频繁切换的内存开销,同时在每个线程上执行开销更小的 goroutine 来降低操作系统和硬件的负担。

6.5.1 设计原理

Golang 调度器的发展历程:

  1. 单线程调度器 0.x: 程序中只有一个活跃线程。有G-M模型组成

  2. 多线程调度器 1.0 :

    允许运行多线程程序

  3. 任务窃取调度器 1.1 :

    • 引入处理器 P,构成 G-M-P 模型
    • 实现了基于 工作窃取 的调度器
  4. 抢占式调度器 1.2 :

    • 基于协作的抢占式调度器 1.2~1.13:
    • 基于信号的抢占式调度器 1.4 ~

6.5.2 数据结构

Golang 调度器使用 GMP 模型:

  1. G:goroutine,表示一个待执行的任务
  2. M:操作系统线程,由操作系统的调度器进行调度管理
  3. P:处理器,包含运行 G 的资源

G

Goroutine 是 Golang 调度器中待执行的任务,只存在于运行时

Goroutine 是 Golang 在用户态提供的线程,但 goroutine 比系统线程要更加轻量占用更少的资源。

Goroutine 的数据结构用 runtime.gopen in new window 表示,其中:

有关的字段为:

type g struct {
    ...
	stack       stack
	stackguard0 uintptr
    ...
}
  • stack:描述了当前 goroutine 的栈内存地址范围 [stack.lo, stack.hi]
  • stackguard0:用于调度器的抢占式调度

抢占相关的字段:

type g struct {
    ...
	preempt       bool // 抢占信号
	preemptStop   bool // 抢占时将状态修改成 `_Gpreempted`
	preemptShrink bool // 在同步安全点收缩栈
	...
}

goroutine 还持有 derferpanic的链表:

type g struct {
    ...
	_panic       *_panic // 最内侧的 panic 结构体
	_defer       *_defer // 最内侧的延迟函数结构体
    ...
}

比较重要的字段:

type g struct {
    ...
	m              *m
	sched          gobuf
	atomicstatus   uint32
	goid           int64
    ...
}
  • m:当前 goroutine 占用的线程
  • atomicstatus:goroutine 的状态
  • sched:goroutine 调度相关信息
  • goid:goroutine ID

其中的 sched字段使用 runtime.gobufopen in new window 结构体:

type gobuf struct {
	sp   uintptr
	pc   uintptr
	g    guintptr
	ret  sys.Uintreg
	...
}

状态

atomicstatus表示 goroutine 状态,除去 GC 相关的状态之外,可能有 9 中状态:

状态描述
_Gidle刚刚被分配并且还没有被初始化
_Grunnable没有执行代码,没有栈的所有权,存储在运行队列中
_Grunning可以执行代码,拥有栈的所有权,被赋予了内核线程 M 和处理器 P
_Gsyscall正在执行系统调用,拥有栈的所有权,没有执行用户代码,被赋予了内核线程 M 但是不在运行队列上
_Gwaiting由于运行时而被阻塞,没有执行用户代码并且不在运行队列上,但是可能存在于 Channel 的等待队列上
_Gdead没有被使用,没有执行代码,可能有分配的栈
_Gcopystack栈正在被拷贝,没有执行代码,不在运行队列上
_Gpreempted由于抢占而被阻塞,没有执行用户代码并且不在运行队列上,等待唤醒
_GscanGC 正在扫描栈空间,没有执行代码,可以与其他状态同时存在

主要的几个状态有三种:

  1. 等待中:G 等待某些条件满足;如: _Gwaiting_Gsyscall_Gpreempted
  2. 可运行:G 准备就绪,可以在线程上运行;即_Grunnable
  3. 运行中:G 正在线程上运行;即 _Grunning
golang-goroutine-state-transition
golang-goroutine-state-transition

M

M 表示操作系统线程,即内核态线程。

最大活跃线程数量和GOMAXPROCS有关,一个程序最多只会有 GOMAXPROCS个活跃线程。

默认情况下GOMAXPROCS为当前机器的CPU数量:

scheduler-m-and-thread
scheduler-m-and-thread

M 的运行时数据结构由runtime.mopen in new window表示:

G 相关的字段:

type m struct {
    ...
	g0   *g
	curg *g
	...
}
  • g0:持有调度栈的 G
  • curg:当前线程上运行的用户 G
g0-and-g
g0-and-g

P 相关的字段:

type m struct {
	p             puintptr
	nextp         puintptr
	oldp          puintptr
}
  • p:正在运行的 P
  • nextp:暂存的 P
  • oldp:执行系统调用前的使用线程的 P

P

P 是 M 和 G 的中间层:

  1. 提供线程M所需的上下文环境
  2. 负责调度线程上的等待队列

P 的数量和GOMAXPROCS相同。

P 的运行时数据结构由runtime.popen in new window表示,其中:

线程运行队列 相关的字段:

type p struct {
	m           muintptr

	runqhead uint32
	runqtail uint32
	runq     [256]guintptr
	runnext guintptr
	...
}
  • m:P 对应的 M
  • runq:P 持有的运行队列,存储等待运行的 G
  • runnext:下一个需要执行的 G

P 的状态有以下五种:

状态描述
_Pidle处理器没有运行用户代码或者调度器,被空闲队列或者改变其状态的结构持有,运行队列为空
_Prunning被线程 M 持有,并且正在执行用户代码或者调度器
_Psyscall没有执行用户代码,当前线程陷入系统调用
_Pgcstop被线程 M 持有,当前处理器由于垃圾回收被停止
_Pdead当前处理器已经不被使用

6.5.3 调度器的启动

调度器在运行时通过runtime.schedinitopen in new window初始化:

func schedinit() {
	_g_ := getg()
	...

	sched.maxmcount = 10000

	...
	sched.lastpoll = uint64(nanotime())
	procs := ncpu
	if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
		procs = n
	}
	if procresize(procs) != nil {
		throw("unknown runnable goroutine during bootstrap")
	}
}
  • sched.maxmcount = 10000:表示 Golang 能够创建的最大线程 M 的数量

之后会调用runtime.procresizeopen in new window,流程如下:

  1. 如果全局变量 allp 切片中的处理器数量少于期望数量,会对切片进行扩容;
  2. 使用 new 创建新的处理器结构体并调用 runtime.p.initopen in new window 初始化刚刚扩容的处理器;
  3. 通过指针将线程 m0 和处理器 allp[0] 绑定到一起
  4. 调用 runtime.p.destroyopen in new window 释放不再使用的处理器结构
  5. 通过截断改变全局变量 allp 的长度保证与期望处理器数量相等
  6. 将除 allp[0] 之外的处理器 P 全部设置成 _Pidle 并加入到全局的空闲队列中

6.5.4 G 的创建

创建 G 可以通过 go关键字,编译期会将其转换成 runtime.newprocopen in new window 函数调用:

func (s *state) call(n *Node, k callKind) *ssa.Value {
	if k == callDeferStack {
		...
	} else {
		switch {
		case k == callGo:
			call = s.newValue1A(ssa.OpStaticCall, types.TypeMem, newproc, s.mem())
		default:
		}
	}
	...
}
func newproc(siz int32, fn *funcval) {
	argp := add(unsafe.Pointer(&fn), sys.PtrSize)
	gp := getg()
	pc := getcallerpc()
	systemstack(func() {
		newg := newproc1(fn, argp, siz, gp, pc)

		_p_ := getg().m.p.ptr()
		runqput(_p_, newg, true)

		if mainStarted {
			wakep()
		}
	})
}
  • 入参:
    • siz:参数大小
    • fn:函数指针
  • newproc1:获取 G 结构体
  • runqput:将创建的 G 加入 P 的运行队列中
  • 在满足条件时,使用runtime.wakepopen in new window唤醒 P 执行 G

runtime.newproc1open in new window的执行流程如下:

  1. 获取或创建新的 G
  2. 将入参放入 G 的栈上
  3. 更新 G 的相关信息

创建 G 的结构体:

func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
	_g_ := getg()
	siz := narg
	siz = (siz + 7) &^ 7

	_p_ := _g_.m.p.ptr()
	newg := gfget(_p_)
	if newg == nil {
		newg = malg(_StackMin)
		casgstatus(newg, _Gidle, _Gdead)
		allgadd(newg)
	}
	...
  • 从 P 的 gFree中获取空闲的 G
  • 若不存在,则通过 malg 创建一个新的 G

将参数放入 G 的栈内存中:

	...
	totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize
	totalSize += -totalSize & (sys.SpAlign - 1)
	sp := newg.stack.hi - totalSize
	spArg := sp
	if narg > 0 {
		memmove(unsafe.Pointer(spArg), argp, uintptr(narg))
	}
	...

更新 G 的信息,并返回:

	...
	memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
	newg.sched.sp = sp
	newg.stktopsp = sp
	newg.sched.pc = funcPC(goexit) + sys.PCQuantum
	newg.sched.g = guintptr(unsafe.Pointer(newg))
	gostartcallfn(&newg.sched, fn)
	newg.gopc = callerpc
	newg.startpc = fn.fn
	casgstatus(newg, _Gdead, _Grunnable)
	newg.goid = int64(_p_.goidcache)
	_p_.goidcache++
	return newg
}

获取空闲的 G 或 创建 G

runtime.gfgetopen in new window函数获取 G 的方式有两种:

  1. gFree 列表中获取空闲的 G:
    1. 从当前的 G 所在的 P 的 gFree 中获取
    2. 从当前的调度器的 sched.gFree中获取
  2. 使用 runtime.malgopen in new window 生成一个新的 G,并加入全局的 G 列表 allgs
golang-newproc-get-goroutine
golang-newproc-get-goroutine

运行队列

runtime.runqputopen in new window将 G 放到运行队列中,运行队列可能是:

  • 全局运行队列
  • P 的本地运行队列
func runqput(_p_ *p, gp *g, next bool) {
	if next {
	retryNext:
		oldnext := _p_.runnext
		if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
			goto retryNext
		}
		if oldnext == 0 {
			return
		}
		gp = oldnext.ptr()
	}
retry:
	h := atomic.LoadAcq(&_p_.runqhead)
	t := _p_.runqtail
	if t-h < uint32(len(_p_.runq)) {
		_p_.runq[t%uint32(len(_p_.runq))].set(gp)
		atomic.StoreRel(&_p_.runqtail, t+1)
		return
	}
	if runqputslow(_p_, gp, h, t) {
		return
	}
	goto retry
}
  1. nexttrue 时,将 Goroutine 设置到P的 runnext 作为下一个处理器执行的任务
  2. nextfalse 并且本地运行队列还有剩余空间时,将 Goroutine 加入 P 持有的本地运行队列
  3. 当 P 的本地运行队列已经没有剩余空间时就会把本地队列中的一部分 Goroutine 和待加入的 Goroutine 通过 runtime.runqputslowopen in new window 添加到调度器持有的全局运行队列上

P 的本地的运行队列是一个使用数组构成的环形链表,它最多可以存储 256 个待执行任务。

golang-runnable-queue
golang-runnable-queue

综上,Golang 中有两个 G 的运行队列:

  1. P 的本地运行队列
  2. 调度器持有的 全局 运行队列,当 P 的本地运行队列已满时才会加入到全局运行队列中

调度信息

新的 G 在被创建后会设置相关的调度信息。

6.5.5 调度循环

在调度器启动之后:

  1. 调用 runtime.mstartopen in new window,初始化 g0stackguard0stackguard1 字段
  2. 调用 runtime.mstart1open in new window 初始化线程,并使用 runtime.scheduleopen in new window 进入调度循环
func schedule() {
	_g_ := getg()

top:
	var gp *g
	var inheritTime bool

	if gp == nil {
		if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
			lock(&sched.lock)
			gp = globrunqget(_g_.m.p.ptr(), 1)
			unlock(&sched.lock)
		}
	}
	if gp == nil {
		gp, inheritTime = runqget(_g_.m.p.ptr())
	}
	if gp == nil {
		gp, inheritTime = findrunnable()
	}

	execute(gp, inheritTime)
}

获取待执行的 G 流程如下:

  1. 全局队列中有待执行的 G 时,通过 schedtick 确保有一定的机率从全局队列中获取G
  2. 从P的本地队列中获取 G
  3. 若无法从 P 的本地队列和全局队列中获取,则调用函数 runtime.findrunnableopen in new window进行查找

runtime.findrunnableopen in new window的流程如下:

  1. 本地运行队列全局运行队列中查找
  2. 网络轮询器中查找是否有 Goroutine 等待运行
  3. 通过 runtime.runqstealopen in new window 尝试从其他随机的 P 中窃取待运行的 Goroutine

获取 G 之后,调用函数 runtime.executeopen in new window 执行 G:

func execute(gp *g, inheritTime bool) {
	_g_ := getg()

	_g_.m.curg = gp
	gp.m = _g_.m
	casgstatus(gp, _Grunnable, _Grunning)
	gp.waitsince = 0
	gp.preempt = false
	gp.stackguard0 = gp.stack.lo + _StackGuard
	if !inheritTime {
		_g_.m.p.ptr().schedtick++
	}

	gogo(&gp.sched)
}

函数最终将使用 runtime.gogoopen in new window 将 G 调度到当前的线程 M 上,gogo函数在不同架构上实现有所不同,386 架构上如下:

TEXT runtime·gogo(SB), NOSPLIT, $8-4
	MOVL buf+0(FP), BX     // 获取调度信息
	MOVL gobuf_g(BX), DX
	MOVL 0(DX), CX         // 保证 Goroutine 不为空
	get_tls(CX)
	MOVL DX, g(CX)
	MOVL gobuf_sp(BX), SP  // 将 runtime.goexit 函数的 PC 恢复到 SP 中
	MOVL gobuf_ret(BX), AX
	MOVL gobuf_ctxt(BX), DX
	MOVL $0, gobuf_sp(BX)
	MOVL $0, gobuf_ret(BX)
	MOVL $0, gobuf_ctxt(BX)
	MOVL gobuf_pc(BX), BX  // 获取待执行函数的程序计数器
	JMP  BX                // 开始执行
golang-gogo-stack
golang-gogo-stack

当 Goroutine 中运行的函数返回时,程序会跳转到 runtime.goexitopen in new window 所在位置执行该函数:

TEXT runtime·goexit(SB),NOSPLIT,$0-0
	CALL	runtime·goexit1(SB)

func goexit1() {
	mcall(goexit0)
}

经过复杂的函数调用之后,最终在当前线程的 g0 的栈上调用 runtime.goexit0open in new window 函数,该函数会将 Goroutine 转换会 _Gdead 状态、清理其中的字段、移除 Goroutine 和线程的关联并调用 runtime.gfputopen in new window 重新加入处理器的 Goroutine 空闲列表 gFree

func goexit0(gp *g) {
	_g_ := getg()

	casgstatus(gp, _Grunning, _Gdead)
	gp.m = nil
	...
	gp.param = nil
	gp.labels = nil
	gp.timer = nil

	dropg()
	gfput(_g_.m.p.ptr(), gp)
	schedule()
}

runtime.goexit0open in new window 会重新调用 runtime.scheduleopen in new window 触发新一轮的 Goroutine 调度。

Golang 中的运行时调度循环会从 runtime.scheduleopen in new window 开始,最终又回到 runtime.scheduleopen in new window,认为调度循环永远都不会返回。

golang-scheduler-loop
golang-scheduler-loop

6.5.6 触发调度

schedule-points
schedule-points

触发调度的主要有以下路径:

  1. 主动挂起:runtime.goparkopen in new window -> runtime.park_mopen in new window
  2. 系统调用 : runtime.exitsyscallopen in new window -> runtime.exitsyscall0open in new window
  3. 协作式调度 : runtime.Goschedopen in new window -> runtime.gosched_mopen in new window -> runtime.goschedImplopen in new window
  4. 系统监控 : runtime.sysmonopen in new window -> runtime.retakeopen in new window -> runtime.preemptoneopen in new window

主动挂起

runtime.goparkopen in new window会将当前 Goroutine 暂停,被暂停的任务不会放回运行队列:

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
	mp := acquirem()
	gp := mp.curg
	mp.waitlock = lock
	mp.waitunlockf = unlockf
	gp.waitreason = reason
	mp.waittraceev = traceEv
	mp.waittraceskip = traceskip
	releasem(mp)
	mcall(park_m)
}

使用runtime.mcallopen in new window切换到 g0 的栈上调用 runtime.park_mopen in new window

func park_m(gp *g) {
	_g_ := getg()

	casgstatus(gp, _Grunning, _Gwaiting)
	dropg()

	schedule()
}

runtime.park_mopen in new window会将当前 Goroutine 的状态从 _Grunning 切换至 _Gwaiting,调用 runtime.dropgopen in new window 移除线程和 Goroutine 之间的关联,在这之后调用 runtime.scheduleopen in new window 触发新一轮的调度。

当 Goroutine 等待的特定条件满足后,运行时会调用 runtime.goreadyopen in new window 将因为调用 runtime.goparkopen in new window 而陷入休眠的 Goroutine 唤醒。

func goready(gp *g, traceskip int) {
	systemstack(func() {
		ready(gp, traceskip, true)
	})
}

func ready(gp *g, traceskip int, next bool) {
	_g_ := getg()

	casgstatus(gp, _Gwaiting, _Grunnable)
	runqput(_g_.m.p.ptr(), gp, next)
	if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
		wakep()
	}
}

runtime.readyopen in new window 会将准备就绪的 Goroutine 的状态切换至 _Grunnable 并将其加入处理器的运行队列中,等待调度器的调度。

系统调用

系统调用也会触发运行时调度器的调度。

通过 syscall.Syscallopen in new windowsyscall.RawSyscallopen in new window 等使用汇编语言编写的方法封装操作系统提供的所有系统调用,其中 syscall.Syscallopen in new window 的实现如下:

#define INVOKE_SYSCALL	INT	$0x80

TEXT ·Syscall(SB),NOSPLIT,$0-28
	CALL	runtime·entersyscall(SB)
	...
	INVOKE_SYSCALL
	...
	CALL	runtime·exitsyscall(SB)
	RET
ok:
	...
	CALL	runtime·exitsyscall(SB)
	RET

INVOKE_SYSCALL 执行系统调用前后,上述函数会调用运行时的 runtime.entersyscallopen in new windowruntime.exitsyscallopen in new window

golang-syscall-and-rawsyscal
golang-syscall-and-rawsyscal

6.5.7 线程管理

Golang 通过调度器改变线程的所有权。

runtime.LockOSThreadopen in new window通过如下所示的代码绑定 Goroutine 和当前线程:

func LockOSThread() {
	if atomic.Load(&newmHandoff.haveTemplateThread) == 0 && GOOS != "plan9" {
		startTemplateThread()
	}
	_g_ := getg()
	_g_.m.lockedExt++
	dolockOSThread()
}

func dolockOSThread() {
	_g_ := getg()
	_g_.m.lockedg.set(_g_)
	_g_.lockedm.set(_g_.m)
}

runtime.dolockOSThreadopen in new window 会分别设置线程的 lockedg 字段和 Goroutine 的 lockedm 字段,绑定线程和 Goroutine。

当 Goroutine 完成了特定的操作之后,会调用以下函数 runtime.UnlockOSThreadopen in new window 分离 Goroutine 和线程:

func UnlockOSThread() {
	_g_ := getg()
	if _g_.m.lockedExt == 0 {
		return
	}
	_g_.m.lockedExt--
	dounlockOSThread()
}

func dounlockOSThread() {
	_g_ := getg()
	if _g_.m.lockedInt != 0 || _g_.m.lockedExt != 0 {
		return
	}
	_g_.m.lockedg = 0
	_g_.lockedm = 0
}

线程生命周期

Go 语言的运行时会通过 runtime.startmopen in new window 启动线程来执行处理器 P,如果在该函数中没能从闲置列表中获取到线程 M 就会调用 runtime.newmopen in new window 创建新的线程:

func newm(fn func(), _p_ *p, id int64) {
	mp := allocm(_p_, fn, id)
	mp.nextp.set(_p_)
	mp.sigmask = initSigmask
	...
	newm1(mp)
}

func newm1(mp *m) {
	if iscgo {
		...
	}
	newosproc(mp)
}

创建新的线程需要使用如下所示的 runtime.newosprocopen in new window,该函数在 Linux 平台上会通过系统调用 clone 创建新的操作系统线程,它也是创建线程链路上距离操作系统最近的 Go 语言函数:

func newosproc(mp *m) {
	stk := unsafe.Pointer(mp.g0.stack.hi)
	...
	ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
	...
}

使用系统调用 clone 创建的线程会在线程主动调用 exit、或者传入的函数 runtime.mstartopen in new window 返回会主动退出,runtime.mstartopen in new window 会执行调用 runtime.newmopen in new window 时传入的匿名函数 fn,到这里也就完成了从线程创建到销毁的整个闭环。

Reference

  1. https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-goroutine/open in new window
上次编辑于:
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.15.2