04.3 玩转 RPC

Kesa...大约 6 分钟golangrpc

4.3 玩转 RPC

不同的场景中 RPC 有着不同的需求, 因此开源社区中诞生了各种 RPC 框架,本节将介绍 Go 内置的 RPC 框架在一些特殊场景的用法。

4.3.1 客户端 RPC 实现原理

Go 语言的 RPC 库最简单的使用方式是通过 Client.Call 方法进行同步阻塞调用,方法实现如下:

// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
	call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
	return call.Error
}

首先通过 Client.Go 方法进行一次异步调用,返回一个表示这次调用的Call 结构体,然后等待Call结构体的 Done 返回调用结果

所以我们直接可以使用Client.Go 方法直接调用 HelloService :

func doClientWork(client *rpc.Client) {
    helloCall := client.Go("HelloService.Hello","hello",new(string),nil)
    
    // do something 
    
    helloCall = <- helloCall.done
    if err := helloCall.Error; err != nil {
        log.Fatal(err)
    }
    
    args := helloCall.Args.(string)
    reply := hellCall.Reply.(string)
    fmt.Println(args,reply)
}

在发起异步调用之后,会执行其他任务,之后可以通过 channel 返回的 *Call 类型变量中获取

client.Go方法如下:

// Go invokes the function asynchronously. It returns the Call structure representing
// the invocation. The done channel will signal when the call is complete by returning
// the same Call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
	call := new(Call)
	call.ServiceMethod = serviceMethod
	call.Args = args
	call.Reply = reply
	if done == nil {
		done = make(chan *Call, 10) // buffered.
	} else {
		// If caller passes done != nil, it must arrange that
		// done has enough buffer for the number of simultaneous
		// RPCs that will be using that channel. If the channel
		// is totally unbuffered, it's best not to run at all.
		if cap(done) == 0 {
			log.Panic("rpc: done channel is unbuffered")
		}
	}
	call.Done = done
	client.send(call)
	return call
}

首先构造 Call 类型的对象,之后通过client.send发送 RPC 请求,并且client.send是线程安全的。

当调用完成或者发生错误时,将调用call.done 方法通知完成:

func (call *Call) done() {
	select {
	case call.Done <- call:
		// ok
	default:
		// We don't want to block here. It is the caller's responsibility to make
		// sure the channel has enough buffer space. See comment in Go().
		if debugLog {
			log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
		}
	}
}

4.3.2 基于 RPC 实现 Watch 功能

在很多系统中都提供了 Watch 功能的接口,当系统满足某种条件时 Watch 方法返回监控的结果。在此我们可以尝试通过 RPC 框架实现一个基本的 Watch 功能。

因为 client.send 是线程安全的,我们可以通过在不同的 Goroutine 中同时并发阻塞调用 RPC 方法,在一个独立的 Goroutine 中调用 Watch 函数进行监控。

下面将通过 RPC 构造一个简单的 KV 数据库,首先定义服务如下:

const (
	KVStoreName = "KVStoreService"
)

var (
	ErrNotFound = errors.New("not found")
	ErrTimeout  = errors.New("timed out")
)

type KVStoreInterface interface {
	Get(key string, value *string) error
	Set(kv []string, reply *struct{}) error
	Watch(timeoutSecond int, keyChanged *string) error
}

type KVStoreService struct {
	m         map[string]string
	filter    map[string]func(key string)
	watchChan chan string
	mu        sync.Mutex
}

var _ KVStoreInterface = (*KVStoreService)(nil)

func NewKVStoreService() *KVStoreService {
	kvs := &KVStoreService{
		m:         make(map[string]string),
		filter:    make(map[string]func(key string)),
		watchChan: make(chan string, 10),
	}
	kvs.filter["watcher"] = func(key string) {
		kvs.watchChan <- key
	}
	return kvs
}

func (kvs *KVStoreService) Get(key string, value *string) error {
	kvs.mu.Lock()
	defer kvs.mu.Unlock()

	if v, ok := kvs.m[key]; ok {
		*value = v
		return nil
	}

	return ErrNotFound
}

