3. sync

Kesa...大约 7 分钟golang

3.1 WaitGroup

WaitGroup可以视作并发安全的计数器:

  • Add():添加计数次数
  • Done():计数器减一
  • Wait():阻塞直到计数器归零
func wg() {
	hello := func(wg *sync.WaitGroup, id int) {
		defer wg.Done()
		fmt.Printf("Hello from %d\n", id)
	}

	numGreeters := 5
	var wg sync.WaitGroup
	wg.Add(numGreeters)
	for i := 0; i < numGreeters; i++ {
		go hello(&wg, i+1)
	}
	wg.Wait()
}
Hello from 5
Hello from 1
Hello from 2
Hello from 3
Hello from 4
  • wg.Add(numGreeters):设定并发计数
  • defer wg.Done():保证并发结束时,计数器减一
  • wg.Wait():goroutine 阻塞直到计数器归零

3.2 互斥锁和读写锁

Mutex 互斥锁


func mu() {
	var count int
	var lock sync.Mutex

	increment := func() {
		lock.Lock()
		defer lock.Unlock()
		count++
		fmt.Printf("Incrementing: %d\n", count)
	}

	decrement := func() {
		lock.Lock()
		defer lock.Unlock()
		count--
		fmt.Printf("Decrementing: %d\n", count)
	}

	var arithmetic sync.WaitGroup
	for i := 0; i < 5; i++ {
		arithmetic.Add(1)
		go func() {
			defer arithmetic.Done()
			increment()
		}()
	}

	for i := 0; i < 5; i++ {
		arithmetic.Add(1)
		go func() {
			defer arithmetic.Done()
			decrement()
		}()
	}

	arithmetic.Wait()
}

注意上述代码中使用defer来解锁,这样能够保证即使后序代码出现了panic也会将锁释放,以免导致死锁。

RWMutex 读写锁

使用读写锁可以更加细度的控制读写权限:

  1. 读锁和读锁之间不是互斥的,只要没有写锁,就可以获取读锁
  2. 写锁和写锁是互斥的
  3. 写锁和读锁是互斥的

读写锁适用于读多写少的场景。

例如:(备注:原书中的示例并不能体现RW锁的优势,我自己写了个)

func usingRWMu(count int) {
	var wg sync.WaitGroup
	wg.Add(count + 1)

	var rw sync.RWMutex
	go producer(&wg, &rw, count)
	for i := 0; i < count; i++ {
		go consumer(&wg, rw.RLocker())
	}

	wg.Wait()
}

func usingMu(count int) {
	var wg sync.WaitGroup
	wg.Add(count + 1)

	var mu sync.Mutex
	go producer(&wg, &mu, count)
	for i := 0; i < count; i++ {
		go consumer(&wg, &mu)
	}

	wg.Wait()
}

func producer(wg *sync.WaitGroup, lock sync.Locker, count int) {
	defer wg.Done()
	for i := 0; i < 5; i++ {
		lock.Lock()
		lock.Unlock()
	}
}

func consumer(wg *sync.WaitGroup, lock sync.Locker) {
	defer wg.Done()
	defer lock.Unlock()
	lock.Lock()
	time.Sleep(1)
}

上述代码中:

  • producer:生产者只有一个,且只写入五次数据
  • consumer:消费者会有多个,每次读取一次数据,读取时间为1纳秒

Benchmark:


