grpc reslover源码分析

2022-08-02 19:16:54 浏览数 (1)

如何将我们选择的服务解析方式应用到grpc的连接建立中去?

grpc的resolver,就是帮我们解决这个问题的

1.程序启动时,客户端是如何从一个域名/服务名,获取到其对应的实例ip,然后与之建立连接的呢?

2.运行过程中,如果后端的实例挂了,grpc如何感知到,并重新建立连接呢?

使用grpc的时候,首先要做的就是调用Dial或DialContext函数来初始化一个clientConn对象,而resolver是这个连接对象的一个重要的成员,所以我们首先看一看clientConn对象创建过程中,resolver是怎么设置进去的。

代码语言:javascript复制
cc := &ClientConn{
    target:            target,
    csMgr:             &connectivityStateManager{},
    conns:             make(map[*addrConn]struct{}),
    dopts:             defaultDialOptions(),
    blockingpicker:    newPickerWrapper(),
    czData:            new(channelzData),
    firstResolveEvent: grpcsync.NewEvent(),
  }

cc.parsedTarget = grpcutil.ParseTarget(cc.target)
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
代码语言:javascript复制
func ParseTarget(target string) (ret resolver.Target) {
  var ok bool
  ret.Scheme, ret.Endpoint, ok = split2(target, "://")
  if !ok {
    return resolver.Target{Endpoint: target}
  }
  ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
  if !ok {
    return resolver.Target{Endpoint: target}
  }
  return ret
}
代码语言:javascript复制
func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
  for _, rb := range cc.dopts.resolvers {
    if scheme == rb.Scheme() {
      return rb
    }
  }
  return resolver.Get(scheme)
}
代码语言:javascript复制
func Get(scheme string) Builder {
  if b, ok := m[scheme]; ok {
    return b
  }
  return nil
}

Get函数是通过m这个map,去查找有没有scheme对应的resolver的builder,那么m这个map是什么时候插入的值呢?这个在resolver的Register函数里:

代码语言:javascript复制
func Register(b Builder) {
m[b.Scheme()] = b
}

那么谁会去调用这个Register函数,向map中写入resolver呢?

grpc实现了一个默认的解析器,也就是"passthrough",这个看名字就理解了,就是透传,所谓透传就是,什么都不做,那么什么时候需要透传呢?当你调用DialContext的时候,如果传入的target本身就是一个ip port,这个时候,自然就不需要再解析什么了。那么"passthrough"对应的这个默认的解析器是什么时候注册到m这个map中的呢?这个调用在passthrough包的init函数里

代码语言:javascript复制
func init() {
resolver.Register(&passthroughBuilder{})
}

具体路径是internal/resolver/passthrough/passthrough.go

在clientconn.go里面引入

代码语言:javascript复制
  _ "google.golang.org/grpc/internal/resolver/dns"         // To register dns resolver.
  _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
  _ "google.golang.org/grpc/internal/resolver/unix"        // To register unix resolver.

这三个包对应3个默认实现的resolver

代码语言:javascript复制
% cd ~/go/pkg/mod/google.golang.org/grpc@v1.37.0/internal/resolver
代码语言:javascript复制
 % tree
.
|____passthrough
| |____passthrough.go
|____config_selector.go
|____unix
| |____unix.go
|____dns
| |____dns_resolver.go
| |____go113.go
| |____dns_resolver_test.go
|____config_selector_test.go

gRPC client LB 配合 Headless Service

创立 Headless Service 后,k8s 会生成 DNS 记录,拜访 Service 会返回后端多个 pod IP 的 A 记录,这样利用就能够基于 DNS 自定义负载平衡。在 grpc-client 指定 headless service 地址为 dns:/// 协定,DNS resolver 会通过 DNS 查问后端多个 pod IP,而后通过 client LB 算法来实现负载平衡。这些 grpc-go 这个库都帮你做了。

可以自己实现一个dnsresolver

代码语言:javascript复制
package resolver

import (
  "context"
  "fmt"
  "net"
  "sync"

  "google.golang.org/grpc/resolver"
)

func init() {
  resolver.Register(NewBuilder())
}

