5. 分布式节点
...大约 4 分钟
DAY5-DISTRIBUTED-NODES
│ go.mod
│ go.work
│ main.go
│ run.sh
│
└─geecache
│ byteview.go
│ cache.go
│ consistenthash.go
│ consistenthash_test.go
│ geecache.go
│ geecache_test.go
│ go.mod
│ http.go
│ peers.go
│
├─consistenthash
│ consistenthash.go
│ consistenthash_test.go
│
└─lru
lru.go
lru_test.go
1. PeerPicker 和 PeerGetter 接口
// PeerPicker is the interface that must be implemented to locate
// the peer that owns a specific key
type PeerPicker interface {
PickPeer(key string) (PeerGetter, bool)
}
// PeerGetter is the interface that must be implemented by a peer
type PeerGetter interface {
Get(group, key string) ([]byte, error)
}
PickPeer
:根据 key 选取节点Get
:在对应的节点的对应的 group 中获取key对应的值
2. HTTP 客户端
HTTPPool
已经实现了服务端功能,现在来实现客户端功能。
const (
defaultBasePath = "/geecache/"
defaultReplicas = 50
)
// HTTPPool implements PeerPicker for a pool of HTTP peers
type HTTPPool struct {
self string // 自己的地址,主机+端口
basePath string // 节点间通讯地址的前缀
mu sync.Mutex // guards peers and httpGetters
peers *consistenthash.Map
httpGetters map[string]*httpGetter
}
peers
:用于根据 key 选择节点httpGetters
:映射节点和对应的httpGetter
var _ PeerGetter = (*httpGetter)(nil)
type httpGetter struct {
baseURL string
}
func (h *httpGetter) Get(group, key string) ([]byte, error) {
u := fmt.Sprintf(
"%v%v/%v",
h.baseURL,
url.QueryEscape(group),
url.QueryEscape(key),
)
res, err := http.Get(u)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("server returned: %v", res.Status)
}
bytes, err := io.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("reading response body: %v", err)
}
return bytes, nil
}
通过调用http.Get
函数获取返回值。
// Set updates the pool's list of peers
func (p *HTTPPool) Set(peers ...string) {
p.mu.Lock()
defer p.mu.Unlock()
p.peers = consistenthash.New(defaultReplicas, nil)
p.peers.Add(peers...)
p.httpGetters = make(map[string]*httpGetter, len(peers))
for _, peer := range peers {
p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath}
}
}
// PickPeer picks a peer according to key
func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) {
p.mu.Lock()
defer p.mu.Unlock()
if peer := p.peers.Get(key); peer != "" && peer != p.self {
p.Log("Pick peer %s", peer)
return p.httpGetters[peer], true
}
return nil, false
}
Set
:- 使用默认
replicas
创建consistenthash.Map
实例 - 添加传入的节点
- 添加节点/
httpGetter
映射
- 使用默认
PickPeer
:根据 key 获取节点对应的httpGetter
3. 主流程
// Group is a cache namespace and associated data loaded spread over
type Group struct {
name string
getter Getter
mainCache cache
peers PeerPicker
}
peers PeerPicker
:调用PickPeer
获取key对应的节点,并获取key对应的value
// RegisterPeers registers a PeerPicker for choosing remote peer
func (g *Group) RegisterPeers(peers PeerPicker) {
if g.peers != nil {
panic("RegisterPeerPicker called more than once")
}
g.peers = peers
}
注册节点。
func (g *Group) load(key string) (ByteView, error) {
if g.peers != nil {
if peer, ok := g.peers.PickPeer(key); ok {
val, err := g.getFromPeer(peer, key)
if err == nil {
return val, nil
}
log.Println("[GeeCache] Failed to get from peer", err)
}
}
return g.getLocally(key)
}
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
bytes, err := peer.Get(g.name, key)
if err != nil {
return ByteView{}, nil
}
return ByteView{b: bytes}, nil
}
load
:- 从其他节点查询键
- 若查询失败,则在本地查询源数据
getFromPeer
:通过key选择节点并查询
4. Demo 测试
package main
import (
"flag"
"fmt"
"geecache"
"log"
"net/http"
)
var db = map[string]string{
"Tom": "630",
"Jack": "589",
"Sam": "567",
}
const (
protocol = "http://"
)
func createGroup() *geecache.Group {
return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(func(key string) ([]byte, error) {
log.Println("[SlowDB] search key", key)
if v, ok := db[key]; ok {
return []byte(v), nil
}
return nil, fmt.Errorf("%s not exist", key)
}))
}
func startCacheServer(addr string, addrs []string, gee *geecache.Group) {
peers := geecache.NewHTTPPool(addr)
peers.Set(addrs...)
gee.RegisterPeers(peers)
log.Println("geecache is running at", addr)
log.Fatal(http.ListenAndServe(addr[len(protocol):], peers))
}
func startAPIServer(apiAddr string, gee *geecache.Group) {
http.Handle("/api", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
key := req.URL.Query().Get("key")
view, err := gee.Get(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(view.ByteSlice())
}))
log.Println("fontend server is running at", apiAddr)
log.Fatal(http.ListenAndServe(apiAddr[len(protocol):], nil))
}
func main() {
var (
port int
api bool
)
flag.IntVar(&port, "port", 8001, "GeeCache server port")
flag.BoolVar(&api, "api", false, "Start a api server")
flag.Parse()
apiAddr := "http://localhost:9999"
addrMap := map[int]string{
8001: "http://localhost:8001",
8002: "http://localhost:8002",
8003: "http://localhost:8003",
}
var addrs []string
for _, v := range addrMap {
addrs = append(addrs, v)
}
gee := createGroup()
if api {
go startAPIServer(apiAddr, gee)
}
startCacheServer(addrMap[port], addrs, gee)
}
startCacheServer
:启动缓存服务- 创建
HTTPPool
- 添加节点信息
- 注册到
Group
中 - 启动 HTTP 服务
- 创建
startAPIServer
:启动 HTTP 服务
#!/usr/bin/env sh
# #!/bin/sh
# 通过/usr/bin/env运行命令的好处是可以查找当前 environment 中程序的默认版本
# trap 收到 EXIT 信号时,执行命令
# rm server 移除临时文件
# kill 0 关闭当前进程组的所有进程
trap "rm server; kill 0" EXIT
go build -ldflags "-s -w" -o ./server
./server -port 8001 &
./server -port 8002 &
./server -port 8003 -api &
sleep 1
echo ">>> start test"
curl "http://localhost:9999/api?key=Tom" &
curl "http://localhost:9999/api?key=Tom" &
curl "http://localhost:9999/api?key=Tom" &
wait
2023/10/11 05:57:59 geecache is running at http://localhost:8003
2023/10/11 05:57:59 geecache is running at http://localhost:8001
2023/10/11 05:57:59 geecache is running at http://localhost:8002
2023/10/11 05:57:59 fontend server is running at http://localhost:9999
>>> start test
2023/10/11 05:58:00 [Server http://localhost:8003] Pick peer http://localhost:8001
2023/10/11 05:58:00 [Server http://localhost:8003] Pick peer http://localhost:8001
2023/10/11 05:58:00 [Server http://localhost:8003] Pick peer http://localhost:8001
2023/10/11 05:58:00 [Server http://localhost:8001] GET /geecache/scores/Tom
2023/10/11 05:58:00 [SlowDB] search key Tom
2023/10/11 05:58:00 [Server http://localhost:8001] GET /geecache/scores/Tom
2023/10/11 05:58:00 [GeeCache] hit
2023/10/11 05:58:00 [Server http://localhost:8001] GET /geecache/scores/Tom
2023/10/11 05:58:00 [GeeCache] hit
630630630
Reference
Powered by Waline v2.15.2