func BenchmarkLock(b *testing.B) {
	tests := []struct {
		name string
		f    func(int)
	}{
		{name: "UsingMutex", f: usingMu},
		{name: "UsingRWMutex", f: usingRWMu},
	}

	for k := 10; k <= 1000; k *= 10 {
		for _, tt := range tests {
			b.Run(fmt.Sprintf("%-15s_%.0e", tt.name, float64(k)), func(b *testing.B) {
				for i := 0; i < b.N; i++ {
					tt.f(k)
				}
			})
		}
	}
}
BenchmarkLock/UsingMutex______1e+01-12                 8         150001075 ns/op            3139 B/op         29 allocs/op
BenchmarkLock/UsingRWMutex____1e+01-12                74          29271991 ns/op            1754 B/op         25 allocs/op
BenchmarkLock/UsingMutex______1e+02-12                 1        1503635900 ns/op           43440 B/op        338 allocs/op
BenchmarkLock/UsingRWMutex____1e+02-12                75          29191513 ns/op           12575 B/op        209 allocs/op
BenchmarkLock/UsingMutex______1e+03-12                 1        14825565800 ns/op         563904 B/op       3658 allocs/op
BenchmarkLock/UsingRWMutex____1e+03-12                68          27025094 ns/op          117468 B/op       2036 allocs/op

可以看出在读多写少的情境下,使用读写锁效率比互斥锁要高。

3.3 Cond

若想要goroutine在特定的条件下暂停运行,若使用一个无限循环for !condition{} 这样将会消耗一个CPU核心的所有周期,是非常低效的。

使用sync.Cond可以高效的实现:

c := sync.NewCond(&sync.Mutex{})
c.L.Lock()
for !condition() {
	c.Wait()
}
//... make use of condition ...
c.L.Unlock()

Cond.Singal()Cond.Broadcast()

当使用Cond.Wait时,会将当前的goroutine加入FIFO列表中:

  1. 当调用Cond.Signal时会唤醒最先加入队列中的goroutine(也就是等待时间最长的)
  2. 当调用Cond.Broadcast()时,会唤醒所有等待中的goroutine

func cond() {
	var wg sync.WaitGroup
	c := sync.NewCond(&sync.Mutex{})

	wg.Add(4)
	done := false
	go reader("reader1", &wg, c, &done)
	go reader("reader2", &wg, c, &done)
	go reader("reader3", &wg, c, &done)

	time.Sleep(3 * time.Second) // 确保消费者已经进入等待状态

	go writer(&wg, c, &done)
	wg.Wait()
}

func reader(name string, wg *sync.WaitGroup, c *sync.Cond, done *bool) {
	defer wg.Done()
	c.L.Lock()
	for !*done {
		fmt.Printf("%s is waiting\n", name)
		c.Wait()
	}
	c.L.Unlock()
	fmt.Printf("%s is reading\n", name)
}

func writer(wg *sync.WaitGroup, c *sync.Cond, done *bool) {
	defer wg.Done()

	c.L.Lock()
	fmt.Println("start writing")
	*done = true
	fmt.Println("done")
	c.L.Unlock()

	fmt.Println("notify all")
	c.Broadcast()

	// 通知等待时间最长的那个
	//fmt.Println("notify one")
	//c.Signal()
}
// 调用 broadcast
reader3 is waiting
reader2 is waiting
reader1 is waiting
start writing
done
notify all
reader1 is reading
reader3 is reading
reader2 is reading

// 调用 signal
reader3 is waiting
reader1 is waiting
reader2 is waiting
start writing
done
notify one
reader3 is reading
fatal error: all goroutines are asleep - deadlock!

可以看出,Cond.Signal唤醒了最先等待的reader3,因为只唤醒了一个导致所有的goroutine均进入休眠,触发了死锁。

3.4 Once

sync.Once的方法Once.Do()能够保证函数只会调用一次

func once() {
	var once1, once2 sync.Once
	var count1, count2 int

	incre1 := func() {
		count1++
	}

	incre2 := func() {
		count2++
	}

	decre := func() {
		count2--
	}

	for i := 0; i < 5; i++ {
		once1.Do(incre1)
	}
	fmt.Println(count1)

	once2.Do(incre2)
	once2.Do(decre)

	fmt.Println(count2)
}

// output
1
1
  • once1.Do(incre1):在循环中执行了5次,但是incre1只会被执行一次
  • once2.Do:分别传入不同的函数执行,只有incre2执行了一次,后序的函数不会被执行