type mydnsResolver struct {
  domain  string
  port    string
  address map[resolver.Address]struct{}
  ctx     context.Context
  cancel  context.CancelFunc
  cc      resolver.ClientConn
  wg      sync.WaitGroup
  rn      chan struct{}
}

// ResolveNow resolves immediately
func (mr *mydnsResolver) ResolveNow(resolver.ResolveNowOptions) {
  select {
  case mr.rn <- struct{}{}:
  default:
  }
}

// Close stops resolving
func (mr *mydnsResolver) Close() {
  mr.cancel()
  mr.wg.Wait()
}
func (mr *mydnsResolver) watcher() {
  defer func() {
    if err := recover(); err != nil {
      fmt.Println(err)
    }
  }()
  defer mr.wg.Done()

  for {
    select {
    case <-mr.ctx.Done():
      return
    case <-mr.rn:
    }
    result, err := mr.resolveByHttpDNS()
    if err != nil || len(result) == 0 {
      continue
    }
    mr.cc.UpdateState(resolver.State{Addresses: result})
  }
}

func (mr *mydnsResolver) resolveByHttpDNS() ([]resolver.Address, error) {
  var items []string = make([]string, 0, 4)

  //这里实现通过向http://myself.dns.xyz发送get请求获取实例ip列表,并存入items中

  var addresses = make([]resolver.Address, 0, len(items))
  for _, v := range items {
    addr := net.JoinHostPort(v, mr.port)
    a := resolver.Address{
      Addr:       addr,
      ServerName: addr, // same as addr
      Type:       resolver.Backend,
    }
    addresses = append(addresses, a)
  }

  return addresses, nil

}

type mydnsBuilder struct {
}

func NewBuilder() resolver.Builder {
  return &mydnsBuilder{}
}

// Scheme for mydns
func (mb *mydnsBuilder) Scheme() string {
  return "mydns"
}

// Build
func (mb *mydnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
  host, port, err := net.SplitHostPort(target.Endpoint)
  if err != nil {
    host = target.Endpoint
    port = "80"
  }

  ctx, cancel := context.WithCancel(context.Background())
  mr := &mydnsResolver{
    domain:  host,
    port:    port,
    cc:      cc,
    rn:      make(chan struct{}, 1),
    address: make(map[resolver.Address]struct{}),
  }
  mr.ctx, mr.cancel = ctx, cancel

  mr.wg.Add(1)
  go mr.watcher()

  mr.ResolveNow(resolver.ResolveNowOptions{})
  return mr, nil
}

当然也可以基于etcd来实现

代码语言:javascript复制
package balancer

import (
  "context"
  "log"
  "strings"
  "time"

  "github.com/coreos/etcd/clientv3"
  "github.com/coreos/etcd/mvcc/mvccpb"
  "google.golang.org/grpc/resolver"
)

const schema = "wonamingv3"

var cli *clientv3.Client

type etcdResolver struct {
  rawAddr string
  cc      resolver.ClientConn
}

// NewResolver initialize an etcd client
func NewResolver(etcdAddr string) resolver.Builder {
  return &etcdResolver{rawAddr: etcdAddr}
}

func (r *etcdResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
  var err error

  if cli == nil {
    cli, err = clientv3.New(clientv3.Config{
      Endpoints:   strings.Split(r.rawAddr, ";"),
      DialTimeout: 15 * time.Second,
    })
    if err != nil {
      return nil, err
    }
  }

  r.cc = cc

  go r.watch("/"   target.Scheme   "/"   target.Endpoint   "/")

  return r, nil
}

func (r etcdResolver) Scheme() string {
  return schema
}

func (r etcdResolver) ResolveNow(rn resolver.ResolveNowOption) {
  log.Println("ResolveNow") // TODO check
}

// Close closes the resolver.
func (r etcdResolver) Close() {
  log.Println("Close")
}

