5. 分布式节点

Kesa...大约 4 分钟golang

https://github.com/dreamjz/golang-notes/tree/main/books/7-days-golang/GeeCache/day5-distributed-nodesopen in new window

DAY5-DISTRIBUTED-NODES
│  go.mod
│  go.work
│  main.go
│  run.sh
│  
└─geecache
    │  byteview.go
    │  cache.go
    │  consistenthash.go
    │  consistenthash_test.go
    │  geecache.go
    │  geecache_test.go
    │  go.mod
    │  http.go
    │  peers.go
    │
    ├─consistenthash
    │      consistenthash.go
    │      consistenthash_test.go
    │
    └─lru
            lru.go
            lru_test.go

1. PeerPicker 和 PeerGetter 接口

// PeerPicker is the interface that must be implemented to locate
// the peer that owns a specific key
type PeerPicker interface {
	PickPeer(key string) (PeerGetter, bool)
}

// PeerGetter is the interface that must be implemented by a peer
type PeerGetter interface {
	Get(group, key string) ([]byte, error)
}
  • PickPeer:根据 key 选取节点
  • Get:在对应的节点的对应的 group 中获取key对应的值

2. HTTP 客户端

HTTPPool已经实现了服务端功能,现在来实现客户端功能。

const (
	defaultBasePath = "/geecache/"
	defaultReplicas = 50
)

// HTTPPool implements PeerPicker for a pool of HTTP peers
type HTTPPool struct {
	self        string     // 自己的地址,主机+端口
	basePath    string     // 节点间通讯地址的前缀
	mu          sync.Mutex // guards peers and httpGetters
	peers       *consistenthash.Map
	httpGetters map[string]*httpGetter
}
  • peers:用于根据 key 选择节点
  • httpGetters:映射节点和对应的httpGetter
var _ PeerGetter = (*httpGetter)(nil)

type httpGetter struct {
	baseURL string
}

func (h *httpGetter) Get(group, key string) ([]byte, error) {
	u := fmt.Sprintf(
		"%v%v/%v",
		h.baseURL,
		url.QueryEscape(group),
		url.QueryEscape(key),
	)

	res, err := http.Get(u)
	if err != nil {
		return nil, err
	}
	defer res.Body.Close()

	if res.StatusCode != http.StatusOK {
		return nil, fmt.Errorf("server returned: %v", res.Status)
	}

	bytes, err := io.ReadAll(res.Body)
	if err != nil {
		return nil, fmt.Errorf("reading response body: %v", err)
	}

	return bytes, nil
}

通过调用http.Get函数获取返回值。

// Set updates the pool's list of peers
func (p *HTTPPool) Set(peers ...string) {
	p.mu.Lock()
	defer p.mu.Unlock()

	p.peers = consistenthash.New(defaultReplicas, nil)
	p.peers.Add(peers...)
	p.httpGetters = make(map[string]*httpGetter, len(peers))
	for _, peer := range peers {
		p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath}
	}
}

// PickPeer picks a peer according to key
func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) {
	p.mu.Lock()
	defer p.mu.Unlock()

	if peer := p.peers.Get(key); peer != "" && peer != p.self {
		p.Log("Pick peer %s", peer)
		return p.httpGetters[peer], true
	}
	return nil, false
}

  • Set
    1. 使用默认replicas创建consistenthash.Map实例
    2. 添加传入的节点
    3. 添加节点/httpGetter映射
  • PickPeer:根据 key 获取节点对应的 httpGetter

3. 主流程

// Group is a cache namespace and associated data loaded spread over
type Group struct {
	name      string
	getter    Getter
	mainCache cache
	peers     PeerPicker
}
  • peers PeerPicker:调用PickPeer获取key对应的节点,并获取key对应的value
// RegisterPeers registers a PeerPicker for choosing remote peer
func (g *Group) RegisterPeers(peers PeerPicker) {
	if g.peers != nil {
		panic("RegisterPeerPicker called more than once")
	}

	g.peers = peers
}

注册节点。

func (g *Group) load(key string) (ByteView, error) {
	if g.peers != nil {
		if peer, ok := g.peers.PickPeer(key); ok {
			val, err := g.getFromPeer(peer, key)
			if err == nil {
				return val, nil
			}
			log.Println("[GeeCache] Failed to get from peer", err)
		}
	}

	return g.getLocally(key)
}

