6.2 同步原语

Kesa...大约 20 分钟golang

6.2.1 基本原语

golang-basic-sync-primitives
golang-basic-sync-primitives

Mutex

sync.Mutexopen in new window

type Mutex struct {
	state int32
	sema  uint32
}
  • state:当前互斥锁的状态
  • sema:控制锁状态的信号量

状态

互斥锁的状态通过state的二进制位表示,默认状态所有位为0:

golang-mutex-state
golang-mutex-state
  • mutexLocked锁定状态
  • mutexWoken:从正常模式被唤醒
  • mutexStarving:饥饿状态
  • waitersCount:当前锁上等待的 goroutine 的个数

正常模式和饥饿模式

sync.Mutexopen in new window由两种模式:

  1. 正常模式
  2. 饥饿模式
golang-mutex-mode
golang-mutex-mode

正常模式下,锁的等待者会按照先进先出 FIFO的顺序获取锁。

但是刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,以保证互斥锁的公平性。

饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。

新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,只会在队列的末尾等待。若一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。

加锁

加锁sync.Mutex.Lockopen in new window

func (m *Mutex) Lock() {
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		return
	}
	m.lockSlow()
}

sync.Mutex.lockSlowopen in new window的流程如下:

  1. 判断当前 Goroutine 能否进入自旋
  2. 通过自旋等待互斥锁的释放
  3. 计算互斥锁的最新状态
  4. 更新互斥锁的状态并获取锁
判断自旋和等待释放
func (m *Mutex) lockSlow() {
	var waitStartTime int64
	starving := false
	awoke := false
	iter := 0
	old := m.state
	for {
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			runtime_doSpin()
			iter++
			old = m.state
			continue
		}

自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。

在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用恰当会对性能带来很大的增益,但是使用的不恰当就会拖慢整个程序。

Goroutine 进入自旋的条件

  1. 互斥锁处于在普通模式
  2. runtime.sync_runtime_canSpinopen in new window函数返回值为 true:
    1. 运行在多 CPU 的机器上
    2. 当前 Goroutine 为了获取该锁进入自旋的次数小于四次
    3. 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空

进入自旋之后,调用runtime.sync_runtime_doSpinopen in new windowruntime.procyieldopen in new window执行 30 次的 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间:

func sync_runtime_doSpin() {
	procyield(active_spin_cnt)
}

TEXT runtime·procyield(SB),NOSPLIT,$0-0
	MOVL	cycles+0(FP), AX
again:
	PAUSE
	SUBL	$1, AX
	JNZ	again
	RET
计算最新状态

在 goroutine 自旋之后,计算当前互斥锁最新的状态:

new := old
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}
		if awoke {
			new &^= mutexWoken
		}
更新锁状态并获取锁

然后使用 CAS 函数sync/atomic.CompareAndSwapInt32open in new window更新状态:

if atomic.CompareAndSwapInt32(&m.state, old, new) {
			if old&(mutexLocked|mutexStarving) == 0 {
				break // 通过 CAS 函数获取了锁
			}
			...
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state
			if old&mutexStarving != 0 {
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				if !starving || old>>mutexWaiterShift == 1 {
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
				break
			}
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}
}

若没有获取锁,则调用 runtime.sync_runtime_SemacquireMutexopen in new window 通过信号量保证资源不会被两个 Goroutine 获取。

runtime.sync_runtime_SemacquireMutexopen in new window会在方法中不断尝试获取锁并陷入休眠等待信号量的释放,一旦当前 Goroutine 可以获取信号量,就会立刻返回:

  • 在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环
  • 在饥饿模式下,当前 Goroutine 会获得互斥锁,若等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出

解锁

sync.Mutex.Unlockopen in new window

func (m *Mutex) Unlock() {
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		m.unlockSlow(new)
	}
}

sync.Mutex.unlockSlowopen in new window先校验锁状态的合法性 — 若当前互斥锁已经被解锁过了会直接抛出异常。

在正常情况下会根据当前互斥锁的状态,分别处理正常模式和饥饿模式下的互斥锁:

