2. 高性能客户端

Kesa...大约 4 分钟golang

day2-clientopen in new window

1. Call 结构体设计


  • the method’s type is exported.
  • the method is exported.
  • the method has two arguments, both exported (or builtin) types.
  • the method’s second argument is a pointer.
  • the method has return type error.
func (t *T) MethodName(argType T1, replyType *T2) error


type Call struct {
	Seq           uint64
	ServiceMethod string     // format "<service>.<method>"
	Args          any        // arguments to the func
	Reply         any        // reply from the func
	Error         error      // if error occurs, it will be set
	Done          chan *Call // Strobes when call is complete.

func (call *Call) done() {
	call.Done <- call


2. Client 实现

type Client struct {
	cc       codec.Codec
	opt      *Option
	sending  sync.Mutex // protect following
	header   codec.Header
	mu       sync.Mutex // protect following
	seq      uint64
	pending  map[uint64]*Call
	closing  bool // user has called Close
	shutdown bool // server told to stop

var _ io.Closer = (*Client)(nil)

var ErrShutdown = errors.New("connection is shut down")

// Close the connection
func (client *Client) Close() error {
	defer client.mu.Unlock()
	if client.closing {
		return ErrShutdown
	client.closing = true
	return client.cc.Close()

// IsAvailable return true if the client does work
func (client *Client) IsAvailable() bool {
	defer client.mu.Unlock()
	return !client.shutdown && !client.closing


  • cc:消息编解码器
  • sending:互斥锁,保证并发时报文不会被干扰
  • header:请求头
  • seq:请求序列号
  • pending:存储未完成的请求,key 为请求的编号,value 为 Call
  • closing:客户端不可用,并且是由客户端关闭
  • shutdown:客户端不可用,并且是由服务端关闭
func (client *Client) registerCall(call *Call) (uint64, error) {
	defer client.mu.Unlock()
	if client.closing || client.shutdown {
		return 0, ErrShutdown
	call.Seq = client.seq
	client.pending[call.Seq] = call
	return call.Seq, nil

func (client *Client) removeCall(seq uint64) *Call {
	defer client.mu.Unlock()
	call := client.pending[seq]
	delete(client.pending, seq)
	return call

func (client *Client) terminateCalls(err error) {
	defer client.sending.Unlock()
	defer client.mu.Unlock()

	client.shutdown = true
	for _, call := range client.pending {
		call.Error = err
  • regiterCall:注册 Call, 将 Call 放入 client.pending中,并更新序列号
  • removeCall:移除并返回被移除的 Call
  • terminatesCalls:发生错误时,终止等待中 Call 的调用,并将错误通知调用方
func (client *Client) receive() {
	var err error
	for err == nil {
		var h codec.Header
		if err = client.cc.ReadHeader(&h); err != nil {

		call := client.removeCall(h.Seq)
		switch {
		case call == nil:
			// write partially failed or call already removed
			err = client.cc.ReadBody(nil)
		case h.Error != "":
			call.Error = fmt.Errorf(h.Error)
			err = client.cc.ReadBody(nil)
			err = client.cc.ReadBody(call.Reply)
			if err != nil {
				call.Error = errors.New("reading body " + err.Error())

	// error occurs, terminates all pending calls


  1. Call 不存在,可能请求没有发送完整,或者被取消,服务端依然进行了处理
  2. Call 存在,服务端处理出错,获取h.Error
  3. Call 存在,且服务端正常处理,读取响应数据
func NewClient(conn net.Conn, opt *Option) (*Client, error) {
	f := codec.NewCodecFuncMap[opt.CodecType]
	if f == nil {
		err := fmt.Errorf("invalid codec type %s", opt.CodecType)
		log.Println("rpc client: codec error:", err)
		return nil, err
	// send option to server
	if err := json.NewEncoder(conn).Encode(opt); err != nil {
		log.Println("rpc client: options error:", err)
		_ = conn.Close()
		return nil, err
	return newClientCodec(f(conn), opt), nil

func newClientCodec(cc codec.Codec, opt *Option) *Client {
	client := &Client{
		seq:     1, // seq start with 1, 0 means invalid call
		cc:      cc,
		opt:     opt,
		pending: make(map[uint64]*Call),
	go client.receive()
	return client
  • 向服务端发送Option
  • 启用新协程接收数据
func parseOptions(opts ...*Option) (*Option, error) {
	// if opts is empty or parameter is nil
	if len(opts) == 0 || opts[0] == nil {
		return DefaultOption, nil
	if len(opts) != 1 {
		return nil, errors.New("only one option allowed")

	opt := opts[0]
	opt.MagicNumber = DefaultOption.MagicNumber
	if opt.CodecType == "" {
		opt.CodecType = DefaultOption.CodecType

	return opt, nil

// Dial connects to an RPC server at the specified network address
func Dial(network, address string, opts ...*Option) (*Client, error) {
	opt, err := parseOptions(opts...)
	if err != nil {
		return nil, err

	conn, err := net.Dial(network, address)
	if err != nil {
		return nil, err

	client, err := NewClient(conn, opt)
	// close the connection if client is nil
	defer func() {
		if client == nil {
			_ = conn.Close()

	return client, err

Dial:建立连接并返回 Client 实例

func (client *Client) Go(serviceMethod string, args, reply any, done chan *Call) *Call {
	if done == nil {
		done = make(chan *Call, 10)
	} else if cap(done) == 0 {
		log.Panic("rpc client: done channel is unbuffered")

	call := &Call{
		ServiceMethod: serviceMethod,
		Args:          args,
		Reply:         reply,
		Done:          done,
	return call

func (client *Client) Call(serviceMethod string, args, reply any) error {
	call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
	return call.Error

  • Go:异步接口,返回 Call 实例
  • Call:同步接口,调用 Go后阻塞等待处理完毕并返回

3. Demo

package main

import (

func startServer(addr chan string) {
	// pick free port
	lis, err := net.Listen("tcp", ":0")
	if err != nil {
		log.Fatal("network error:", err)
	log.Println("start rpc server on", lis.Addr())
	addr <- lis.Addr().String()

func main() {
	addr := make(chan string)
	go startServer(addr)

	client, _ := geerpc.Dial("tcp", <-addr)
	defer func() {
		_ = client.Close()

	var wg sync.WaitGroup
	for i := 0; i < 5; i++ {
		go func(i int) {
			defer wg.Done()
			args := fmt.Sprintf("geerpc req %d", i)
			var reply string
			if err := client.Call("Foo.Sum", args, &reply); err != nil {
				log.Fatal("call Foo.Sum error:", err)
			log.Println("reply:", reply)



  1. https://geektutu.com/post/geerpc-day2.htmlopen in new window
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.15.2