GRPC整体介绍
grpc是google开源的一个高性能,通用的rpc框架,基于http2标准协议设计的,多语言支持。
grpc的优点包括:
- 自由,开放:让所有人,所有平台都能使用,其实就是开源,跨平台,跨语言
- 协议可插拔:不同的服务可能需要使用不同的消息通信类型和编码机制,例如,JSON、XML 和 Thirft, 所以协议应允许可插拔机制,还有负载均衡,服务发现,日志,监控等都支持可插拔机制
- 阻塞和非阻塞:支持客户端和服务器交换的消息序列的异步和同步处理。
- 取消和超时:一次 RPC 操作可能是持久并且昂贵的,应该允许客户端设置取消 RPC 通信和对这次通信加上一个超时时间
- 拒绝:必须允许服务器通过在继续处理请求的同时拒绝新请求的到来并优雅地关闭。
- 流处理:存储系统依靠流和流控制来表达大型数据集,其他服务,如语音到文本或股票行情,依赖于流来表示与时间相关的消息序列
- 流控制:计算能力和网络容量在客户端和服务器之间通常是不平衡的。流控制允许更好的缓冲区管理,以及过度活跃的对等体提供对 DOS 的保护。
- 元数据交换 : 认证或跟踪等常见的跨领域问题依赖于不属于服务声明接口的数据交换。 依赖于他们将这些特性演进到服务,暴露 API 来提供能力。
- 标准化状态码 :客户端通常以有限的方式响应 API 调用返回的错误。应约束状态码名称空间,以使这些错误处理决策更加清晰。如果需要更丰富的特定领域的状态,则可以使用元数据交换机制来提供该状态。
- 互通性:报文协议 (Wire Protocol) 必须遵循普通互联网基础框架
grpc主要有4中请求/相应模式:
1) 单项rpc (simple rpc)
也就是发送一个请求,返回一个数据包,是属于比较通用的rpc请求模式. 比如:
代码语言:javascript复制rpc Echo(EchoRequest) returns (EchoResponse){
}
2) 服务侧流式rpc (server-side rpc)
客户端发送一个请求,服务端返回连续的流式数据.
代码语言:javascript复制rpc Echo(EchoRequest) returns (stream EchoResponse){
}
3) 客户端侧流式rpc (client-side rpc)
客户端发流式请求,服务端返回单个相应.
代码语言:javascript复制rpc Echo(stream EchoRequest) returns (EchoResponse){
}
4) 双向流式rpc (Bidirectional streaming RPC)
代码语言:javascript复制rpc Echo(stream EchoRequest) returns (stream EchoResponse){
}
GRPC负载均衡
- 源码分析
我们先根据代码来梳理一下grpc客户端的建立连接流程。
首先,我们是通过grpc.DialContext 方法来拿到一个连接的, 函数的签名如下:
代码语言:javascript复制func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error)
第一个参数是ctx, 可以传入一些上下文信息(如trace_id, header信息等), target目标服务器的名字串(需通过名字解析),或者是ip port的方式(名字解析采用grpc默认的passthrough), opts是建立连接的一些参数,如拦截器(grpc.WithUnaryInterceptor), 是否是阻塞建立连接(grpc.WithBlock, 默认是非阻塞的) 等。
我们再看下DialContext函数的执行逻辑, 这里提取负载均衡相关逻辑出来, 详细见源码(grpc v1.38.):
代码语言:javascript复制func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
// …
// Determine the resolver to use.
// 根据target字符串,解析出结构体出来(Scheme, Authority, Endpoint), 如果不是按照标准的target格式,则当做target为Endpoint, 如host:port的格式.
cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
channelz.Infof(logger, cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)
// 根据scheme拿到对应的解析器创建者(resolverBuilder), 这些resolver 是在定义具体的resolver的时候注册到grpc里的。
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
// 如果没有拿到对应的resolverBuilder, 尝试用默认的,也就是passthrough的方式
// 这里用到创建者模式,将创建resolver交给resolverBuilder, 让resolver专注业务逻辑,创建的参数交给resolverBuiler
if resolverBuilder == nil {
// If resolver builder is still nil, the parsed target's scheme is
// not registered. Fallback to default resolver and set Endpoint to
// the original target.
channelz.Infof(logger, cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
cc.parsedTarget = resolver.Target{
Scheme: resolver.GetDefaultScheme(),
Endpoint: target,
}
resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)
if resolverBuilder == nil {
return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)
}
}
// …..
// 负载均衡创建者的一些参数
cc.balancerBuildOpts = balancer.BuildOptions{
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
}
// Build the resolver.
// 这里创建一个resolverWraper, 里面包了一个resolver, 这里用了策略模式,将一些流程上的逻辑统一放到wraper里,这样让resolver的职责更单一。
rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
if err != nil {
return nil, fmt.Errorf("failed to build resolver: %v", err)
}
cc.mu.Lock()
cc.resolverWrapper = rWrapper
cc.mu.Unlock()
//…
return cc, nil
}
这里主要流程是根据target获取对应的解析器类型, 然后创建解析器, 具体的说明在注释里有说明。在这段逻辑里,我们知道了怎么拿到解析器,以及创建一个解析器wrapper。 我们接着往下梳理, newCCResolverWrapper的逻辑。
代码语言:javascript复制func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
rb := cc.dopts.resolverBuilder
if rb == nil {
return nil, fmt.Errorf("could not get resolver for scheme: %q", cc.parsedTarget.Scheme)
}
ccr := &ccResolverWrapper{
cc: cc,
addrCh: make(chan []resolver.Address, 1),
scCh: make(chan string, 1),
}
var err error
ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, resolver.BuildOption{DisableServiceConfig: cc.dopts.disableServiceConfig})
if err != nil {
return nil, err
}
return ccr, nil
}
newCCResolverWrapper 是调用resolverBuiler,创建一个resolver做ccResolverWrapper的一个参数, 这里ccResolverWrapper其实是实现了resolver.ClientConn的interface.
代码语言:javascript复制type ClientConn interface {
// UpdateState updates the state of the ClientConn appropriately.
UpdateState(State) error
// ReportError notifies the ClientConn that the Resolver encountered an
// error. The ClientConn will notify the load balancer and begin calling
// ResolveNow on the Resolver with exponential backoff.
ReportError(error)
// NewAddress is called by resolver to notify ClientConn a new list
// of resolved addresses.
// The address list should be the complete list of resolved addresses.
//
// Deprecated: Use UpdateState instead.
NewAddress(addresses []Address)
// NewServiceConfig is called by resolver to notify ClientConn a new
// service config. The service config should be provided as a json string.
//
// Deprecated: Use UpdateState instead.
NewServiceConfig(serviceConfig string)
// ParseServiceConfig parses the provided service config and returns an
// object that provides the parsed config.
ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult
}
其中UpdateState方法当resolve解析到新的地址的时候,会触发调用这个方法。
创建resolver是在resolverBuilder的Build方法里, 这里我们看下一个项目中使用到的resolver。
代码语言:javascript复制func (r *TLResolveBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
grpclog.Infof("build tl resolver %s", target.Endpoint)
// 这里解析拿到对应的service名
service, err := parseTLAddress(target.Endpoint)
if err != nil {
return nil, fmt.Errorf("bulid tl resolver failed %v", err)
}
// 初始化路由表
_, err = tl.ApiGetRoute(service)
if err != nil {
return nil, fmt.Errorf("get route(service:%s) failed %v", service, err)
}
tlResolver := TLResolver{
service: service,
cc: cc,
}
// 起一个协程,定期从名字服务(或者本地agent)获取最新目标服务地址列表, 并更新到链接池里。
go tlResolver.watcher()
return &tlResolver, nil
}
这里主要是创建一个resolver, 并初始化连接信息,然后起一个协程异步定期去拉取最新的列表。
获取连接的过程简单就是这些,这里大家可能要问了,解析器拿到新地址之后,怎么告诉grpc连接的?
我们继续跟着tlResolver.watcher方法梳理
代码语言:javascript复制func (r *TLResolver) watcher() {
for {
r.updateAddress()
// 每秒钟获取一次tl服务地址
time.Sleep(time.Second)
}
}
继续看r.updateAddress方法
代码语言:javascript复制func (r *TLResolver) updateAddress() {
// 获取最新的地址列表
destList, err := tl.ApiGetRouteTable(r.service)
if err != nil {
grpclog.Errorf("get tl failed %s", err)
return
}
// 这里不用担心重复地址问题, 在clientconn.go的handleResolvedAddrs中会判断, 如果地址完全一致则会略过
if len(destList) > 0 {
r.cc.NewAddress(genAddrStr(destList))
}
}
最终新地址会通知给连接, 其中NewAddress方法是ccResolverWrapper实现了resolver.ClientConn interface的一个接口。 具体实现的时候,会调用grpc.ClientConn 的updateResolverState 方法, 我们跟进UpdateSubConnState方法里看。
代码语言:javascript复制func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
s := state.ConnectivityState
// …
b.scStates[sc] = s
switch s {
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
delete(b.scStates, sc)
}
oldAggrState := b.state
b.state = b.csEvltr.RecordTransition(oldS, s)
// 重新生成picker
// Regenerate picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
(b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
b.regeneratePicker()
}
// 更新balancer的状态
b.cc.UpdateBalancerState(b.state, b.picker)
}
我们接着先看regeneratePicker, 重新生成选择器。
代码语言:javascript复制// 重新根据新地址生成选择器
func (b *baseBalancer) regeneratePicker() {
if b.state == connectivity.TransientFailure {
b.picker = NewErrPicker(balancer.ErrTransientFailure)
return
}
readySCs := make(map[resolver.Address]balancer.SubConn)
// 选出所有处于ready状态的subConn
for addr, sc := range b.subConns {
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
readySCs[addr] = sc
}
}
b.picker = b.pickerBuilder.Build(readySCs)
}
UpdateBalancerState 更新负载均衡器的状态, 也就是将新的地址信息告诉给balancer。
picker的连接就可以让每次请求的时候选择可用的连接, 调用链为:
call.go::invoke ->stream.go::newClientStream->ClientConn::getTransport - pickerWrapper.pick::Picker.pick
其中call.go::invoke proto文件自动生成的代码中具体接口会调用。
这里流程比较复杂,因为grpc的设计用到了比较多的设计模式做抽象,将resolver, balancer提供接口让具体实现者去实现,这样可以对接具体业务自己的名字服务。画一个整体的流程图如下:
这里再总结一下Grpc 负载均衡的特点:
- 每个请求都进行负载均衡
- 解析器和负载均衡器让业务侧自行根据项目情况实现
- 客户端连接不用维护负载均衡器,交给单独的组件去实现,实现解耦,使用起来更简单
如何实现一个新的resolver和balancer? 这里直接贴一个例子代码
resolver:
代码语言:javascript复制package TL_resolver
import (
"google.golang.org/grpc/resolver"
"fmt"
"time"
"strings"
"strconv"
"google.golang.org/grpc/grpclog"
)
type TLResolveBuilder struct {
}
type TLResolver struct {
service string
cc resolver.ClientConn
}
func NewBuilder() resolver.Builder {
return &TLResolveBuilder {
}
}
func (r *TLResolver) watcher() {
for {
r.updateAddress()
// 每秒钟获取一次TL服务地址
time.Sleep(time.Second)
}
}
func (r *TLResolver) updateAddress() {
destList, err := TL.ApiGetRouteTable(r.modId, r.cmdId)
if err != nil {
grpclog.Errorf("get TL failed %s", err)
return
}
// 当刚装上TL_agent时,第一次会返回空,所以重试一次
if destList == nil || len(destList) == 0 {
destList, err = TL.ApiGetRouteTable(r.modId, r.cmdId)
if err != nil {
grpclog.Errorf("get TL failed %s", err)
return
}
if destList == nil || len(destList) == 0 {
grpclog.Infof("retry get-route-table failed! destList is empty, modid:%d, cmdid:%d", r.modId, r.cmdId)
}
}
// 这里不用担心重复地址问题, 在clientconn.go的handleResolvedAddrs中会判断, 如果地址完全一致则会略过
if len(destList) > 0 {
r.cc.NewAddress(genAddrStr(destList))
}
}
func genAddrStr(destList []TL.Dest) []resolver.Address {
addrList := make([]resolver.Address, 0)
for _, item := range destList {
addrStr := fmt.Sprintf("%s:%d", item.Ip, item.Port)
addrList = append(addrList, resolver.Address{Addr: addrStr})
}
return addrList
}
func (r *TLResolver) ResolveNow(opts resolver.ResolveNowOption) {
r.updateAddress()
}
func (r *TLResolver) Close() {
}
func (r *TLResolveBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
grpclog.Infof("build TL resolver %s", target.Endpoint)
service := target.Endpoint
if err != nil {
return nil, fmt.Errorf("bulid TL resolver failed %v", err)
}
// init route table
_, err = TL.ApiGetRoute(service)
if err != nil {
return nil, fmt.Errorf("get route failed %v", err)
}
TLResolver := TLResolver{
service: service,
cc: cc,
}
go TLResolver.watcher()
return &TLResolver, nil
}
func (r *TLResolveBuilder) Scheme() string {
return "TL"
}
balancer:
代码语言:javascript复制package TL_balancer
import (
"fmt"
"golang.org/x/net/context"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
"strconv"
"sync"
"time"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/balancer/base"
)
func NewBalancerBuilder(service string) balancer.Builder {
return base.NewBalancerBuilder(getBalancerName(modId, cmdId), &TLPickerBuilder{
service: service
})
}
func getBalancerName(service string) string {
return “TL”
}
type TLPickerBuilder struct {
service string
}
func (b *TLPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker {
grpclog.Infof("TLPickerBuilder: newPicker called with readySCs: %v", readySCs)
connMap := make(map[string]balancer.SubConn)
connSlice := make([]balancer.SubConn, 0)
for addr, sc := range readySCs {
connMap[addr.Addr] = sc
connSlice = append(connSlice, sc)
}
return &TLPicker{
connMap: connMap,
connSlice: connSlice,
service: b.service,
mu: sync.Mutex{},
next: 0,
}
}
type TLPicker struct {
connMap map[string]balancer.SubConn
connSlice []balancer.SubConn
Service string
mu sync.Mutex
next int
}
func (p *TLPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
if len(p.connSlice) <= 0 {
return nil, nil, balancer.ErrNoSubConnAvailable
}
// 先通过TL选址, 这样就可以复用TL的负载均衡权重
res, err := TL.ApiGetRoute(p.service)
if err == nil {
if conn, ok := p.connMap[res.Ip() ":" strconv.Itoa(res.Port())]; ok {
// TL 调用结果上报TL
beginTime := time.Now().UnixNano()
doneFunc := func(dinfo balancer.DoneInfo) {
timeSpan := (time.Now().UnixNano() - beginTime) / 1e6
if dinfo.Err == nil {
TL.ApiRouteResultUpdate(res,1, uint64(timeSpan))
} else {
TL.ApiRouteResultUpdate(res,-1, uint64(timeSpan))
}
}
return conn, doneFunc, nil
} else {
grpclog.Warningf("Can not find addr in pool %s %d", res.Ip(), res.Port())
}
} else {
grpclog.Errorf("get TL failed %s", err)
}
// 当通过TL选址的地址 在连接池中不存在, 则round robin返回一个连接池中的连接
p.mu.Lock()
sc := p.connSlice[p.next]
p.next = (p.next 1) % len(p.connSlice)
p.mu.Unlock()
return sc, nil, nil
}
使用:
代码语言:javascript复制resolver.Register(tl_resolver.NewBuilder())
var service = “grpc.testservice”
balancerBuilder := tl_balancer.NewBalancerBuilder(service)
balancer.Register(balancerBuilder)