04.3 玩转 RPC
4.3 玩转 RPC
不同的场景中 RPC 有着不同的需求, 因此开源社区中诞生了各种 RPC 框架,本节将介绍 Go 内置的 RPC 框架在一些特殊场景的用法。
4.3.1 客户端 RPC 实现原理
Go 语言的 RPC 库最简单的使用方式是通过 Client.Call
方法进行同步阻塞调用,方法实现如下:
// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
首先通过 Client.Go
方法进行一次异步调用,返回一个表示这次调用的Call
结构体,然后等待Call
结构体的 Done 返回调用结果
所以我们直接可以使用Client.Go
方法直接调用 HelloService
:
func doClientWork(client *rpc.Client) {
helloCall := client.Go("HelloService.Hello","hello",new(string),nil)
// do something
helloCall = <- helloCall.done
if err := helloCall.Error; err != nil {
log.Fatal(err)
}
args := helloCall.Args.(string)
reply := hellCall.Reply.(string)
fmt.Println(args,reply)
}
在发起异步调用之后,会执行其他任务,之后可以通过 channel 返回的 *Call
类型变量中获取
client.Go
方法如下:
// Go invokes the function asynchronously. It returns the Call structure representing
// the invocation. The done channel will signal when the call is complete by returning
// the same Call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
// If caller passes done != nil, it must arrange that
// done has enough buffer for the number of simultaneous
// RPCs that will be using that channel. If the channel
// is totally unbuffered, it's best not to run at all.
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
client.send(call)
return call
}
首先构造 Call
类型的对象,之后通过client.send
发送 RPC 请求,并且client.send
是线程安全的。
当调用完成或者发生错误时,将调用call.done
方法通知完成:
func (call *Call) done() {
select {
case call.Done <- call:
// ok
default:
// We don't want to block here. It is the caller's responsibility to make
// sure the channel has enough buffer space. See comment in Go().
if debugLog {
log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
}
}
}
4.3.2 基于 RPC 实现 Watch 功能
在很多系统中都提供了 Watch 功能的接口,当系统满足某种条件时 Watch 方法返回监控的结果。在此我们可以尝试通过 RPC 框架实现一个基本的 Watch 功能。
因为 client.send
是线程安全的,我们可以通过在不同的 Goroutine 中同时并发阻塞调用 RPC 方法,在一个独立的 Goroutine 中调用 Watch 函数进行监控。
下面将通过 RPC 构造一个简单的 KV 数据库,首先定义服务如下:
const (
KVStoreName = "KVStoreService"
)
var (
ErrNotFound = errors.New("not found")
ErrTimeout = errors.New("timed out")
)
type KVStoreInterface interface {
Get(key string, value *string) error
Set(kv []string, reply *struct{}) error
Watch(timeoutSecond int, keyChanged *string) error
}
type KVStoreService struct {
m map[string]string
filter map[string]func(key string)
watchChan chan string
mu sync.Mutex
}
var _ KVStoreInterface = (*KVStoreService)(nil)
func NewKVStoreService() *KVStoreService {
kvs := &KVStoreService{
m: make(map[string]string),
filter: make(map[string]func(key string)),
watchChan: make(chan string, 10),
}
kvs.filter["watcher"] = func(key string) {
kvs.watchChan <- key
}
return kvs
}
func (kvs *KVStoreService) Get(key string, value *string) error {
kvs.mu.Lock()
defer kvs.mu.Unlock()
if v, ok := kvs.m[key]; ok {
*value = v
return nil
}
return ErrNotFound
}
func (kvs *KVStoreService) Set(kv []string, reply *struct{}) error {
kvs.mu.Lock()
defer kvs.mu.Unlock()
key, value := kv[0], kv[1]
oldValue := kvs.m[key]
log.Printf("old: %v, new: %v", oldValue, value)
for _, fn := range kvs.filter {
fn(key)
}
kvs.m[key] = value
return nil
}
func (kvs *KVStoreService) Watch(timeoutSecond int, keyChanged *string) error {
select {
case <-time.After(time.Duration(timeoutSecond) * time.Second):
return ErrTimeout
case key := <-kvs.watchChan:
*keyChanged = key
return nil
}
}
KVStoreService
:m
用于存储 KV 数据,filter
为过滤器函数列表,mu
为互斥锁,watchChan
: 监视通道,用记录改变的 key- 在
NewKVStoreService
中初始化m
,filter
和watchChan
并注册了watcher
函数用于监控 key 的变化 - 当调用
Set
方法时,会调用所有的过滤器,此时watcher
会被调用并将 key 写只通道中 Watch
: 从watch
通道中获取变化的 key ,若超时则返回错误
客户端如下:
const (
timeout = 10
)
type KVStoreClient struct {
client *rpc.Client
}
func (kvs *KVStoreClient) Get(key string, value *string) error {
return kvs.client.Call(service.KVStoreName+".Get", key, value)
}
func (kvs *KVStoreClient) Set(kv []string, reply *struct{}) error {
return kvs.client.Call(service.KVStoreName+".Set", kv, reply)
}
func (kvs *KVStoreClient) Watch(timeoutSecond int, keyChanged *string) error {
return kvs.client.Call(service.KVStoreName+".Watch", timeoutSecond, keyChanged)
}
var _ service.KVStoreInterface = (*KVStoreClient)(nil)
func DialKVStoreService(network string, addr string) (*KVStoreClient, error) {
client, err := rpc.Dial(network, addr)
if err != nil {
return nil, err
}
kvClient := &KVStoreClient{
client: client,
}
return kvClient, nil
}
var (
cmd string
params []string
)
func main() {
parse()
client, err := DialKVStoreService("tcp", ":9090")
if err != nil {
log.Fatalf("error dialing tcp: %v", err)
}
switch cmd {
case "get":
var val string
err := get(client, params[0], &val)
if err != nil {
log.Fatal(err)
}
fmt.Println(val)
case "set":
err := set(client, params)
if err != nil {
log.Fatal(err)
}
default:
fmt.Println("Unsupported command")
}
}
func parse() {
if len(os.Args) < 3 {
fmt.Print("USAGE: \n\t client [command] [...params]")
os.Exit(1)
}
cmd = os.Args[1]
params = os.Args[2:]
log.Printf("Cmd: %s, params: %v", cmd, params)
}
func doClientWork(client *KVStoreClient, srcMethod func() error, watched bool) error {
if !watched {
return srcMethod()
}
watchChan := make(chan bool)
go watchKeyChanged(client, watchChan)
err := srcMethod()
<-watchChan
return err
}
func watchKeyChanged(client *KVStoreClient, watchChan chan bool) {
var keyChanged string
err := client.Watch(timeout, &keyChanged)
if err != nil {
log.Printf("error calling %s: %v", service.KVStoreName+".Watch", err)
}
log.Print("watch-key-changed: ", keyChanged)
watchChan <- true
}
func get(client *KVStoreClient, key string, val *string) error {
return doClientWork(client, func() error {
err := client.Get(key, val)
if err != nil {
return fmt.Errorf("error calling %s.%s: %w", service.KVStoreName, "Get", err)
}
return nil
}, false)
}
func set(client *KVStoreClient, kv []string) error {
return doClientWork(client, func() error {
err := client.Set(kv, new(struct{}))
if err != nil {
return fmt.Errorf("error calling %s.%s: %v", service.KVStoreName, "Set", err)
}
return nil
}, true)
}
watchKeyChanged
: 调用KVStoreService.Watch
服务获取修改的 KeydoClientWork
:当需要调用watch
时,通过watchChan
等待watchKeyChanged
的返回
这样每次调用 KVStoreService.Set
服务时就可以监控 kv 的变化了
4.3.3 反向 RPC
通常的 RPC 基于 C/S 架构, RPC 的服务端对应网络服务器, RPC 客户端对应网络客户端。但是对于一些特殊场景,例如公司内网提供一个 RPC 服务,但是外网无法连接到内网服务器,此时可以参考类似反向代理技术,首先从内网主动连接到外网的 TCP 服务器,然后基于 TPC 连接向外网提供服务。
示例代码如下,server 端:
func main() {
rpc.Register(service.NewArithmeticService())
for {
conn, err := net.Dial("tcp", ":9090")
if err != nil {
log.Printf("error dialing tcp: %v, retry ...", err.Error())
time.Sleep(1 * time.Second)
continue
}
log.Println("dialing success")
rpc.ServeConn(conn)
conn.Close()
}
}
反向 RPC 的服务端不主动监听 TCP 连接,而是连接到客户端的 TCP 服务器并基于 TCP 为客户端提供 RPC 服务
客户端需要提供 TCP 服务用于接受服务端的连接:
func main() {
listener, err := net.Listen("tcp", ":9090")
if err != nil {
log.Fatalln("error listening tcp:", err)
}
log.Print("listening on tcp :9090")
clientChan := make(chan *rpc.Client)
go func() {
log.Println("waiting for connection ...")
conn, err := listener.Accept()
if err != nil {
log.Fatalln("error accepting conn:", err)
}
log.Printf("%s connected ...", conn.RemoteAddr())
clientChan <- rpc.NewClient(conn)
}()
doClientWork(clientChan)
}
func doClientWork(clientChan chan *rpc.Client) {
client := <-clientChan
defer client.Close()
var reply int
args := service.ArithmeticService{
X: 2,
Y: 3,
}
err := client.Call("ArithmeticService.Multiply", args, &reply)
if err != nil {
log.Println("error calling ArithmeticService.Multiply:", err)
}
fmt.Printf("%d * %d = %d", args.X, args.Y, reply)
}
4.3.4 上下文信息
基于上下文可以针对不同客户端提供定制化的 RPC 服务,我们可以通过为每个链接提供独立的 RPC 服务来实现对上下文的支持。
首先改造HelloService
var (
ErrAuth = errors.New("not login")
ErrLogin = errors.New("username or password incorrect")
)
type HelloService struct {
Conn net.Conn
isLogin bool
}
func (h *HelloService) Login(request string, reply *string) error {
if request != "user:pass" {
return ErrLogin
}
h.isLogin = true
return nil
}
func (h *HelloService) Hello(request string, reply *string) error {
if !h.isLogin {
return ErrAuth
}
*reply = fmt.Sprintf("Hello %s, from %s", request, h.Conn.RemoteAddr())
return nil
}
在 Hello
中加入了简单的登录验证,这样在调用 RPC 服务之前需要进行登录操作。
在服务端为每个链接提供独立的 RPC 服务:
func main() {
listener, err := net.Listen("tcp", ":9090")
if err != nil {
log.Fatalln("error listening tcp:", err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal("error accepting conn:", err)
}
go func() {
defer conn.Close()
p := rpc.NewServer()
p.Register(&service.HelloService{Conn: conn})
p.ServeConn(conn)
}()
}
}