同步原语与锁

Kesa2023年10月10日...大约 7 分钟golang

1. Mutex

数据结构

type Mutex struct {
	state int32
	sema  uint32
}

状态

互斥锁的状态通过state的二进制位表示,默认状态所有位为0:

golang-mutex-state
golang-mutex-state

模式

加锁

func (m *Mutex) Lock() {
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		return
	}
	m.lockSlow()
}

sync.Mutex.lockSlowopen in new window流程如下:

  1. 判断当前 Goroutine 能否进入自旋
  2. 通过自旋等待互斥锁的释放
  3. 计算互斥锁的最新状态
  4. 更新互斥锁的状态并获取锁

解锁

func (m *Mutex) Unlock() {
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		m.unlockSlow(new)
	}
}

慢解锁流程:

小结

加锁过程:

解锁过程:

2. RWMutex

数据结构

type RWMutex struct {
	w           Mutex
	writerSem   uint32
	readerSem   uint32
	readerCount int32
	readerWait  int32
}

写锁

获取

func (rw *RWMutex) Lock() {
	rw.w.Lock()
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
}
  1. 获取写锁,若已被获取则等待
  2. 阻塞后续读操作
  3. 等待所有的读操作结束后,唤醒当前 goroutine

获取写锁时会先阻塞写锁的获取,后阻塞读锁的获取,这种策略能够保证读操作不会被连续的写操作『饿死』。

释放

func (rw *RWMutex) Unlock() {
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		throw("sync: Unlock of unlocked RWMutex")
	}
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	rw.w.Unlock()
}
  1. reaerCount 变为正数释放读锁
  2. 释放陷入因读锁陷入等待的 Gorotuine
  3. 释放写锁

读锁

获取

func (rw *RWMutex) RLock() {
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
}
  1. readerCount 若为负数,表示存在写锁,等待写锁释放

释放

func (rw *RWMutex) RUnlock() {
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		rw.rUnlockSlow(r)
	}
}
  1. readerCount 减一
  2. 获取 readCount结果
    • 小于零,存在写锁,进入慢解锁
    • 大于等于零,直接解锁

慢解锁:

func (rw *RWMutex) rUnlockSlow(r int32) {
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		throw("sync: RUnlock of unlocked RWMutex")
	}
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

3. WaitGroup

数据结构

type WaitGroup struct {
	noCopy noCopy
	state1 [3]uint32
}

防止拷贝

sync.noCopyopen in new window 是一个特殊的私有结构体,tools/go/analysis/passes/copylockopen in new window 包中的分析器会在编译期间检查被拷贝的变量中是否包含 sync.noCopyopen in new window 或者实现了 LockUnlock 方法,若包含该结构体或者实现了对应的方法就会报出以下错误:

func main() {
	wg := sync.WaitGroup{}
	yawg := wg
	fmt.Println(wg, yawg)
}

$ go vet proc.go
./prog.go:10:10: assignment copies lock value to yawg: sync.WaitGroup
./prog.go:11:14: call of fmt.Println copies lock value: sync.WaitGroup
./prog.go:11:18: call of fmt.Println copies lock value: sync.WaitGroup

状态

state1是总共占用 12 字节的数组,存储当前结构体的状态,在64位和32位机上表现不同:

golang-waitgroup-state
golang-waitgroup-state

Add

func (wg *WaitGroup) Add(delta int) {
	statep, semap := wg.state()
	state := atomic.AddUint64(statep, uint64(delta)<<32)
	v := int32(state >> 32)
	w := uint32(state)
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
	if v > 0 || w == 0 {
		return
	}
	*statep = 0
	for ; w != 0; w-- {
		runtime_Semrelease(semap, false, 0)
	}
}

Done

实际复用Add(-1)

Wait

func (wg *WaitGroup) Wait() {
	statep, semap := wg.state()
	for {
		state := atomic.LoadUint64(statep)
		v := int32(state >> 32)
		if v == 0 {
			return
		}
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
			runtime_Semacquire(semap)
			if +statep != 0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			return
		}
	}
}

4. Once

数据结构

type Once struct {
	done uint32
	m    Mutex
}

Do

func (o *Once) Do(f func()) {
	if atomic.LoadUint32(&o.done) == 0 {
		o.doSlow(f)
	}
}

func (o *Once) doSlow(f func()) {
	o.m.Lock()
	defer o.m.Unlock()
	if o.done == 0 {
		defer atomic.StoreUint32(&o.done, 1)
		f()
	}
}

5. Cond

数据结构

type Cond struct {
	noCopy  noCopy
	L       Locker
	notify  notifyList
	checker copyChecker
}
type notifyList struct {
	wait uint32
	notify uint32

	lock mutex
	head *sudog
	tail *sudog
}

Wait

func (c *Cond) Wait() {
	c.checker.check()
	t := runtime_notifyListAdd(&c.notify) // runtime.notifyListAdd 的链接名
	c.L.Unlock()
	runtime_notifyListWait(&c.notify, t) // runtime.notifyListWait 的链接名
	c.L.Lock()
}

func notifyListAdd(l *notifyList) uint32 {
	return atomic.Xadd(&l.wait, 1) - 1
}

使当前 goroutine 陷入休眠:

  1. 等待计数器加一
  2. 解锁
  3. 等待唤醒
  4. 加锁

runtime_notifyListWait

func notifyListWait(l *notifyList, t uint32) {
	s := acquireSudog()
	s.g = getg()
	s.ticket = t
	if l.tail == nil {
		l.head = s
	} else {
		l.tail.next = s
	}
	l.tail = s
	goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
	releaseSudog(s)
}

获取当前 goroutine 并将其追加至通知链表的尾部

Singal

唤醒等待队列的最前面的 goroutine

func (c *Cond) Signal() {
	c.checker.check()
	runtime_notifyListNotifyOne(&c.notify)
}

Broadcase

唤醒队列中全部的 goroutine

func (c *Cond) Broadcast() {
	c.checker.check()
	runtime_notifyListNotifyAll(&c.notify)
}

小结

Reference

  1. https://github.com/golang/go/tree/release-branch.go1.18/srcopen in new window
  2. https://draveness.me/golang/open in new window
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.15.2