grpc-go之NameResolver(六)

2022-10-12 13:04:10 浏览数 (2)

介绍

在大多数场景下, 我们的服务都不是单节点部署而是多节点部署, 通过域名访问服务是目前大部分网络应用的真实情况.

NameResolver可以看作是一个map[service-name][backend-ip]. 它接受一个服务名称,并返回一个地址列表(IP 端口号)。常用的NameResolver是DNS, 不了解域名解析的可以看我这篇文章《域名解析的全过程》。

gRPC 中的默认使用DNS域名解析,同时在客户端以插件形式提供了自定义NameResolver的支持。

自定义NameResolver

name_reslover/grpc_reslover.go

代码语言:txt复制
package grpc_resolver

import (
	"context"
	"fmt"
	"google.golang.org/grpc/resolver"
	"sync"
	"time"
)

const (
	ExampleScheme      = "ns"
	ExampleServiceName = "resolver.example.grpc.io"
	backendAddr        = "localhost:50051"
	minNSResRate       = 10 * time.Second
)

type exampleResolverBuilder struct{}

func (*exampleResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	r := &exampleResolver{
		target: target,
		cc:     cc,
		addrsStore: map[string][]string{
			ExampleServiceName: {backendAddr},
		},
		rn:     make(chan struct{}, 1),
		ctx:    ctx,
		cancel: cancel,
	}
	r.wg.Add(1)
	// 开启监听, 一般注册中心有变化, 则及时更新地址信息
	go r.watcher()
	r.ResolveNow(resolver.ResolveNowOptions{})
	return r, nil
}
func (*exampleResolverBuilder) Scheme() string { return ExampleScheme }

// exampleResolver is a
// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver).
type exampleResolver struct {
	target     resolver.Target
	cc         resolver.ClientConn
	addrsStore map[string][]string
	rn         chan struct{}
	ctx        context.Context
	cancel     context.CancelFunc
	wg         sync.WaitGroup
}

func (r *exampleResolver) ResolveNow(o resolver.ResolveNowOptions) {
	select {
	case r.rn <- struct{}{}:
	default:
	}
}
func (r *exampleResolver) Close() {
	r.cancel()
	r.wg.Wait()
}

// 全局 resolverBuild 都存放一个 map 中,key 为 scheme,value 为对应的 resolverBuilder。
// init的时候会把自定义的exampleResolverBuilder注册进去
func init() {
	// Register the example ResolverBuilder. This is usually done in a package's
	// init() function.
	resolver.Register(&exampleResolverBuilder{})
}

// ServiceInstance 定义服务注册中心返回的服务实例
type ServiceInstance struct {
	// 服务名称
	ServiceName string
	// 服务地址(包括了ip和端口)
	Addr string
}

// GetAllInstance 从服务注册中心获取可用的服务实例列表
// 这里为了演示, 简单的直接返回了一个列表,真实场景是需要调服务注册中心的API的
func GetAllInstance() ([]ServiceInstance, error) {
	return []ServiceInstance{
		{ExampleServiceName, backendAddr},
	}, nil
}

func (r *exampleResolver) lookup() (*resolver.State, error) {
	instances, err := GetAllInstance()
	addresses := make([]resolver.Address, len(instances))
	for i, instance := range instances {
		addresses[i] = resolver.Address{Addr: instance.Addr, ServerName: instance.ServiceName}
	}
	state := &resolver.State{Addresses: addresses}
	return state, err
}

//watcher backend svr change
func (r *exampleResolver) watcher() {
	defer r.wg.Done()
	for {
		select {
		case <-r.ctx.Done():
			fmt.Println("关闭了")
			return
		case <-r.rn:
		}
		state, err := r.lookup()
		if err != nil {
			r.cc.ReportError(err)
		} else {
			r.cc.UpdateState(*state)
		}

		// 第二个select 用一个 timer 来限制dns更新频率
		t := time.NewTimer(minNSResRate)
		select {
		case <-t.C:
			r.rn <- struct{}{}
		case <-r.ctx.Done():
			t.Stop()
			return
		}
	}
}

一般地自定义NameResolver需要实现如下两个接口

代码语言:txt复制
type Builder interface {
	Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
	Scheme() string
}


type Resolver interface {
	ResolveNow(ResolveNowOptions)
	Close()
}

Resolver 用于将服务名解析成对应服务实例。

Builder 则采用 Builder 模式在包初始化时创建并注册构造自定义 Resolver 实例。

基本流程

  • 客户端启动时,注册自定义的 resolver 启动时会自动执行 init() 方法,它会构造自定义的 resolveBuilder,并将其注册到 grpc 内部的 resolveBuilder 表中。
  • 客户端拨号 客户端启动时通过自定义 Dail() 方法构造 grpc.ClientConn 单例, 它会找到URL上Scheme对应的resolveBuilder, 然后调用resolveBuilder的Build方法构建自定义 resolver,同时开启watch更新协程,通过此 resolver 更新可用的服务实例列表。

所谓的服务实例列表其实就是域名对应的IP 端口信息, 一个域名可能对应多个IP 端口实例。

  • LB策略 grpc 底层 LB 库对每个实例均创建一个 subConnection,最终根据相应的 LB 策略,选择合适的 subConnection 处理某次 RPC 请求。
image.pngimage.png

客户端使用

client/main.go

代码语言:txt复制
ctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
	defer cancel()
	exampleConn, err := grpc.DialContext(
		ctx,
		fmt.Sprintf("%s:///%s", grpc_resolver.ExampleScheme, grpc_resolver.ExampleServiceName),
		grpc.WithDefaultServiceConfig(retryPolicy),
		grpc.WithTransportCredentials(cred),
		grpc.WithPerRPCCredentials(userPwdAuth),
		grpc.WithPerRPCCredentials(oauthAuth),
		grpc.WithPerRPCCredentials(jwtAuth),
		grpc.WithChainUnaryInterceptor(grpc_validator.UnaryClientInterceptor()),
		grpc.WithStreamInterceptor(grpc_validator.ClientStreamInterceptor()),
	)
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer exampleConn.Close()

	client3 := pb.NewBattleServiceClient(exampleConn)
	bidirectionalStreamBattle(client3)

输出效果

image.pngimage.png

0 人点赞