介绍
在大多数场景下, 我们的服务都不是单节点部署而是多节点部署, 通过域名访问服务是目前大部分网络应用的真实情况.
NameResolver
可以看作是一个map[service-name][backend-ip]
. 它接受一个服务名称,并返回一个地址列表(IP 端口号)。常用的NameResolver是DNS
, 不了解域名解析的可以看我这篇文章《域名解析的全过程》。
gRPC 中的默认使用DNS域名解析
,同时在客户端以插件形式提供了自定义NameResolver
的支持。
自定义NameResolver
name_reslover/grpc_reslover.go
代码语言:txt复制package grpc_resolver
import (
"context"
"fmt"
"google.golang.org/grpc/resolver"
"sync"
"time"
)
const (
ExampleScheme = "ns"
ExampleServiceName = "resolver.example.grpc.io"
backendAddr = "localhost:50051"
minNSResRate = 10 * time.Second
)
type exampleResolverBuilder struct{}
func (*exampleResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
r := &exampleResolver{
target: target,
cc: cc,
addrsStore: map[string][]string{
ExampleServiceName: {backendAddr},
},
rn: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
}
r.wg.Add(1)
// 开启监听, 一般注册中心有变化, 则及时更新地址信息
go r.watcher()
r.ResolveNow(resolver.ResolveNowOptions{})
return r, nil
}
func (*exampleResolverBuilder) Scheme() string { return ExampleScheme }
// exampleResolver is a
// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver).
type exampleResolver struct {
target resolver.Target
cc resolver.ClientConn
addrsStore map[string][]string
rn chan struct{}
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
func (r *exampleResolver) ResolveNow(o resolver.ResolveNowOptions) {
select {
case r.rn <- struct{}{}:
default:
}
}
func (r *exampleResolver) Close() {
r.cancel()
r.wg.Wait()
}
// 全局 resolverBuild 都存放一个 map 中,key 为 scheme,value 为对应的 resolverBuilder。
// init的时候会把自定义的exampleResolverBuilder注册进去
func init() {
// Register the example ResolverBuilder. This is usually done in a package's
// init() function.
resolver.Register(&exampleResolverBuilder{})
}
// ServiceInstance 定义服务注册中心返回的服务实例
type ServiceInstance struct {
// 服务名称
ServiceName string
// 服务地址(包括了ip和端口)
Addr string
}
// GetAllInstance 从服务注册中心获取可用的服务实例列表
// 这里为了演示, 简单的直接返回了一个列表,真实场景是需要调服务注册中心的API的
func GetAllInstance() ([]ServiceInstance, error) {
return []ServiceInstance{
{ExampleServiceName, backendAddr},
}, nil
}
func (r *exampleResolver) lookup() (*resolver.State, error) {
instances, err := GetAllInstance()
addresses := make([]resolver.Address, len(instances))
for i, instance := range instances {
addresses[i] = resolver.Address{Addr: instance.Addr, ServerName: instance.ServiceName}
}
state := &resolver.State{Addresses: addresses}
return state, err
}
//watcher backend svr change
func (r *exampleResolver) watcher() {
defer r.wg.Done()
for {
select {
case <-r.ctx.Done():
fmt.Println("关闭了")
return
case <-r.rn:
}
state, err := r.lookup()
if err != nil {
r.cc.ReportError(err)
} else {
r.cc.UpdateState(*state)
}
// 第二个select 用一个 timer 来限制dns更新频率
t := time.NewTimer(minNSResRate)
select {
case <-t.C:
r.rn <- struct{}{}
case <-r.ctx.Done():
t.Stop()
return
}
}
}
一般地自定义NameResolver需要实现如下两个接口
代码语言:txt复制type Builder interface {
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
Scheme() string
}
type Resolver interface {
ResolveNow(ResolveNowOptions)
Close()
}
Resolver 用于将服务名解析成对应服务实例。
Builder 则采用 Builder 模式在包初始化时创建并注册构造自定义 Resolver 实例。
基本流程
- 客户端启动时,注册自定义的 resolver 启动时会自动执行 init() 方法,它会构造自定义的 resolveBuilder,并将其注册到 grpc 内部的 resolveBuilder 表中。
- 客户端拨号 客户端启动时通过自定义 Dail() 方法构造 grpc.ClientConn 单例, 它会找到URL上Scheme对应的resolveBuilder, 然后调用resolveBuilder的Build方法构建自定义 resolver,同时开启watch更新协程,通过此 resolver 更新可用的服务实例列表。
所谓的服务实例列表其实就是域名对应的IP 端口信息, 一个域名可能对应多个IP 端口实例。
- LB策略 grpc 底层 LB 库对每个实例均创建一个 subConnection,最终根据相应的 LB 策略,选择合适的 subConnection 处理某次 RPC 请求。
客户端使用
client/main.go
代码语言:txt复制ctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
defer cancel()
exampleConn, err := grpc.DialContext(
ctx,
fmt.Sprintf("%s:///%s", grpc_resolver.ExampleScheme, grpc_resolver.ExampleServiceName),
grpc.WithDefaultServiceConfig(retryPolicy),
grpc.WithTransportCredentials(cred),
grpc.WithPerRPCCredentials(userPwdAuth),
grpc.WithPerRPCCredentials(oauthAuth),
grpc.WithPerRPCCredentials(jwtAuth),
grpc.WithChainUnaryInterceptor(grpc_validator.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(grpc_validator.ClientStreamInterceptor()),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer exampleConn.Close()
client3 := pb.NewBattleServiceClient(exampleConn)
bidirectionalStreamBattle(client3)