grpc-go之负载均衡(七)

2022-10-12 17:16:23 浏览数 (1)

介绍

gRPC 中的负载平衡基于每个调用而不是每个连接发生。即使所有请求都来自单个客户端,我们仍然希望它们在所有服务器之间进行负载平衡。

gRPC 负载均衡包括客户端负载均衡和服务端负载均衡两种方向, gRPC 的客户端负载均衡的流程如下

  • 域名解析 启动时,gRPC 客户端会针对服务器名称发出名称解析请求。该名称将解析为 IP 地址列表、指示使用哪个客户端负载平衡策略 关于gRPC-go域名解析的内容可以参考《grpc-go之NameResolver(六)》
  • 实例化负载平衡策略 客户端实例化负载平衡策略, 负载平衡策略为服务器的 IP 地址创建一组子通道。并监视子通道的连接状态并决定每个子通道何时应尝试连接。对于每个发送的 RPC,负载平衡策略决定 RPC 应该发送到哪个子通道。

负载平衡策略

gRPC-Go 中内置了pick_first和round_robin两种算法。

  • pick_first 尝试连接到第一个地址,如果连接成功,则将其用于所有RPC,如果连接失败,则尝试下一个地址(并继续这样做,直到一个连接成功)。
  • round_robin 连接到所有地址,并依次向每个后端发送一个RPC。例如,第一个RPC将发送到backend-1,第二个RPC将发送到backend-2,第三个RPC将再次发送到backend-1。

案例说明

name_reslover/grpc_reslover.go

代码语言:go复制
package grpc_resolver

import (
	"context"
	"fmt"
	"google.golang.org/grpc/resolver"
	"sync"
	"time"
)

const (
	ExampleScheme      = "ns"
	ExampleServiceName = "resolver.example.grpc.io"
	backendAddr1       = "localhost:50051"
	backendAddr2       = "localhost:50052"
	minNSResRate       = 10 * time.Second
)

type exampleResolverBuilder struct{}

func (*exampleResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	r := &exampleResolver{
		target: target,
		cc:     cc,
		addrsStore: map[string][]string{
			ExampleServiceName: { backendAddr2},
		},
		rn:     make(chan struct{}, 1),
		ctx:    ctx,
		cancel: cancel,
	}
	r.wg.Add(1)
	// 开启监听, 一般注册中心有变化, 则及时更新地址信息
	go r.watcher()
	r.ResolveNow(resolver.ResolveNowOptions{})
	return r, nil
}
func (*exampleResolverBuilder) Scheme() string { return ExampleScheme }

// exampleResolver is a
// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver).
type exampleResolver struct {
	target     resolver.Target
	cc         resolver.ClientConn
	addrsStore map[string][]string
	rn         chan struct{}
	ctx        context.Context
	cancel     context.CancelFunc
	wg         sync.WaitGroup
}

func (r *exampleResolver) ResolveNow(o resolver.ResolveNowOptions) {
	select {
	case r.rn <- struct{}{}:
	default:
	}
}
func (r *exampleResolver) Close() {
	r.cancel()
	r.wg.Wait()
}

// 全局 resolverBuild 都存放一个 map 中,key 为 scheme,value 为对应的 resolverBuilder。
// init的时候会把自定义的exampleResolverBuilder注册进去
func init() {
	// Register the example ResolverBuilder. This is usually done in a package's
	// init() function.
	resolver.Register(&exampleResolverBuilder{})
}

// ServiceInstance 定义服务注册中心返回的服务实例
type ServiceInstance struct {
	// 服务名称
	ServiceName string
	// 服务地址(包括了ip和端口)
	Addr string
}

// GetAllInstance 从服务注册中心获取可用的服务实例列表
// 这里为了演示, 简单的直接返回了一个列表,真实场景是需要调服务注册中心的API的
func GetAllInstance() ([]ServiceInstance, error) {
	return []ServiceInstance{
		{ExampleServiceName, backendAddr1},
		{ExampleServiceName, backendAddr2},
	}, nil
}

func (r *exampleResolver) lookup() (*resolver.State, error) {
	instances, err := GetAllInstance()
	addresses := make([]resolver.Address, len(instances))
	for i, instance := range instances {
		addresses[i] = resolver.Address{Addr: instance.Addr, ServerName: instance.ServiceName}
	}
	state := &resolver.State{Addresses: addresses}
	return state, err
}

//watcher backend svr change
func (r *exampleResolver) watcher() {
	defer r.wg.Done()
	for {
		select {
		case <-r.ctx.Done():
			fmt.Println("关闭了")
			return
		case <-r.rn:
		}
		state, err := r.lookup()
		if err != nil {
			r.cc.ReportError(err)
		} else {
			r.cc.UpdateState(*state)
		}

		// 第二个select 用一个 timer 来限制dns更新频率
		t := time.NewTimer(minNSResRate)
		select {
		case <-t.C:
			r.rn <- struct{}{}
		case <-r.ctx.Done():
			t.Stop()
			return
		}
	}
}

client/main.go

代码语言:go复制
package main

import (
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/credentials/oauth"
	"google.golang.org/grpc/examples/data"
	"grpc-demo/helloworld/auth"
	"grpc-demo/helloworld/grpc_validator"
	grpc_resolver "grpc-demo/helloworld/name_reslover"
	"grpc-demo/helloworld/pb"
	"log"
)