func (r *etcdResolver) watch(keyPrefix string) {
  var addrList []resolver.Address

  getResp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
  if err != nil {
    log.Println(err)
  } else {
    for i := range getResp.Kvs {
      addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(getResp.Kvs[i].Key), keyPrefix)})
    }
  }

  r.cc.NewAddress(addrList)

  rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
  for n := range rch {
    for _, ev := range n.Events {
      addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
      switch ev.Type {
      case mvccpb.PUT:
        if !exist(addrList, addr) {
          addrList = append(addrList, resolver.Address{Addr: addr})
          r.cc.NewAddress(addrList)
        }
      case mvccpb.DELETE:
        if s, ok := remove(addrList, addr); ok {
          addrList = s
          r.cc.NewAddress(addrList)
        }
      }
      //log.Printf("%s %q : %qn", ev.Type, ev.Kv.Key, ev.Kv.Value)
    }
  }
}

func exist(l []resolver.Address, addr string) bool {
  for i := range l {
    if l[i].Addr == addr {
      return true
    }
  }
  return false
}

func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
  for i := range s {
    if s[i].Addr == addr {
      s[i] = s[len(s)-1]
      return s[:len(s)-1], true
    }
  }
  return nil, false
}

基于consul实现

代码语言:javascript复制
package consul

import (
  "errors"
  "fmt"
  "regexp"
  "sync"

  "github.com/hashicorp/consul/api"
  "google.golang.org/grpc/resolver"
)

const (
  defaultPort = "8500"
)

var (
  errMissingAddr = errors.New("consul resolver: missing address")

  errAddrMisMatch = errors.New("consul resolver: invalied uri")

  errEndsWithColon = errors.New("consul resolver: missing port after port-separator colon")

  regexConsul, _ = regexp.Compile("^([A-z0-9.] )(:[0-9]{1,5})?/([A-z_] )$")
)

func Init() {
  fmt.Printf("calling consul initn")
  resolver.Register(NewBuilder())
}

type consulBuilder struct {
}

type consulResolver struct {
  address              string
  wg                   sync.WaitGroup
  cc                   resolver.ClientConn
  name                 string
  disableServiceConfig bool
  lastIndex            uint64
}

func NewBuilder() resolver.Builder {
  return &consulBuilder{}
}

func (cb *consulBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {

  fmt.Printf("calling consul buildn")
  fmt.Printf("target: %vn", target)
  host, port, name, err := parseTarget(fmt.Sprintf("%s/%s", target.Authority, target.Endpoint))
  if err != nil {
    return nil, err
  }

  cr := &consulResolver{
    address:              fmt.Sprintf("%s%s", host, port),
    name:                 name,
    cc:                   cc,
    disableServiceConfig: opts.DisableServiceConfig,
    lastIndex:            0,
  }

  cr.wg.Add(1)
  go cr.watcher()
  return cr, nil

}

func (cr *consulResolver) watcher() {
  fmt.Printf("calling consul watchern")
  config := api.DefaultConfig()
  config.Address = cr.address
  client, err := api.NewClient(config)
  if err != nil {
    fmt.Printf("error create consul client: %vn", err)
    return
  }

  for {
    services, metainfo, err := client.Health().Service(cr.name, cr.name, true, &api.QueryOptions{WaitIndex: cr.lastIndex})
    if err != nil {
      fmt.Printf("error retrieving instances from Consul: %v", err)
    }

    cr.lastIndex = metainfo.LastIndex
    var newAddrs []resolver.Address
    for _, service := range services {
      addr := fmt.Sprintf("%v:%v", service.Service.Address, service.Service.Port)
      newAddrs = append(newAddrs, resolver.Address{Addr: addr})
    }
    fmt.Printf("adding service addrsn")
    fmt.Printf("newAddrs: %vn", newAddrs)
    cr.cc.NewAddress(newAddrs)
    cr.cc.NewServiceConfig(cr.name)
  }

}

func (cb *consulBuilder) Scheme() string {
  return "consul"
}

func (cr *consulResolver) ResolveNow(opt resolver.ResolveNowOption) {
}

func (cr *consulResolver) Close() {
}

func parseTarget(target string) (host, port, name string, err error) {

  fmt.Printf("target uri: %vn", target)
  if target == "" {
    return "", "", "", errMissingAddr
  }

  if !regexConsul.MatchString(target) {
    return "", "", "", errAddrMisMatch
  }

  groups := regexConsul.FindStringSubmatch(target)
  host = groups[1]
  port = groups[2]
  name = groups[3]
  if port == "" {
    port = defaultPort
  }
  return host, port, name, nil
}

0 人点赞