6.3 计时器

Kesa...大约 11 分钟golang

6.3.1 设计原理

Golang 的计时器经历了几次版本迭代:

  1. v1.9:所有计时器有全局唯一的四叉堆维护
  2. v1.10~v1.13:全局使用64个四叉堆维护全部的计时器,每个处理器(P)创建的计时器会由对应的四叉堆维护
  3. v1.14:每个处理器单独管理计时器并通过网络轮询触发

全局四叉堆

Go 1.10 之前的计时器都使用最小四叉堆实现,所有的计时器都会存储在如下所示的结构体 runtime.timers:093adeeopen in new window 中:

var timers struct {
	lock         mutex
	gp           *g
	created      bool
	sleeping     bool
	rescheduling bool
	sleepUntil   int64
	waitnote     note
	t            []*timer
}

其中的t就是最小的四叉堆,行时创建的所有计时器都会加入到四叉堆中。

runtime.timerproc:093adeeopen in new window Goroutine 会运行时间驱动的事件,运行时会在发生以下事件时唤醒计时器:

  • 四叉堆中的计时器到期
  • 四叉堆中加入了触发时间更早的新计时器
golang-timer-quadtree
golang-timer-quadtree

缺点

全局四叉堆共用的互斥锁对计时器的影响非常大,计时器的各种操作都需要获取全局唯一的互斥锁,这会严重影响计时器的性能。

分片四叉堆

Go 1.10 将全局的四叉堆分割成了 64 个更小的四叉堆。

理想情况下,四叉堆的数量应该等于处理器的数量,但是这需要实现动态的分配过程,所以经过权衡最终选择初始化 64 个四叉堆,以牺牲内存占用的代价换取性能的提升。

const timersLen = 64

var timers [timersLen]struct {
	timersBucket
}

type timersBucket struct {
	lock         mutex
	gp           *g
	created      bool
	sleeping     bool
	rescheduling bool
	sleepUntil   int64
	waitnote     note
	t            []*timer
}

若当前机器上的处理器 P 的个数超过了 64,多个处理器上的计时器就可能存储在同一个桶中。每一个计时器桶都由一个运行 runtime.timerproc:76f4fd8open in new window 函数的 Goroutine 处理。

golang-timer-bucket
golang-timer-bucket

缺点:

将全局计时器分片的方式,虽然能够降低锁的粒度,提高计时器的性能,但是 runtime.timerproc:76f4fd8open in new window 造成的处理器线程之间频繁上下文切换却成为了影响计时器性能的首要因素。

网络轮询器

Go v1.14 所有的计时器都以最小四叉堆的形式存储在处理器 runtime.popen in new window 中。

golang-p-and-timers
golang-p-and-timers

处理器 runtime.popen in new window 中与计时器相关的有以下字段:

type p struct {
	...
	timersLock mutex
	timers []*timer

	numTimers     uint32
	adjustTimers  uint32
	deletedTimers uint32
	...
}
  • timersLock — 用于保护计时器的互斥锁
  • timers — 存储计时器的最小四叉堆
  • numTimers — 处理器中的计时器数量
  • adjustTimers — 处理器中处于 timerModifiedEarlier 状态的计时器数量
  • deletedTimers — 处理器中处于 timerDeleted 状态的计时器数量

这种方式能够充分利用本地性、减少上下文的切换开销。

6.3.2 数据结构

计时器由runtime.timeropen in new window表示:

type timer struct {
	pp puintptr

	when     int64
	period   int64
	f        func(interface{}, uintptr)
	arg      interface{}
	seq      uintptr
	nextwhen int64
	status   uint32
}
  • when — 当前计时器被唤醒的时间
  • period — 两次被唤醒的间隔
  • f — 每当计时器被唤醒时都会调用的函数
  • arg — 计时器被唤醒时调用 f 传入的参数
  • nextWhen — 计时器处于 timerModifiedXX 状态时,用于设置 when 字段
  • status — 计时器的状态

对外暴露的计时器使用 time.Timeropen in new window 结体:

type Timer struct {
	C <-chan Time
	r runtimeTimer
}

time.Timeropen in new window 计时器必须通过 time.NewTimeropen in new windowtime.AfterFuncopen in new window 或者 time.Afteropen in new window 函数创建。 当计时器失效时,订阅计时器 Channel 的 Goroutine 会收到计时器失效的时间。

6.3.3 状态机

运行时使用状态机的方式处理全部的计时器,其中包括 10 种状态和几种操作。

状态解释
timerNoStatus还没有设置状态
timerWaiting等待触发
timerRunning运行计时器函数
timerDeleted被删除
timerRemoving正在被删除
timerRemoved已经被停止并从堆中删除
timerModifying正在被修改
timerModifiedEarlier被修改到了更早的时间
timerModifiedLater被修改到了更晚的时间
timerMoving已经被修改正在被移动