func (m *Mutex) unlockSlow(new int32) {
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
	if new&mutexStarving == 0 { // 正常模式
		old := new
		for {
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else { // 饥饿模式
		runtime_Semrelease(&m.sema, true, 1)
	}
}

流程如下:

  • 正常模式下:
    • 若互斥锁不存在等待者或者互斥锁的 mutexLockedmutexStarvingmutexWoken 状态不都为 0,那么当前方法可以直接返回,不需要唤醒其他等待者
    • 若互斥锁存在等待者,会通过 sync.runtime_Semreleaseopen in new window 唤醒等待者并移交锁的所有权
  • 饥饿模式下: 直接调用 sync.runtime_Semreleaseopen in new window 将当前锁交给下一个正在尝试获取锁的等待者,等待者被唤醒后会得到锁,在这时互斥锁还不会退出饥饿状态

小结

加锁过程:

  • 若互斥锁处于初始化状态,会通过置位 mutexLocked 加锁

  • 若互斥锁处于 mutexLocked 状态并且在普通模式下工作,会进入自旋,执行 30 次 PAUSE 指令消耗 CPU 时间等待锁的释放

  • 若当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式

  • 互斥锁在正常情况下会通过 runtime.sync_runtime_SemacquireMutexopen in new window 将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒

  • 若当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,那么它会将互斥锁切换回正常模式

解锁过程:

  • 当互斥锁已经被解锁时,调用 sync.Mutex.Unlockopen in new window 会直接抛出异常
  • 当互斥锁处于饥饿模式时,将锁的所有权交给队列中的下一个等待者,等待者会负责设置 mutexLocked 标志位
  • 当互斥锁处于普通模式时:

RWMutex

sync.RWMutexopen in new window是细粒度的互斥锁,不限制资源的并发读,但是读写、写写操作无法并行执行;适用于读多写少的情景。

\
YN
NN

数据结构

sync.RWMutexopen in new window

type RWMutex struct {
	w           Mutex
	writerSem   uint32
	readerSem   uint32
	readerCount int32
	readerWait  int32
}
  • w:复用互斥锁
  • writerSem:写等待读信号
  • readerSem:读等待写信号
  • readerWait:当前操作被阻塞时,等待的读操作的个数

写锁

获取写锁,调用sync.RWMutex.Lockopen in new window

func (rw *RWMutex) Lock() {
	rw.w.Lock()
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
}
  1. 调用结构体持有的 sync.Mutexopen in new window 结构体的 sync.Mutex.Lockopen in new window 阻塞后续的写操作:
    • 因为互斥锁已经被获取,其他 Goroutine 在获取写锁时会进入自旋或者休眠
  2. 调用 sync/atomic.AddInt32open in new window 函数阻塞后续的读操作
  3. 若仍然有其他 Goroutine 持有互斥锁的读锁,该 Goroutine 会调用runtime.sync_runtime_SemacquireMutexopen in new window进入休眠状态等待所有读锁所有者执行结束后释放 writerSem 信号量将当前协程唤醒

释放写锁,调用sync.RWMutex.Unlockopen in new window

func (rw *RWMutex) Unlock() {
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		throw("sync: Unlock of unlocked RWMutex")
	}
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	rw.w.Unlock()
}
  1. 调用 sync/atomic.AddInt32open in new window 函数将 readerCount 变回正数,释放读锁
  2. 通过 for 循环释放所有因为获取读锁而陷入等待的 Goroutine
  3. 调用 sync.Mutex.Unlockopen in new window 释放写锁

获取写锁时会先阻塞写锁的获取,后阻塞读锁的获取,这种策略能够保证读操作不会被连续的写操作『饿死』。

读锁

获取读锁sync.RWMutex.RLockopen in new window

func (rw *RWMutex) RLock() {
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
}
  1. 若该方法返回负数, 则表示其他 Goroutine 获得了写锁,当前 Goroutine 就会调用 runtime.sync_runtime_SemacquireMutexopen in new window 陷入休眠等待锁的释放
  2. 若该方法的结果为非负数,则没有 Goroutine 获得写锁,当前方法会成功返回

释放读锁sync.RWMutex.RUnlockopen in new window

func (rw *RWMutex) RUnlock() {
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		rw.rUnlockSlow(r)
	}
}
  1. 减少正在读资源的 readerCount 整数
  2. 根据 sync/atomic.AddInt32open in new window 的返回值不同会分别进行处理:

sync.RWMutex.rUnlockSlowopen in new window

func (rw *RWMutex) rUnlockSlow(r int32) {
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		throw("sync: RUnlock of unlocked RWMutex")
	}
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}
  1. 减少获取锁的写操作等待的读操作数 readerWait
  2. 在所有读操作都被释放之后触发写操作的信号量 writerSem,唤醒尝试获取写锁的 Goroutine

