在分析完源码后golang源码分析:grpc 链接池(3)resolver 、balancer和picker,我们尝试自定义实现相应的插件。grpc 通过服务发现或者直连形式获取到 gRPC server 的实例的 endpoints,然后通知负载均衡器进行 SubConn 更新,对于新加入的 endpoint 进行实例创建,移出废弃的 endpoint, 最后通过状态更新将状态为 Idle 的 SubConn 进行管理,gRPC 在调用 Invoke时,则会通过负载均衡器中的 Picker 去按照某一个负载均衡算法选择一个 SubConn 创建链接,如果创建成功则不再进行其他 SubConn 的尝试,否则会按照一定的退避算法进行重试,直到退避失败或者创建链接成功为止。上述三个组件的功能分别如下:
- resolver:通过直连、本地配置,或者从服务发现后台,比如k8s、nacos、etcd,consul等存储介质,获取target对应的endpoint列表。
- balancer:管理连接池的SubConn,创建对应的picker
- picker:从 SubConn 列表中按照负载均衡算法选择一个 SubConn 创建链接
下面我们通过这样一个实例来分别实现上述组件,并测试正确性。
后台服务有三个endpoint:127.0.0.1:9080,127.0.0.1:9081,127.0.0.1:9082客户端可以随机从三个endpoint中选择 一个来发送请求
代码语言:javascript复制package myresolver
import (
"fmt"
"strconv"
// "github.com/golang-leetcode/grpc-go/resolver"
//引用包不对,导致注册失败
"google.golang.org/grpc/resolver"
)
const MockResolverScheme = "mock"
type mockResolverBuilder struct {
}
type mockResolver struct {
target resolver.Target
cc resolver.ClientConn
}
func (*mockResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
r := &mockResolver{
target: target,
cc: cc,
}
fmt.Println("target:", target, target.Endpoint, target.URL)
switch target.Endpoint {
case "myMock":
fmt.Println("myMock resolver")
}
r.start()
return r, nil
}
func (*mockResolverBuilder) Scheme() string {
fmt.Println("get mock scheme")
return MockResolverScheme
}
func (r mockResolver) start() {
addrs := make([]resolver.Address, 0)
for i := 0; i < 3; i {
addrs = append(addrs, resolver.Address{Addr: "127.0.0.1:908" strconv.FormatInt(int64(i), 10)})
}
fmt.Println("addrs", addrs)
r.cc.UpdateState(resolver.State{Addresses: addrs})
}
func (r *mockResolver) ResolveNow(o resolver.ResolveNowOptions) {
addrs := make([]resolver.Address, 0)
for i := 0; i < 3; i {
addrs = append(addrs, resolver.Address{
ServerName: "resolver.mock.grpc.io" strconv.FormatInt(int64(i), 10),
Addr: "127.0.0.1:908" strconv.FormatInt(int64(i), 10)})
}
fmt.Println("addrs", addrs)
//2023/02/05 16:55:53 could not greet rpc error: code = Unavailable desc = last connection error: connection error: desc = "transport: Error while dialing dial tcp: lookup tcp///default/resolver.mock.grpc.io: nodename nor servname provided, or not known"
r.start()
}
func (*mockResolver) Close() {}
func init() {
resolver.Register(&mockResolverBuilder{})
fmt.Println(resolver.Get(MockResolverScheme))
}
代码语言:javascript复制package mybalancer
import (
"learn/grpc/picker"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
)
const Name = "random"
func init() {
balancer.Register(newBuilder())
}
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, picker.NewRandomPickerBuilder(), base.Config{HealthCheck: true})
}
代码语言:javascript复制package picker
import (
"fmt"
"math/rand"
"sync"
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
)
func NewRandomPickerBuilder() *randomPickerBuilder {
return &randomPickerBuilder{}
}
type randomPickerBuilder struct {
}
type Conn struct {
SubConn balancer.SubConn
SubConnInfo base.SubConnInfo
}
func (r *randomPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
readyScs := make([]Conn, 0, len(info.ReadySCs))
for sc, info := range info.ReadySCs {
readyScs = append(readyScs, Conn{
SubConn: sc,
SubConnInfo: info,
})
}
return &randomPicker{
subConns: readyScs,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}
type randomPicker struct {
subConns []Conn
mu sync.Mutex
r *rand.Rand
}
func (r *randomPicker) Pick(_ balancer.PickInfo) (balancer.PickResult, error) {
next := r.r.Int() % len(r.subConns)
sc := r.subConns[next]
fmt.Printf("picked: % vn", sc.SubConnInfo.Address.Addr)
return balancer.PickResult{
SubConn: sc.SubConn,
}, nil
}
这里实现的比较简单,比如我们可以通过target.Endpoint 来实现不同的endpoint获取策略。然后我们启动服务
代码语言:javascript复制package hello
import (
context "context"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
type ImplementedGreeterServer struct {
Port string
}
func (i *ImplementedGreeterServer) SayHello(context.Context, *HelloRequest) (*HelloReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method SayHello implemented" i.Port)
}
func (i *ImplementedGreeterServer) SayHello1(context.Context, *HelloRequest) (*HelloReply, error) {
return nil, status.Errorf(codes.Unimplemented, "method SayHello1 implemented" i.Port)
}
func (i *ImplementedGreeterServer) mustEmbedUnimplementedGreeterServer() {}
代码语言:javascript复制package main
import (
helloworld "learn/grpc/conn/hello"
"log"
"net"
"strconv"
"google.golang.org/grpc"
)
func main() {
for i := 0; i < 3; i {
go func(i int) {
addr := ":908" strconv.FormatInt(int64(i), 10)
srv := grpc.NewServer()
helloworld.RegisterGreeterServer(srv, &helloworld.ImplementedGreeterServer{
Port: addr,
})
listener, err := net.Listen("tcp", addr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
err = srv.Serve(listener)
if err != nil {
log.Fatalf("failed to serve: %v", err)
}
}(i)
}
select {}
}
然后通过客户端发起请求
代码语言:javascript复制package main
import (
"context"
"fmt"
"log"
"time"
helloworld "learn/grpc/conn/hello"
"learn/grpc/mybalancer"
"learn/grpc/myresolver"
"google.golang.org/grpc"
)
func main() {
conn, err := grpc.Dial(myresolver.MockResolverScheme ":///myMock", grpc.WithInsecure(), grpc.WithTimeout(10*time.Second), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, mybalancer.Name)), grpc.WithBlock()) // grpc.WithBalancerName(builder.Name) //, grpc.WithBlock(),grpc.WithResolvers(r),
defer conn.Close()
if err != nil {
log.Fatalf("did not1 connect: %v", err)
}
client := helloworld.NewGreeterClient(conn)
for i := 0; i < 10; i {
resp, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "hello world!"})
fmt.Println(i 1, "个", resp, err)
resp, err = client.SayHello1(context.Background(), &helloworld.HelloRequest{Name: "hello world!"})
fmt.Println(i 1, "个", resp, err)
if err != nil {
log.Println("could not greet", err)
}
}
}
这里需要注意的是,我们在创建连接的是通过schema来选择我们自定义的mock resolver,指定balancer的时候,使用的是 grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, mybalancer.Name))来指定的,对于picker来说,它是balancer里面直接引用的,所以不需要在连接的时候指定。
代码语言:javascript复制1 个 <nil> rpc error: code = Unimplemented desc = method SayHello implemented:9082
picked: 127.0.0.1:9081
1 个 <nil> rpc error: code = Unimplemented desc = method SayHello1 implemented:9081
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1 implemented:9081
picked: 127.0.0.1:9081
2 个 <nil> rpc error: code = Unimplemented desc = method SayHello implemented:9081
picked: 127.0.0.1:9080
2 个 <nil> rpc error: code = Unimplemented desc = method SayHello1 implemented:9080
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1 implemented:9080
picked: 127.0.0.1:9080
3 个 <nil> rpc error: code = Unimplemented desc = method SayHello implemented:9080
picked: 127.0.0.1:9080
3 个 <nil> rpc error: code = Unimplemented desc = method SayHello1 implemented:9080
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1 implemented:9080
picked: 127.0.0.1:9080
4 个 <nil> rpc error: code = Unimplemented desc = method SayHello implemented:9080
picked: 127.0.0.1:9080
4 个 <nil> rpc error: code = Unimplemented desc = method SayHello1 implemented:9080
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1 implemented:9080
picked: 127.0.0.1:9081
5 个 <nil> rpc error: code = Unimplemented desc = method SayHello implemented:9081
picked: 127.0.0.1:9082
5 个 <nil> rpc error: code = Unimplemented desc = method SayHello1 implemented:9082
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1 implemented:9082
picked: 127.0.0.1:9080
6 个 <nil> rpc error: code = Unimplemented desc = method SayHello implemented:9080
picked: 127.0.0.1:9080
6 个 <nil> rpc error: code = Unimplemented desc = method SayHello1 implemented:9080
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1 implemented:9080
picked: 127.0.0.1:9081
7 个 <nil> rpc error: code = Unimplemented desc = method SayHello implemented:9081
picked: 127.0.0.1:9080
7 个 <nil> rpc error: code = Unimplemented desc = method SayHello1 implemented:9080
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1 implemented:9080
picked: 127.0.0.1:9080
8 个 <nil> rpc error: code = Unimplemented desc = method SayHello implemented:9080
picked: 127.0.0.1:9081
8 个 <nil> rpc error: code = Unimplemented desc = method SayHello1 implemented:9081
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1 implemented:9081
picked: 127.0.0.1:9081
9 个 <nil> rpc error: code = Unimplemented desc = method SayHello implemented:9081
picked: 127.0.0.1:9080
9 个 <nil> rpc error: code = Unimplemented desc = method SayHello1 implemented:9080
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1 implemented:9080
picked: 127.0.0.1:9080
10 个 <nil> rpc error: code = Unimplemented desc = method SayHello implemented:9080
picked: 127.0.0.1:9082
10 个 <nil> rpc error: code = Unimplemented desc = method SayHello1 implemented:9082
2023/02/05 18:26:08 could not greet rpc error: code = Unimplemented desc = method SayHello1 implemented:9082
看下结果,我们的连接端口是随机的,符合我们的预期。