操作计时器时,运行时会根据状态的不同而做出反应,所以在分析计时器时会将状态作为切入点分析其实现原理。计时器的状态机中包含如下所示的 7 种不同操作,它们分别承担了不同的职责:

增加计时器

调用 time.NewTimeropen in new window 增加新的计时器时,会执行程序中的 runtime.addtimeropen in new window 函数根据以下的规则处理计时器:

  • timerNoStatus -> timerWaiting
  • 其他状态 -> 崩溃:不合法的状态
func addtimer(t *timer) {
	if t.status != timerNoStatus {
		badTimer()
	}
	t.status = timerWaiting
	cleantimers(pp)
	doaddtimer(pp, t)
	wakeNetPoller(when)
}
  1. 调用 runtime.cleantimersopen in new window 清理处理器中的计时器;
  2. 调用runtime.doaddtimer将当前计时器加入处理器的timers四叉堆中:
    1. 调用 runtime.netpollGenericInitopen in new window 函数惰性初始化网络轮询器;
  3. 调用runtime.wakeNetPoller 唤醒网络轮询器中休眠的线程;
    1. 调用 runtime.netpollBreakopen in new window 函数中断正在阻塞的网络轮询

每次增加新的计时器都会中断正在阻塞的轮询,触发调度器检查是否有计时器到期。

删除计时器

runtime.deltimeropen in new window 函数会标记需要删除的计时器,会根据以下的规则处理计时器:

  • timerWaiting -> timerModifying -> timerDeleted
  • timerModifiedEarlier -> timerModifying -> timerDeleted
  • timerModifiedLater -> timerModifying -> timerDeleted
  • 其他状态 -> 等待状态改变或者直接返回

修改计时器

runtime.modtimeropen in new window 会修改已经存在的计时器,它会根据以下的规则处理计时器:

  • timerWaiting -> timerModifying -> timerModifiedXX
  • timerModifiedXX -> timerModifying -> timerModifiedYY
  • timerNoStatus -> timerModifying -> timerWaiting
  • timerRemoved -> timerModifying -> timerWaiting
  • timerDeleted -> timerModifying -> timerWaiting
  • 其他状态 -> 等待状态改变
func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) bool {
	status := uint32(timerNoStatus)
	wasRemoved := false
loop:
	for {
		switch status = atomic.Load(&t.status); status {
			...
		}
	}

	t.period = period
	t.f = f
	t.arg = arg
	t.seq = seq

	if wasRemoved {
		t.when = when
		doaddtimer(pp, t)
		wakeNetPoller(when)
	} else {
		t.nextwhen = when
		newStatus := uint32(timerModifiedLater)
		if when < t.when {
			newStatus = timerModifiedEarlier
		}
		...
		if newStatus == timerModifiedEarlier {
			wakeNetPoller(when)
		}
	}
}

若待修改的计时器已经被删除,那么该函数会调用 runtime.doaddtimeropen in new window 创建新的计时器。在正常情况下会根据修改后的时间进行不同的处理:

  • 若修改后的时间大于或者等于修改前时间,设置计时器的状态为 timerModifiedLater
  • 若修改后的时间小于修改前时间,设置计时器的状态为 timerModifiedEarlier 并调用 runtime.netpollBreakopen in new window 触发调度器的重新调度

清除计时器

runtime.cleantimersopen in new window 函数会根据状态清理处理器队列头中的计时器,该函数会遵循以下的规则修改计时器的触发时间:

  • timerDeleted -> timerRemoving -> timerRemoved
  • timerModifiedXX -> timerMoving -> timerWaiting
func cleantimers(pp *p) bool {
	for {
		if len(pp.timers) == 0 {
			return true
		}
		t := pp.timers[0]
		switch s := atomic.Load(&t.status); s {
		case timerDeleted:
			atomic.Cas(&t.status, s, timerRemoving)
			dodeltimer0(pp)
			atomic.Cas(&t.status, timerRemoving, timerRemoved)
		case timerModifiedEarlier, timerModifiedLater:
			atomic.Cas(&t.status, s, timerMoving)

			t.when = t.nextwhen

			dodeltimer0(pp)
			doaddtimer(pp, t)
			atomic.Cas(&t.status, timerMoving, timerWaiting)
		default:
			return true
		}
	}
}

runtime.cleantimersopen in new window 函数只会处理计时器状态为 timerDeletedtimerModifiedEarliertimerModifiedLater 的情况:

  • 若计时器的状态为 timerDeleted
  • 若计时器的状态为 timerModifiedEarlier 或者 timerModifiedLater:

runtime.cleantimersopen in new window 会删除已经标记的计时器,修改状态为 timerModifiedXX 的计时器.

调整计时器

runtime.adjusttimersopen in new window会删除堆中的计时器并修改状态为 timerModifiedEarliertimerModifiedLater 的计时器的时间,处理计时器状态:

  • timerDeleted -> timerRemoving -> timerRemoved
  • timerModifiedXX -> timerMoving -> timerWaiting