func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
	bytes, err := peer.Get(g.name, key)
	if err != nil {
		return ByteView{}, nil
	}
	return ByteView{b: bytes}, nil
}
  • load
    1. 从其他节点查询键
    2. 若查询失败,则在本地查询源数据
  • getFromPeer:通过key选择节点并查询

4. Demo 测试

package main

import (
	"flag"
	"fmt"
	"geecache"
	"log"
	"net/http"
)

var db = map[string]string{
	"Tom":  "630",
	"Jack": "589",
	"Sam":  "567",
}

const (
	protocol = "http://"
)

func createGroup() *geecache.Group {
	return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(func(key string) ([]byte, error) {
		log.Println("[SlowDB] search key", key)
		if v, ok := db[key]; ok {
			return []byte(v), nil
		}
		return nil, fmt.Errorf("%s not exist", key)
	}))
}

func startCacheServer(addr string, addrs []string, gee *geecache.Group) {
	peers := geecache.NewHTTPPool(addr)
	peers.Set(addrs...)
	gee.RegisterPeers(peers)
	log.Println("geecache is running at", addr)
	log.Fatal(http.ListenAndServe(addr[len(protocol):], peers))
}

func startAPIServer(apiAddr string, gee *geecache.Group) {
	http.Handle("/api", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
		key := req.URL.Query().Get("key")
		view, err := gee.Get(key)
		if err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}
		w.Header().Set("Content-Type", "application/octet-stream")
		w.Write(view.ByteSlice())
	}))

	log.Println("fontend server is running at", apiAddr)
	log.Fatal(http.ListenAndServe(apiAddr[len(protocol):], nil))
}

func main() {
	var (
		port int
		api  bool
	)
	flag.IntVar(&port, "port", 8001, "GeeCache server port")
	flag.BoolVar(&api, "api", false, "Start a api server")
	flag.Parse()

	apiAddr := "http://localhost:9999"
	addrMap := map[int]string{
		8001: "http://localhost:8001",
		8002: "http://localhost:8002",
		8003: "http://localhost:8003",
	}

	var addrs []string
	for _, v := range addrMap {
		addrs = append(addrs, v)
	}

	gee := createGroup()
	if api {
		go startAPIServer(apiAddr, gee)
	}
	startCacheServer(addrMap[port], addrs, gee)
}

  • startCacheServer:启动缓存服务
    1. 创建 HTTPPool
    2. 添加节点信息
    3. 注册到 Group
    4. 启动 HTTP 服务
  • startAPIServer:启动 HTTP 服务
#!/usr/bin/env sh
# #!/bin/sh
# 通过/usr/bin/env运行命令的好处是可以查找当前 environment 中程序的默认版本

# trap 收到 EXIT 信号时,执行命令
# rm server 移除临时文件
# kill 0 关闭当前进程组的所有进程
trap "rm server; kill 0" EXIT

go build -ldflags "-s -w" -o ./server
./server -port 8001  &
./server -port 8002 &
./server -port 8003 -api  &

sleep 1

echo ">>> start test"
curl "http://localhost:9999/api?key=Tom" &
curl "http://localhost:9999/api?key=Tom" &
curl "http://localhost:9999/api?key=Tom" &

wait
2023/10/11 05:57:59 geecache is running at http://localhost:8003
2023/10/11 05:57:59 geecache is running at http://localhost:8001
2023/10/11 05:57:59 geecache is running at http://localhost:8002
2023/10/11 05:57:59 fontend server is running at http://localhost:9999
>>> start test
2023/10/11 05:58:00 [Server http://localhost:8003] Pick peer http://localhost:8001
2023/10/11 05:58:00 [Server http://localhost:8003] Pick peer http://localhost:8001
2023/10/11 05:58:00 [Server http://localhost:8003] Pick peer http://localhost:8001
2023/10/11 05:58:00 [Server http://localhost:8001] GET /geecache/scores/Tom
2023/10/11 05:58:00 [SlowDB] search key Tom
2023/10/11 05:58:00 [Server http://localhost:8001] GET /geecache/scores/Tom
2023/10/11 05:58:00 [GeeCache] hit
2023/10/11 05:58:00 [Server http://localhost:8001] GET /geecache/scores/Tom
2023/10/11 05:58:00 [GeeCache] hit
630630630

Reference

  1. https://geektutu.com/post/geecache-day5.htmlopen in new window
上次编辑于:
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.15.2