func main() {

	// 更多配置信息查看官方文档: https://github.com/grpc/grpc/blob/master/doc/service_config.md
	// service这里语法为<package>.<service> package就是proto文件中指定的package,service也是proto文件中指定的 Service Name。
	// method 可以不指定 即当前service下的所以方法都使用该配置。
	serverPolicy1 := `{
		"methodConfig": [{
		  "name": [{"service": "pb.Greeter","method":"SayHello"}],
		  "retryPolicy": {
			  "MaxAttempts": 5,
			  "InitialBackoff": ".01s",
			  "MaxBackoff": ".01s",
			  "BackoffMultiplier": 1.0,
			  "RetryableStatusCodes": [ "UNAVAILABLE", "DEADLINE_EXCEEDED" ]
		  }
		}]}`

	// 构建一个 PerRPCCredentials。
	// 使用内置的Oauth2
	oauthAuth := oauth.NewOauthAccess(auth.FetchToken())

	// 使用自定一的的身份验证
	userPwdAuth := auth.NewUserPwdAuth()

	// 使用自定一的的身份验证
	jwtAuth := auth.NewJWTAuthToken()

	cred, err := credentials.NewClientTLSFromFile(data.Path("/Users/guirong/go/src/grpc-demo/helloworld/client/ca.crt"),
		"www.ggr.com")
	if err != nil {
		log.Fatalf("failed to load credentials: %v", err)
	}

	fmt.Printf("call BattleService.Battle %s:///%s, use loadBalancingPolicy=pickFirst n", grpc_resolver.ExampleScheme,
		grpc_resolver.ExampleServiceName)
	pickFirstConn, err := grpc.Dial(
		fmt.Sprintf("%s:///%s", grpc_resolver.ExampleScheme, grpc_resolver.ExampleServiceName),
		grpc.WithDefaultServiceConfig(serverPolicy1),
		grpc.WithTransportCredentials(cred),
		grpc.WithPerRPCCredentials(userPwdAuth),
		grpc.WithPerRPCCredentials(oauthAuth),
		grpc.WithPerRPCCredentials(jwtAuth),
		grpc.WithChainUnaryInterceptor(grpc_validator.UnaryClientInterceptor()),
		grpc.WithStreamInterceptor(grpc_validator.ClientStreamInterceptor()),
	)
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer pickFirstConn.Close()

	client1 := pb.NewBattleServiceClient(pickFirstConn)
	for i := 0; i < 10; i   {
		bidirectionalStreamBattle(client1)
	}

	serverPolicy2 := `{
		"loadBalancingConfig": [ { "round_robin": {} } ],
		"methodConfig": [{
		  "name": [{"service": "pb.Greeter","method":"SayHello"}],
		  "retryPolicy": {
			  "MaxAttempts": 5,
			  "InitialBackoff": ".01s",
			  "MaxBackoff": ".01s",
			  "BackoffMultiplier": 1.0,
			  "RetryableStatusCodes": [ "UNAVAILABLE", "DEADLINE_EXCEEDED" ]
		  }
		}]}`
	fmt.Printf("call BattleService.Battle %s:///%s, use loadBalancingPolicy=round_robinn",
		grpc_resolver.ExampleScheme,
		grpc_resolver.ExampleServiceName)
	//ctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
	//defer cancel()
	roundRobinConn, err := grpc.Dial(
		fmt.Sprintf("%s:///%s", grpc_resolver.ExampleScheme, grpc_resolver.ExampleServiceName),
		// 配置负载均衡策略, 默认是pickFirst
		grpc.WithDefaultServiceConfig(serverPolicy2),
		grpc.WithTransportCredentials(cred),
		grpc.WithPerRPCCredentials(userPwdAuth),
		grpc.WithPerRPCCredentials(oauthAuth),
		grpc.WithPerRPCCredentials(jwtAuth),
		grpc.WithChainUnaryInterceptor(grpc_validator.UnaryClientInterceptor()),
		grpc.WithStreamInterceptor(grpc_validator.ClientStreamInterceptor()),
	)

	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer roundRobinConn.Close()

	client2 := pb.NewBattleServiceClient(roundRobinConn)
	for i := 0; i < 10; i   {
		bidirectionalStreamBattle(client2)
	}
}

其中loadBalancingConfig": [ { "round_robin": {} } ]配置就是用来执行负载均衡策略的.

image.pngimage.png

输出结果

代码语言:txt复制
拦截器
拦截器
拦截器
拦截器
2022/10/12 16:50:35 Serving gRPC on 0.0.0.0:50052
2022/10/12 16:50:35 Serving gRPC on 0.0.0.0:50051
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50052
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50052
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50052
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50052
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50052
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50052
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50052
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50052
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50052
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50052
<nil>
进入IsValidOauth2验证Oauth2...
离开IsValidOauth2, Oauth2 验证OK...
进入IsValidJWToken验证jwt token...
离开IsValidJWToken,jwt token 验证 OK...
HeroId:"hero_1" SkillId:"Skill_1" 
2022/10/12 16:50:37 BattleServer地址 :50051
HeroId:"hero_2" SkillId:"Skill_2" 
2022/10/12 16:50:37 BattleServer地址 :50051
<nil>

参考

https://github.com/grpc/grpc/blob/master/doc/load-balancing.md

https://www.lixueduan.com/posts/grpc/12-client-side-loadbalance/

0 人点赞