3. sync
3.1 WaitGroup
WaitGroup
可以视作并发安全的计数器:
Add()
:添加计数次数Done()
:计数器减一Wait()
:阻塞直到计数器归零
func wg() {
hello := func(wg *sync.WaitGroup, id int) {
defer wg.Done()
fmt.Printf("Hello from %d\n", id)
}
numGreeters := 5
var wg sync.WaitGroup
wg.Add(numGreeters)
for i := 0; i < numGreeters; i++ {
go hello(&wg, i+1)
}
wg.Wait()
}
Hello from 5
Hello from 1
Hello from 2
Hello from 3
Hello from 4
wg.Add(numGreeters)
:设定并发计数defer wg.Done()
:保证并发结束时,计数器减一wg.Wait()
:goroutine 阻塞直到计数器归零
3.2 互斥锁和读写锁
Mutex 互斥锁
func mu() {
var count int
var lock sync.Mutex
increment := func() {
lock.Lock()
defer lock.Unlock()
count++
fmt.Printf("Incrementing: %d\n", count)
}
decrement := func() {
lock.Lock()
defer lock.Unlock()
count--
fmt.Printf("Decrementing: %d\n", count)
}
var arithmetic sync.WaitGroup
for i := 0; i < 5; i++ {
arithmetic.Add(1)
go func() {
defer arithmetic.Done()
increment()
}()
}
for i := 0; i < 5; i++ {
arithmetic.Add(1)
go func() {
defer arithmetic.Done()
decrement()
}()
}
arithmetic.Wait()
}
注意上述代码中使用defer
来解锁,这样能够保证即使后序代码出现了panic
也会将锁释放,以免导致死锁。
RWMutex 读写锁
使用读写锁可以更加细度的控制读写权限:
- 读锁和读锁之间不是互斥的,只要没有写锁,就可以获取读锁
- 写锁和写锁是互斥的
- 写锁和读锁是互斥的
读写锁适用于读多写少的场景。
例如:(备注:原书中的示例并不能体现RW锁的优势,我自己写了个)
func usingRWMu(count int) {
var wg sync.WaitGroup
wg.Add(count + 1)
var rw sync.RWMutex
go producer(&wg, &rw, count)
for i := 0; i < count; i++ {
go consumer(&wg, rw.RLocker())
}
wg.Wait()
}
func usingMu(count int) {
var wg sync.WaitGroup
wg.Add(count + 1)
var mu sync.Mutex
go producer(&wg, &mu, count)
for i := 0; i < count; i++ {
go consumer(&wg, &mu)
}
wg.Wait()
}
func producer(wg *sync.WaitGroup, lock sync.Locker, count int) {
defer wg.Done()
for i := 0; i < 5; i++ {
lock.Lock()
lock.Unlock()
}
}
func consumer(wg *sync.WaitGroup, lock sync.Locker) {
defer wg.Done()
defer lock.Unlock()
lock.Lock()
time.Sleep(1)
}
上述代码中:
producer
:生产者只有一个,且只写入五次数据consumer
:消费者会有多个,每次读取一次数据,读取时间为1纳秒
Benchmark:
func BenchmarkLock(b *testing.B) {
tests := []struct {
name string
f func(int)
}{
{name: "UsingMutex", f: usingMu},
{name: "UsingRWMutex", f: usingRWMu},
}
for k := 10; k <= 1000; k *= 10 {
for _, tt := range tests {
b.Run(fmt.Sprintf("%-15s_%.0e", tt.name, float64(k)), func(b *testing.B) {
for i := 0; i < b.N; i++ {
tt.f(k)
}
})
}
}
}
BenchmarkLock/UsingMutex______1e+01-12 8 150001075 ns/op 3139 B/op 29 allocs/op
BenchmarkLock/UsingRWMutex____1e+01-12 74 29271991 ns/op 1754 B/op 25 allocs/op
BenchmarkLock/UsingMutex______1e+02-12 1 1503635900 ns/op 43440 B/op 338 allocs/op
BenchmarkLock/UsingRWMutex____1e+02-12 75 29191513 ns/op 12575 B/op 209 allocs/op
BenchmarkLock/UsingMutex______1e+03-12 1 14825565800 ns/op 563904 B/op 3658 allocs/op
BenchmarkLock/UsingRWMutex____1e+03-12 68 27025094 ns/op 117468 B/op 2036 allocs/op
可以看出在读多写少的情境下,使用读写锁效率比互斥锁要高。
3.3 Cond
若想要goroutine在特定的条件下暂停运行,若使用一个无限循环for !condition{}
这样将会消耗一个CPU核心的所有周期,是非常低效的。
使用sync.Cond
可以高效的实现:
c := sync.NewCond(&sync.Mutex{})
c.L.Lock()
for !condition() {
c.Wait()
}
//... make use of condition ...
c.L.Unlock()
Cond.Singal()
和Cond.Broadcast()
当使用Cond.Wait
时,会将当前的goroutine加入FIFO列表中:
- 当调用
Cond.Signal
时会唤醒最先加入队列中的goroutine(也就是等待时间最长的) - 当调用
Cond.Broadcast()
时,会唤醒所有等待中的goroutine
func cond() {
var wg sync.WaitGroup
c := sync.NewCond(&sync.Mutex{})
wg.Add(4)
done := false
go reader("reader1", &wg, c, &done)
go reader("reader2", &wg, c, &done)
go reader("reader3", &wg, c, &done)
time.Sleep(3 * time.Second) // 确保消费者已经进入等待状态
go writer(&wg, c, &done)
wg.Wait()
}
func reader(name string, wg *sync.WaitGroup, c *sync.Cond, done *bool) {
defer wg.Done()
c.L.Lock()
for !*done {
fmt.Printf("%s is waiting\n", name)
c.Wait()
}
c.L.Unlock()
fmt.Printf("%s is reading\n", name)
}
func writer(wg *sync.WaitGroup, c *sync.Cond, done *bool) {
defer wg.Done()
c.L.Lock()
fmt.Println("start writing")
*done = true
fmt.Println("done")
c.L.Unlock()
fmt.Println("notify all")
c.Broadcast()
// 通知等待时间最长的那个
//fmt.Println("notify one")
//c.Signal()
}
// 调用 broadcast
reader3 is waiting
reader2 is waiting
reader1 is waiting
start writing
done
notify all
reader1 is reading
reader3 is reading
reader2 is reading
// 调用 signal
reader3 is waiting
reader1 is waiting
reader2 is waiting
start writing
done
notify one
reader3 is reading
fatal error: all goroutines are asleep - deadlock!
可以看出,Cond.Signal
唤醒了最先等待的reader3
,因为只唤醒了一个导致所有的goroutine均进入休眠,触发了死锁。
3.4 Once
sync.Once
的方法Once.Do()
能够保证函数只会被调用一次。
func once() {
var once1, once2 sync.Once
var count1, count2 int
incre1 := func() {
count1++
}
incre2 := func() {
count2++
}
decre := func() {
count2--
}
for i := 0; i < 5; i++ {
once1.Do(incre1)
}
fmt.Println(count1)
once2.Do(incre2)
once2.Do(decre)
fmt.Println(count2)
}
// output
1
1
once1.Do(incre1)
:在循环中执行了5次,但是incre1
只会被执行一次once2.Do
:分别传入不同的函数执行,只有incre2
执行了一次,后序的函数不会被执行
func onceDeadlock() {
var once1, once2 sync.Once
var f1, f2 func()
f1 = func() {
once1.Do(f2)
}
f2 = func() {
once2.Do(f1)
}
once1.Do(f1)
once2.Do(f2)
}
上述代码会引发死锁,因为执行f2
时,需要等待once1.Do(f2)
返回才可以,此时两者相互等待触发死锁。
3.5 Pool
Pool 模式是一种创建和提供可供使用的固定数量实例或Pool实例的方法。通常用于约束创建昂贵的场景(如数据库连接),以便创建固定数量的实例,但不确定数量的操作仍可请求访问这些场景。
Golang 中sync.Pool
可以用于实现自定义的对象池,:
Pool.New
:New
是Pool
的一个字段,类型为func() any
,用于创建新的对象Pool.Get() any
:Get 方法获取从 Pool 中获取一个对象,若没有则调用New
创建对象Pool.Put(any)
:将对象放回 Pool 中
使用要点:
- 使用
Pool.New
创建对象是线程安全的 - 使用
Pool.Get() any
获取对象时,不能对获取的对象状态做任何的假设(因为是不确定的) - 通常使用
defer pool.Put()
的形式以确保取出的对象被放回 Pool 中 - Pool 内的分布必须大致均匀
使用sync.Pool
有两个常见的场景:
- 创建对象比较昂贵,如内存占用大
- 创建对象耗时较长,提前创建对象缓存,可提高效率
例如:
func pool1() {
var numCreated int
pool := sync.Pool{
New: func() any {
numCreated++
a := make([]int, 1024)
return &a
},
}
var wg sync.WaitGroup
num := 1024 * 1024
wg.Add(num)
for i := 0; i < num; i++ {
go func() {
defer wg.Done()
a := pool.Get().(*[]int)
defer pool.Put(a)
// do something
}()
}
wg.Wait()
fmt.Println(numCreated, " slices created")
}
// ouput
25 slices created
由上述代码可以看出,若不使用 Pool ,将可能快速创建大量的切片,占用大量内存并使得GC产生大量负载。
3.6 Channel
Channel 按照是否有缓冲区可分为:
- 无缓冲 Channel;
make(chan Type), make(chan Type, 0)
- 有缓冲 Channel;
make(chan Type, size)
Channel 按照读写方向可分为:
- 双向Channel;
chan Type
- 只读Channel;
<-chan Type
- 只写Channel;
chan<- Type
从channel中读取数据有两种形式:
v := <-ch
;v, ok := <-ch
;ok
表示通道是否关闭,false
表示通道已关闭后读取的值
for-range
循环遍历channel
,会持续读取channel
中的数据(若没有则阻塞),直到channel
被关闭:
for range ch {}
for v := range ch {}
Channel的操作
下表给出在不同的channel的状态下不同的操作的结果:
操作 | Channel状态 | 结果 |
---|---|---|
Read | nil | 阻塞 |
打开 且 非空 | 输出值,(true) | |
打开 且 为空 | 阻塞 | |
关闭 | 默认零值,false | |
只写 | 编译错误 | |
Write | nil | 阻塞 |
打开 且 缓冲已满 | 阻塞 | |
打开 且 缓冲未满 | 写入值 | |
关闭 | panic | |
只读 | 编译错误 | |
Close | nil | panic |
打开 且 非空 | 关闭Channel;可继续读取,直到缓冲区为空,v 为默认零值,ok 为false | |
打开 且 为空 | 关闭Channel;可继续读取,v 为默认零值,ok 为false | |
关闭 | panic | |
只读 | 编译错误 |
3.7 select
select
语句结构:
select {
case ... : // ... case1
case ... : // ... case2
...
default:
// ... default
}
select
执行顺序:
- 若有
default
,则在无channel可用时直接执行default
- 若无
default
:- 在无channel可用时,阻塞当前goroutine
- 有多个chennel可用时,随机选择可用分支执行
3.8 GOMAXPROCS
通常GOMAXPROCS
和CPU的核心数相同。但在需要测试并发问题时,可以增大此值,使得出现竞争的概率增大,提前暴露问题。