5. 大规模并发

Kesa...大约 15 分钟golang

5.1 异常传递

出现异常通常表示系统进入了一个无法满足用户操作的状态,此时系统需要传达几个关键的信息:

  1. 发生了什么 包含对异常的描述;例如:磁盘已满,连接重置等
  2. 发生在什么时间,什么位置 异常发生的完整调用栈信息以及异常发生的时间
  3. 对用户友好的信息 对发送给用户的信息进行自定义
  4. 告诉用户如何获取更多信息 在发给用户的信息中应告知详细的异常信息在哪里可以获取

异常可分为两类:

  1. Bug:未知的错误,没有预先定义的异常
  2. 已知异常:已预先定义的异常

func mainFunc() {
	log.SetOutput(os.Stdout)
	log.SetFlags(log.Ltime | log.LUTC)
	err := runJob("1")
	if err != nil {
		msg := "There was a unexpected issue, please report as a bug" // 未知异常,BUG
		var intermediateErr intermediateErr
		if errors.As(err, &intermediateErr) {
			msg = err.Error() // 已知异常
		}
		handleErr(1, err, msg)
	}
}

func handleErr(key int, err error, msg string) {
	log.SetPrefix(fmt.Sprintf("[logID: %d]", key))
	log.Printf("%#v", err)
	fmt.Printf("[%v] %v", key, msg)
}

type MyError struct {
	Inner      error // 原始错误
	Message    string
	StackTrace string         // 调用栈
	Misc       map[string]any // 相关信息
}

func (err MyError) Error() string {
	return err.Message
}

func wrapError(err error, msg string) MyError {
	return MyError{
		Inner:      err,
		Message:    msg,
		StackTrace: string(debug.Stack()),
		Misc:       make(map[string]any),
	}
}

// 中层组件
type intermediateErr struct {
	error
}

func runJob(id string) error {
	jobPath := "bad/path"
	isExec, err := isGloballyExec(jobPath)

	if err != nil {
		return intermediateErr{wrapError(err, "bad/path not available")} // 封装底层错误
	} else if !isExec {
		return intermediateErr{wrapError(nil, "bad/path is not executable")}
	}

	return exec.Command(jobPath, "--id="+id).Run()
}

// 底层组件
type lowLevelErr struct {
	error
}