func (kvs *KVStoreService) Set(kv []string, reply *struct{}) error {
	kvs.mu.Lock()
	defer kvs.mu.Unlock()

	key, value := kv[0], kv[1]

	oldValue := kvs.m[key]
	log.Printf("old: %v, new: %v", oldValue, value)
	for _, fn := range kvs.filter {
		fn(key)
	}

	kvs.m[key] = value
	return nil
}

func (kvs *KVStoreService) Watch(timeoutSecond int, keyChanged *string) error {
	select {
	case <-time.After(time.Duration(timeoutSecond) * time.Second):
		return ErrTimeout
	case key := <-kvs.watchChan:
		*keyChanged = key
		return nil
	}
}
  • KVStoreService: m 用于存储 KV 数据,filter 为过滤器函数列表, mu 为互斥锁, watchChan : 监视通道,用记录改变的 key
  • NewKVStoreService 中初始化 m , filterwatchChan 并注册了 watcher 函数用于监控 key 的变化
  • 当调用 Set 方法时,会调用所有的过滤器,此时 watcher 会被调用并将 key 写只通道中
  • Watch: 从 watch 通道中获取变化的 key ,若超时则返回错误

客户端如下:

const (
	timeout = 10
)

type KVStoreClient struct {
	client *rpc.Client
}

func (kvs *KVStoreClient) Get(key string, value *string) error {
	return kvs.client.Call(service.KVStoreName+".Get", key, value)
}

func (kvs *KVStoreClient) Set(kv []string, reply *struct{}) error {
	return kvs.client.Call(service.KVStoreName+".Set", kv, reply)
}

func (kvs *KVStoreClient) Watch(timeoutSecond int, keyChanged *string) error {
	return kvs.client.Call(service.KVStoreName+".Watch", timeoutSecond, keyChanged)
}

var _ service.KVStoreInterface = (*KVStoreClient)(nil)

func DialKVStoreService(network string, addr string) (*KVStoreClient, error) {
	client, err := rpc.Dial(network, addr)
	if err != nil {
		return nil, err
	}
	kvClient := &KVStoreClient{
		client: client,
	}
	return kvClient, nil
}

var (
	cmd    string
	params []string
)

func main() {
	parse()

	client, err := DialKVStoreService("tcp", ":9090")
	if err != nil {
		log.Fatalf("error dialing tcp: %v", err)
	}

	switch cmd {
	case "get":
		var val string
		err := get(client, params[0], &val)
		if err != nil {
			log.Fatal(err)
		}
		fmt.Println(val)
	case "set":
		err := set(client, params)
		if err != nil {
			log.Fatal(err)
		}
	default:
		fmt.Println("Unsupported command")
	}
}

func parse() {
	if len(os.Args) < 3 {
		fmt.Print("USAGE: \n\t client [command] [...params]")
		os.Exit(1)
	}
	cmd = os.Args[1]
	params = os.Args[2:]
	log.Printf("Cmd: %s, params: %v", cmd, params)
}

func doClientWork(client *KVStoreClient, srcMethod func() error, watched bool) error {
	if !watched {
		return srcMethod()
	}
	watchChan := make(chan bool)
	go watchKeyChanged(client, watchChan)
	err := srcMethod()
	<-watchChan
	return err
}

func watchKeyChanged(client *KVStoreClient, watchChan chan bool) {
	var keyChanged string
	err := client.Watch(timeout, &keyChanged)
	if err != nil {
		log.Printf("error calling %s: %v", service.KVStoreName+".Watch", err)
	}
	log.Print("watch-key-changed: ", keyChanged)
	watchChan <- true
}

func get(client *KVStoreClient, key string, val *string) error {
	return doClientWork(client, func() error {
		err := client.Get(key, val)
		if err != nil {
			return fmt.Errorf("error calling %s.%s: %w", service.KVStoreName, "Get", err)
		}
		return nil
	}, false)
}