小结

获取写锁:

  • 每次 sync.RWMutex.RUnlockopen in new window 都会将 readerCount 其减一,归零时该 Goroutine 会获得写锁
  • readerCount 减少 rwmutexMaxReaders 个数以阻塞后续的读操作

释放写锁:

  • 会先通知所有的读操作,然后才会释放持有的互斥锁

WaitGroup

golang-syncgroup
golang-syncgroup

sync.WaitGroupopen in new window

type WaitGroup struct {
	noCopy noCopy
	state1 [3]uint32
}

sync.noCopyopen in new window 是一个特殊的私有结构体,tools/go/analysis/passes/copylockopen in new window 包中的分析器会在编译期间检查被拷贝的变量中是否包含 sync.noCopyopen in new window 或者实现了 LockUnlock 方法,若包含该结构体或者实现了对应的方法就会报出以下错误:

func main() {
	wg := sync.WaitGroup{}
	yawg := wg
	fmt.Println(wg, yawg)
}

$ go vet proc.go
./prog.go:10:10: assignment copies lock value to yawg: sync.WaitGroup
./prog.go:11:14: call of fmt.Println copies lock value: sync.WaitGroup
./prog.go:11:18: call of fmt.Println copies lock value: sync.WaitGroup

state1是总共占用 12 字节的数组,存储当前结构体的状态,在64位和32位机上表现不同:

golang-waitgroup-state
golang-waitgroup-state

接口

sync.WaitGroupopen in new window有三个导出方法:

  1. sync.WaitGroup.Addopen in new window
  2. sync.WaitGroup.Doneopen in new window
  3. sync.WaitGroup.Waitopen in new window

sync.WaitGroup.Doneopen in new window

func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

sync.WaitGroup.Addopen in new window:

func (wg *WaitGroup) Add(delta int) {
	statep, semap := wg.state()
	state := atomic.AddUint64(statep, uint64(delta)<<32)
	v := int32(state >> 32)
	w := uint32(state)
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
	if v > 0 || w == 0 {
		return
	}
	*statep = 0
	for ; w != 0; w-- {
		runtime_Semrelease(semap, false, 0)
	}
}

更新 sync.WaitGroupopen in new window 中的计数器 counter

sync.WaitGroup.Waitopen in new window

func (wg *WaitGroup) Wait() {
	statep, semap := wg.state()
	for {
		state := atomic.LoadUint64(statep)
		v := int32(state >> 32)
		if v == 0 {
			return
		}
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
			runtime_Semacquire(semap)
			if +statep != 0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			return
		}
	}
}

小结

对于sync.WaitGroupopen in new window

Once

sync.Onceopen in new window可以保证在 Go 程序运行期间的某段代码只会执行一次:

func main() {
    o := &sync.Once{}
    for i := 0; i < 10; i++ {
        o.Do(func() {
            fmt.Println("only once")
        })
    }
}

$ go run main.go
only once

数据结构

type Once struct {
	done uint32
	m    Mutex
}
  • done:表示代码是否执行过
  • m:互斥锁

接口

只有一个导出的sync.Once.Doopen in new window,接收一个入参为空的函数:

func (o *Once) Do(f func()) {
	if atomic.LoadUint32(&o.done) == 0 {
		o.doSlow(f)
	}
}

func (o *Once) doSlow(f func()) {
	o.m.Lock()
	defer o.m.Unlock()
	if o.done == 0 {
		defer atomic.StoreUint32(&o.done, 1)
		f()
	}
}
  1. 为当前 Goroutine 获取互斥锁
  2. 执行传入的无入参函数
  3. 运行延迟函数调用,将成员变量 done 更新成 1
  4. 通过done来确保函数只会被执行一次

小结

使用sync.Once.Doopen in new window时:

Cond

sync.Condopen in new window可以让一组的 Goroutine 都在满足特定条件时被唤醒。

初始化Cond时需要传入互斥锁:

var status int64

func main() {
	c := sync.NewCond(&sync.Mutex{})
	for i := 0; i < 10; i++ {
		go listen(c)
	}
	time.Sleep(1 * time.Second)
	go broadcast(c)

	ch := make(chan os.Signal, 1)
	signal.Notify(ch, os.Interrupt)
	<-ch
}

