Envoy控制面实践

2023-08-19 09:29:04 浏览数 (1)

重新发一次,之前的排版有问题

简介

Envoy 是一款由 Lyft 开源的,使用 C 编写的 L7 代理和通信总线,目前是 CNCF 旗下的开源项目且已经毕业,代码托管在 GitHub 上,它也是 Istio 服务网格中默认的数据平面。关于 Envoy 的详情请阅读 Envoy 中文文档。Envoy 本身无法构成一个完整的 Service Mesh,但是它可以作为 service mesh 中的应用间流量的代理,负责 service mesh 中的数据层。

Envoy架构中的一些重要概念:

  • Downstream:下游主机,指连接到Envoy的主机,这些主机用来发送请求并接受响应。
  • Upstream:上游主机,指接收来自Envoy连接和请求的主机,并返回响应。
  • Listener:服务或程序的监听器, Envoy暴露一个或多个监听器监听下游主机的请求,当监听到请求时,通过Filter Chain把对请求的处理全部抽象为Filter, 例如ReadFilter、WriteFilter、HttpFilter等。
  • Cluster:服务提供集群,指Envoy连接的一组逻辑相同的上游主机。Envoy通过服务发现功能来发现集群内的成员,通过负载均衡功能将流量路由到集群的各个成员。
  • xDS:xDS中的x是一个代词,类似云计算里的XaaS可以指代IaaS、PaaS、SaaS等。DS为Discovery Service,即发现服务的意思。xDS包括CDS(cluster discovery service)、RDS(route discovery service)、EDS(endpoint discovery service)、ADS(aggregated discovery service),其中ADS称为聚合的发现服务,是对CDS、RDS、LDS、EDS服务的统一封装,解决CDS、RDS、LDS、EDS信息更新顺序依赖的问题,从而保证以一定的顺序同步各类配置信息。以上Endpoint、Cluster、Route的概念介绍如下:
  • Endpoint:一个具体的“应用实例”,类似于Kubernetes中的一个Pod;
  • Cluster:可以理解“应用集群”,对应提供相同服务的一个或多个Endpoint, 类似Kubernetes中Service概念,即一个Service提供多个相同服务的Pod;
  • Route:当我们做金丝雀发布部署时,同一个服务会有多个版本,这时需要Route规则规定请求如何路由到其中的某个版本上。

Envoy正常的工作流程为Host A(下游主机)发送请求至上游主机(Host B、Host C、Host D等),Envoy通过Listener监听到有下游主机的请求,收到请求后的Envoy将所有请求流量劫持至Envoy内部,并将请求内容抽象为Filter Chains路由至某个上游主机中从而实现路由转发及负载均衡能力。

xDS

Envoy为了实现流量代理能力通常需要一个统一的配置文件来记录信息以便启动时加载,在Envoy中启动配置文件有静态配置和动态配置两种方式。静态配置是将配置信息写入文件中,启动时直接加载,动态配置通过xDS实现一个Envoy的服务端(可以理解为以API接口对外实现服务发现能力)。

xDS模块的功能是通过Envoy API V1(基于HTTP)或V2(基于gRPC)实现一个服务端将配置信息暴露给上游主机,等待上游主机的拉取。

xDS API 在envoy中被称为 Data plane API 。其代码保存在 https://github.com/envoyproxy/envoy/tree/master/api/envoy/api/v2,用户可以根据proto文件自行生成相对应语言的GRPC代码文件。

Envoy 官方提供了两份 xDS Server 的实现,分别是:

  • go-control-plane 基于Golang 的 xDS Server 实现代码
  • java-control-plane 基于 Java 的 xDS Server 实现代码

另外,官方还把 api 的定义代码从 Envoy 的源码库中提取出来,放在了 https://github.com/envoyproxy/data-plane-api

实践

要实现一个简单的控制面,一般就是基于xDS api来管理动态配置的能力,如下几步

1. 实现callback
代码语言:javascript复制
type callbacks struct {
   test.Callbacks
   simpleCache cache.SnapshotCache
   hash cache.NodeHash
   datasource service.Datasource
   config *Config
}

func (c *callbacks) OnStreamRequest(id int64, request *discovery.DiscoveryRequest) error {
   nodeId := c.hash.ID(request.Node)
   logrus.Debugf("node: %s on stream request version_info: %s resource_names: %s type_url: %s response_nonce: %s error: % v", nodeId, request.VersionInfo, request.ResourceNames, request.TypeUrl, request.ResponseNonce, request.ErrorDetail)
   if !containsString(c.simpleCache.GetStatusKeys(), nodeId) {
      c.SetSnapshot(nodeId)
   }
   return c.Callbacks.OnStreamRequest(id, request)
}

根据业务需要,实现相应的回调方法。

2. 创建grpc服务
代码语言:javascript复制
server := server.NewServer(ctx, simpleCache, cb)

var grpcOptions []grpc.ServerOption
grpcOptions = append(grpcOptions,
   grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams),
   grpc.KeepaliveParams(keepalive.ServerParameters{
      Time: grpcKeepaliveTime,
      Timeout: grpcKeepaliveTimeout,
   }),
   grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
      MinTime: grpcKeepaliveMinTime,
      PermitWithoutStream: true,
   }),
)
grpcServer := grpc.NewServer(grpcOptions...)

lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
   logrus.Fatal(err)
}

discoverygrpc.RegisterAggregatedDiscoveryServiceServer(grpcServer, server)
endpointservice.RegisterEndpointDiscoveryServiceServer(grpcServer, server)
clusterservice.RegisterClusterDiscoveryServiceServer(grpcServer, server)
routeservice.RegisterRouteDiscoveryServiceServer(grpcServer, server)
listenerservice.RegisterListenerDiscoveryServiceServer(grpcServer, server)
secretservice.RegisterSecretDiscoveryServiceServer(grpcServer, server)
runtimeservice.RegisterRuntimeDiscoveryServiceServer(grpcServer, server)

logrus.Infof("management server listening on %dn", port)
  if err = grpcServer.Serve(lis); err != nil {
    logrus.Println(err)
  }

上面的流程大致就是:

  • 构造一个业务server实例,将回调实例传入
  • 构造一个grpc服务器实例
  • 向grpc注册业务服务,也就是各个xDS api的实现服务
  • 启动一个grpc服务器
3. 构造快照
代码语言:javascript复制
snap, _ := cache.NewSnapshot(time.Now().Format(time.RFC3339),
   map[resource.Type][]types.Resource{
      resource.ClusterType: clusters,
      resource.RouteType: {makeRoute(RouteName, routes)},
      resource.ListenerType: {makeHTTPListener(ListenerName, RouteName, listenerPort)},
   },
)

根据xDS 数据格式,构造所需的业务数据,也就是最后存储在envoy中的动态配置

4. 返回快照
代码语言:javascript复制
if err := snapshot.Consistent(); err != nil {
   logrus.Errorf("snapshot inconsistency: % vn% v", snapshot, err)
   return
}

if err := c.simpleCache.SetSnapshot(context.Background(), nodeId, snapshot); err != nil {
   logrus.Errorf("snapshot error %q for % v", err, snapshot)
   return
}

将业务数据动态注入envoy配置中

0 人点赞