2. 高性能客户端
...大约 4 分钟
1. Call 结构体设计
对于net/rpc
来说,一个函数需要能够被远程调用,需要满足如下五个条件:
- the method’s type is exported.
- the method is exported.
- the method has two arguments, both exported (or builtin) types.
- the method’s second argument is a pointer.
- the method has return type error.
func (t *T) MethodName(argType T1, replyType *T2) error
geerpc/client.go
type Call struct {
Seq uint64
ServiceMethod string // format "<service>.<method>"
Args any // arguments to the func
Reply any // reply from the func
Error error // if error occurs, it will be set
Done chan *Call // Strobes when call is complete.
}
func (call *Call) done() {
call.Done <- call
}
支持异步调用,在调用结束时使用call.Done
通知。
2. Client 实现
type Client struct {
cc codec.Codec
opt *Option
sending sync.Mutex // protect following
header codec.Header
mu sync.Mutex // protect following
seq uint64
pending map[uint64]*Call
closing bool // user has called Close
shutdown bool // server told to stop
}
var _ io.Closer = (*Client)(nil)
var ErrShutdown = errors.New("connection is shut down")
// Close the connection
func (client *Client) Close() error {
client.mu.Lock()
defer client.mu.Unlock()
if client.closing {
return ErrShutdown
}
client.closing = true
return client.cc.Close()
}
// IsAvailable return true if the client does work
func (client *Client) IsAvailable() bool {
client.mu.Lock()
defer client.mu.Unlock()
return !client.shutdown && !client.closing
}
Client
:
cc
:消息编解码器sending
:互斥锁,保证并发时报文不会被干扰header
:请求头seq
:请求序列号pending
:存储未完成的请求,key 为请求的编号,value 为 Callclosing
:客户端不可用,并且是由客户端关闭shutdown
:客户端不可用,并且是由服务端关闭
func (client *Client) registerCall(call *Call) (uint64, error) {
client.mu.Lock()
defer client.mu.Unlock()
if client.closing || client.shutdown {
return 0, ErrShutdown
}
call.Seq = client.seq
client.pending[call.Seq] = call
client.seq++
return call.Seq, nil
}
func (client *Client) removeCall(seq uint64) *Call {
client.mu.Lock()
defer client.mu.Unlock()
call := client.pending[seq]
delete(client.pending, seq)
return call
}
func (client *Client) terminateCalls(err error) {
client.sending.Lock()
defer client.sending.Unlock()
client.mu.Lock()
defer client.mu.Unlock()
client.shutdown = true
for _, call := range client.pending {
call.Error = err
call.done()
}
}
regiterCall
:注册 Call, 将 Call 放入client.pending
中,并更新序列号removeCall
:移除并返回被移除的 CallterminatesCalls
:发生错误时,终止等待中 Call 的调用,并将错误通知调用方
func (client *Client) receive() {
var err error
for err == nil {
var h codec.Header
if err = client.cc.ReadHeader(&h); err != nil {
break
}
call := client.removeCall(h.Seq)
switch {
case call == nil:
// write partially failed or call already removed
err = client.cc.ReadBody(nil)
case h.Error != "":
call.Error = fmt.Errorf(h.Error)
err = client.cc.ReadBody(nil)
call.done()
default:
err = client.cc.ReadBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
// error occurs, terminates all pending calls
client.terminateCalls(err)
}
客户端接收的响应有三种情况:
- Call 不存在,可能请求没有发送完整,或者被取消,服务端依然进行了处理
- Call 存在,服务端处理出错,获取
h.Error
- Call 存在,且服务端正常处理,读取响应数据
func NewClient(conn net.Conn, opt *Option) (*Client, error) {
f := codec.NewCodecFuncMap[opt.CodecType]
if f == nil {
err := fmt.Errorf("invalid codec type %s", opt.CodecType)
log.Println("rpc client: codec error:", err)
return nil, err
}
// send option to server
if err := json.NewEncoder(conn).Encode(opt); err != nil {
log.Println("rpc client: options error:", err)
_ = conn.Close()
return nil, err
}
return newClientCodec(f(conn), opt), nil
}
func newClientCodec(cc codec.Codec, opt *Option) *Client {
client := &Client{
seq: 1, // seq start with 1, 0 means invalid call
cc: cc,
opt: opt,
pending: make(map[uint64]*Call),
}
go client.receive()
return client
}
- 向服务端发送
Option
- 启用新协程接收数据
func parseOptions(opts ...*Option) (*Option, error) {
// if opts is empty or parameter is nil
if len(opts) == 0 || opts[0] == nil {
return DefaultOption, nil
}
if len(opts) != 1 {
return nil, errors.New("only one option allowed")
}
opt := opts[0]
opt.MagicNumber = DefaultOption.MagicNumber
if opt.CodecType == "" {
opt.CodecType = DefaultOption.CodecType
}
return opt, nil
}
// Dial connects to an RPC server at the specified network address
func Dial(network, address string, opts ...*Option) (*Client, error) {
opt, err := parseOptions(opts...)
if err != nil {
return nil, err
}
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
client, err := NewClient(conn, opt)
// close the connection if client is nil
defer func() {
if client == nil {
_ = conn.Close()
}
}()
return client, err
}
Dial
:建立连接并返回 Client 实例
func (client *Client) Go(serviceMethod string, args, reply any, done chan *Call) *Call {
if done == nil {
done = make(chan *Call, 10)
} else if cap(done) == 0 {
log.Panic("rpc client: done channel is unbuffered")
}
call := &Call{
ServiceMethod: serviceMethod,
Args: args,
Reply: reply,
Done: done,
}
client.send(call)
return call
}
func (client *Client) Call(serviceMethod string, args, reply any) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
Go
:异步接口,返回 Call 实例Call
:同步接口,调用Go
后阻塞等待处理完毕并返回
3. Demo
package main
import (
"fmt"
"geerpc"
"log"
"net"
"sync"
)
func startServer(addr chan string) {
// pick free port
lis, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatal("network error:", err)
}
log.Println("start rpc server on", lis.Addr())
addr <- lis.Addr().String()
geerpc.Accept(lis)
}
func main() {
log.SetFlags(0)
addr := make(chan string)
go startServer(addr)
client, _ := geerpc.Dial("tcp", <-addr)
defer func() {
_ = client.Close()
}()
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
args := fmt.Sprintf("geerpc req %d", i)
var reply string
if err := client.Call("Foo.Sum", args, &reply); err != nil {
log.Fatal("call Foo.Sum error:", err)
}
log.Println("reply:", reply)
}(i)
}
wg.Wait()
}
Reference
Powered by Waline v2.15.2