func broadcast(c *sync.Cond) {
	c.L.Lock()
	atomic.StoreInt64(&status, 1)
	c.Broadcast()
	c.L.Unlock()
}

func listen(c *sync.Cond) {
	c.L.Lock()
	for atomic.LoadInt64(&status) != 1 {
		c.Wait()
	}
	fmt.Println("listen")
	c.L.Unlock()
}

$ go run main.go
listen
...
listen
golang-cond-broadcast
golang-cond-broadcast

数据结构

type Cond struct {
	noCopy  noCopy
	L       Locker
	notify  notifyList
	checker copyChecker
}
  • noCopy : 用于保证结构体不会在编译期间拷贝
  • copyChecker : 用于禁止运行期间发生的拷贝
  • L : 用于保护内部的 notify 字段,Locker 接口类型的变量
  • notify : Goroutine 的链表,它是实现同步机制的核心结构
type notifyList struct {
	wait uint32
	notify uint32

	lock mutex
	head *sudog
	tail *sudog
}
  • head:链表头
  • tail:链表尾
  • wait:当前正在等待的 goroutine 索引
  • notify:已经通知到的 goroutine 索引

接口

sync.Cond.Waitopen in new window会将当前 Goroutine 陷入休眠状态,执行过程分成以下两个步骤:

  1. 调用 runtime.notifyListAddopen in new window 将等待计数器加一并解锁
  2. 调用 runtime.notifyListWaitopen in new window 等待其他 Goroutine 的唤醒并加锁
func (c *Cond) Wait() {
	c.checker.check()
	t := runtime_notifyListAdd(&c.notify) // runtime.notifyListAdd 的链接名
	c.L.Unlock()
	runtime_notifyListWait(&c.notify, t) // runtime.notifyListWait 的链接名
	c.L.Lock()
}

func notifyListAdd(l *notifyList) uint32 {
	return atomic.Xadd(&l.wait, 1) - 1
}

runtime.notifyListWaitopen in new window获取当前 Goroutine 并将它追加到 Goroutine 通知链表的最末端:

func notifyListWait(l *notifyList, t uint32) {
	s := acquireSudog()
	s.g = getg()
	s.ticket = t
	if l.tail == nil {
		l.head = s
	} else {
		l.tail.next = s
	}
	l.tail = s
	goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
	releaseSudog(s)
}

然后调用runtime.goparkunlockopen in new window将当前 Goroutine 陷入休眠,直接让出当前处理器的使用权并等待调度器的唤醒。

golang-cond-notifylist
golang-cond-notifylist

sync.Cond.Signalopen in new windowsync.Cond.Broadcastopen in new window用于唤醒休眠的 Goroutine :

func (c *Cond) Signal() {
	c.checker.check()
	runtime_notifyListNotifyOne(&c.notify)
}

func (c *Cond) Broadcast() {
	c.checker.check()
	runtime_notifyListNotifyAll(&c.notify)
}

runtime.notifyListNotifyOneopen in new windowsync.notifyListopen in new window 链表中找到满足 sudog.ticket == l.notify 条件的 Goroutine 并通过 runtime.readyWithTimeopen in new window 唤醒:

func notifyListNotifyOne(l *notifyList) {
	t := l.notify
	atomic.Store(&l.notify, t+1)

	for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
		if s.ticket == t {
			n := s.next
			if p != nil {
				p.next = n
			} else {
				l.head = n
			}
			if n == nil {
				l.tail = p
			}
			s.next = nil
			readyWithTime(s, 4)
			return
		}
	}
}

runtime.notifyListNotifyAllopen in new window 会依次通过 runtime.readyWithTimeopen in new window 唤醒链表中 Goroutine:

func notifyListNotifyAll(l *notifyList) {
	s := l.head
	l.head = nil
	l.tail = nil

	atomic.Store(&l.notify, atomic.Load(&l.wait))

	for s != nil {
		next := s.next
		s.next = nil
		readyWithTime(s, 4)
		s = next
	}
}

Goroutine 的唤醒顺序也是按照加入队列先后顺序,先加入的会先被唤醒,而后加入的可能 Goroutine 需要等待调度器的调度。

小结

使用sync.Condopen in new window时:

6.2.2 扩展原语

ErrGroup

golang/sync/errgroup.Groupopen in new window在一组 Goroutine 中提供了同步、错误传播以及上下文取消的功能,例如:

