5. 大规模并发
5.1 异常传递
出现异常通常表示系统进入了一个无法满足用户操作的状态,此时系统需要传达几个关键的信息:
- 发生了什么 包含对异常的描述;例如:磁盘已满,连接重置等
- 发生在什么时间,什么位置 异常发生的完整调用栈信息以及异常发生的时间
- 对用户友好的信息 对发送给用户的信息进行自定义
- 告诉用户如何获取更多信息 在发给用户的信息中应告知详细的异常信息在哪里可以获取
异常可分为两类:
- Bug:未知的错误,没有预先定义的异常
- 已知异常:已预先定义的异常
func mainFunc() {
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)
err := runJob("1")
if err != nil {
msg := "There was a unexpected issue, please report as a bug" // 未知异常,BUG
var intermediateErr intermediateErr
if errors.As(err, &intermediateErr) {
msg = err.Error() // 已知异常
}
handleErr(1, err, msg)
}
}
func handleErr(key int, err error, msg string) {
log.SetPrefix(fmt.Sprintf("[logID: %d]", key))
log.Printf("%#v", err)
fmt.Printf("[%v] %v", key, msg)
}
type MyError struct {
Inner error // 原始错误
Message string
StackTrace string // 调用栈
Misc map[string]any // 相关信息
}
func (err MyError) Error() string {
return err.Message
}
func wrapError(err error, msg string) MyError {
return MyError{
Inner: err,
Message: msg,
StackTrace: string(debug.Stack()),
Misc: make(map[string]any),
}
}
// 中层组件
type intermediateErr struct {
error
}
func runJob(id string) error {
jobPath := "bad/path"
isExec, err := isGloballyExec(jobPath)
if err != nil {
return intermediateErr{wrapError(err, "bad/path not available")} // 封装底层错误
} else if !isExec {
return intermediateErr{wrapError(nil, "bad/path is not executable")}
}
return exec.Command(jobPath, "--id="+id).Run()
}
// 底层组件
type lowLevelErr struct {
error
}
func isGloballyExec(path string) (bool, error) {
info, err := os.Stat(path)
if err != nil {
return false, lowLevelErr{wrapError(err, err.Error())} // 封装系统错误
}
return info.Mode().Perm()&0100 == 0100, nil
}
// output
[logID: 1]13:15:12 ch05.intermediateErr{error:ch05.MyError{Inner:ch05.lowLevelErr{error:ch05.MyError{...
[1] bad/path not available
由输出可以看到,标准输出中只显示了异常的信息和LogID,在日志中则记录了更为详细的信息。
5.2 超时和取消
系统需要超时的情景:
- 系统饱和 若系统已经饱和,则超出的请求应返回超时,而不是长时间的等待
- 陈旧的数据 数据通常有一个窗口期,若数据的处理已经过期,那么请求应该超时
- 防止死锁 为了防止死锁,在所有的并发处理中添加超时,这样即使发生死锁也会在超时时间到达时解除阻塞状态
并发进程需要取消的原因:
- 超时 超时是隐式的取消
- 用户干预 用户在使用并发程序时,可以取消已经开始的操作
- 复制请求 发送同一份数据到多个请求,当一个请求返回时,取消其余的请求
5.3 心跳
心跳是并发进程向外界发送信号的一种方式。
心跳可以使得外界能够了解系统的运行情况,并在系统出现异常时进行检测。心跳通常有两类:
- 定期发出的心跳:每隔一段时间发出的心跳
- 工作单元开始时发出的心跳
定期心跳
func heartbeat1() {
doWork := func(done <-chan struct{}, pulseInterval time.Duration) (<-chan struct{}, <-chan time.Time) {
heartbeat := make(chan struct{})
resCh := make(chan time.Time)
go func() {
defer close(resCh)
defer close(heartbeat)
pulse := time.Tick(pulseInterval)
workGen := time.Tick(2 * pulseInterval)
sendPulse := func() {
select {
case heartbeat <- struct{}{}:
default: // 防止阻塞,当无法发出心跳时
}
}
sendResult := func(r time.Time) {
for {
select {
case <-done:
return
case <-pulse:
sendPulse()
case resCh <- r:
return
}
}
}
for {
select {
case <-done:
return
case <-pulse:
sendPulse()
case r := <-workGen:
sendResult(r)
}
}
}()
return heartbeat, resCh
}
done := make(chan struct{})
time.AfterFunc(6*time.Second, func() {
close(done)
})
timeout := 2 * time.Second
heartbeat, resCh := doWork(done, timeout/2)
for {
select {
case _, ok := <-heartbeat:
if !ok {
return
}
fmt.Println("pulse")
case r, ok := <-resCh:
if !ok {
return
}
fmt.Println(r)
case <-time.After(timeout):
return
}
}
}
// output
pulse
pulse
3
pulse
pulse
5
pulse
pulse
可以看出每个结果之间有两次心跳。
定期心跳 检测 goroutine 的运行
定期心跳的一个作用是判断一个 goroutine 是否正常运行,将上述的doWork
函数进行修改:
doWork := func(done <-chan struct{}, pulseInterval time.Duration) (<-chan struct{}, <-chan time.Time) {
heartbeat := make(chan struct{})
resCh := make(chan time.Time)
go func() {
// 不关闭 channel
// defer close(resCh)
// defer close(heartbeat)
...
// 两次循环结束
for i := 0; i < 2; i++ {
select {
case <-done:
return
case <-pulse:
sendPulse()
case r := <-workGen:
sendResult(r)
}
}()
...
}
...
for {
select {
case _, ok := <-heartbeat:
if !ok {
return
}
fmt.Println("pulse")
case r, ok := <-resCh:
if !ok {
return
}
fmt.Println(r.Second())
case <-time.After(timeout):
// 长时间未收到心跳
fmt.Println("goroutine is no healthy")
return
}
}
...
// output
pulse
pulse
goroutine is no healthy
工作 goroutine 只进行了两次发送循环就结束了,此时检测 goroutine 可以根据心跳情况判断是否有在正常工作。
工作单元开始时的心跳
func heartbeat3() {
doWork := func(done <-chan struct{}) (<-chan struct{}, <-chan int) {
heartbeat := make(chan struct{}, 1) // 缓冲大小为 1
resCh := make(chan int)
go func() {
defer close(resCh)
defer close(heartbeat)
for i := 0; i < 5; i++ {
select {
case heartbeat <- struct{}{}: // 工作开始前的 心跳
default:
}
select {
case <-done:
return
case resCh <- rand.Intn(20):
}
}
}()
return heartbeat, resCh
}
done := make(chan struct{})
defer close(done)
hb, res := doWork(done)
for {
select {
case _, ok := <-hb:
if !ok {
return
}
fmt.Println("pulse")
case r, ok := <-res:
if !ok {
return
}
fmt.Println(r)
}
}
}
// output
pulse
11
pulse
17
pulse
15
pulse
5
pulse
3
heartbeat := make(chan struct{}, 1)
:心跳通道的缓冲大小为1,确保即使消息没有被接收也能至少发出一个心跳
由结果可以看出,每次发送结果前会先发送一次心跳。
使用心跳进行测试
对于一个并发的工作,在数据处理开始前可能会出现某种延迟,给测试带来很大的不确定性:
func doWork1(done <-chan struct{}, nums []int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
// 模拟运行延迟
time.Sleep(2 * time.Second)
for _, v := range nums {
select {
case <-done:
return
case out <- v:
}
}
}()
return out
}
测试函数:
func Test_doWork1(t *testing.T) {
done := make(chan struct{})
defer close(done)
nums := []int{1, 2, 3, 4}
res := doWork1(done, nums)
for _, v := range nums {
select {
case n, ok := <-res:
if !ok {
return
}
if n != v {
t.Errorf("get: %v, want: %v", n, v)
}
case <-time.After(time.Second):
t.Fatal("test time out")
}
}
}
// output
=== RUN Test_doWork1
heartbeats_test.go:37: test time out
--- FAIL: Test_doWork1 (1.00s)
测试中加入了超时,防止进入死锁状态。
因为代码中延迟是固定的,所以测试一定会超时。但是在实际的运行过程中,工作延迟无法确定,那么测试时无法给出一个准确的超时时间,给测试带来了很多的不确定性。
通过工作心跳可以在测试时避免引入超时:
func doWork2(done <-chan struct{}, nums []int) (<-chan struct{}, <-chan int) {
out := make(chan int)
hb := make(chan struct{}, 1)
go func() {
defer close(out)
defer close(hb)
// 模拟运行延迟
time.Sleep(2 * time.Second)
for _, v := range nums {
select {
case hb <- struct{}{}:
default:
}
select {
case <-done:
return
case out <- v:
}
}
}()
return hb, out
}
测试时,在检测到工作心跳开始后才进行测试:
func Test_doWork2(t *testing.T) {
done := make(chan struct{})
defer close(done)
nums := []int{1, 2, 3, 4}
hb, res := doWork2(done, nums)
<-hb
for _, v := range nums {
select {
case n, ok := <-res:
if !ok {
return
}
if n != v {
t.Errorf("get: %v, want: %v", n, v)
}
}
}
}
使用工作心跳进行测试,当工作延迟非常大的时候,此时可以使用定期心跳进行测试,并可以控制测试时的超时时间:
func doWork3(done <-chan struct{}, interval time.Duration, nums []int) (<-chan struct{}, <-chan int) {
out := make(chan int)
hb := make(chan struct{})
go func() {
defer close(out)
defer close(hb)
// 模拟延迟
time.Sleep(2 * time.Second)
pulse := time.Tick(interval)
numLoop:
for _, v := range nums {
for {
select {
case <-done:
return
case <-pulse:
select {
case hb <- struct{}{}:
default: // 心跳信号可跳过
}
case out <- v:
continue numLoop // 继续外层循环
}
}
}
}()
return hb, out
}
func Test_doWork3(t *testing.T) {
done := make(chan struct{})
defer close(done)
nums := []int{1, 2, 3, 4}
timeout := 2 * time.Second
hb, res := doWork3(done, timeout/2, nums)
<-hb // 进入处理循环
for _, v := range nums {
select {
case n, ok := <-res:
if !ok {
return
}
if n != v {
t.Errorf("get: %v, want: %v", n, v)
}
case <-hb:
case <-time.After(timeout):
t.Fatal("text time out")
}
}
}
5.4 复制请求
当接收到一个工作请求时,可以启用多个并发进程进行处理,在最快的进程返回之后,取消其余的进程。
func rc() {
doWork := func(done <-chan struct{}, id int, wg *sync.WaitGroup, out chan<- int) {
defer wg.Done()
start := time.Now()
// 模拟随机延迟
procTime := time.Duration(rand.Intn(10)+1) * time.Second
select {
case <-done:
case <-time.After(procTime):
}
select {
case <-done:
case out <- id:
}
t := time.Since(start)
if t < procTime { // 被取消时,预计处理时间
t = procTime
}
fmt.Printf("%d took %v\n", id, t)
}
done := make(chan struct{})
in := make(chan int)
var wg sync.WaitGroup
wg.Add(10)
for i := 1; i <= 10; i++ {
go doWork(done, i, &wg, in)
}
res := <-in
close(done)
wg.Wait()
fmt.Println("Receive result from #", res)
}
// output
1 took 3s
9 took 3s
8 took 1.0049034s
4 took 2s
7 took 3s
5 took 6s
6 took 7s
3 took 2s
10 took 10s
2 took 6s
Receive result from # 8
5.5 速率限制
速率限制,是指限制某种资源在某段时间内的访问次数。
速率限制的好处:
- 防止恶意攻击,如:DDoS
- 防止因大量突发请求导致系统崩溃
- 可以为系统不同的收费服务,分配不同的访问速率
- 限制付费服务的速率,防止产生高额的账单
- ...
- ....
令牌桶算法
大多数限速使用令牌桶算法,相关概念:
- 令牌(token):用户访问资源需要令牌,没有令牌的请求将被拒绝
- 桶(bucket):存放令牌
- 桶深度(depth):桶能够存储的令牌的最大数量
- 添加速率(rate):向桶中添加新令牌的速率
假设系统有两个API:
type APIConnection struct{}
func Open() *APIConnection {
return &APIConnection{}
}
func (a *APIConnection) ReadFile(ctx context.Context) error {
// 假设访问成功
return nil
}
func (a *APIConnection) ResolveAddress(ctx context.Context) error {
return nil
}
启用20个 goroutine 分别读取文件和解析地址各10次:
func requestWithoutLimit() {
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)
conn := Open()
var wg sync.WaitGroup
ctx := context.Background()
wg.Add(20)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
if err := conn.ReadFile(ctx); err != nil {
log.Println("Can not read file: ", err)
}
log.Println("Read file")
}()
}
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
if err := conn.ResolveAddress(ctx); err != nil {
log.Println("Can not resolve address: ", err)
}
log.Println("Resolve address")
}()
}
wg.Wait()
}
// output
15:58:21 Resolve address
15:58:21 Read file
15:58:21 Read file
15:58:21 Resolve address
15:58:21 Read file
15:58:21 Read file
15:58:21 Read file
15:58:21 Read file
15:58:21 Read file
15:58:21 Read file
15:58:21 Read file
15:58:21 Read file
15:58:21 Resolve address
15:58:21 Resolve address
15:58:21 Resolve address
15:58:21 Resolve address
15:58:21 Resolve address
15:58:21 Resolve address
15:58:21 Resolve address
15:58:21 Resolve address
可以看出,所有的请求几乎时同一时间完成的。
golang.org/x/time/rate
golang.org/x/time/rate
实现令牌桶限速器,其中:
type Limit float64
func NewLimiter(r Limit, b int) *Limiter
Limit
:表示时间发生的频率NewLimiter
:传入时间发生的频率r
,和令牌桶深度b
,返回限速器*Limiter
以下的辅助函数可用于根据时间间隔生成速率:
func Every(interval time.Duration) Limit
Wait
和WaitN
用于阻塞请求:
func (lim *Limiter) Wait(ctx context.Context)
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
Wait
:WaitN(ctx, 1)
的缩写WaitN
:会阻塞,直到限速器允许发生 n 次事件;以下情况会返回错误:- 当 n 超过 limit
- context 超时
- context 被取消
修改之前的API接口,引入限速器:
type APIConnection struct {
limiter *rate.Limiter
}
func Open() *APIConnection {
return &APIConnection{
limiter: rate.NewLimiter(rate.Limit(1), 1),
}
}
func (a *APIConnection) ReadFile(ctx context.Context) error {
if err := a.limiter.Wait(ctx); err != nil {
return err
}
return nil
}
func (a *APIConnection) ResolveAddress(ctx context.Context) error {
if err := a.limiter.Wait(ctx); err != nil {
return err
}
return nil
}
func req() {
defer log.Println("Done.")
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)
ctx := context.Background()
var wg sync.WaitGroup
conn := Open()
wg.Add(20)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
conn.ReadFile(ctx)
log.Println("Read file")
}()
go func() {
defer wg.Done()
conn.ResolveAddress(ctx)
log.Println("Resolve address")
}()
}
wg.Wait()
}
// output
16:27:50 Read file
16:27:51 Resolve address
16:27:52 Read file
16:27:53 Resolve address
16:27:54 Read file
16:27:55 Resolve address
16:27:56 Resolve address
16:27:57 Read file
16:27:58 Resolve address
16:27:59 Read file
16:28:00 Resolve address
16:28:01 Resolve address
16:28:02 Read file
16:28:03 Resolve address
16:28:04 Read file
16:28:05 Read file
16:28:06 Resolve address
16:28:07 Read file
16:28:08 Read file
16:28:09 Resolve address
16:28:09 Done.
可以看出,每秒只能访问一次API接口。
组合限速器
当需要从不同的维度对速率进行限制时,可以将多个限速器组合起来。
组合多个限速器时需要将等待时间最长(速率最小的)放在限速器队列的最前端。
例如,组合两个限制:
- 限速A:每秒 2 个
- 限速B:每分钟 10 个
假设不同限速器的令牌是同时减少的,若先减少A的令牌,然后减少B的令牌;当B的令牌用完时,需要6秒恢复一个,但是这6秒内A的令牌在一直恢复,程序可以一直以A的速率执行,此时6秒内可能执行了 12 次,超过了B的限制,此时B的限速已经失效。
type RateLimiter interface {
Wait(context.Context) error
Limit() rate.Limit
}
type MultiLimiter struct {
limiters []RateLimiter
}
func NewMultiLimiter(lims ...RateLimiter) *MultiLimiter {
// 按照 limit 升序排列, 即按照等待时间降序
sort.Slice(lims, func(i, j int) bool {
return lims[i].Limit() < lims[j].Limit()
})
return &MultiLimiter{limiters: lims}
}
func (m *MultiLimiter) Wait(ctx context.Context) error {
for i := range m.limiters {
if err := m.limiters[i].Wait(ctx); err != nil {
return err
}
}
return nil
}
func (m *MultiLimiter) Limit() rate.Limit {
return m.limiters[0].Limit()
}
NewMultiLimiter
: 将限速器按照速率升序排列(等待时间降序排列)Wait
:减少每一个限速器的令牌桶Limit()
:整体速率取决于最小的那个
func mullim() {
defer log.Println("Done.")
log.SetOutput(os.Stdout)
log.SetFlags(log.LUTC | log.Ltime)
conn := OpenConn()
var wg sync.WaitGroup
ctx := context.Background()
wg.Add(20)
for i := 0; i < 10; i++ {
go func(id int) {
defer wg.Done()
if err := conn.ReadFile(ctx); err != nil {
log.Println(err)
}
log.Printf("#%d Read file\n", id)
}(i)
go func(id int) {
defer wg.Done()
if err := conn.ResolveAddress(ctx); err != nil {
log.Println(err)
}
log.Printf("#%d Resolve address\n", id)
}(i)
}
wg.Wait()
}
type APIConn struct {
limiter RateLimiter
}
func OpenConn() *APIConn {
// 2 per sec
secondLimiter := rate.NewLimiter(Per(2, time.Second), 2)
// 10 per min
minuteLimiter := rate.NewLimiter(Per(10, time.Minute), 10)
return &APIConn{
limiter: NewMultiLimiter(secondLimiter, minuteLimiter),
}
}
func (a *APIConn) ReadFile(ctx context.Context) error {
if err := a.limiter.Wait(ctx); err != nil {
return err
}
return nil
}
func (a *APIConn) ResolveAddress(ctx context.Context) error {
if err := a.limiter.Wait(ctx); err != nil {
return err
}
return nil
}
func Per(n int, d time.Duration) rate.Limit {
return rate.Every(d / time.Duration(n))
}
// output
17:44:47 #0 Read file
17:44:47 #3 Resolve address
17:44:48 #1 Read file
17:44:48 #0 Resolve address
17:44:49 #9 Resolve address
17:44:49 #1 Resolve address
17:44:50 #2 Resolve address
17:44:50 #3 Read file
17:44:51 #2 Read file
17:44:51 #6 Resolve address
17:44:53 #4 Read file
17:44:59 #5 Read file
17:45:05 #4 Resolve address
17:45:11 #5 Resolve address
17:45:17 #6 Read file
17:45:23 #8 Read file
17:45:29 #7 Read file
17:45:35 #8 Resolve address
17:45:41 #9 Read file
17:45:47 #7 Resolve address
17:45:47 Done.
44:51
到44:53
只经过了两秒,是因为分钟级的限速器在之前的时间中已经恢复了一个令牌了。
上述的限速器组合在时间维度上,还可以从资源的维度进行组合:
type APIConn1 struct {
diskLimiter,
netLimiter,
apiLimiter RateLimiter
}
func OpenAPIConn1() *APIConn1 {
// api limiter
// 2 per sec
secondLimiter := rate.NewLimiter(Per(2, time.Second), 2)
// 10 per min
minuteLimiter := rate.NewLimiter(Per(10, time.Minute), 10)
// disk limiter
diskLimiter := rate.NewLimiter(rate.Limit(1), 1)
// net limiter
netLimiter := rate.NewLimiter(Per(3, time.Second), 3)
return &APIConn1{
diskLimiter: diskLimiter,
netLimiter: netLimiter,
apiLimiter: NewMultiLimiter(secondLimiter, minuteLimiter),
}
}
func (a *APIConn1) ReadFile1(ctx context.Context) error {
if err := NewMultiLimiter(a.apiLimiter, a.diskLimiter).Wait(ctx); err != nil {
return err
}
return nil
}
func (a *APIConn1) ResolveAddress1(ctx context.Context) error {
if err := NewMultiLimiter(a.apiLimiter, a.netLimiter).Wait(ctx); err != nil {
return err
}
return nil
}
func mullim1() {
defer log.Println("Done.")
log.SetOutput(os.Stdout)
log.SetFlags(log.LUTC | log.Ltime)
conn := OpenAPIConn1()
var wg sync.WaitGroup
ctx := context.Background()
wg.Add(20)
for i := 0; i < 10; i++ {
go func(id int) {
defer wg.Done()
if err := conn.ReadFile1(ctx); err != nil {
log.Println(err)
}
log.Printf("#%d Read file\n", id)
}(i)
go func(id int) {
defer wg.Done()
if err := conn.ResolveAddress1(ctx); err != nil {
log.Println(err)
}
log.Printf("#%d Resolve address\n", id)
}(i)
}
wg.Wait()
}
// output
17:57:14 #0 Resolve address
17:57:14 #0 Read file
17:57:15 #2 Read file
17:57:15 #9 Resolve address
17:57:16 #4 Read file
17:57:16 #4 Resolve address
17:57:17 #5 Read file
17:57:17 #5 Resolve address
17:57:18 #6 Read file
17:57:18 #6 Resolve address
17:57:20 #7 Read file
17:57:26 #7 Resolve address
17:57:32 #8 Read file
17:57:38 #8 Resolve address
17:57:44 #3 Resolve address
17:57:50 #9 Read file
17:57:56 #1 Read file
17:58:02 #2 Resolve address
17:58:08 #1 Resolve address
17:58:14 #3 Read file
17:58:14 Done.
5.6 治愈异常的 goroutine
在长期运行的系统中,经常会有一些长时间运行的 goroutine,可能其中的一些 goroutine 进入了不正常的状态难以恢复,此时就需要一个监控机制,来监控 goroutine 并重启异常的 goroutine。重启的过程就被称为治愈(healing)。
type StartGoroutineFn func(done <-chan struct{}, pulseInterval time.Duration) <-chan struct{}
func NewSteward(timeout time.Duration, startGoroutine StartGoroutineFn) StartGoroutineFn {
return func(done <-chan struct{}, pulseInterval time.Duration) <-chan struct{} {
hb := make(chan struct{})
go func() {
defer close(hb)
var wardDone chan struct{}
var wardHb <-chan struct{}
startWard := func() { // 启动 goroutine 函数
wardDone = make(chan struct{})
wardHb = startGoroutine(or(wardDone, done), timeout/2) // 管理员被关闭时,监控区也关闭
}
startWard() // 启动 goroutine
pulse := time.Tick(pulseInterval)
monitorLoop:
for {
timeoutSignal := time.After(timeout) // 监控超时
for {
select {
case <-pulse: // 管理员心跳
select {
case hb <- struct{}{}: // 发送管理员心跳
default:
}
case <-wardHb: // 读取 监控区 心跳
continue monitorLoop
case <-timeoutSignal: // 监控超时,重启被监控的 goroutine
log.Println("steward: ward unhealthy; restarting")
close(wardDone) // 关闭监控区 goroutine
startWard() // 启动
continue monitorLoop
case <-done: // 管理员被关闭
return
}
}
}
}()
return hb
}
}
NewSteward
生成新的管理员,并返回StartGoroutineFn
表示管理员也是被监控的。
函数流程如下:
- 启动监控区 goroutine
- 循环监听:
- 向外界发送自身的心跳
- 监听监控区心跳
- 监控超时则重启监控区
- 监听是否自己被关闭
测试:
func or(dones ...<-chan struct{}) <-chan struct{} {
switch len(dones) {
case 0:
return nil
case 1:
return dones[0]
}
orDone := make(chan struct{})
go func() {
defer close(orDone)
select {
case <-dones[0]:
case <-dones[1]:
case <-or(append(dones[2:], orDone)...):
}
}()
return orDone
}
func monitor() {
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)
// 监控区 goroutine
// 什么也不做,等待被关闭
doWork := func(done <-chan struct{}, _ time.Duration) <-chan struct{} {
log.Println("ward: hello, I'm irresponsible...")
go func() {
<-done
log.Println("ward: I'm halting")
}()
return nil
}
// 管理员函数,设置 4 秒的超时
doWorkWithSteward := NewSteward(4*time.Second, doWork)
// 9 秒后关闭管理员和监控区
done := make(chan struct{})
time.AfterFunc(9*time.Second, func() {
log.Println("Halting steward and ward")
close(done)
})
// 监控管理员心跳
for range doWorkWithSteward(done, 4*time.Second) {
}
log.Println("Done")
}
doWork
:创建一个什么都不做的监控区,等待重启- 设置9秒后关闭管理员及其监控区
- 监听管理员心跳
输出:
18:36:59 ward: hello, I'm irresponsible...
18:37:03 steward: ward unhealthy; restarting
18:37:03 ward: hello, I'm irresponsible...
18:37:03 ward: I'm halting
18:37:07 steward: ward unhealthy; restarting
18:37:07 ward: hello, I'm irresponsible...
18:37:07 ward: I'm halting
18:37:08 Halting steward and ward
18:37:08 Done