func onceDeadlock() {
	var once1, once2 sync.Once

	var f1, f2 func()

	f1 = func() {
		once1.Do(f2)
	}

	f2 = func() {
		once2.Do(f1)
	}

	once1.Do(f1)
	once2.Do(f2)
}

上述代码会引发死锁,因为执行f2时,需要等待once1.Do(f2)返回才可以,此时两者相互等待触发死锁。

3.5 Pool

Pool 模式是一种创建和提供可供使用的固定数量实例或Pool实例的方法。通常用于约束创建昂贵的场景(如数据库连接),以便创建固定数量的实例,但不确定数量的操作仍可请求访问这些场景。

Golang 中sync.Pool可以用于实现自定义的对象池,:

  • Pool.NewNewPool的一个字段,类型为func() any,用于创建新的对象

  • Pool.Get() any:Get 方法获取从 Pool 中获取一个对象,若没有则调用New创建对象

  • Pool.Put(any):将对象放回 Pool 中

使用要点:

  1. 使用Pool.New创建对象是线程安全的
  2. 使用Pool.Get() any获取对象时,不能对获取的对象状态做任何的假设(因为是不确定的)
  3. 通常使用defer pool.Put()的形式以确保取出的对象被放回 Pool 中
  4. Pool 内的分布必须大致均匀

使用sync.Pool有两个常见的场景:

  1. 创建对象比较昂贵,如内存占用大
  2. 创建对象耗时较长,提前创建对象缓存,可提高效率

例如:


func pool1() {
	var numCreated int
	pool := sync.Pool{
		New: func() any {
			numCreated++
			a := make([]int, 1024)
			return &a
		},
	}

	var wg sync.WaitGroup
	num := 1024 * 1024
	wg.Add(num)

	for i := 0; i < num; i++ {
		go func() {
			defer wg.Done()
			a := pool.Get().(*[]int)
			defer pool.Put(a)

			// do something
		}()
	}

	wg.Wait()

	fmt.Println(numCreated, " slices created")
}

// ouput
25  slices created

由上述代码可以看出,若不使用 Pool ,将可能快速创建大量的切片,占用大量内存并使得GC产生大量负载。

3.6 Channel

Channel 按照是否有缓冲区可分为:

  1. 无缓冲 Channel;make(chan Type), make(chan Type, 0)
  2. 有缓冲 Channel;make(chan Type, size)

Channel 按照读写方向可分为:

  1. 双向Channel;chan Type
  2. 只读Channel;<-chan Type
  3. 只写Channel;chan<- Type

从channel中读取数据有两种形式:

  1. v := <-ch
  2. v, ok := <-chok表示通道是否关闭,false表示通道已关闭后读取的值

for-range循环遍历channel,会持续读取channel中的数据(若没有则阻塞),直到channel被关闭:

  1. for range ch {}
  2. for v := range ch {}

Channel的操作

下表给出在不同的channel的状态下不同的操作的结果:

操作Channel状态结果
Readnil阻塞
打开 且 非空输出值,(true)
打开 且 为空阻塞
关闭默认零值,false
只写编译错误
Writenil阻塞
打开 且 缓冲已满阻塞
打开 且 缓冲未满写入值
关闭panic
只读编译错误
Closenilpanic
打开 且 非空关闭Channel;可继续读取,直到缓冲区为空,v 为默认零值,ok为false
打开 且 为空关闭Channel;可继续读取,v 为默认零值,ok为false
关闭panic
只读编译错误

3.7 select

select语句结构:

select {
case ... : // ... case1
case ... : // ... case2
...
default:
    // ... default
}

select执行顺序:

  1. 若有default,则在无channel可用时直接执行default
  2. 若无default
    1. 在无channel可用时,阻塞当前goroutine
    2. 有多个chennel可用时,随机选择可用分支执行

3.8 GOMAXPROCS

通常GOMAXPROCS和CPU的核心数相同。但在需要测试并发问题时,可以增大此值,使得出现竞争的概率增大,提前暴露问题。

Reference

  1. Go 语言并发之道open in new window
上次编辑于:
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.15.2