var g errgroup.Group
var urls = []string{
    "http://www.golang.org/",
    "http://www.google.com/",
}
for i := range urls {
    url := urls[i]
    g.Go(func() error {
        resp, err := http.Get(url)
        if err == nil {
            resp.Body.Close()
        }
        return err
    })
}
if err := g.Wait(); err == nil {
    fmt.Println("Successfully fetched all URLs.")
}

数据结构

type Group struct {
	cancel func()

	wg sync.WaitGroup

	errOnce sync.Once
	err     error
}
  • cancel:创建 context.Contextopen in new window 时返回的取消函数,用于在多个 Goroutine 之间同步取消信号
  • wg:用于等待一组 Goroutine 完成子任务的同步原语
  • errOnce:用于保证只接收一个子任务返回的错误

接口

golang/sync/errgroup.WithContextopen in new window创建新的 golang/sync/errgroup.Groupopen in new window 结构体:

func WithContext(ctx context.Context) (*Group, context.Context) {
	ctx, cancel := context.WithCancel(ctx)
	return &Group{cancel: cancel}, ctx
}

golang/sync/errgroup.Group.Goopen in new window用于运行新的子任务,执行流程如下:

  1. 调用 sync.WaitGroup.Addopen in new window 增加待处理的任务
  2. 创建新的 Goroutine 并运行子任务
  3. 返回错误时及时调用 cancel 并对 err 赋值,只有最早返回的错误才会被上游感知到,后续的错误都会被舍弃
func (g *Group) Go(f func() error) {
	g.wg.Add(1)

	go func() {
		defer g.wg.Done()

		if err := f(); err != nil {
			g.errOnce.Do(func() {
				g.err = err
				if g.cancel != nil {
					g.cancel()
				}
			})
		}
	}()
}

func (g *Group) Wait() error {
	g.wg.Wait()
	if g.cancel != nil {
		g.cancel()
	}
	return g.err
}

小结

使用是需要注意:

Semaphore

golang/sync/semaphore.Weightedopen in new window实现了带权重的信号量,有四个导出方法:

  1. golang/sync/semaphore.NewWeightedopen in new window 创建新的信号量
  2. golang/sync/semaphore.Weighted.Acquireopen in new window 阻塞地获取指定权重的资源,若当前没有空闲资源,会陷入休眠等待
  3. golang/sync/semaphore.Weighted.TryAcquireopen in new window 非阻塞地获取指定权重的资源,若当前没有空闲资源,会直接返回 false
  4. golang/sync/semaphore.Weighted.Releaseopen in new window 用于释放指定权重的资源

数据结构

func NewWeighted(n int64) *Weighted {
	w := &Weighted{size: n}
	return w
}

type Weighted struct {
	size    int64
	cur     int64
	mu      sync.Mutex
	waiters list.List
}
  • waiters:等待获取资源的 Goroutine

  • size:信号量的上限

  • cur:计数器,范围[0, size]

golang-semaphore
golang-semaphore

获取

golang/sync/semaphore.Weighted.Acquireopen in new window用于获取指定权重的资源,其中包含三种不同情况:

  1. 当信号量中剩余的资源大于获取的资源并且没有等待的 Goroutine 时,会直接获取信号量
  2. 当需要获取的信号量大于 golang/sync/semaphore.Weightedopen in new window 的上限时,由于不可能满足条件会直接返回错误
  3. 遇到其他情况时会将当前 Goroutine 加入到等待列表并通过 select 等待调度器唤醒当前 Goroutine,Goroutine 被唤醒后会获取信号量
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
	if s.size-s.cur >= n && s.waiters.Len() == 0 {
		s.cur += n
		return nil
	}

	...
	ready := make(chan struct{})
	w := waiter{n: n, ready: ready}
	elem := s.waiters.PushBack(w)
	select {
	case <-ctx.Done():
		err := ctx.Err()
		select {
		case <-ready:
			err = nil
		default:
			s.waiters.Remove(elem)
		}
		return err
	case <-ready:
		return nil
	}
}

golang/sync/semaphore.Weighted.TryAcquireopen in new window非阻塞地判断当前信号量是否有充足的资源,若有充足的资源会直接立刻返回 true,否则会返回 false

func (s *Weighted) TryAcquire(n int64) bool {
	s.mu.Lock()
	success := s.size-s.cur >= n && s.waiters.Len() == 0
	if success {
		s.cur += n
	}
	s.mu.Unlock()
	return success
}