func set(client *KVStoreClient, kv []string) error {
	return doClientWork(client, func() error {
		err := client.Set(kv, new(struct{}))
		if err != nil {
			return fmt.Errorf("error calling %s.%s: %v", service.KVStoreName, "Set", err)
		}
		return nil
	}, true)
}
  • watchKeyChanged: 调用 KVStoreService.Watch 服务获取修改的 Key
  • doClientWork :当需要调用 watch 时,通过 watchChan 等待 watchKeyChanged 的返回

这样每次调用 KVStoreService.Set 服务时就可以监控 kv 的变化了

4.3.3 反向 RPC

通常的 RPC 基于 C/S 架构, RPC 的服务端对应网络服务器, RPC 客户端对应网络客户端。但是对于一些特殊场景,例如公司内网提供一个 RPC 服务,但是外网无法连接到内网服务器,此时可以参考类似反向代理技术,首先从内网主动连接到外网的 TCP 服务器,然后基于 TPC 连接向外网提供服务。

示例代码如下,server 端:

func main() {
	rpc.Register(service.NewArithmeticService())

	for {
		conn, err := net.Dial("tcp", ":9090")
		if err != nil {
			log.Printf("error dialing tcp: %v, retry ...", err.Error())
			time.Sleep(1 * time.Second)
			continue
		}
		log.Println("dialing success")

		rpc.ServeConn(conn)
		conn.Close()
	}
}

反向 RPC 的服务端不主动监听 TCP 连接,而是连接到客户端的 TCP 服务器并基于 TCP 为客户端提供 RPC 服务

客户端需要提供 TCP 服务用于接受服务端的连接:

func main() {
	listener, err := net.Listen("tcp", ":9090")
	if err != nil {
		log.Fatalln("error listening tcp:", err)
	}
	log.Print("listening on tcp :9090")
	clientChan := make(chan *rpc.Client)

	go func() {
		log.Println("waiting for connection ...")
		conn, err := listener.Accept()
		if err != nil {
			log.Fatalln("error accepting conn:", err)
		}
		log.Printf("%s connected ...", conn.RemoteAddr())
		clientChan <- rpc.NewClient(conn)
	}()

	doClientWork(clientChan)
}

func doClientWork(clientChan chan *rpc.Client) {
	client := <-clientChan
	defer client.Close()
	var reply int
	args := service.ArithmeticService{
		X: 2,
		Y: 3,
	}
	err := client.Call("ArithmeticService.Multiply", args, &reply)
	if err != nil {
		log.Println("error calling ArithmeticService.Multiply:", err)
	}

	fmt.Printf("%d * %d = %d", args.X, args.Y, reply)
}

4.3.4 上下文信息

基于上下文可以针对不同客户端提供定制化的 RPC 服务,我们可以通过为每个链接提供独立的 RPC 服务来实现对上下文的支持。

首先改造HelloService

var (
	ErrAuth  = errors.New("not login")
	ErrLogin = errors.New("username or password incorrect")
)

type HelloService struct {
	Conn    net.Conn
	isLogin bool
}

func (h *HelloService) Login(request string, reply *string) error {
	if request != "user:pass" {
		return ErrLogin
	}
	h.isLogin = true
	return nil
}

func (h *HelloService) Hello(request string, reply *string) error {
	if !h.isLogin {
		return ErrAuth
	}
	*reply = fmt.Sprintf("Hello %s, from %s", request, h.Conn.RemoteAddr())
	return nil
}

Hello 中加入了简单的登录验证,这样在调用 RPC 服务之前需要进行登录操作。

在服务端为每个链接提供独立的 RPC 服务:

func main() {
	listener, err := net.Listen("tcp", ":9090")
	if err != nil {
		log.Fatalln("error listening tcp:", err)
	}

	for {
		conn, err := listener.Accept()
		if err != nil {
			log.Fatal("error accepting conn:", err)
		}
		go func() {
			defer conn.Close()

			p := rpc.NewServer()
			p.Register(&service.HelloService{Conn: conn})
			p.ServeConn(conn)
		}()
	}
}

Referece

  1. 4.3 玩转 RPCopen in new window
上次编辑于:
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.15.2