golang源码分析:grpc 链接池(4)自定义resolver 、balancer和picker

2023-03-01 16:19:14 浏览数 (2)

在分析完源码后golang源码分析:grpc 链接池(3)resolver 、balancer和picker,我们尝试自定义实现相应的插件。grpc 通过服务发现或者直连形式获取到 gRPC server 的实例的 endpoints,然后通知负载均衡器进行 SubConn 更新,对于新加入的 endpoint 进行实例创建,移出废弃的 endpoint, 最后通过状态更新将状态为 Idle 的 SubConn 进行管理,gRPC 在调用 Invoke时,则会通过负载均衡器中的 Picker 去按照某一个负载均衡算法选择一个 SubConn 创建链接,如果创建成功则不再进行其他 SubConn 的尝试,否则会按照一定的退避算法进行重试,直到退避失败或者创建链接成功为止。上述三个组件的功能分别如下:

  • resolver:通过直连、本地配置,或者从服务发现后台,比如k8s、nacos、etcd,consul等存储介质,获取target对应的endpoint列表。
  • balancer:管理连接池的SubConn,创建对应的picker
  • picker:从 SubConn 列表中按照负载均衡算法选择一个 SubConn 创建链接

下面我们通过这样一个实例来分别实现上述组件,并测试正确性。

后台服务有三个endpoint:127.0.0.1:9080,127.0.0.1:9081,127.0.0.1:9082客户端可以随机从三个endpoint中选择 一个来发送请求

代码语言:javascript复制
package myresolver

import (
  "fmt"
  "strconv"

  // "github.com/golang-leetcode/grpc-go/resolver"
  //引用包不对,导致注册失败
  "google.golang.org/grpc/resolver"
)

const MockResolverScheme = "mock"

type mockResolverBuilder struct {
}

type mockResolver struct {
  target resolver.Target
  cc     resolver.ClientConn
}

func (*mockResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
  r := &mockResolver{
    target: target,
    cc:     cc,
  }
  fmt.Println("target:", target, target.Endpoint, target.URL)
  switch target.Endpoint {
  case "myMock":
    fmt.Println("myMock resolver")
  }
  r.start()
  return r, nil
}

func (*mockResolverBuilder) Scheme() string {
  fmt.Println("get mock scheme")
  return MockResolverScheme
}

func (r mockResolver) start() {
  addrs := make([]resolver.Address, 0)
  for i := 0; i < 3; i   {
    addrs = append(addrs, resolver.Address{Addr: "127.0.0.1:908"   strconv.FormatInt(int64(i), 10)})
  }
  fmt.Println("addrs", addrs)
  r.cc.UpdateState(resolver.State{Addresses: addrs})
}

func (r *mockResolver) ResolveNow(o resolver.ResolveNowOptions) {
  addrs := make([]resolver.Address, 0)
  for i := 0; i < 3; i   {
    addrs = append(addrs, resolver.Address{
      ServerName: "resolver.mock.grpc.io"   strconv.FormatInt(int64(i), 10),
      Addr:       "127.0.0.1:908"   strconv.FormatInt(int64(i), 10)})
  }
  fmt.Println("addrs", addrs)
  //2023/02/05 16:55:53 could not greet rpc error: code = Unavailable desc = last connection error: connection error: desc = "transport: Error while dialing dial tcp: lookup tcp///default/resolver.mock.grpc.io: nodename nor servname provided, or not known"
  r.start()
}
func (*mockResolver) Close() {}

func init() {
  resolver.Register(&mockResolverBuilder{})
  fmt.Println(resolver.Get(MockResolverScheme))
}
代码语言:javascript复制
package mybalancer

import (
  "learn/grpc/picker"

  "google.golang.org/grpc/balancer"
  "google.golang.org/grpc/balancer/base"
)

const Name = "random"

func init() {
  balancer.Register(newBuilder())
}

func newBuilder() balancer.Builder {
  return base.NewBalancerBuilder(Name, picker.NewRandomPickerBuilder(), base.Config{HealthCheck: true})
}
代码语言:javascript复制
package picker

import (
  "fmt"
  "math/rand"
  "sync"
  "time"

  "google.golang.org/grpc/balancer"
  "google.golang.org/grpc/balancer/base"
)

func NewRandomPickerBuilder() *randomPickerBuilder {
  return &randomPickerBuilder{}
}

type randomPickerBuilder struct {
}

type Conn struct {
  SubConn     balancer.SubConn
  SubConnInfo base.SubConnInfo
}

func (r *randomPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
  if len(info.ReadySCs) == 0 {
    return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
  }
  readyScs := make([]Conn, 0, len(info.ReadySCs))
  for sc, info := range info.ReadySCs {
    readyScs = append(readyScs, Conn{
      SubConn:     sc,
      SubConnInfo: info,
    })
  }
  return &randomPicker{
    subConns: readyScs,
    r:        rand.New(rand.NewSource(time.Now().UnixNano())),
  }
}

type randomPicker struct {
  subConns []Conn
  mu       sync.Mutex
  r        *rand.Rand
}

func (r *randomPicker) Pick(_ balancer.PickInfo) (balancer.PickResult, error) {
  next := r.r.Int() % len(r.subConns)
  sc := r.subConns[next]
  fmt.Printf("picked: % vn", sc.SubConnInfo.Address.Addr)
  return balancer.PickResult{
    SubConn: sc.SubConn,
  }, nil
}

