1. 服务端和消息编码

Kesa...大约 5 分钟golang

day1-codecopen in new window

1. 消息的序列化和反序列化

典型的 RPC 调用如下:

err = client.Call("Arith.Multiply", args, &reply)
  • Arith:服务名
  • Multiply:方法名
  • args:参数
  • reply,err:返回值

可以将消息抽象为两个部分:

  1. Body:请求参数和返回值
  2. Header:请求的相关信息,返回的错误等

1.1 Header 结构

codec/codec.go

type Header struct {
	ServiceMethod string // format "Service.Method"
	Seq           uint64 // sequence number of request
	Error         string
}
  • ServiceMethod:请求的服务名和方法名
  • Seq:序列号,标识某个请求
  • Error:错误信息

1.2 Codec 接口

codec/codec.go

type Codec interface {
	io.Closer
	ReadHeader(*Header) error
	ReadBody(any) error
	Write(*Header, any) error
}


type NewCodecFunc func(closer io.ReadWriteCloser) Codec

type Type string

const (
	GobType  Type = "application/gob"
	JsonType Type = "application/json"
)

var NewCodecFuncMap map[Type]NewCodecFunc

func init() {
	NewCodecFuncMap = make(map[Type]NewCodecFunc)
	NewCodecFuncMap[GobType] = NewGobCodec
}

NewCodecFuncMap通过不同的数据类型返回不同Codec构造函数。

1.3 GobCodec

codec/gob.go

type GobCodec struct {
	conn io.ReadWriteCloser
	buf  *bufio.Writer
	dec  *gob.Decoder
	enc  *gob.Encoder
}
  • conn:连接实例
  • dec,enc:gob 的 decoder/encoder
  • buf:带缓冲的Writer
var _ Codec = (*GobCodec)(nil)

func NewGobCodec(conn io.ReadWriteCloser) Codec {
	buf := bufio.NewWriter(conn)
	return &GobCodec{
		conn: conn,
		buf:  buf,
		dec:  gob.NewDecoder(conn),
		enc:  gob.NewEncoder(buf),
	}
}

func (c *GobCodec) ReadHeader(h *Header) error {
	return c.dec.Decode(h)
}

func (c *GobCodec) ReadBody(body any) error {
	return c.dec.Decode(body)
}

func (c *GobCodec) Write(h *Header, body any) error {
	var err error
	defer func() {
		_ = c.buf.Flush()
		if err != nil {
			_ = c.Close()
		}
	}()

	if err = c.enc.Encode(h); err != nil {
		log.Println("rpc codec: gob error encoding header:", err)
		return err
	}
	if err = c.enc.Encode(body); err != nil {
		log.Println("rpc codec: gob error encoding body:", err)
		return err
	}

	return nil
}

func (c *GobCodec) Close() error {
	return c.conn.Close()
}

2. 通信过程

客户/服务端通信时,需要协商使用的协议,数据格式和内容等。

例如 HTTP 报文,分为 header 和 body 2 部分,body 的格式和长度通过 header 中的 Content-TypeContent-Length 指定,服务端通过解析 header 就能够知道如何从 body 中读取需要的信息。

对于 RPC 协议来说,这部分协商是需要自主设计的。为了提升性能,一般在报文的最开始会规划固定的字节,来协商相关的信息。比如第1个字节用来表示序列化方式,第2个字节表示压缩方式,第3-6字节表示 header 的长度,7-10 字节表示 body 的长度。

现阶段只需要消息的编码方式,将其放在Option结构中。

geerpc/server.go

const MagicNumber = 0x3bef5c

type Option struct {
	MagicNumber int        // MagicNumber marks this is a geerpc request
	CodecType   codec.Type // CodecType
}

var DefaultOption = &Option{
	MagicNumber: MagicNumber,
	CodecType:   codec.GobType,
}

一般来说,涉及协议协商的这部分信息,需要设计固定的字节来传输的。但是为了实现上更简单,GeeRPC 客户端固定采用 JSON 编码 Option,后续的 header 和 body 的编码方式由 Option 中的 CodeType 指定,服务端首先使用 JSON 解码 Option,然后通过 Option 的 CodeType 解码剩余的内容。即报文将以这样的形式发送:

| Option{MagicNumber: xxx, CodecType: xxx} | Header{ServiceMethod ...} | Body interface{} |
|<------      固定 JSON 编码      ------>  | <-------   编码方式由 CodeType 决定   ------->|

在一次连接中,Option 固定在报文的最开始,Header 和 Body 可以有多个:

| Option | Header1 | Body1 | Header2 | Body2 | ...

2.1 服务端

geerpc/server.go

// Server represents an RPC server
type Server struct{}

// NewServer returns a new Server.
func NewServer() *Server {
	return &Server{}
}

// DefaultServer is the default instance of *Server.
var DefaultServer = NewServer()

// Accept accepts connections on the listener and serves requests
// for each incoming connection.
func (server *Server) Accept(lis net.Listener) {
	for {
		conn, err := lis.Accept()
		if err != nil {
			log.Println("rpc server: accept error:", err)
			return
		}
		go server.ServeConn(conn)
	}
}

func Accept(lis net.Listener) {
	DefaultServer.Accept(lis)
}

s.Accept:等待连接建立,并启用新协程进行处理

