1. 服务端和消息编码
...大约 5 分钟
1. 消息的序列化和反序列化
典型的 RPC 调用如下:
err = client.Call("Arith.Multiply", args, &reply)
Arith
:服务名Multiply
:方法名args
:参数reply
,err
:返回值
可以将消息抽象为两个部分:
- Body:请求参数和返回值
- Header:请求的相关信息,返回的错误等
1.1 Header 结构
codec/codec.go
:
type Header struct {
ServiceMethod string // format "Service.Method"
Seq uint64 // sequence number of request
Error string
}
ServiceMethod
:请求的服务名和方法名Seq
:序列号,标识某个请求Error
:错误信息
1.2 Codec 接口
codec/codec.go
:
type Codec interface {
io.Closer
ReadHeader(*Header) error
ReadBody(any) error
Write(*Header, any) error
}
type NewCodecFunc func(closer io.ReadWriteCloser) Codec
type Type string
const (
GobType Type = "application/gob"
JsonType Type = "application/json"
)
var NewCodecFuncMap map[Type]NewCodecFunc
func init() {
NewCodecFuncMap = make(map[Type]NewCodecFunc)
NewCodecFuncMap[GobType] = NewGobCodec
}
NewCodecFuncMap
通过不同的数据类型返回不同Codec
构造函数。
1.3 GobCodec
codec/gob.go
type GobCodec struct {
conn io.ReadWriteCloser
buf *bufio.Writer
dec *gob.Decoder
enc *gob.Encoder
}
conn
:连接实例dec
,enc
:gob 的 decoder/encoderbuf
:带缓冲的Writer
var _ Codec = (*GobCodec)(nil)
func NewGobCodec(conn io.ReadWriteCloser) Codec {
buf := bufio.NewWriter(conn)
return &GobCodec{
conn: conn,
buf: buf,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
}
}
func (c *GobCodec) ReadHeader(h *Header) error {
return c.dec.Decode(h)
}
func (c *GobCodec) ReadBody(body any) error {
return c.dec.Decode(body)
}
func (c *GobCodec) Write(h *Header, body any) error {
var err error
defer func() {
_ = c.buf.Flush()
if err != nil {
_ = c.Close()
}
}()
if err = c.enc.Encode(h); err != nil {
log.Println("rpc codec: gob error encoding header:", err)
return err
}
if err = c.enc.Encode(body); err != nil {
log.Println("rpc codec: gob error encoding body:", err)
return err
}
return nil
}
func (c *GobCodec) Close() error {
return c.conn.Close()
}
2. 通信过程
客户/服务端通信时,需要协商使用的协议,数据格式和内容等。
例如 HTTP 报文,分为 header 和 body 2 部分,body 的格式和长度通过 header 中的 Content-Type
和 Content-Length
指定,服务端通过解析 header 就能够知道如何从 body 中读取需要的信息。
对于 RPC 协议来说,这部分协商是需要自主设计的。为了提升性能,一般在报文的最开始会规划固定的字节,来协商相关的信息。比如第1个字节用来表示序列化方式,第2个字节表示压缩方式,第3-6字节表示 header 的长度,7-10 字节表示 body 的长度。
现阶段只需要消息的编码方式,将其放在Option
结构中。
geerpc/server.go
const MagicNumber = 0x3bef5c
type Option struct {
MagicNumber int // MagicNumber marks this is a geerpc request
CodecType codec.Type // CodecType
}
var DefaultOption = &Option{
MagicNumber: MagicNumber,
CodecType: codec.GobType,
}
一般来说,涉及协议协商的这部分信息,需要设计固定的字节来传输的。但是为了实现上更简单,GeeRPC 客户端固定采用 JSON 编码 Option,后续的 header 和 body 的编码方式由 Option 中的 CodeType 指定,服务端首先使用 JSON 解码 Option,然后通过 Option 的 CodeType 解码剩余的内容。即报文将以这样的形式发送:
| Option{MagicNumber: xxx, CodecType: xxx} | Header{ServiceMethod ...} | Body interface{} |
|<------ 固定 JSON 编码 ------> | <------- 编码方式由 CodeType 决定 ------->|
在一次连接中,Option 固定在报文的最开始,Header 和 Body 可以有多个:
| Option | Header1 | Body1 | Header2 | Body2 | ...
2.1 服务端
geerpc/server.go
// Server represents an RPC server
type Server struct{}
// NewServer returns a new Server.
func NewServer() *Server {
return &Server{}
}
// DefaultServer is the default instance of *Server.
var DefaultServer = NewServer()
// Accept accepts connections on the listener and serves requests
// for each incoming connection.
func (server *Server) Accept(lis net.Listener) {
for {
conn, err := lis.Accept()
if err != nil {
log.Println("rpc server: accept error:", err)
return
}
go server.ServeConn(conn)
}
}
func Accept(lis net.Listener) {
DefaultServer.Accept(lis)
}
s.Accept
:等待连接建立,并启用新协程进行处理
// ServeConn runs the server on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
defer func() {
_ = conn.Close()
}()
var opt Option
if err := json.NewDecoder(conn).Decode(&opt); err != nil {
log.Println("rpc server: options error:", err)
return
}
if opt.MagicNumber != MagicNumber {
log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)
return
}
f := codec.NewCodecFuncMap[opt.CodecType]
if f == nil {
log.Printf("rpc server: invalid codec type: %s", opt.CodecType)
return
}
server.serveCodec(f(conn))
}
// invalidRequest is a placeholder for response argv when error occurs
var invalidRequest = struct{}{}
func (server *Server) serveCodec(cc codec.Codec) {
sending := new(sync.Mutex) // make sure to send a complete response
wg := new(sync.WaitGroup) // wait until all request are handled
for {
req, err := server.readRequest(cc)
if err != nil {
if req == nil {
break // it's not possible to recover, so close the connection
}
req.h.Error = err.Error()
server.sendResponse(cc, req.h, invalidRequest, sending)
continue
}
wg.Add(1)
go server.handleRequest(cc, req, sending, wg)
}
wg.Wait()
_ = cc.Close()
}
serveCodec
流程:
- 创建互斥锁和 Wait Group
- 互斥锁:保证并发时写入的数据不会被其他协程影响,否则返回的报文可能时乱序的,导致客户端无法解析
- Wait Group:保证所有的请求均被解析,因为一次连接可能接收到多个请求
- 在无限循环中处理请求,直到出现错误或连接关闭:
- 读取请求
- 启用新协程处理
- 等待所有请求处理完毕,关闭连接
type request struct {
h *codec.Header // header of request
argv, replyv reflect.Value // argv and replyv of request
}
func (server *Server) readRequestHeader(cc codec.Codec) (*codec.Header, error) {
var h codec.Header
if err := cc.ReadHeader(&h); err != nil {
if errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
log.Println("rpc server: read header error:", err)
}
return nil, err
}
return &h, nil
}
func (server *Server) readRequest(cc codec.Codec) (*request, error) {
h, err := server.readRequestHeader(cc)
if err != nil {
return nil, err
}
req := &request{h: h}
// day1: suppose the argv type is string
req.argv = reflect.New(reflect.TypeOf(""))
if err = cc.ReadBody(req.argv.Interface()); err != nil {
log.Println("rpc server: read argv err:", err)
}
return req, nil
}
func (server *Server) sendResponse(cc codec.Codec, h *codec.Header, body any, sending *sync.Mutex) {
sending.Lock()
defer sending.Unlock()
if err := cc.Write(h, body); err != nil {
log.Println("rpc server: write response error:", err)
}
}
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {
defer wg.Done()
log.Println(req.h, req.argv.Elem())
req.replyv = reflect.ValueOf(fmt.Sprintf("geerpc resp %d", req.h.Seq))
server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
}
3. 简易客户端
func startServer(addr chan string) {
// pick free port
lis, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatal("network error:", err)
}
log.Println("start rpc server on", lis.Addr())
addr <- lis.Addr().String()
geerpc.Accept(lis)
}
func main() {
addr := make(chan string)
go startServer(addr)
conn, _ := net.Dial("tcp", <-addr)
defer func() {
_ = conn.Close()
}()
// send options
_ = json.NewEncoder(conn).Encode(geerpc.DefaultOption)
cc := codec.NewGobCodec(conn)
// send request and receive response
for i := 0; i < 5; i++ {
h := &codec.Header{
ServiceMethod: "Foo.Sum",
Seq: uint64(i),
}
_ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))
var (
respH codec.Header
reply string
)
_ = cc.ReadHeader(&respH)
_ = cc.ReadBody(&reply)
log.Printf("Resp header: %v, reply: %v", respH, reply)
}
}
net.Listen("tcp", ":0")
:tcp 端口号为 0 表示由系统分配
2023/10/12 13:26:25 start rpc server on [::]:5735
2023/10/12 13:26:25 &{Foo.Sum 0 } geerpc req 0
2023/10/12 13:26:25 Resp header: {Foo.Sum 0 }, reply: geerpc resp 0
2023/10/12 13:26:25 &{Foo.Sum 1 } geerpc req 1
2023/10/12 13:26:25 Resp header: {Foo.Sum 1 }, reply: geerpc resp 1
2023/10/12 13:26:25 &{Foo.Sum 2 } geerpc req 2
2023/10/12 13:26:25 Resp header: {Foo.Sum 2 }, reply: geerpc resp 2
2023/10/12 13:26:25 &{Foo.Sum 3 } geerpc req 3
2023/10/12 13:26:25 Resp header: {Foo.Sum 3 }, reply: geerpc resp 3
2023/10/12 13:26:25 &{Foo.Sum 4 } geerpc req 4
2023/10/12 13:26:25 Resp header: {Foo.Sum 4 }, reply: geerpc resp 4
Reference
Powered by Waline v2.15.2