这里实现的比较简单,比如我们可以通过target.Endpoint 来实现不同的endpoint获取策略。然后我们启动服务

代码语言:javascript复制
package hello

import (
  context "context"

  codes "google.golang.org/grpc/codes"
  status "google.golang.org/grpc/status"
)

type ImplementedGreeterServer struct {
  Port string
}

func (i *ImplementedGreeterServer) SayHello(context.Context, *HelloRequest) (*HelloReply, error) {
  return nil, status.Errorf(codes.Unimplemented, "method SayHello  implemented" i.Port)
}
func (i *ImplementedGreeterServer) SayHello1(context.Context, *HelloRequest) (*HelloReply, error) {
  return nil, status.Errorf(codes.Unimplemented, "method SayHello1  implemented" i.Port)
}

func (i *ImplementedGreeterServer) mustEmbedUnimplementedGreeterServer() {}
代码语言:javascript复制
package main

import (
  helloworld "learn/grpc/conn/hello"
  "log"
  "net"
  "strconv"

  "google.golang.org/grpc"
)

func main() {
  for i := 0; i < 3; i   {
    go func(i int) {
      addr := ":908"   strconv.FormatInt(int64(i), 10)

      srv := grpc.NewServer()
      helloworld.RegisterGreeterServer(srv, &helloworld.ImplementedGreeterServer{
        Port: addr,
      })

      listener, err := net.Listen("tcp", addr)
      if err != nil {
        log.Fatalf("failed to listen: %v", err)
      }

      err = srv.Serve(listener)
      if err != nil {
        log.Fatalf("failed to serve: %v", err)
      }
    }(i)
  }
  select {}
}

然后通过客户端发起请求

代码语言:javascript复制
package main

import (
  "context"
  "fmt"
  "log"
  "time"

  helloworld "learn/grpc/conn/hello"
  "learn/grpc/mybalancer"
  "learn/grpc/myresolver"

  "google.golang.org/grpc"
)

func main() {
  conn, err := grpc.Dial(myresolver.MockResolverScheme ":///myMock", grpc.WithInsecure(), grpc.WithTimeout(10*time.Second), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, mybalancer.Name)), grpc.WithBlock()) // grpc.WithBalancerName(builder.Name) //, grpc.WithBlock(),grpc.WithResolvers(r),
  defer conn.Close()
  if err != nil {
    log.Fatalf("did not1 connect: %v", err)
  }
  client := helloworld.NewGreeterClient(conn)
  for i := 0; i < 10; i   {
    resp, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "hello world!"})
    fmt.Println(i 1, "个", resp, err)
    resp, err = client.SayHello1(context.Background(), &helloworld.HelloRequest{Name: "hello world!"})
    fmt.Println(i 1, "个", resp, err)
    if err != nil {
      log.Println("could not greet", err)
    }
  }
}

这里需要注意的是,我们在创建连接的是通过schema来选择我们自定义的mock resolver,指定balancer的时候,使用的是 grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, mybalancer.Name))来指定的,对于picker来说,它是balancer里面直接引用的,所以不需要在连接的时候指定。

代码语言:javascript复制
1 个 <nil> rpc error: code = Unimplemented desc = method SayHello  implemented:9082
picked: 127.0.0.1:9081
1 个 <nil> rpc error: code = Unimplemented desc = method SayHello1  implemented:9081
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1  implemented:9081
picked: 127.0.0.1:9081
2 个 <nil> rpc error: code = Unimplemented desc = method SayHello  implemented:9081
picked: 127.0.0.1:9080
2 个 <nil> rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
picked: 127.0.0.1:9080
3 个 <nil> rpc error: code = Unimplemented desc = method SayHello  implemented:9080
picked: 127.0.0.1:9080
3 个 <nil> rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
picked: 127.0.0.1:9080
4 个 <nil> rpc error: code = Unimplemented desc = method SayHello  implemented:9080
picked: 127.0.0.1:9080
4 个 <nil> rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
picked: 127.0.0.1:9081
5 个 <nil> rpc error: code = Unimplemented desc = method SayHello  implemented:9081
picked: 127.0.0.1:9082
5 个 <nil> rpc error: code = Unimplemented desc = method SayHello1  implemented:9082
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1  implemented:9082
picked: 127.0.0.1:9080
6 个 <nil> rpc error: code = Unimplemented desc = method SayHello  implemented:9080
picked: 127.0.0.1:9080
6 个 <nil> rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
picked: 127.0.0.1:9081
7 个 <nil> rpc error: code = Unimplemented desc = method SayHello  implemented:9081
picked: 127.0.0.1:9080
7 个 <nil> rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
picked: 127.0.0.1:9080
8 个 <nil> rpc error: code = Unimplemented desc = method SayHello  implemented:9080
picked: 127.0.0.1:9081
8 个 <nil> rpc error: code = Unimplemented desc = method SayHello1  implemented:9081
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1  implemented:9081
picked: 127.0.0.1:9081
9 个 <nil> rpc error: code = Unimplemented desc = method SayHello  implemented:9081
picked: 127.0.0.1:9080
9 个 <nil> rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1  implemented:9080
picked: 127.0.0.1:9080
10 个 <nil> rpc error: code = Unimplemented desc = method SayHello  implemented:9080
picked: 127.0.0.1:9082
10 个 <nil> rpc error: code = Unimplemented desc = method SayHello1  implemented:9082
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1  implemented:9082

看下结果,我们的连接端口是随机的,符合我们的预期。

0 人点赞