在云原生的时代,服务发现已经是必不可少的功能,我借着最近迁移 gRPC 服务的机会尝试了一下如何用 etcd 实现服务发现,期间遇到诸多问题,本文逐一记之。
虽然 gRPC 并没有内置 etcd 的服务发现功能,但是它提供了相关接口让我们扩展:
代码语言:javascript复制// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
// Build creates a new resolver for the given target.
//
// gRPC dial calls Build synchronously, and fails if the returned error is
// not nil.
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
// Scheme returns the scheme supported by this resolver.
// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
Scheme() string
}
// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
// ResolveNow will be called by gRPC to try to resolve the target name
// again. It's just a hint, resolver can ignore this if it's not necessary.
//
// It could be called multiple times concurrently.
ResolveNow(ResolveNowOptions)
// Close closes the resolver.
Close()
}
在实际动手之前,有必要了解一下「gRPC Name Resolution」,它定义了 gRPC 的 URI 格式,举个例子:「dns://1.1.1.1/huoding.com」,其中:
- Scheme:dns
- Authority:1.1.1.1
- Endpoint:huoding.com
表示通过 dns 服务器 1.1.1.1 查询 huoding.com 有哪些节点。
既然我们要支持 etcd,那么我们首先要想好 etcd 对应的 URI 应该是什么样的,尤其是 Authority 填什么好呢?按上面例子的意思,填 etcd 服务器的地址似乎就可以,不过实际情况中,一般会有多台 etcd 服务器,还牵扯到用户名密码,虽然我们可以构造一个复杂的 DSN 字符串全写到 Authority 里,但是那样的话显得太臃肿了,还不如直接从配置文件里获取来的实在,所以建议 Authority 留空。假设我们要通过 etcd 查询一个名为 foo 的服务对应的节点,那么 URI 可以定义为:「etcd:///foo」。
了解了基础知识之后,在编码前让我们在头脑里过一遍 gRPC 的服务流程:
- 服务端启动,在 etcd 里通过租约注册键为「/foo/<ip>:<port>」并且值为「<ip>:<port>」的数据,同时定期发送心跳包,一旦节点退出会注销相关数据。
- 客户端启动,gRPC 从 etcd:///foo 解析出 Scheme、Authority、Endpoint,并根据 Scheme 找到对应 Builder,调用其 Build 方法,返回对应的 Resolver,在 etcd 中查询前缀是「/foo/」的数据,就是目前可用的节点。
- 最后,负载均衡会挑选出一个节点来提供服务。
etcd
下面可以粘代码了,我主要是参考 gRPC 内置的 dns_resolver.go 来实现的。
先是 builder.go,实现了 Builder 接口:
代码语言:javascript复制package etcd
import (
"fmt"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc/resolver"
)
type Builder struct {
Client *clientv3.Client
}
func (b *Builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
prefix := fmt.Sprintf("/%s/", target.Endpoint)
r := &Resolver{
Client: b.Client,
cc: cc,
prefix: prefix,
}
go r.watcher()
r.ResolveNow(resolver.ResolveNowOptions{})
return r, nil
}
func (b *Builder) Scheme() string {
return "etcd"
}
再是 resolver.go,实现了 Resolver 接口:
代码语言:javascript复制package etcd
import (
"context"
"sync"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"google.golang.org/grpc/resolver"
)
type Resolver struct {
sync.RWMutex
Client *clientv3.Client
cc resolver.ClientConn
prefix string
addresses map[string]resolver.Address
}
func (r *Resolver) ResolveNow(resolver.ResolveNowOptions) {
// todo
}
func (r *Resolver) Close() {
// todo
}
func (r *Resolver) watcher() {
r.addresses = make(map[string]resolver.Address)
response, err := r.Client.Get(context.Background(), r.prefix, clientv3.WithPrefix())
if err == nil {
for _, kv := range response.Kvs {
r.setAddress(string(kv.Key), string(kv.Value))
}
r.cc.UpdateState(resolver.State{
Addresses: r.getAddresses(),
})
}
watch := r.Client.Watch(context.Background(), r.prefix, clientv3.WithPrefix())
for response := range watch {
for _, event := range response.Events {
switch event.Type {
case mvccpb.PUT:
r.setAddress(string(event.Kv.Key), string(event.Kv.Value))
case mvccpb.DELETE:
r.delAddress(string(event.Kv.Key))
}
}
r.cc.UpdateState(resolver.State{
Addresses: r.getAddresses(),
})
}
}
func (r *Resolver) setAddress(key, address string) {
r.Lock()
defer r.Unlock()
r.addresses[key] = resolver.Address{Addr: string(address)}
}
func (r *Resolver) delAddress(key string) {
r.Lock()
defer r.Unlock()
delete(r.addresses, key)
}
func (r *Resolver) getAddresses() []resolver.Address {
addresses := make([]resolver.Address, 0, len(r.addresses))
for _, address := range r.addresses {
addresses = append(addresses, address)
}
return addresses
}
接着是服务端代码:
代码语言:javascript复制func main() {
host := viper.GetString("server.host")
port := viper.GetString("server.port")
listener, err := net.Listen("tcp", net.JoinHostPort(host, port))
if err != nil {
log.Fatalln(err)
}
server := grpc.NewServer()
reflection.Register(server)
pb.RegisterFooServer(server, &foo.Server{})
close, err := register("foo", 10)
if err != nil {
log.Fatalln(err)
}
go func() {
if err := server.Serve(listener); err != nil {
log.Fatalln(err)
}
}()
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
close()
}
func register(service string, ttl int64) (func(), error) {
port := viper.GetString("server.port")
client, err := etcdClient()
if err != nil {
return nil, err
}
ctx := context.Background()
lease, err := client.Grant(ctx, ttl)
if err != nil {
return nil, err
}
ip, err := localIP()
if err != nil {
return nil, err
}
key := fmt.Sprintf("/%s/%s:%s", service, ip, port)
value := fmt.Sprintf("%s:%s", ip, port)
_, err = client.Put(ctx, key, value, clientv3.WithLease(lease.ID))
if err != nil {
return nil, err
}
keepAlive, err := client.KeepAlive(ctx, lease.ID)
if err != nil {
return nil, err
}
go func() {
for {
<-keepAlive
}
}()
close := func() {
_, _ = client.Revoke(ctx, lease.ID)
}
return close, nil
}
func localIP() (string, error) {
addrs, err := net.InterfaceAddrs()
if err != nil {
return "", err
}
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
return ipnet.IP.String(), nil
}
}
}
return "", errors.New("unable to determine local ip")
}
最后是客户端代码:
代码语言:javascript复制func main() {
client, err := etcdClient()
if err != nil {
log.Fatalln(err)
}
builder := &etcd.Builder{
Client: client,
}
resolver.Register(builder)
ctx := context.Background()
target := "etcd:///foo"
cc, err := grpc.DialContext(
ctx,
target,
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)),
grpc.WithInsecure(),
)
if err != nil {
log.Fatalln(err)
}
defer cc.Close()
// pb.NewFooClient(cc) ...
}
说明:在获取 ip 的时候,没有考虑内外网 ip 的情况,需要的可以参考相关资料。
说明:etcd 3.5 以下版本 和 gRPC 的最新版本不兼容,需要在 go.mod 里指定版本:
代码语言:javascript复制replace (
github.com/golang/protobuf => github.com/golang/protobuf v1.3.2
go.etcd.io/etcd => go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738
google.golang.org/grpc => google.golang.org/grpc v1.26.0
)
结尾我要说的是 Go Kit 是个好东西啊, 内置支持各种服务发现,包括 etcd。