4. 超时处理
...大约 4 分钟
1. 超时
超时处理是 RPC 框架一个比较基本的能力,如果缺少超时处理机制,无论是服务端还是客户端都容易因为网络或其他错误导致挂死,资源耗尽,这些问题的出现大大地降低了服务的可用性。因此,需要在 RPC 框架中加入超时处理的能力。
纵观整个远程调用的过程,需要客户端处理超时的地方有:
- 与服务端建立连接,导致的超时
- 发送请求到服务端,写报文导致的超时
- 等待服务端处理时,等待处理导致的超时(比如服务端已挂死,迟迟不响应)
- 从服务端接收响应时,读报文导致的超时
需要服务端处理超时的地方有:
- 读取客户端请求报文时,读报文导致的超时
- 发送响应报文时,写报文导致的超时
- 调用映射服务的方法时,处理报文导致的超时
GeeRPC 在 3 个地方添加了超时处理机制。分别是:
- 客户端创建连接时
- 客户端
Client.Call()
整个过程导致的超时(包含发送报文,等待处理,接收报文所有阶段) - 服务端处理报文,即
Server.handleRequest
超时
1.1 创建连接超时
为了实现简单,将超时设置放在Option
结构体中。
geerpc/server.go
type Option struct {
MagicNumber int // MagicNumber marks this is a geerpc request
CodecType codec.Type // CodecType
ConnectTimeout time.Duration // 0 means no limit
HandleTimeout time.Duration
}
var DefaultOption = &Option{
MagicNumber: MagicNumber,
CodecType: codec.GobType,
ConnectTimeout: 10 * time.Second,
}
ConnectTimeout
:默认10s,0s表示没有限制
geerpc/client.go
type clientResult struct {
client *Client
err error
}
type newClientFunc func(conn net.Conn, opt *Option) (*Client, error)
func dialTimeout(f newClientFunc, network, address string, opts ...*Option) (*Client, error) {
opt, err := parseOptions(opts...)
if err != nil {
return nil, err
}
conn, err := net.DialTimeout(network, address, opt.ConnectTimeout)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
_ = conn.Close()
}
}()
ch := make(chan clientResult)
go func() {
client, err := f(conn, opt)
ch <- clientResult{client: client, err: err}
}()
if opt.ConnectTimeout == 0 {
res := <-ch
return res.client, res.err
}
select {
case <-time.After(opt.ConnectTimeout):
return nil, fmt.Errorf("rpc client: connect timeout: expect within %s", opt.ConnectTimeout)
case res := <-ch:
return res.client, res.err
}
}
// Dial connects to an RPC server at the specified network address
func Dial(network, address string, opts ...*Option) (*Client, error) {
return dialTimeout(NewClient, network, address, opts...)
}
两个位置引入超时:
net.Dialimeout
:建立连接超时,则返回错误- 启用子协程执行
NewClient
,若超时则返回错误
1.2 Client.Call 超时
使用context
包处理,将超时控制给用户
func (client *Client) Call(ctx context.Context, serviceMethod string, args, reply any) error {
call := client.Go(serviceMethod, args, reply, make(chan *Call, 1))
select {
case <-ctx.Done():
client.removeCall(call.Seq)
return errors.New("rpc client: call failed:" + ctx.Err().Error())
case c := <-call.Done:
return c.Error
}
}
1.3 服务端处理超时
geerpc/server.go
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) {
defer wg.Done()
called := make(chan struct{})
sent := make(chan struct{})
go func() {
err := req.svc.call(req.mtype, req.argv, req.replyv)
called <- struct{}{}
if err != nil {
req.h.Error = err.Error()
server.sendResponse(cc, req.h, invalidRequest, sending)
sent <- struct{}{}
return
}
server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
sent <- struct{}{}
}()
if timeout == 0 {
<-called
<-sent
return
}
select {
case <-time.After(timeout):
req.h.Error = fmt.Sprintf("rpc server: request handle timeout: exepct within %s", timeout)
server.sendResponse(cc, req.h, invalidRequest, sending)
case <-called:
<-sent
}
}
服务端处理分为两个阶段:
- 方法调用,若调用完成则向
called
通道发送信号 - 写入响应,若写入完成则向
sent
发送信号
若处理等两个阶段超时,则向客户端告知处理失败。
2. 单元测试
2.1 连接超时
func TestClientDialtimeout(t *testing.T) {
t.Parallel()
lis, _ := net.Listen("tcp", ":0")
f := func(conn net.Conn, opt *Option) (*Client, error) {
_ = conn.Close()
time.Sleep(2 * time.Second)
return nil, nil
}
t.Run("timeout", func(t *testing.T) {
_, err := dialTimeout(f, "tcp", lis.Addr().String(), &Option{ConnectTimeout: time.Second})
_assert(err != nil && strings.Contains(err.Error(), "connect timeout"), "expect a timeout error")
})
t.Run("0", func(t *testing.T) {
_, err := dialTimeout(f, "tcp", lis.Addr().String(), &Option{ConnectTimeout: 0})
_assert(err == nil, "0 means no limit")
})
}
t.Parallel
:并行测试,同时测试多个测试用例
2.2 服务端处理超时
type Bar int
func (b Bar) Timeout(argv int, reply *int) error {
time.Sleep(2 * time.Second)
return nil
}
func startServer(addr chan string) {
var b Bar
_ = Register(&b)
lis, _ := net.Listen("tcp", ":0")
addr <- lis.Addr().String()
Accept(lis)
}
func TestClient_Call(t *testing.T) {
t.Parallel()
addrCh := make(chan string)
go startServer(addrCh)
addr := <-addrCh
t.Run("client timeout", func(t *testing.T) {
client, _ := Dial("tcp", addr)
ctx, _ := context.WithTimeout(context.Background(), time.Second)
var reply int
err := client.Call(ctx, "Bar.Timeout", 1, reply)
_assert(err != nil && strings.Contains(err.Error(), ctx.Err().Error()), "expect a timeout error")
})
t.Run("server handle timeout", func(t *testing.T) {
client, _ := Dial("tcp", addr, &Option{
HandleTimeout: time.Second,
})
var reply int
err := client.Call(context.Background(), "Bar.Timeout", 1, &reply)
_assert(err != nil && strings.Contains(err.Error(), "handle timeout"), "expect a timeout error")
})
}
两种场景:
- 客户端通过
Context
设置调用超时 - 客户端通过发送
Option
通知服务端处理超时时间
Reference
Powered by Waline v2.15.2