4. 并发模式
4.1 for-select
for-select
模式常见的有两种:
- 向channel中发送迭代变量
- 循环等待停止
向channel中发送迭代变量
将迭代的内容持续的发送到通道中。
strs := []string{"a", "b", "c"}
for _, str := range strs {
select {
case <-done:
return
case ch <- str:
// ...
}
}
循环等待停止
创建无限循环,直到触发停止条件。
for {
select {
case <-done:
return
case ... //
...
default:
// 其余的工作
}
}
// 或者
for {
select {
case <-done:
return
...
default:
}
// 其余的工作
}
4.2 防止 goroutine 泄露 (done-channel 模式)
一个 goroutine 有以下几种方式终止:
- 工作已完成
- 触发panic终止工作
- 收到其他的goroutine的通知,停止工作
当一个goroutine处于阻塞状态且无法被唤醒或者没有被通知何时停止时,其将一直保持在内存中,过多的类似情况将会导致内存泄露。
goroutine 持续处于阻塞态
例如,下面的代码中的gorutine将一直处于阻塞态:
func fs1() {
doWork := func(strch <-chan string) <-chan bool {
completed := make(chan bool)
go func() {
defer close(completed)
for str := range strch {
fmt.Println(str)
}
}()
return completed
}
completed := doWork(nil)
<-completed
}
上述代码向doWork
传入了一个nil
通道,新的goroutine在读取时将永远进入阻塞状态。
避免这种情况,需要向goroutine传入一个通道以通知其何时结束:
func fs2() {
doWork := func(strch <-chan string, done <-chan bool) <-chan bool {
completed := make(chan bool)
go func() {
defer close(completed)
fmt.Println("goroutine starting ...")
for {
select {
case <-done:
return
case str := <-strch:
fmt.Println(str)
}
}
}()
return completed
}
done := make(chan bool)
completed := doWork(nil, done)
go func() {
time.Sleep(time.Second)
fmt.Println("Canceling work of goroutine...")
close(done)
}()
<-completed
}
// output
goroutine starting ...
Canceling work of goroutine...
上述代码中,父协程的另一个子协程在等待一秒后关闭通道,此时正在工作(被阻塞)的子协程收到通知停止工作。
goroutine 持续工作
下例中,协程会持续工作下去无法被停止:
func fs3() {
randGen := func() <-chan int {
out := make(chan int)
go func() {
defer close(out)
defer fmt.Println("Closing output channel...")
for {
out <- rand.Int()
}
}()
return out
}
in := randGen()
for i := 0; i < 5; i++ {
if n, ok := <-in; ok {
fmt.Println("Receiving: ", n)
}
}
fmt.Println("Receiving done...")
}
// output
Receiving: 476485380518016684
Receiving: 6023384071323969085
Receiving: 2320634544446194988
Receiving: 7599105886652685216
Receiving: 6600473839265802854
Receiving done...
上述代码中,父协程在读取了5个数字后结束,但是子协程依然在无限循环中工作下去。
可以向子协程传入通道以通知其停止。
func fs4() {
randGen := func(done <-chan bool) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case <-done:
return
case out <- rand.Int():
}
}
}()
return out
}
done := make(chan bool)
in := randGen(done)
for i := 0; i < 5; i++ {
if n, ok := <-in; ok {
fmt.Println("Receiving: ", n)
}
}
close(done)
}
// output
Receiving: 5765501585996642873
Receiving: 1097901739950203962
Receiving: 5923671937547588944
Receiving: 2825262626940893067
Receiving: 2469149525637622718
Receiving done...
Closing output channel...
可以看到,子协程的输出通道被关闭了,子协程的工作也结束了。
为保证goroutine不会泄露,可以遵守约定:如果一个goroutine负责创建goroutine,那么其应当负责确保创建的子goroutine可以被停止
4.3 or-channel
当存在多个done-channel
模式时,若需要其中任意一个关闭时,关闭整体的done-channle
,可以采用or-channel
模式。
func or(chans ...<-chan bool) <-chan bool {
switch len(chans) { // 递归出口
case 0:
return nil
case 1:
return chans[0]
}
orDone := make(chan bool)
go func() {
defer close(orDone)
select {
case <-chans[0]:
case <-chans[1]:
case <-or(append(chans[2:], orDone)...): // 递归的创建 done-chan 树
}
}()
return orDone
}
func orDone() {
sig := func(after time.Duration) <-chan bool {
done := make(chan bool)
go func() {
defer close(done)
time.Sleep(after)
}()
return done
}
start := time.Now()
<-or(
sig(time.Hour),
sig(time.Minute),
sig(2*time.Hour),
sig(5*time.Minute),
sig(time.Second),
)
fmt.Println("Done after: ", time.Since(start))
}
// output
Done after: 1.0027397s
or
:递归的创建了一个done-channel
树形结构,只要其中有一个done-channel
被关闭,就会关闭其父节点的done-channel
直到最初的根节点done-channel
被关闭sig
:创建了一个会定时关闭的chan
由输出可以看出,在1s
之后,其中的一个chan
被关闭,导致整体的or-chan
被关闭了。
4.4 错误处理
当 goroutine 中出现错误时应该将其和执行结果结合并返回给父goroutine进行处理。
下例中,goroutine 将出现的错误进行简单的处理,而父 goroutine 对出现的错误一无所知。
func eh1() {
checkStatus := func(urls []string, done <-chan bool) <-chan *http.Response {
respch := make(chan *http.Response)
go func() {
defer close(respch)
for _, url := range urls {
resp, err := http.Get(url)
if err != nil {
fmt.Println(err)
continue
}
select {
case <-done:
return
case respch <- resp:
}
}
}()
return respch
}
done := make(chan bool)
defer close(done)
urls := []string{"https://google.com", "https://badhost"}
for resp := range checkStatus(urls, done) {
fmt.Println("Response: ", resp.Status)
}
}
将返回结果和错误耦合在一起之后返回给父 goroutine, 此时可以使用标准的error
处理流程进行处理,而且父 goroutine 能够对子协程的执行情况有更多的控制。
type Result struct {
err error
resp *http.Response
}
func eh2() {
checkStatus := func(urls []string, done <-chan bool) <-chan *Result {
resCh := make(chan *Result)
go func() {
defer close(resCh)
for _, url := range urls {
resp, err := http.Get(url)
select {
case <-done:
return
case resCh <- &Result{err, resp}:
}
}
}()
return resCh
}
done := make(chan bool)
defer close(done)
urls := []string{"https://google.com", "https://badhost"}
for res := range checkStatus(urls, done) {
if res.err != nil {
fmt.Println("Err: ", res.err)
} else {
fmt.Println("Status: ", res.resp.Status)
}
}
}
// output
Status: 200 OK
Err: Get "https://badhost": EOF
4.5 Pipeline
Pipeline 是指将数据视作流进行处理,数据会从一个 stage 传输到另一个 stage。(备注:这里书中说的 stage 可以理解为流水线工作的一个步骤/流程)
批处理
func batchProcess() {
multiply := func(vals []int, multiplier int) []int {
muls := make([]int, len(vals))
for i, v := range vals {
muls[i] = v * multiplier
}
return muls
}
add := func(vals []int, additive int) []int {
adds := make([]int, len(vals))
for i, v := range vals {
adds[i] = v + additive
}
return adds
}
ints := []int{1, 2, 3, 4}
for _, v := range add(multiply(ints, 2), 1) {
fmt.Printf("%d, ", v)
}
}
上述的例子中,每个 stage 处理一个切片内的所有数据,并返回一个切片的,这种操作形式被称为批处理。
可以看出,批处理每个 stage 都要维持两倍的切片内存占用。
流处理
若将上述的例子改成:
func streamProcess() {
multiply := func(val int, mul int) int {
return val * mul
}
add := func(val int, additive int) int {
return val + additive
}
ints := []int{1, 2, 3, 4}
for _, v := range ints {
fmt.Print(add(multiply(v, 2), 1), ", ")
}
}
这次每次函数只处理一个数据,被称作流处理。
上述的处理流程每放在了for
循环之中进行顺序处理,可将每个 stage 交个一个 goroutine 来处理可以极大的提高效率。
使用 channel
func streamProcWithChan() {
intGen := func(done <-chan bool, vals ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, v := range vals {
select {
case <-done:
return
case out <- v:
}
}
}()
return out
}
multiply := func(done <-chan bool, in <-chan int, mul int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for v := range in {
select {
case <-done:
return
case out <- v * mul:
}
}
}()
return out
}
add := func(done <-chan bool, in <-chan int, additive int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for v := range in {
select {
case <-done:
return
case out <- v + additive:
}
}
}()
return out
}
done := make(chan bool)
defer close(done)
for v := range add(done, multiply(done, intGen(done, 1, 2, 3, 4), 2), 1) {
fmt.Println(v)
}
}
defer close(done)
:创建了一个通道,并在结束时关闭;因为其他的子协程均使用了done-channel
模式,那么在父协程关闭done
的时候,所有的子协程都将停止工作- 整个处理流程是流水线式的:
intGen --> multiply --> add --> output
,因为不同的协程之间使用channel
通信所以是并发安全的
生成器
repeat
创建一个 repeat 生成器,用于重复的生产(生成)传入的值。
func repeat(done <-chan bool, vals ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
for _, v := range vals {
select {
case <-done:
return
case out <- v:
}
}
}
}()
return out
}
repeat
会不断的将输入的值重复传入channel
中,直到被通知停止为止。
func gen1() {
done := make(chan bool)
defer close(done)
for v := range take(done, repeat(done, 1), 3) {
fmt.Println(v)
}
}
func take(done <-chan bool, in <-chan int, num int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 0; i < num; i++ {
select {
case <-done:
return
case out <- <-in:
}
}
}()
return out
}
// output
1
1
1
take
:从输入的通道中取出指定数目的元素,并放入输出的通道中
repeat function
若将repeat
生成器的参数改为函数类型,那么就可以实现重复调用函数的功能。
func repeatFn(done <-chan bool, fn func() int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case <-done:
return
case out <- fn():
}
}
}()
return out
}
func gen2() {
genRand := func() int {
return rand.Intn(100)
}
done := make(chan bool)
defer close(done)
for v := range take(done, repeatFn(done, genRand), 5) {
fmt.Println(v)
}
}
// output
91
85
99
47
74
4.6 Fan-in Fan-out 扇入 扇出
- 扇入(fan-in): 将多个channel的结果组合到一个channel中
- 扇出(fan-out):启用多个channel处理pipeline的输入
使用扇入扇出模式需要满足两个条件:
- 每次计算不依赖之前的 stage 的计算结果; 因为并发的过程中,stage 的运行顺序是完全未知的
- 运行需要很长时间; 若pipeline的某一个 stage 运行时间非常长,会导致整个pipeline被阻塞
下面的例子中,从任意多个随机数中寻找10个素数:
func findPrime() {
randIntn := func() int {
return rand.Intn(5000000000)
}
done := make(chan bool)
defer close(done)
in := repeatFn(done, randIntn)
start := time.Now()
for prime := range take(done, primeFinder(done, in), 10) {
fmt.Println(prime)
}
fmt.Println("Processing time: ", time.Since(start))
}
func primeFinder(done <-chan bool, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for v := range in {
select {
case <-done:
return
default:
}
if isPrime(v) {
select {
case <-done:
return
case out <- v:
}
}
}
}()
return out
}
func isPrime(n int) bool {
if n < 2 {
return false
}
r := int(math.Sqrt(float64(n)))
for i := 2; i <= r; i++ {
if n%i == 0 {
return false
}
}
return true
}
// output
4273728947
173486111
1703994883
3005863829
1167579209
116591549
2948090351
874494619
362146943
2720562833
Processing time: 4.293ms
因为每次寻找素数的操作,不依赖其他的 stage,那么可以针对寻找素数的 stage 改进为扇出:
- 获取CPU核心数
- 调用多次
primeFinder(done, in)
,使用多个 goroutine 来寻找素数
在将多个 goroutine 的输出流合并为一个,即扇入。然后从合并的输出流中读取结果。
func findPrimesUsingFaninout() {
randIntn := func() int {
return rand.Intn(5000000000)
}
done := make(chan bool)
defer close(done)
in := repeatFn(done, randIntn)
// fan out
numFinders := runtime.NumCPU()
finders := make([]<-chan int, numFinders)
for i := range finders {
finders[i] = primeFinder(done, in)
}
start := time.Now()
// fan in
for v := range take(done, fanIn(done, finders...), 10) {
fmt.Println(v)
}
fmt.Println("Processing time: ", time.Since(start))
}
func fanIn(done <-chan bool, chs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
multiplex := func(c <-chan int) {
defer wg.Done()
for v := range c {
select {
case <-done:
return
case out <- v:
}
}
}
// get value from every channel
// and put into out
wg.Add(len(chs))
for _, c := range chs {
go multiplex(c)
}
// waiting for reading operations
// done, then close out channel
go func() {
wg.Wait()
close(out)
}()
return out
}
// output
85825093
115703509
2927984149
4584998707
3213011077
1945956029
429187747
195600227
4355002273
3875919389
Processing time: 1.0415ms
在扇入函数fanIn
中:
- 启用了多个 goroutine,将多个输入流中的结果放入同一个输出流中
- 启用了单独的 goroutine,在合并结束后关闭输出流
从输出结果上看,使用扇入扇出之后,执行时间减少了约75%。
4.7 or-done-channel
当从 channel 中读取时,若需要判断当前操作是否被取消,可能需要如下代码:
loop:
for {
select {
case <-done:
break loop
case v, ok := <-myChan:
if !ok {
// 退出或者退出循环
}
// 对获取的 v 进行操作
}
}
若逻辑比较复杂时,特别是有多个嵌套的循环时,代码的可读性会下降。
可以使用 goroutine 来封装这样的操作:
orDone := func(done <-chan bool, c <-chan any) <-chan any {
out := make(chan any)
go func() {
defer close(out)
for {
select {
case <-done:
return
case v, ok := <-c:
if !ok {
return
}
select {
case out <- v:
case <-done:
}
}
}
}()
return out
}
// 使用正常的循环模式
for v := range orDone(done, myChan) {
// ...
}
orDone
封装了详细的处理过程之后,就能够用一般的循环模式,提高了代码的可读性。
4.8 tee-channel
tee-channel
模式可以将一个 channel 中的数据发送给两个或多个不同的 channel 中。
func teeCh() {
done := make(chan bool)
defer close(done)
randGen := func() int {
return rand.Intn(20)
}
out1, out2 := tee(done, take(done, repeatFn(done, randGen), 5))
for v := range out1 {
fmt.Printf("out1: %d, out2: %d\n", v, <-out2)
}
}
func orDoneCh(done <-chan bool, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case <-done:
return
case v, ok := <-in:
if !ok {
return
}
select {
case <-done:
case out <- v:
}
}
}
}()
return out
}
func tee(done <-chan bool, in <-chan int) (<-chan int, <-chan int) {
out1, out2 := make(chan int), make(chan int)
go func() {
defer close(out1)
defer close(out2)
for v := range orDoneCh(done, in) {
out1A, out2A := out1, out2 // 使用临时变量
for i := 0; i < 2; i++ {
select {
case <-done:
return
case out1A <- v:
out1A = nil // 阻塞写入,以便另一个继续
case out2A <- v:
out2A = nil // 阻塞写入,以便另一个继续
}
}
}
}()
return out1, out2
}
// output
out1: 7, out2: 7
out1: 4, out2: 4
out1: 12, out2: 12
out1: 16, out2: 16
out1: 16, out2: 16
在函数tee
中:
out1A, out2A
:使用了两个临时变量out1A = nil
,out2A = nil
:每次成功写入后,将临时变量置空,以便下一次能够写入另一个通道
4.9 桥接 channel
若需要从通道的通道中读取一系列的值,使用桥接模式可以使得调用者只需关注通道中的值,无需处理每个通道。
func bridge(done <-chan bool, chIn <-chan <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
// 从通道的通道中获取通道
var c <-chan int
select {
case <-done:
return
case vc, ok := <-chIn:
if !ok {
return
}
c = vc
}
// 遍历通道,获取值
for v := range c {
select {
case <-done:
return
case out <- v:
}
}
}
}()
return out
}
var c <-chan int
:临时变量,获取通道中的通道case out <- v:
:将每个通道的值写入同一通道
func bc() {
genCh := func() <-chan <-chan int {
out := make(chan (<-chan int))
go func() {
defer close(out)
for i := 0; i < 5; i++ {
ch := make(chan int, 1)
ch <- i
close(ch)
out <- ch
}
}()
return out
}
done := make(chan bool)
defer close(done)
for v := range bridge(done, genCh()) {
fmt.Print(v, " ")
}
}
// output
0 1 2 3 4
4.10 队列
在 pipeline 中引入队列以提高系统性能,其适用于以下情况:
- 一个 stage 中批处理可以节省时间
- 一个 stage 产生的延迟会在系统中产生反馈回环; 例如,将数据缓存到内存中,比直接发送到硬盘要快很多;若直接发送到硬盘,当某个 stage 出现阻塞时,整个系统都会阻塞
使用队列优化,可以使用利特尔法则:
- L:系统平均负载数
- λ:负载平均到达率
- W: 负载在系统中花费的平均时间
4.11 context
context.Context
接口定义了一组方法:
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key any) any
}
Deadline()
:当为该 context 工作的 work 被取消时,返回设定的 deadline;若没有设定 deadline,ok将为 falseDone()
:当为该 context 工作的 work 被取消时,返回已关闭的 channel;若未被取消,则可能返回 nilErr()
:返回当前工作被取消的原因,未被取消则返回 nilValue(key any)
:返回和此 context 关联的 key 的 value,若没有则返回 nil
contex
中有两个函数用于生成空的context.Context
:
func Background() Context
func TODO() Context
Background()
:Background returns a non-nil, empty Context. It is never canceled, has no values, and has no deadline. It is typically used by the main function, initialization, and tests, and as the top-level Context for incoming requests.TODO
:TODO returns a non-nil, empty Context. Code should use context.TODO when it's unclear which Context to use or it is not yet available (because the surrounding function has not yet been extended to accept a Context parameter).
以下函数用于生成带有条件的context.Context
:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
WithCancel
:需要主动调用CancelFunc
才能取消WithDeadline
:当时间到达设置的deadline
时自动取消WithTimeout
:当经过的时间达到timeout
时,自动取消
done-channel
模式 和 context
包
下面的示例先使用done-channel
模式:
var (
LocaleCanceled = errors.New("locale canceled")
LocaleUnsupported = errors.New("locale unsupported")
)
func doneChannelPattern() {
done := make(chan struct{})
defer close(done)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
err := printGreeting(done)
if err != nil {
fmt.Println(err)
}
}()
go func() {
defer wg.Done()
err := printFarewell(done)
if err != nil {
fmt.Println(err)
}
}()
wg.Wait()
}
func printFarewell(done <-chan struct{}) error {
farewell, err := genFarewell(done)
if err != nil {
return err
}
fmt.Println(farewell, " world")
return nil
}
func printGreeting(done <-chan struct{}) error {
greeting, err := genGreeting(done)
if err != nil {
return err
}
fmt.Println(greeting, " world")
return nil
}
func genFarewell(done <-chan struct{}) (string, error) {
switch loc, err := locale(done); {
case err != nil:
return "", err
case loc == "en/us":
return "goodbye", nil
default:
return "", LocaleUnsupported
}
}
func genGreeting(done <-chan struct{}) (string, error) {
switch loc, err := locale(done); {
case err != nil:
return "", err
case loc == "en/us":
return "hello", nil
default:
return "", LocaleUnsupported
}
}
func locale(done <-chan struct{}) (string, error) {
select {
case <-done:
return "", LocaleCanceled
case <-time.After(3 * time.Second):
}
return "en/us", nil
}
上述代码中,locale
函数需要3秒之后才会执行。因为并发函数执行的顺序无法预知,打招呼函数可能在说再见之后执行。
若需要新增两个要求:
- 若
genGreeting
函数不想等待locale
函数太长时间,在等待1s之后就主动取消执行 - 若
genGreeting
取消,genFarewell
也会被取消
此时可以使用context
包方便的实现:
func usingCtx() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
if err := printGreetingWithCtx(ctx); err != nil {
fmt.Println(err)
cancel() // 取消所有的 goroutine 工作
}
}()
go func() {
defer wg.Done()
if err := printFarewellWithCtx(ctx); err != nil {
fmt.Println(err)
}
}()
wg.Wait()
}
func printGreetingWithCtx(ctx context.Context) error {
greeting, err := genGreetingWithCtx(ctx)
if err != nil {
return err
}
fmt.Println(greeting, " world")
return nil
}
func printFarewellWithCtx(ctx context.Context) error {
farewell, err := genFarewellWithCtx(ctx)
if err != nil {
return err
}
fmt.Println(farewell, " world")
return nil
}
func genFarewellWithCtx(ctx context.Context) (string, error) {
switch loc, err := localeWithCtx(ctx); {
case err != nil:
return "", err
case loc != "en/us":
return "", LocaleUnsupported
default:
return "goodbye", nil
}
}
func genGreetingWithCtx(ctx context.Context) (string, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
switch loc, err := localeWithCtx(ctx); {
case err != nil:
return "", err
case loc != "en/us":
return "", LocaleUnsupported
default:
return "hello", nil
}
}
func localeWithCtx(ctx context.Context) (string, error) {
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(3 * time.Second):
}
return "en/us", nil
}
// output
context deadline exceeded
context canceled
// test time
--- PASS: Test_usingCtx (1.01s)
由输出可以看出
context deadline exceeded
: 当1s时间到的时候,ctx
被关闭,localeWithCtx
函数停止工作并返回context canceled
:当printGreetingWithCtx
被取消时,主动取消了ctx
,导致genFarewell
调用链上的localeWithCtx
被停止
数据存储和获取
context.Context
支持存储key-val
键值对:
WithValue(parent Context, key, val any) Context
:可以设置键值对,并返回新的Context
Context.Value(key any) any
:则可以根据传入的key
返回value
不是所有的数据都适合放入Context
中,应该遵循以下建议:
- 数据应该通过进程和API边界
- 数据应该是不可变的
- 数据应该趋向于简单类型
- 数据应该是数据,而不是类型或方法
- 数据应该用于修饰操作,而不是驱动操作