func adjusttimers(pp *p, now int64) {
	var moved []*timer
loop:
	for i := 0; i < len(pp.timers); i++ {
		t := pp.timers[i]
		switch s := atomic.Load(&t.status); s {
		case timerDeleted:
			// 删除堆中的计时器
		case timerModifiedEarlier, timerModifiedLater:
			// 修改计时器的时间
		case ...
		}
	}
	if len(moved) > 0 {
		addAdjustedTimers(pp, moved)
	}
}

运行计时器

runtime.runtimeropen in new window 函数会检查处理器四叉堆上最顶上的计时器,该函数也会处理计时器的删除以及计时器时间的更新,它会遵循以下的规则处理计时器:

  • timerNoStatus -> 崩溃:未初始化的计时器
  • timerWaiting
    • -> timerWaiting
    • -> timerRunning -> timerNoStatus
    • -> timerRunning -> timerWaiting
  • timerModifying -> 等待状态改变
  • timerModifiedXX -> timerMoving -> timerWaiting
  • timerDeleted -> timerRemoving -> timerRemoved
  • timerRunning -> 崩溃:并发调用该函数
  • timerRemovedtimerRemovingtimerMoving -> 崩溃:计时器堆不一致
func runtimer(pp *p, now int64) int64 {
	for {
		t := pp.timers[0]
		switch s := atomic.Load(&t.status); s {
		case timerWaiting:
			if t.when > now {
				return t.when
			}
			atomic.Cas(&t.status, s, timerRunning)
			runOneTimer(pp, t, now)
			return 0
		case timerDeleted:
			// 删除计时器
		case timerModifiedEarlier, timerModifiedLater:
			// 修改计时器的时间
		case ...
		}
	}
}

若处理器四叉堆顶部的计时器没有到触发时间会直接返回,否则调用 runtime.runOneTimeropen in new window 运行堆顶的计时器:

func runOneTimer(pp *p, t *timer, now int64) {
	f := t.f
	arg := t.arg
	seq := t.seq

	if t.period > 0 {
		delta := t.when - now
		t.when += t.period * (1 + -delta/t.period)
		siftdownTimer(pp.timers, 0)
		atomic.Cas(&t.status, timerRunning, timerWaiting)
		updateTimer0When(pp)
	} else {
		dodeltimer0(pp)
		atomic.Cas(&t.status, timerRunning, timerNoStatus)
	}

	unlock(&pp.timersLock)
	f(arg, seq)
	lock(&pp.timersLock)
}

根据计时器的 period 字段,上述函数会做出不同的处理:

更新计时器之后,上述函数会运行计时器中存储的函数并传入触发时间等参数。

6.3.4 触发计时器

Golang 会在两个模块中触发计时器:

  1. 调度器调度时会检查处理器中的计时器是否准备就绪
  2. 系统监控会检查是否有未执行的到期计时器

调度器

调度器使用runtime.checkTimersopen in new window来运行处理器中计时器的函数,它会在发生以下情况时被调用:

runtime.checkTimersopen in new window的流程如下:

  • 调整计时器:
    • 若处理器中不存在需要调整的计时器
      • 当没有需要执行的计时器时,直接返回
      • 当下一个计时器没有到期并且需要删除的计时器较少时都会直接返回;
    • 若处理器中存在需要调整的计时器,会调用 runtime.adjusttimersopen in new window
  • 运行计时器:
    • 调整了堆中的计时器之后,会通过 runtime.runtimeropen in new window 依次查找堆中是否存在需要执行的计时器:
      • 若存在,直接运行计时器
      • 若不存在,获取最新计时器的触发时间
  • 删除计时器: 若当前 Goroutine 的处理器和传入的处理器相同,并且处理器中删除的计时器是堆中计时器的 1/4 以上,就会调用 runtime.clearDeletedTimersopen in new window 删除处理器全部被标记为 timerDeleted 的计时器,保证堆中靠后的计时器被删除。

系统监控

系统监控函数 runtime.sysmonopen in new window 也可能会触发函数的计时器:

func sysmon() {
	...
	for {
		...
		now := nanotime()
		next, _ := timeSleepUntil()
		...
		lastpoll := int64(atomic.Load64(&sched.lastpoll))
		if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
			atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
			list := netpoll(0)
			if !list.empty() {
				incidlelocked(-1)
				injectglist(&list)
				incidlelocked(1)
			}
		}
		if next < now {
			startm(nil, false)
		}
		...
}
  1. 调用 runtime.timeSleepUntilopen in new window 获取计时器的到期时间以及持有该计时器的堆
  2. 若超过 10ms 的时间没有轮询,调用 runtime.netpollopen in new window 轮询网络
  3. 若当前有应该运行的计时器没有执行,可能存在无法被抢占的处理器,这时我们应该启动新的线程处理计时器

runtime.timeSleepUntilopen in new window 会遍历运行时的全部处理器并查找下一个需要执行的计时器。

Reference

  1. https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-timer/open in new window
上次编辑于:
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.15.2