7. 服务发现与注册中心
...大约 4 分钟
1. 注册中心
注册中心的好处在于,客户端和服务端都只需要感知注册中心的存在,而无需感知对方的存在。更具体一些:
- 服务端启动后,向注册中心发送注册消息,注册中心得知该服务已经启动,处于可用状态。一般来说,服务端还需要定期向注册中心发送心跳,证明自己还活着
- 客户端向注册中心询问,当前哪天服务是可用的,注册中心将可用的服务列表返回客户端
- 客户端根据注册中心得到的服务列表,选择其中一个发起调用
比较常用的注册中心有 etcd、zookeeper、consul,一般比较出名的微服务或者 RPC 框架,这些主流的注册中心都是支持的。
2. 实现简单的注册中心
实现一个支持心跳探测的注册中心:
registry/registry.go
// GeeRegistry is a simple register center, provide following functions.
// add a server and receive heartbeat to keep it alive.
// returns all alive servers and delete dead servers sync simultaneously.
type GeeRegistry struct {
timeout time.Duration
mu sync.Mutex
servers map[string]*ServerItem
}
type ServerItem struct {
Addr string
start time.Time
}
const (
defaultPath = "/geerpc/registry"
defaultTimeout = 5 * time.Minute
)
func New(timeout time.Duration) *GeeRegistry {
return &GeeRegistry{
servers: make(map[string]*ServerItem),
timeout: timeout,
}
}
GeeRegistry
:timeout
:超时时间servers
:服务
ServerItem
Addr
:服务地址start
:启动时间
默认超时时间为 5min。
func (r *GeeRegistry) putServer(addr string) {
r.mu.Lock()
defer r.mu.Unlock()
s := r.servers[addr]
if s == nil {
r.servers[addr] = &ServerItem{Addr: addr, start: time.Now()}
} else {
s.start = time.Now()
}
}
func (r *GeeRegistry) aliveServers() []string {
r.mu.Lock()
defer r.mu.Unlock()
var alives []string
for addr, s := range r.servers {
if r.timeout == 0 || s.start.Add(r.timeout).After(time.Now()) {
alives = append(alives, addr)
} else {
delete(r.servers, addr)
}
}
sort.Strings(alives)
return alives
}
putServer
:添加服务实例,若已存在则更新其启动时间aliveServers
:获取可用服务列表,并删除超时服务
var _ http.Handler = (*GeeRegistry)(nil)
var DefualtGeeRegister = New(defaultTimeout)
func (r *GeeRegistry) ServeHTTP(w http.ResponseWriter, req *http.Request) {
switch req.Method {
case http.MethodGet:
w.Header().Set("X-Geerpc-Servers", strings.Join(r.aliveServers(), ","))
case http.MethodPost:
addr := req.Header.Get("X-Geerpc-Server")
if addr == "" {
w.WriteHeader(http.StatusInternalServerError)
return
}
r.putServer(addr)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
}
func (r *GeeRegistry) HandleHTTP(path string) {
http.Handle(path, r)
log.Println("rpc registry path:", path)
}
func HandleHTTP() {
DefualtGeeRegister.HandleHTTP(defaultPath)
}
为了实现简单,使用 HTTP 协议并将数据放入 Header 中。
- Get:返回所有可用的服务列表,通过自定义字段 X-Geerpc-Servers 承载。
- Post:添加服务实例或发送心跳,通过自定义字段 X-Geerpc-Server 承载。
func Heartbeat(registry, addr string, duration time.Duration) {
if duration == 0 {
// make sure there is enough time to send heart beat
// before it's removed from registry
duration = defaultTimeout - time.Duration(1)*time.Minute
}
var err error
err = sendHeartbeat(registry, addr)
go func() {
t := time.Tick(duration)
for err == nil {
<-t
err = sendHeartbeat(registry, addr)
}
}()
}
func sendHeartbeat(registry, addr string) error {
log.Println(addr, "send heart beat to registry", registry)
httpClient := &http.Client{}
req, _ := http.NewRequest("POST", registry, nil)
req.Header.Set("X-Geerpc-Server", addr)
if _, err := httpClient.Do(req); err != nil {
log.Println("rpc server: heart beat err:", err)
return err
}
return nil
}
定期向注册中心发送心跳。
3. ReistryDiscovery
xclient/discovery_gee.go
type GeeRegistryDiscovery struct {
*MultiServerDiscovery
registry string
timeout time.Duration
lastUpdate time.Time
}
const defaultUpdateTimeout = time.Second * 10
func NewGeeRegistryDiscovery(addr string, timeout time.Duration) *GeeRegistryDiscovery {
if timeout == 0 {
timeout = defaultUpdateTimeout
}
return &GeeRegistryDiscovery{
MultiServerDiscovery: NewMultiServerDiscovery(make([]string, 0)),
registry: addr,
timeout: timeout,
}
}
GeeRegistryDiscovery
:
*MultiServerDiscovery
:嵌套registry
:注册中心地址timeout
:服务列表过期时间lastUpdate
:上次更新时间
func (d *GeeRegistryDiscovery) Update(servers []string) error {
d.mu.Lock()
defer d.mu.Unlock()
d.servers = servers
d.lastUpdate = time.Now()
return nil
}
func (d *GeeRegistryDiscovery) Refresh() error {
d.mu.Lock()
defer d.mu.Unlock()
if d.lastUpdate.Add(d.timeout).After(time.Now()) {
return nil
}
log.Println("rpc registry: refresh servers from registry", d.registry)
resp, err := http.Get(d.registry)
if err != nil {
log.Println("rpc registry refresh err:", err)
return err
}
servers := strings.Split(resp.Header.Get("X-Geerpc-Servers"), ",")
d.servers = make([]string, 0, len(servers))
for _, s := range servers {
if strings.TrimSpace(s) != "" {
d.servers = append(d.servers, strings.TrimSpace(s))
}
}
d.lastUpdate = time.Now()
return nil
}
实现Update
和Refresh
。
func (d *GeeRegistryDiscovery) Get(mode SelectMode) (string, error) {
if err := d.Refresh(); err != nil {
return "", err
}
return d.MultiServerDiscovery.Get(mode)
}
func (d *GeeRegistryDiscovery) GetAll() ([]string, error) {
if err := d.Refresh(); err != nil {
return nil, err
}
return d.MultiServerDiscovery.GetAll()
}
在获取服务之前,会先从调用refresh
更新服务列表。
4. Demo
package main
import (
"context"
"geerpc"
"geerpc/registry"
"geerpc/xclient"
"log"
"net"
"net/http"
"sync"
"time"
)
type Foo int
type Args struct {
Num1, Num2 int
}
func (f Foo) Sum(args Args, reply *int) error {
*reply = args.Num1 + args.Num2
return nil
}
func (f Foo) Sleep(args Args, reply *int) error {
time.Sleep(time.Duration(args.Num1) * time.Second)
*reply = args.Num1 + args.Num2
return nil
}
func startServer(regAddr string, wg *sync.WaitGroup) {
var f Foo
lis, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatal("server listen tcp failed:", err)
}
server := geerpc.NewServer()
if err = server.Register(&f); err != nil {
log.Fatal("server listen tcp failed:", err)
}
registry.Heartbeat(regAddr, "tcp@"+lis.Addr().String(), 0)
log.Println("server runs at:", lis.Addr().String())
wg.Done()
server.Accept(lis)
}
func startRegistry(wg *sync.WaitGroup) {
lis, _ := net.Listen("tcp", ":9999")
registry.HandleHTTP()
wg.Done()
_ = http.Serve(lis, nil)
}
func foo(xc *xclient.XClient, ctx context.Context, typ, serviceMethod string, args *Args) {
var (
reply int
err error
)
switch typ {
case "call":
err = xc.Call(ctx, serviceMethod, args, &reply)
case "broadcast":
err = xc.Broadcast(ctx, serviceMethod, args, &reply)
}
if err != nil {
log.Printf("%s %s error: %v", typ, serviceMethod, err)
} else {
log.Printf("%s %s success: %d + %d = %d", typ, serviceMethod, args.Num1, args.Num2, reply)
}
}
func call(regAddr string) {
d := xclient.NewGeeRegistryDiscovery(regAddr, 0)
xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
defer func() { _ = xc.Close() }()
// send request & receive response
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
foo(xc, context.Background(), "call", "Foo.Sum", &Args{Num1: i, Num2: i * i})
}(i)
}
wg.Wait()
}
func broadcast(regAddr string) {
d := xclient.NewGeeRegistryDiscovery(regAddr, 0)
xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
defer func() { _ = xc.Close() }()
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
foo(xc, context.Background(), "broadcast", "Foo.Sum", &Args{Num1: i, Num2: i * i})
// expect 2 - 5 timeout
ctx, _ := context.WithTimeout(context.Background(), time.Second*2)
foo(xc, ctx, "broadcast", "Foo.Sleep", &Args{Num1: i, Num2: i * i})
}(i)
}
wg.Wait()
}
func main() {
log.SetFlags(0)
regAddr := "http://localhost:9999/geerpc/registry"
var wg sync.WaitGroup
wg.Add(1)
go startRegistry(&wg)
wg.Wait()
wg.Add(2)
go startServer(regAddr, &wg)
go startServer(regAddr, &wg)
wg.Wait()
call(regAddr)
broadcast(regAddr)
}
Reference
Powered by Waline v2.15.2