// ServeConn runs the server on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
	defer func() {
		_ = conn.Close()
	}()
	var opt Option
	if err := json.NewDecoder(conn).Decode(&opt); err != nil {
		log.Println("rpc server: options error:", err)
		return
	}
	if opt.MagicNumber != MagicNumber {
		log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)
		return
	}
	f := codec.NewCodecFuncMap[opt.CodecType]
	if f == nil {
		log.Printf("rpc server: invalid codec type: %s", opt.CodecType)
		return
	}
	server.serveCodec(f(conn))
}

// invalidRequest is a placeholder for response argv when error occurs
var invalidRequest = struct{}{}

func (server *Server) serveCodec(cc codec.Codec) {
	sending := new(sync.Mutex) // make sure to send a complete response
	wg := new(sync.WaitGroup)  // wait until all request are handled
	for {
		req, err := server.readRequest(cc)
		if err != nil {
			if req == nil {
				break // it's not possible to recover, so close the connection
			}
			req.h.Error = err.Error()
			server.sendResponse(cc, req.h, invalidRequest, sending)
			continue
		}
		wg.Add(1)
		go server.handleRequest(cc, req, sending, wg)
	}
	wg.Wait()
	_ = cc.Close()
}

serveCodec流程:

  1. 创建互斥锁和 Wait Group
    • 互斥锁:保证并发时写入的数据不会被其他协程影响,否则返回的报文可能时乱序的,导致客户端无法解析
    • Wait Group:保证所有的请求均被解析,因为一次连接可能接收到多个请求
  2. 在无限循环中处理请求,直到出现错误或连接关闭:
    1. 读取请求
    2. 启用新协程处理
  3. 等待所有请求处理完毕,关闭连接

type request struct {
	h            *codec.Header // header of request
	argv, replyv reflect.Value // argv and replyv of request
}

func (server *Server) readRequestHeader(cc codec.Codec) (*codec.Header, error) {
	var h codec.Header
	if err := cc.ReadHeader(&h); err != nil {
		if errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
			log.Println("rpc server: read header error:", err)
		}
		return nil, err
	}
	return &h, nil
}

func (server *Server) readRequest(cc codec.Codec) (*request, error) {
	h, err := server.readRequestHeader(cc)
	if err != nil {
		return nil, err
	}
	req := &request{h: h}
	// day1: suppose the argv type is string
	req.argv = reflect.New(reflect.TypeOf(""))
	if err = cc.ReadBody(req.argv.Interface()); err != nil {
		log.Println("rpc server: read argv err:", err)
	}
	return req, nil
}

func (server *Server) sendResponse(cc codec.Codec, h *codec.Header, body any, sending *sync.Mutex) {
	sending.Lock()
	defer sending.Unlock()
	if err := cc.Write(h, body); err != nil {
		log.Println("rpc server: write response error:", err)
	}
}

func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {
	defer wg.Done()
	log.Println(req.h, req.argv.Elem())
	req.replyv = reflect.ValueOf(fmt.Sprintf("geerpc resp %d", req.h.Seq))
	server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
}

3. 简易客户端

func startServer(addr chan string) {
	// pick free port
	lis, err := net.Listen("tcp", ":0")
	if err != nil {
		log.Fatal("network error:", err)
	}
	log.Println("start rpc server on", lis.Addr())
	addr <- lis.Addr().String()
	geerpc.Accept(lis)
}

func main() {
	addr := make(chan string)
	go startServer(addr)

	conn, _ := net.Dial("tcp", <-addr)
	defer func() {
		_ = conn.Close()
	}()

	// send options
	_ = json.NewEncoder(conn).Encode(geerpc.DefaultOption)
	cc := codec.NewGobCodec(conn)

	// send request and receive response
	for i := 0; i < 5; i++ {
		h := &codec.Header{
			ServiceMethod: "Foo.Sum",
			Seq:           uint64(i),
		}
		_ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))
		var (
			respH codec.Header
			reply string
		)
		_ = cc.ReadHeader(&respH)
		_ = cc.ReadBody(&reply)
		log.Printf("Resp header: %v, reply: %v", respH, reply)
	}
}
  • net.Listen("tcp", ":0"):tcp 端口号为 0 表示由系统分配
2023/10/12 13:26:25 start rpc server on [::]:5735
2023/10/12 13:26:25 &{Foo.Sum 0 } geerpc req 0
2023/10/12 13:26:25 Resp header: {Foo.Sum 0 }, reply: geerpc resp 0
2023/10/12 13:26:25 &{Foo.Sum 1 } geerpc req 1
2023/10/12 13:26:25 Resp header: {Foo.Sum 1 }, reply: geerpc resp 1
2023/10/12 13:26:25 &{Foo.Sum 2 } geerpc req 2
2023/10/12 13:26:25 Resp header: {Foo.Sum 2 }, reply: geerpc resp 2
2023/10/12 13:26:25 &{Foo.Sum 3 } geerpc req 3
2023/10/12 13:26:25 Resp header: {Foo.Sum 3 }, reply: geerpc resp 3
2023/10/12 13:26:25 &{Foo.Sum 4 } geerpc req 4
2023/10/12 13:26:25 Resp header: {Foo.Sum 4 }, reply: geerpc resp 4

Reference

  1. https://geektutu.com/post/geerpc-day1.htmlopen in new window
上次编辑于:
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.15.2