释放

golang/sync/semaphore.Weighted.Releaseopen in new window从头到尾遍历 waiters 列表中全部的等待者,若释放资源后的信号量有充足的剩余资源就会通过 Channel 唤起指定的 Goroutine:

func (s *Weighted) Release(n int64) {
	s.mu.Lock()
	s.cur -= n
	for {
		next := s.waiters.Front()
		if next == nil {
			break
		}
		w := next.Value.(waiter)
		if s.size-s.cur < w.n {
			break
		}
		s.cur += w.n
		s.waiters.Remove(next)
		close(w.ready)
	}
	s.mu.Unlock()
}

小结

使用时注意:

SingleFlight

golang/sync/singleflight.Groupopen in new window能够在一个服务中抑制对下游的多次重复请求。

常见的使用场景是:我们在使用 Redis 对数据库中的数据进行缓存,发生缓存击穿时,大量的流量都会打到数据库上进而影响服务的尾延时。

golang-query-without-single-flight
golang-query-without-single-flight

golang/sync/singleflight.Groupopen in new window能够限制对同一个键值对的多次重复请求,减少对下游的瞬时流量。

golang-extension-single-flight
golang-extension-single-flight

在资源的获取非常昂贵时(例如:访问缓存、数据库),就很适合使用。

例如:

type service struct {
    requestGroup singleflight.Group
}

func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) {
    v, err, _ := requestGroup.Do(request.Hash(), func() (interface{}, error) {
        rows, err := // select * from tables
        if err != nil {
            return nil, err
        }
        return rows, nil
    })
    if err != nil {
        return nil, err
    }
    return Response{
        rows: rows,
    }, nil
}

结构体

type Group struct {
	mu sync.Mutex
	m  map[string]*call
}

type call struct {
	wg sync.WaitGroup

	val interface{}
	err error

	dups  int
	chans []chan<- Result
}
  • mu:互斥锁
  • m:哈希表,call保存当前调用的信息

golang/sync/singleflight.callopen in new window 结构体中的 valerr 字段都只会在执行传入的函数时赋值一次并在 sync.WaitGroup.Waitopen in new window 返回时被读取;dupschans 两个字段分别存储了抑制的请求数量以及用于同步结果的 Channel。

接口

调用golang/sync/singleflight.Group.Doopen in new window时都会获取互斥锁,随后判断是否已经存在键对应的 golang/sync/singleflight.callopen in new window

  1. 当不存在对应的 golang/sync/singleflight.callopen in new window 时:
    1. 初始化一个新的 golang/sync/singleflight.callopen in new window 指针
    2. 增加 sync.WaitGroupopen in new window 持有的计数器
    3. golang/sync/singleflight.callopen in new window 指针添加到映射表
    4. 释放持有的互斥锁
    5. 阻塞地调用 golang/sync/singleflight.Group.doCallopen in new window 方法等待结果的返回
  2. 当存在对应的 golang/sync/singleflight.callopen in new window 时:
    1. 增加 dups 计数器,它表示当前重复的调用次数
    2. 释放持有的互斥锁
    3. 通过 sync.WaitGroup.Waitopen in new window 等待请求的返回
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
		c.dups++
		g.mu.Unlock()
		c.wg.Wait()
		return c.val, c.err, true
	}
	c := new(call)
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	g.doCall(c, key, fn)
	return c.val, c.err, c.dups > 0
}
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
	c.val, c.err = fn()
	c.wg.Done()

	g.mu.Lock()
	delete(g.m, key)
	for _, ch := range c.chans {
		ch <- Result{c.val, c.err, c.dups > 0}
	}
	g.mu.Unlock()
}

doCall的流程如下:

  1. 运行传入的函数 fn,该函数的返回值会赋值给 c.valc.err
  2. 调用 sync.WaitGroup.Doneopen in new window 方法通知所有等待结果的 Goroutine — 当前函数已经执行完成,可以从 call 结构体中取出返回值并返回了;
  3. 获取持有的互斥锁并通过管道将信息同步给使用 golang/sync/singleflight.Group.DoChanopen in new window 方法的 Goroutine

小结

当需要减少对下游的相同请求时,可以使用 golang/sync/singleflight.Groupopen in new window 来增加吞吐量和服务质量,使用时注意:

Reference

  1. https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives/open in new window
上次编辑于:
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.15.2