func isGloballyExec(path string) (bool, error) {
	info, err := os.Stat(path)
	if err != nil {
		return false, lowLevelErr{wrapError(err, err.Error())} // 封装系统错误
	}
	return info.Mode().Perm()&0100 == 0100, nil
}
// output
[logID: 1]13:15:12 ch05.intermediateErr{error:ch05.MyError{Inner:ch05.lowLevelErr{error:ch05.MyError{...
[1] bad/path not available

由输出可以看到,标准输出中只显示了异常的信息和LogID,在日志中则记录了更为详细的信息。

5.2 超时和取消

系统需要超时的情景:

  1. 系统饱和 若系统已经饱和,则超出的请求应返回超时,而不是长时间的等待
  2. 陈旧的数据 数据通常有一个窗口期,若数据的处理已经过期,那么请求应该超时
  3. 防止死锁 为了防止死锁,在所有的并发处理中添加超时,这样即使发生死锁也会在超时时间到达时解除阻塞状态

并发进程需要取消的原因:

  1. 超时 超时是隐式的取消
  2. 用户干预 用户在使用并发程序时,可以取消已经开始的操作
  3. 复制请求 发送同一份数据到多个请求,当一个请求返回时,取消其余的请求

5.3 心跳

心跳是并发进程向外界发送信号的一种方式。

心跳可以使得外界能够了解系统的运行情况,并在系统出现异常时进行检测。心跳通常有两类:

  1. 定期发出的心跳:每隔一段时间发出的心跳
  2. 工作单元开始时发出的心跳

定期心跳

func heartbeat1() {
	doWork := func(done <-chan struct{}, pulseInterval time.Duration) (<-chan struct{}, <-chan time.Time) {
		heartbeat := make(chan struct{})
		resCh := make(chan time.Time)
		go func() {
			defer close(resCh)
			defer close(heartbeat)

			pulse := time.Tick(pulseInterval)
			workGen := time.Tick(2 * pulseInterval)

			sendPulse := func() {
				select {
				case heartbeat <- struct{}{}:
				default: // 防止阻塞,当无法发出心跳时
				}
			}

			sendResult := func(r time.Time) {
				for {
					select {
					case <-done:
						return
					case <-pulse:
						sendPulse()
					case resCh <- r:
						return
					}
				}
			}

			for {
				select {
				case <-done:
					return
				case <-pulse:
					sendPulse()
				case r := <-workGen:
					sendResult(r)
				}
			}
		}()

		return heartbeat, resCh
	}

	done := make(chan struct{})
	time.AfterFunc(6*time.Second, func() {
		close(done)
	})

	timeout := 2 * time.Second
	heartbeat, resCh := doWork(done, timeout/2)

	for {
		select {
		case _, ok := <-heartbeat:
			if !ok {
				return
			}
			fmt.Println("pulse")
		case r, ok := <-resCh:
			if !ok {
				return
			}
			fmt.Println(r)
		case <-time.After(timeout):
			return
		}

	}
}
// output
pulse
pulse
3
pulse
pulse
5
pulse
pulse

可以看出每个结果之间有两次心跳。

定期心跳 检测 goroutine 的运行

定期心跳的一个作用是判断一个 goroutine 是否正常运行,将上述的doWork函数进行修改:

doWork := func(done <-chan struct{}, pulseInterval time.Duration) (<-chan struct{}, <-chan time.Time) {
		heartbeat := make(chan struct{})
		resCh := make(chan time.Time)
		go func() {
            // 不关闭 channel
            // defer close(resCh)
			// defer close(heartbeat)
            ...
    		// 两次循环结束
    		for i := 0; i < 2; i++ {
				select {
				case <-done:
					return
				case <-pulse:
					sendPulse()
				case r := <-workGen:
					sendResult(r)
				}
        	}()
    	...
	}
	...
	for {
		select {
		case _, ok := <-heartbeat:
			if !ok {
				return
			}
			fmt.Println("pulse")
		case r, ok := <-resCh:
			if !ok {
				return
			}
			fmt.Println(r.Second())
		case <-time.After(timeout):
            // 长时间未收到心跳
			fmt.Println("goroutine is no healthy")
			return
		}
	}   
...
    
// output
pulse
pulse
goroutine is no healthy

工作 goroutine 只进行了两次发送循环就结束了,此时检测 goroutine 可以根据心跳情况判断是否有在正常工作。

工作单元开始时的心跳

func heartbeat3() {
	doWork := func(done <-chan struct{}) (<-chan struct{}, <-chan int) {
		heartbeat := make(chan struct{}, 1) // 缓冲大小为 1
		resCh := make(chan int)
		go func() {
			defer close(resCh)
			defer close(heartbeat)

			for i := 0; i < 5; i++ {
				select {
				case heartbeat <- struct{}{}: // 工作开始前的 心跳
				default:
				}

				select {
				case <-done:
					return
				case resCh <- rand.Intn(20):
				}
			}
		}()

		return heartbeat, resCh
	}

	done := make(chan struct{})
	defer close(done)

	hb, res := doWork(done)
	for {
		select {
		case _, ok := <-hb:
			if !ok {
				return
			}
			fmt.Println("pulse")
		case r, ok := <-res:
			if !ok {
				return
			}
			fmt.Println(r)
		}

	}
}
// output
pulse
11
pulse
17
pulse
15
pulse
5
pulse
3
  • heartbeat := make(chan struct{}, 1):心跳通道的缓冲大小为1,确保即使消息没有被接收也能至少发出一个心跳

由结果可以看出,每次发送结果前会先发送一次心跳。

使用心跳进行测试

对于一个并发的工作,在数据处理开始前可能会出现某种延迟,给测试带来很大的不确定性:

func doWork1(done <-chan struct{}, nums []int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)

		// 模拟运行延迟
		time.Sleep(2 * time.Second)

		for _, v := range nums {
			select {
			case <-done:
				return
			case out <- v:
			}
		}
	}()

	return out
}

测试函数:

func Test_doWork1(t *testing.T) {
	done := make(chan struct{})
	defer close(done)

	nums := []int{1, 2, 3, 4}
	res := doWork1(done, nums)

	for _, v := range nums {
		select {
		case n, ok := <-res:
			if !ok {
				return
			}
			if n != v {
				t.Errorf("get: %v, want: %v", n, v)
			}
		case <-time.After(time.Second):
			t.Fatal("test time out")
		}
	}
}
// output
=== RUN   Test_doWork1
    heartbeats_test.go:37: test time out
--- FAIL: Test_doWork1 (1.00s)

测试中加入了超时,防止进入死锁状态。

因为代码中延迟是固定的,所以测试一定会超时。但是在实际的运行过程中,工作延迟无法确定,那么测试时无法给出一个准确的超时时间,给测试带来了很多的不确定性。

通过工作心跳可以在测试时避免引入超时:

func doWork2(done <-chan struct{}, nums []int) (<-chan struct{}, <-chan int) {
	out := make(chan int)
	hb := make(chan struct{}, 1)
	go func() {
		defer close(out)
		defer close(hb)

		// 模拟运行延迟
		time.Sleep(2 * time.Second)

		for _, v := range nums {
			select {
			case hb <- struct{}{}:
			default:
			}

			select {
			case <-done:
				return
			case out <- v:
			}
		}
	}()

	return hb, out
}

测试时,在检测到工作心跳开始后才进行测试:

func Test_doWork2(t *testing.T) {
	done := make(chan struct{})
	defer close(done)

	nums := []int{1, 2, 3, 4}
	hb, res := doWork2(done, nums)

	<-hb

	for _, v := range nums {
		select {
		case n, ok := <-res:
			if !ok {
				return
			}
			if n != v {
				t.Errorf("get: %v, want: %v", n, v)
			}
		}
	}
}

使用工作心跳进行测试,当工作延迟非常大的时候,此时可以使用定期心跳进行测试,并可以控制测试时的超时时间:

func doWork3(done <-chan struct{}, interval time.Duration, nums []int) (<-chan struct{}, <-chan int) {
	out := make(chan int)
	hb := make(chan struct{})

	go func() {
		defer close(out)
		defer close(hb)

		// 模拟延迟
		time.Sleep(2 * time.Second)

		pulse := time.Tick(interval)

	numLoop:
		for _, v := range nums {
			for {
				select {
				case <-done:
					return
				case <-pulse:
					select {
					case hb <- struct{}{}:
					default: // 心跳信号可跳过
					}
				case out <- v:
					continue numLoop // 继续外层循环
				}
			}
		}
	}()

	return hb, out
}
func Test_doWork3(t *testing.T) {
	done := make(chan struct{})
	defer close(done)

	nums := []int{1, 2, 3, 4}
	timeout := 2 * time.Second
	hb, res := doWork3(done, timeout/2, nums)

	<-hb // 进入处理循环

	for _, v := range nums {
		select {
		case n, ok := <-res:
			if !ok {
				return
			}
			if n != v {
				t.Errorf("get: %v, want: %v", n, v)
			}
		case <-hb:
		case <-time.After(timeout):
			t.Fatal("text time out")
		}
	}
}

5.4 复制请求

当接收到一个工作请求时,可以启用多个并发进程进行处理,在最快的进程返回之后,取消其余的进程。


func rc() {
	doWork := func(done <-chan struct{}, id int, wg *sync.WaitGroup, out chan<- int) {
		defer wg.Done()

		start := time.Now()
		// 模拟随机延迟
		procTime := time.Duration(rand.Intn(10)+1) * time.Second

		select {
		case <-done:
		case <-time.After(procTime):
		}

		select {
		case <-done:
		case out <- id:
		}

		t := time.Since(start)
		if t < procTime { // 被取消时,预计处理时间
			t = procTime
		}

		fmt.Printf("%d took %v\n", id, t)
	}

	done := make(chan struct{})
	in := make(chan int)
	var wg sync.WaitGroup

	wg.Add(10)
	for i := 1; i <= 10; i++ {
		go doWork(done, i, &wg, in)
	}

	res := <-in
	close(done)
	wg.Wait()

	fmt.Println("Receive result from #", res)
}
// output
1 took 3s
9 took 3s
8 took 1.0049034s
4 took 2s
7 took 3s
5 took 6s
6 took 7s
3 took 2s
10 took 10s
2 took 6s
Receive result from # 8

5.5 速率限制

速率限制,是指限制某种资源在某段时间内的访问次数。

速率限制的好处:

  1. 防止恶意攻击,如:DDoS
  2. 防止因大量突发请求导致系统崩溃
  3. 可以为系统不同的收费服务,分配不同的访问速率
  4. 限制付费服务的速率,防止产生高额的账单
  5. ...
  6. ....

令牌桶算法

大多数限速使用令牌桶算法,相关概念:

  1. 令牌(token):用户访问资源需要令牌,没有令牌的请求将被拒绝
  2. 桶(bucket):存放令牌
  3. 桶深度(depth):桶能够存储的令牌的最大数量
  4. 添加速率(rate):向桶中添加新令牌的速率

假设系统有两个API:

type APIConnection struct{}

func Open() *APIConnection {
	return &APIConnection{}
}

func (a *APIConnection) ReadFile(ctx context.Context) error {
	// 假设访问成功
	return nil
}

func (a *APIConnection) ResolveAddress(ctx context.Context) error {
	return nil
}

启用20个 goroutine 分别读取文件和解析地址各10次:


func requestWithoutLimit() {
	log.SetOutput(os.Stdout)
	log.SetFlags(log.Ltime | log.LUTC)

	conn := Open()
	var wg sync.WaitGroup
	ctx := context.Background()

	wg.Add(20)
	for i := 0; i < 10; i++ {
		go func() {
			defer wg.Done()

			if err := conn.ReadFile(ctx); err != nil {
				log.Println("Can not read file: ", err)
			}
			log.Println("Read file")
		}()
	}
	for i := 0; i < 10; i++ {
		go func() {
			defer wg.Done()

			if err := conn.ResolveAddress(ctx); err != nil {
				log.Println("Can not resolve address: ", err)
			}
			log.Println("Resolve address")
		}()
	}

	wg.Wait()
}
// output
15:58:21 Resolve address
15:58:21 Read file
15:58:21 Read file
15:58:21 Resolve address
15:58:21 Read file
15:58:21 Read file
15:58:21 Read file
15:58:21 Read file
15:58:21 Read file
15:58:21 Read file
15:58:21 Read file
15:58:21 Read file
15:58:21 Resolve address
15:58:21 Resolve address
15:58:21 Resolve address
15:58:21 Resolve address
15:58:21 Resolve address
15:58:21 Resolve address
15:58:21 Resolve address
15:58:21 Resolve address

可以看出,所有的请求几乎时同一时间完成的。

golang.org/x/time/rateopen in new window

golang.org/x/time/rateopen in new window实现令牌桶限速器,其中:

type Limit float64
func NewLimiter(r Limit, b int) *Limiter
  • Limit:表示时间发生的频率
  • NewLimiter:传入时间发生的频率r,和令牌桶深度b,返回限速器*Limiter

以下的辅助函数可用于根据时间间隔生成速率:

func Every(interval time.Duration) Limit

WaitWaitN用于阻塞请求:

func (lim *Limiter) Wait(ctx context.Context)
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
  • WaitWaitN(ctx, 1)的缩写
  • WaitN:会阻塞,直到限速器允许发生 n 次事件;以下情况会返回错误:
    1. 当 n 超过 limit
    2. context 超时
    3. context 被取消

修改之前的API接口,引入限速器:


type APIConnection struct {
	limiter *rate.Limiter
}

func Open() *APIConnection {
	return &APIConnection{
		limiter: rate.NewLimiter(rate.Limit(1), 1),
	}
}

func (a *APIConnection) ReadFile(ctx context.Context) error {
	if err := a.limiter.Wait(ctx); err != nil {
		return err
	}

	return nil
}

func (a *APIConnection) ResolveAddress(ctx context.Context) error {
	if err := a.limiter.Wait(ctx); err != nil {
		return err
	}

	return nil
}

func req() {
	defer log.Println("Done.")

	log.SetOutput(os.Stdout)
	log.SetFlags(log.Ltime | log.LUTC)
	ctx := context.Background()
	var wg sync.WaitGroup

	conn := Open()

	wg.Add(20)
	for i := 0; i < 10; i++ {
		go func() {
			defer wg.Done()
			conn.ReadFile(ctx)
			log.Println("Read file")
		}()

		go func() {
			defer wg.Done()
			conn.ResolveAddress(ctx)
			log.Println("Resolve address")
		}()
	}

	wg.Wait()
}
// output 
16:27:50 Read file
16:27:51 Resolve address
16:27:52 Read file
16:27:53 Resolve address
16:27:54 Read file
16:27:55 Resolve address
16:27:56 Resolve address
16:27:57 Read file
16:27:58 Resolve address
16:27:59 Read file
16:28:00 Resolve address
16:28:01 Resolve address
16:28:02 Read file
16:28:03 Resolve address
16:28:04 Read file
16:28:05 Read file
16:28:06 Resolve address
16:28:07 Read file
16:28:08 Read file
16:28:09 Resolve address
16:28:09 Done.

可以看出,每秒只能访问一次API接口。

组合限速器

当需要从不同的维度对速率进行限制时,可以将多个限速器组合起来。

组合多个限速器时需要将等待时间最长速率最小的)放在限速器队列的最前端。

例如,组合两个限制:

  1. 限速A:每秒 2 个
  2. 限速B:每分钟 10 个

假设不同限速器的令牌是同时减少的,若先减少A的令牌,然后减少B的令牌;当B的令牌用完时,需要6秒恢复一个,但是这6秒内A的令牌在一直恢复,程序可以一直以A的速率执行,此时6秒内可能执行了 12 次,超过了B的限制,此时B的限速已经失效。


type RateLimiter interface {
	Wait(context.Context) error
	Limit() rate.Limit
}

type MultiLimiter struct {
	limiters []RateLimiter
}

func NewMultiLimiter(lims ...RateLimiter) *MultiLimiter {
	// 按照 limit 升序排列, 即按照等待时间降序
	sort.Slice(lims, func(i, j int) bool {
		return lims[i].Limit() < lims[j].Limit()
	})
	return &MultiLimiter{limiters: lims}
}

func (m *MultiLimiter) Wait(ctx context.Context) error {
	for i := range m.limiters {
		if err := m.limiters[i].Wait(ctx); err != nil {
			return err
		}
	}

	return nil
}

func (m *MultiLimiter) Limit() rate.Limit {
	return m.limiters[0].Limit()
}
  • NewMultiLimiter: 将限速器按照速率升序排列(等待时间降序排列)
  • Wait:减少每一个限速器的令牌桶
  • Limit():整体速率取决于最小的那个

func mullim() {
	defer log.Println("Done.")
	log.SetOutput(os.Stdout)
	log.SetFlags(log.LUTC | log.Ltime)

	conn := OpenConn()
	var wg sync.WaitGroup

	ctx := context.Background()
	wg.Add(20)
	for i := 0; i < 10; i++ {
		go func(id int) {
			defer wg.Done()
			if err := conn.ReadFile(ctx); err != nil {
				log.Println(err)
			}
			log.Printf("#%d Read file\n", id)
		}(i)

		go func(id int) {
			defer wg.Done()
			if err := conn.ResolveAddress(ctx); err != nil {
				log.Println(err)
			}
			log.Printf("#%d Resolve address\n", id)
		}(i)
	}

	wg.Wait()
}

type APIConn struct {
	limiter RateLimiter
}

func OpenConn() *APIConn {
	// 2 per sec
	secondLimiter := rate.NewLimiter(Per(2, time.Second), 2)
	// 10 per min
	minuteLimiter := rate.NewLimiter(Per(10, time.Minute), 10)

	return &APIConn{
		limiter: NewMultiLimiter(secondLimiter, minuteLimiter),
	}
}

func (a *APIConn) ReadFile(ctx context.Context) error {
	if err := a.limiter.Wait(ctx); err != nil {
		return err
	}
	return nil
}

func (a *APIConn) ResolveAddress(ctx context.Context) error {
	if err := a.limiter.Wait(ctx); err != nil {
		return err
	}
	return nil
}

func Per(n int, d time.Duration) rate.Limit {
	return rate.Every(d / time.Duration(n))
}
// output
17:44:47 #0 Read file
17:44:47 #3 Resolve address
17:44:48 #1 Read file
17:44:48 #0 Resolve address
17:44:49 #9 Resolve address
17:44:49 #1 Resolve address
17:44:50 #2 Resolve address
17:44:50 #3 Read file
17:44:51 #2 Read file
17:44:51 #6 Resolve address
17:44:53 #4 Read file
17:44:59 #5 Read file
17:45:05 #4 Resolve address
17:45:11 #5 Resolve address
17:45:17 #6 Read file
17:45:23 #8 Read file
17:45:29 #7 Read file
17:45:35 #8 Resolve address
17:45:41 #9 Read file
17:45:47 #7 Resolve address
17:45:47 Done.

44:5144:53只经过了两秒,是因为分钟级的限速器在之前的时间中已经恢复了一个令牌了。

上述的限速器组合在时间维度上,还可以从资源的维度进行组合:


type APIConn1 struct {
	diskLimiter,
	netLimiter,
	apiLimiter RateLimiter
}

func OpenAPIConn1() *APIConn1 {
	// api limiter
	// 2 per sec
	secondLimiter := rate.NewLimiter(Per(2, time.Second), 2)
	// 10 per min
	minuteLimiter := rate.NewLimiter(Per(10, time.Minute), 10)

	// disk limiter
	diskLimiter := rate.NewLimiter(rate.Limit(1), 1)
	// net limiter
	netLimiter := rate.NewLimiter(Per(3, time.Second), 3)

	return &APIConn1{
		diskLimiter: diskLimiter,
		netLimiter:  netLimiter,
		apiLimiter:  NewMultiLimiter(secondLimiter, minuteLimiter),
	}
}

func (a *APIConn1) ReadFile1(ctx context.Context) error {
	if err := NewMultiLimiter(a.apiLimiter, a.diskLimiter).Wait(ctx); err != nil {
		return err
	}
	return nil
}

func (a *APIConn1) ResolveAddress1(ctx context.Context) error {
	if err := NewMultiLimiter(a.apiLimiter, a.netLimiter).Wait(ctx); err != nil {
		return err
	}
	return nil
}

func mullim1() {
	defer log.Println("Done.")
	log.SetOutput(os.Stdout)
	log.SetFlags(log.LUTC | log.Ltime)

	conn := OpenAPIConn1()
	var wg sync.WaitGroup

	ctx := context.Background()
	wg.Add(20)
	for i := 0; i < 10; i++ {
		go func(id int) {
			defer wg.Done()
			if err := conn.ReadFile1(ctx); err != nil {
				log.Println(err)
			}
			log.Printf("#%d Read file\n", id)
		}(i)

		go func(id int) {
			defer wg.Done()
			if err := conn.ResolveAddress1(ctx); err != nil {
				log.Println(err)
			}
			log.Printf("#%d Resolve address\n", id)
		}(i)
	}

	wg.Wait()
}
// output
17:57:14 #0 Resolve address
17:57:14 #0 Read file
17:57:15 #2 Read file
17:57:15 #9 Resolve address
17:57:16 #4 Read file
17:57:16 #4 Resolve address
17:57:17 #5 Read file
17:57:17 #5 Resolve address
17:57:18 #6 Read file
17:57:18 #6 Resolve address
17:57:20 #7 Read file
17:57:26 #7 Resolve address
17:57:32 #8 Read file
17:57:38 #8 Resolve address
17:57:44 #3 Resolve address
17:57:50 #9 Read file
17:57:56 #1 Read file
17:58:02 #2 Resolve address
17:58:08 #1 Resolve address
17:58:14 #3 Read file
17:58:14 Done.

5.6 治愈异常的 goroutine

在长期运行的系统中,经常会有一些长时间运行的 goroutine,可能其中的一些 goroutine 进入了不正常的状态难以恢复,此时就需要一个监控机制,来监控 goroutine 并重启异常的 goroutine。重启的过程就被称为治愈(healing)。

type StartGoroutineFn func(done <-chan struct{}, pulseInterval time.Duration) <-chan struct{}

func NewSteward(timeout time.Duration, startGoroutine StartGoroutineFn) StartGoroutineFn {
	return func(done <-chan struct{}, pulseInterval time.Duration) <-chan struct{} {
		hb := make(chan struct{})
		go func() {
			defer close(hb)

			var wardDone chan struct{}
			var wardHb <-chan struct{}

			startWard := func() { // 启动 goroutine 函数
				wardDone = make(chan struct{})
				wardHb = startGoroutine(or(wardDone, done), timeout/2) // 管理员被关闭时,监控区也关闭
			}

			startWard() // 启动 goroutine

			pulse := time.Tick(pulseInterval)

		monitorLoop:
			for {
				timeoutSignal := time.After(timeout) // 监控超时

				for {
					select {
					case <-pulse: // 管理员心跳
						select {
						case hb <- struct{}{}: // 发送管理员心跳
						default:
						}
					case <-wardHb: // 读取 监控区 心跳
						continue monitorLoop
					case <-timeoutSignal: // 监控超时,重启被监控的 goroutine
						log.Println("steward: ward unhealthy; restarting")
						close(wardDone) // 关闭监控区 goroutine
						startWard()     // 启动
						continue monitorLoop
					case <-done: // 管理员被关闭
						return
					}
				}
			}
		}()
		return hb
	}
}

NewSteward生成新的管理员,并返回StartGoroutineFn表示管理员也是被监控的。

函数流程如下:

  1. 启动监控区 goroutine
  2. 循环监听:
    1. 向外界发送自身的心跳
    2. 监听监控区心跳
    3. 监控超时则重启监控区
    4. 监听是否自己被关闭

测试:


func or(dones ...<-chan struct{}) <-chan struct{} {
	switch len(dones) {
	case 0:
		return nil
	case 1:
		return dones[0]
	}

	orDone := make(chan struct{})
	go func() {
		defer close(orDone)
		select {
		case <-dones[0]:
		case <-dones[1]:
		case <-or(append(dones[2:], orDone)...):
		}
	}()

	return orDone
}

func monitor() {
	log.SetOutput(os.Stdout)
	log.SetFlags(log.Ltime | log.LUTC)

	// 监控区 goroutine
	// 什么也不做,等待被关闭
	doWork := func(done <-chan struct{}, _ time.Duration) <-chan struct{} {
		log.Println("ward: hello, I'm irresponsible...")
		go func() {
			<-done
			log.Println("ward: I'm halting")
		}()
		return nil
	}

	// 管理员函数,设置 4 秒的超时
	doWorkWithSteward := NewSteward(4*time.Second, doWork)

	// 9 秒后关闭管理员和监控区
	done := make(chan struct{})
	time.AfterFunc(9*time.Second, func() {
		log.Println("Halting steward and ward")
		close(done)
	})

	// 监控管理员心跳
	for range doWorkWithSteward(done, 4*time.Second) {
	}

	log.Println("Done")
}
  • doWork:创建一个什么都不做的监控区,等待重启
  • 设置9秒后关闭管理员及其监控区
  • 监听管理员心跳

输出:

18:36:59 ward: hello, I'm irresponsible...
18:37:03 steward: ward unhealthy; restarting
18:37:03 ward: hello, I'm irresponsible...
18:37:03 ward: I'm halting
18:37:07 steward: ward unhealthy; restarting
18:37:07 ward: hello, I'm irresponsible...
18:37:07 ward: I'm halting
18:37:08 Halting steward and ward
18:37:08 Done	

Reference

  1. Go 语言并发之道open in new window
上次编辑于:
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.15.2