6.2 同步原语
6.2.1 基本原语
Mutex
type Mutex struct {
state int32
sema uint32
}
state
:当前互斥锁的状态sema
:控制锁状态的信号量
状态
互斥锁的状态通过state
的二进制位表示,默认状态所有位为0:
mutexLocked
:锁定状态mutexWoken
:从正常模式被唤醒mutexStarving
:饥饿状态waitersCount
:当前锁上等待的 goroutine 的个数
正常模式和饥饿模式
sync.Mutex
由两种模式:
- 正常模式
- 饥饿模式
在正常模式下,锁的等待者会按照先进先出 FIFO的顺序获取锁。
但是刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,以保证互斥锁的公平性。
在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。
新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,只会在队列的末尾等待。若一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。
加锁
func (m *Mutex) Lock() {
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
m.lockSlow()
}
- 当锁的状态是 0 时,将
mutexLocked
位置成 1 - 状态不是 0 时,
sync.Mutex.lockSlow
通过自旋(Spinnig)等方式等待锁的释放
sync.Mutex.lockSlow
的流程如下:
- 判断当前 Goroutine 能否进入自旋
- 通过自旋等待互斥锁的释放
- 计算互斥锁的最新状态
- 更新互斥锁的状态并获取锁
判断自旋和等待释放
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。
在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用恰当会对性能带来很大的增益,但是使用的不恰当就会拖慢整个程序。
Goroutine 进入自旋的条件:
- 互斥锁处于在普通模式
runtime.sync_runtime_canSpin
函数返回值为 true:- 运行在多 CPU 的机器上
- 当前 Goroutine 为了获取该锁进入自旋的次数小于四次
- 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空
进入自旋之后,调用runtime.sync_runtime_doSpin
和 runtime.procyield
执行 30 次的 PAUSE
指令,该指令只会占用 CPU 并消耗 CPU 时间:
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
计算最新状态
在 goroutine 自旋之后,计算当前互斥锁最新的状态:
new := old
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
new &^= mutexWoken
}
更新锁状态并获取锁
然后使用 CAS 函数sync/atomic.CompareAndSwapInt32
更新状态:
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // 通过 CAS 函数获取了锁
}
...
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
}
若没有获取锁,则调用 runtime.sync_runtime_SemacquireMutex
通过信号量保证资源不会被两个 Goroutine 获取。
runtime.sync_runtime_SemacquireMutex
会在方法中不断尝试获取锁并陷入休眠等待信号量的释放,一旦当前 Goroutine 可以获取信号量,就会立刻返回:
- 在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环
- 在饥饿模式下,当前 Goroutine 会获得互斥锁,若等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出
解锁
func (m *Mutex) Unlock() {
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
- 若该
sync/atomic.AddInt32
返回的新状态等于 0,当前 Goroutine 就成功解锁 - 若该函数返回的新状态不等于 0,则调用
sync.Mutex.unlockSlow
开始慢解锁
sync.Mutex.unlockSlow
先校验锁状态的合法性 — 若当前互斥锁已经被解锁过了会直接抛出异常。
在正常情况下会根据当前互斥锁的状态,分别处理正常模式和饥饿模式下的互斥锁:
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 { // 正常模式
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else { // 饥饿模式
runtime_Semrelease(&m.sema, true, 1)
}
}
流程如下:
- 正常模式下:
- 若互斥锁不存在等待者或者互斥锁的
mutexLocked
、mutexStarving
、mutexWoken
状态不都为 0,那么当前方法可以直接返回,不需要唤醒其他等待者 - 若互斥锁存在等待者,会通过
sync.runtime_Semrelease
唤醒等待者并移交锁的所有权
- 若互斥锁不存在等待者或者互斥锁的
- 饥饿模式下: 直接调用
sync.runtime_Semrelease
将当前锁交给下一个正在尝试获取锁的等待者,等待者被唤醒后会得到锁,在这时互斥锁还不会退出饥饿状态
小结
加锁过程:
若互斥锁处于初始化状态,会通过置位
mutexLocked
加锁若互斥锁处于
mutexLocked
状态并且在普通模式下工作,会进入自旋,执行 30 次PAUSE
指令消耗 CPU 时间等待锁的释放若当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式
互斥锁在正常情况下会通过
runtime.sync_runtime_SemacquireMutex
将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒若当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,那么它会将互斥锁切换回正常模式
解锁过程:
- 当互斥锁已经被解锁时,调用
sync.Mutex.Unlock
会直接抛出异常 - 当互斥锁处于饥饿模式时,将锁的所有权交给队列中的下一个等待者,等待者会负责设置
mutexLocked
标志位 - 当互斥锁处于普通模式时:
- 若没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,会直接返回
- 其他情况下会通过
sync.runtime_Semrelease
唤醒对应的 Goroutine
RWMutex
sync.RWMutex
是细粒度的互斥锁,不限制资源的并发读,但是读写、写写操作无法并行执行;适用于读多写少的情景。
\ | 读 | 写 |
---|---|---|
读 | Y | N |
写 | N | N |
数据结构
type RWMutex struct {
w Mutex
writerSem uint32
readerSem uint32
readerCount int32
readerWait int32
}
w
:复用互斥锁writerSem
:写等待读信号readerSem
:读等待写信号readerWait
:当前操作被阻塞时,等待的读操作的个数
写锁
获取写锁,调用sync.RWMutex.Lock
:
func (rw *RWMutex) Lock() {
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
- 调用结构体持有的
sync.Mutex
结构体的sync.Mutex.Lock
阻塞后续的写操作:- 因为互斥锁已经被获取,其他 Goroutine 在获取写锁时会进入自旋或者休眠
- 调用
sync/atomic.AddInt32
函数阻塞后续的读操作 - 若仍然有其他 Goroutine 持有互斥锁的读锁,该 Goroutine 会调用
runtime.sync_runtime_SemacquireMutex
进入休眠状态等待所有读锁所有者执行结束后释放writerSem
信号量将当前协程唤醒
释放写锁,调用sync.RWMutex.Unlock
:
func (rw *RWMutex) Unlock() {
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
throw("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
rw.w.Unlock()
}
- 调用
sync/atomic.AddInt32
函数将readerCount
变回正数,释放读锁 - 通过 for 循环释放所有因为获取读锁而陷入等待的 Goroutine
- 调用
sync.Mutex.Unlock
释放写锁
获取写锁时会先阻塞写锁的获取,后阻塞读锁的获取,这种策略能够保证读操作不会被连续的写操作『饿死』。
读锁
获取读锁,sync.RWMutex.RLock
:
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
- 若该方法返回负数, 则表示其他 Goroutine 获得了写锁,当前 Goroutine 就会调用
runtime.sync_runtime_SemacquireMutex
陷入休眠等待锁的释放 - 若该方法的结果为非负数,则没有 Goroutine 获得写锁,当前方法会成功返回
释放读锁,sync.RWMutex.RUnlock
:
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
rw.rUnlockSlow(r)
}
}
- 减少正在读资源的
readerCount
整数 - 根据
sync/atomic.AddInt32
的返回值不同会分别进行处理:- 若返回值大于等于零 — 读锁直接解锁成功
- 若返回值小于零 — 有一个正在执行的写操作,在这时会调用
sync.RWMutex.rUnlockSlow
方法
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
throw("sync: RUnlock of unlocked RWMutex")
}
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
- 减少获取锁的写操作等待的读操作数
readerWait
- 在所有读操作都被释放之后触发写操作的信号量
writerSem
,唤醒尝试获取写锁的 Goroutine
小结
获取写锁:
- 每次
sync.RWMutex.RUnlock
都会将readerCount
其减一,归零时该 Goroutine 会获得写锁 - 将
readerCount
减少rwmutexMaxReaders
个数以阻塞后续的读操作
释放写锁:
- 会先通知所有的读操作,然后才会释放持有的互斥锁
WaitGroup
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}
noCopy
:保证sync.WaitGroup
不会被通过再赋值的方式拷贝state1
:存储状态和信号量
sync.noCopy
是一个特殊的私有结构体,tools/go/analysis/passes/copylock
包中的分析器会在编译期间检查被拷贝的变量中是否包含 sync.noCopy
或者实现了 Lock
和 Unlock
方法,若包含该结构体或者实现了对应的方法就会报出以下错误:
func main() {
wg := sync.WaitGroup{}
yawg := wg
fmt.Println(wg, yawg)
}
$ go vet proc.go
./prog.go:10:10: assignment copies lock value to yawg: sync.WaitGroup
./prog.go:11:14: call of fmt.Println copies lock value: sync.WaitGroup
./prog.go:11:18: call of fmt.Println copies lock value: sync.WaitGroup
state1
是总共占用 12 字节的数组,存储当前结构体的状态,在64位和32位机上表现不同:
接口
sync.WaitGroup
有三个导出方法:
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if v > 0 || w == 0 {
return
}
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
更新 sync.WaitGroup
中的计数器 counter
:
- 计数器只能是非负数
- 当计数器为负数就会发生程序崩溃
- 当计数器归零时,通过
sync.runtime_Semrelease
唤醒处于等待状态的 Goroutine
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
if v == 0 {
return
}
if atomic.CompareAndSwapUint64(statep, state, state+1) {
runtime_Semacquire(semap)
if +statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
- 在计数器大于 0 并且不存在等待的 Goroutine 时,调用
runtime.sync_runtime_Semacquire
陷入睡眠 - 计数器归零时,陷入睡眠状态的 Goroutine 会被唤醒,并且方法返回
小结
sync.WaitGroup
必须在sync.WaitGroup.Wait
方法返回之后才能被重新使用sync.WaitGroup.Done
只是对sync.WaitGroup.Add
方法的简单封装,- 可以向
sync.WaitGroup.Add
方法传入任意负数(需要保证计数器非负)快速将计数器归零以唤醒等待的 Goroutine - 可以同时有多个 Goroutine 等待当前
sync.WaitGroup
计数器的归零,这些 Goroutine 会被同时唤醒
Once
sync.Once
可以保证在 Go 程序运行期间的某段代码只会执行一次:
func main() {
o := &sync.Once{}
for i := 0; i < 10; i++ {
o.Do(func() {
fmt.Println("only once")
})
}
}
$ go run main.go
only once
数据结构
type Once struct {
done uint32
m Mutex
}
done
:表示代码是否执行过m
:互斥锁
接口
只有一个导出的sync.Once.Do
,接收一个入参为空的函数:
- 若传入的函数已经执行过,会直接返回
- 若传入的函数没有执行过,会调用
sync.Once.doSlow
执行传入的函数
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
- 为当前 Goroutine 获取互斥锁
- 执行传入的无入参函数
- 运行延迟函数调用,将成员变量
done
更新成 1 - 通过
done
来确保函数只会被执行一次
小结
使用sync.Once.Do
时:
sync.Once.Do
方法中传入的函数只会被执行一次- 两次调用
sync.Once.Do
方法传入不同的函数只会执行第一次调传入的函数
Cond
sync.Cond
可以让一组的 Goroutine 都在满足特定条件时被唤醒。
初始化Cond
时需要传入互斥锁:
var status int64
func main() {
c := sync.NewCond(&sync.Mutex{})
for i := 0; i < 10; i++ {
go listen(c)
}
time.Sleep(1 * time.Second)
go broadcast(c)
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
<-ch
}
func broadcast(c *sync.Cond) {
c.L.Lock()
atomic.StoreInt64(&status, 1)
c.Broadcast()
c.L.Unlock()
}
func listen(c *sync.Cond) {
c.L.Lock()
for atomic.LoadInt64(&status) != 1 {
c.Wait()
}
fmt.Println("listen")
c.L.Unlock()
}
$ go run main.go
listen
...
listen
数据结构
type Cond struct {
noCopy noCopy
L Locker
notify notifyList
checker copyChecker
}
noCopy
: 用于保证结构体不会在编译期间拷贝copyChecker
: 用于禁止运行期间发生的拷贝L
: 用于保护内部的notify
字段,Locker
接口类型的变量notify
: Goroutine 的链表,它是实现同步机制的核心结构
type notifyList struct {
wait uint32
notify uint32
lock mutex
head *sudog
tail *sudog
}
head
:链表头tail
:链表尾wait
:当前正在等待的 goroutine 索引notify
:已经通知到的 goroutine 索引
接口
sync.Cond.Wait
会将当前 Goroutine 陷入休眠状态,执行过程分成以下两个步骤:
- 调用
runtime.notifyListAdd
将等待计数器加一并解锁 - 调用
runtime.notifyListWait
等待其他 Goroutine 的唤醒并加锁
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify) // runtime.notifyListAdd 的链接名
c.L.Unlock()
runtime_notifyListWait(&c.notify, t) // runtime.notifyListWait 的链接名
c.L.Lock()
}
func notifyListAdd(l *notifyList) uint32 {
return atomic.Xadd(&l.wait, 1) - 1
}
runtime.notifyListWait
获取当前 Goroutine 并将它追加到 Goroutine 通知链表的最末端:
func notifyListWait(l *notifyList, t uint32) {
s := acquireSudog()
s.g = getg()
s.ticket = t
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
releaseSudog(s)
}
然后调用runtime.goparkunlock
将当前 Goroutine 陷入休眠,直接让出当前处理器的使用权并等待调度器的唤醒。
sync.Cond.Signal
和 sync.Cond.Broadcast
用于唤醒休眠的 Goroutine :
sync.Cond.Signal
方法会唤醒队列最前面的 Goroutinesync.Cond.Broadcast
方法会唤醒队列中全部的 Goroutine
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
runtime.notifyListNotifyOne
从 sync.notifyList
链表中找到满足 sudog.ticket == l.notify
条件的 Goroutine 并通过 runtime.readyWithTime
唤醒:
func notifyListNotifyOne(l *notifyList) {
t := l.notify
atomic.Store(&l.notify, t+1)
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
s.next = nil
readyWithTime(s, 4)
return
}
}
}
runtime.notifyListNotifyAll
会依次通过 runtime.readyWithTime
唤醒链表中 Goroutine:
func notifyListNotifyAll(l *notifyList) {
s := l.head
l.head = nil
l.tail = nil
atomic.Store(&l.notify, atomic.Load(&l.wait))
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
Goroutine 的唤醒顺序也是按照加入队列的先后顺序,先加入的会先被唤醒,而后加入的可能 Goroutine 需要等待调度器的调度。
小结
使用sync.Cond
时:
sync.Cond.Wait
在调用之前一定要使用获取互斥锁,否则会触发程序崩溃sync.Cond.Signal
唤醒的 Goroutine 都是队列最前面、等待最久的 Goroutinesync.Cond.Broadcast
按照一定顺序广播通知等待的全部 Goroutine
6.2.2 扩展原语
ErrGroup
golang/sync/errgroup.Group
在一组 Goroutine 中提供了同步、错误传播以及上下文取消的功能,例如:
var g errgroup.Group
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
}
for i := range urls {
url := urls[i]
g.Go(func() error {
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
return err
})
}
if err := g.Wait(); err == nil {
fmt.Println("Successfully fetched all URLs.")
}
golang/sync/errgroup.Group.Go
创建一个 Goroutine 并在其中执行传入的函数golang/sync/errgroup.Group.Wait
等待所有 Goroutine 全部返回:- 若返回错误,这一组 Goroutine 最少返回一个错误
- 若返回空值,所有 Goroutine 都成功执行
数据结构
type Group struct {
cancel func()
wg sync.WaitGroup
errOnce sync.Once
err error
}
cancel
:创建context.Context
时返回的取消函数,用于在多个 Goroutine 之间同步取消信号wg
:用于等待一组 Goroutine 完成子任务的同步原语errOnce
:用于保证只接收一个子任务返回的错误
接口
golang/sync/errgroup.WithContext
创建新的 golang/sync/errgroup.Group
结构体:
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
golang/sync/errgroup.Group.Go
用于运行新的子任务,执行流程如下:
- 调用
sync.WaitGroup.Add
增加待处理的任务 - 创建新的 Goroutine 并运行子任务
- 返回错误时及时调用
cancel
并对err
赋值,只有最早返回的错误才会被上游感知到,后续的错误都会被舍弃
func (g *Group) Go(f func() error) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
}
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}
小结
使用是需要注意:
golang/sync/errgroup.Group
在出现错误或者等待结束后会调用context.Context
的cancel
方法同步取消信号- 只有第一个出现的错误才会被返回,剩余的错误会被直接丢弃
Semaphore
golang/sync/semaphore.Weighted
实现了带权重的信号量,有四个导出方法:
golang/sync/semaphore.NewWeighted
创建新的信号量golang/sync/semaphore.Weighted.Acquire
阻塞地获取指定权重的资源,若当前没有空闲资源,会陷入休眠等待golang/sync/semaphore.Weighted.TryAcquire
非阻塞地获取指定权重的资源,若当前没有空闲资源,会直接返回false
golang/sync/semaphore.Weighted.Release
用于释放指定权重的资源
数据结构
func NewWeighted(n int64) *Weighted {
w := &Weighted{size: n}
return w
}
type Weighted struct {
size int64
cur int64
mu sync.Mutex
waiters list.List
}
waiters
:等待获取资源的 Goroutinesize
:信号量的上限cur
:计数器,范围[0, size]
获取
golang/sync/semaphore.Weighted.Acquire
用于获取指定权重的资源,其中包含三种不同情况:
- 当信号量中剩余的资源大于获取的资源并且没有等待的 Goroutine 时,会直接获取信号量
- 当需要获取的信号量大于
golang/sync/semaphore.Weighted
的上限时,由于不可能满足条件会直接返回错误 - 遇到其他情况时会将当前 Goroutine 加入到等待列表并通过
select
等待调度器唤醒当前 Goroutine,Goroutine 被唤醒后会获取信号量
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
return nil
}
...
ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w)
select {
case <-ctx.Done():
err := ctx.Err()
select {
case <-ready:
err = nil
default:
s.waiters.Remove(elem)
}
return err
case <-ready:
return nil
}
}
golang/sync/semaphore.Weighted.TryAcquire
非阻塞地判断当前信号量是否有充足的资源,若有充足的资源会直接立刻返回 true
,否则会返回 false
:
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock()
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n
}
s.mu.Unlock()
return success
}
释放
golang/sync/semaphore.Weighted.Release
从头到尾遍历 waiters
列表中全部的等待者,若释放资源后的信号量有充足的剩余资源就会通过 Channel 唤起指定的 Goroutine:
func (s *Weighted) Release(n int64) {
s.mu.Lock()
s.cur -= n
for {
next := s.waiters.Front()
if next == nil {
break
}
w := next.Value.(waiter)
if s.size-s.cur < w.n {
break
}
s.cur += w.n
s.waiters.Remove(next)
close(w.ready)
}
s.mu.Unlock()
}
小结
使用时注意:
golang/sync/semaphore.Weighted.Acquire
和golang/sync/semaphore.Weighted.TryAcquire
都可以用于获取资源,前者会阻塞地获取信号量,后者会非阻塞地获取信号量golang/sync/semaphore.Weighted.Release
方法会按照先进先出的顺序唤醒可以被唤醒的 Goroutine- 若一个 Goroutine 获取了较多地资源,由于
golang/sync/semaphore.Weighted.Release
的释放策略可能会等待比较长的时间
SingleFlight
golang/sync/singleflight.Group
能够在一个服务中抑制对下游的多次重复请求。
常见的使用场景是:我们在使用 Redis 对数据库中的数据进行缓存,发生缓存击穿时,大量的流量都会打到数据库上进而影响服务的尾延时。
golang/sync/singleflight.Group
能够限制对同一个键值对的多次重复请求,减少对下游的瞬时流量。
在资源的获取非常昂贵时(例如:访问缓存、数据库),就很适合使用。
例如:
type service struct {
requestGroup singleflight.Group
}
func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) {
v, err, _ := requestGroup.Do(request.Hash(), func() (interface{}, error) {
rows, err := // select * from tables
if err != nil {
return nil, err
}
return rows, nil
})
if err != nil {
return nil, err
}
return Response{
rows: rows,
}, nil
}
结构体
type Group struct {
mu sync.Mutex
m map[string]*call
}
type call struct {
wg sync.WaitGroup
val interface{}
err error
dups int
chans []chan<- Result
}
mu
:互斥锁m
:哈希表,call
保存当前调用的信息
golang/sync/singleflight.call
结构体中的 val
和 err
字段都只会在执行传入的函数时赋值一次并在 sync.WaitGroup.Wait
返回时被读取;dups
和 chans
两个字段分别存储了抑制的请求数量以及用于同步结果的 Channel。
接口
golang/sync/singleflight.Group.Do
: 同步等待的方法golang/sync/singleflight.Group.DoChan
: 返回 Channel 异步等待的方法
调用golang/sync/singleflight.Group.Do
时都会获取互斥锁,随后判断是否已经存在键对应的 golang/sync/singleflight.call
:
- 当不存在对应的
golang/sync/singleflight.call
时:- 初始化一个新的
golang/sync/singleflight.call
指针 - 增加
sync.WaitGroup
持有的计数器 - 将
golang/sync/singleflight.call
指针添加到映射表 - 释放持有的互斥锁
- 阻塞地调用
golang/sync/singleflight.Group.doCall
方法等待结果的返回
- 初始化一个新的
- 当存在对应的
golang/sync/singleflight.call
时:- 增加
dups
计数器,它表示当前重复的调用次数 - 释放持有的互斥锁
- 通过
sync.WaitGroup.Wait
等待请求的返回
- 增加
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
g.mu.Unlock()
}
doCall
的流程如下:
- 运行传入的函数
fn
,该函数的返回值会赋值给c.val
和c.err
; - 调用
sync.WaitGroup.Done
方法通知所有等待结果的 Goroutine — 当前函数已经执行完成,可以从call
结构体中取出返回值并返回了; - 获取持有的互斥锁并通过管道将信息同步给使用
golang/sync/singleflight.Group.DoChan
方法的 Goroutine
小结
当需要减少对下游的相同请求时,可以使用 golang/sync/singleflight.Group
来增加吞吐量和服务质量,使用时注意:
golang/sync/singleflight.Group.Do
和golang/sync/singleflight.Group.DoChan
一个用于同步阻塞调用传入的函数,一个用于异步调用传入的参数并通过 Channel 接收函数的返回值golang/sync/singleflight.Group.Forget
可以通知golang/sync/singleflight.Group
在持有的映射表中删除某个键,接下来对该键的调用就不会等待前面的函数返回了- 一旦调用的函数返回了错误,所有在等待的 Goroutine 也都会接收到同样的错误