Envoy Ingress:Contour基本原理和源码分析

2022-03-07 13:38:52 浏览数 (1)

概述

  • contour 是一个开源的 k8s ingress controller
  • 使用 envoy 提供反向代理
  • 提供动态配置更新
  • 支持多集群网络代理

特性

  • 基于高性能的L7代理和负载均衡Envoy作为数据面
  • 灵活的架构
  • TLS安全代理

为什么选择Contour

  • 动态配置更新,最大限度减少连接中断
  • 安全的支持多集群,多配置
  • 使用 HTTPProxy CRD 增强 ingress 核心配置

安装

kubectl

代码语言:javascript复制
# yaml 安装
kubectl apply -f https://projectcontour.io/quickstart/contour.yaml
# 验证
kubectl get pod -n projectcontour

helm

代码语言:javascript复制
# 添加仓库
helm repo add bitnami https://charts.bitnami.com/bitnami  
helm repo update
helm install contour --namespace projectcontour bitnami/contour

资源

部署完成后查看安装的资源信息,包括:

  • contour deployment:counter controller,作为控制面
  • envoy daemonset:envoy,作为数据面
代码语言:javascript复制
$ kubectl get deploy,svc,ds,job -n projectcontour
NAME                      READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/contour   2/2     2            2           12d

NAME              TYPE           CLUSTER-IP       EXTERNAL-IP 
service/contour   ClusterIP      172.20.126.255   <none>    8001/TCP                     12d
service/envoy     LoadBalancer   172.20.12.164    ...

NAME                   DESIRED   CURRENT   READY   UP-TO-DATE   AVAILABLE   NODE SELECTOR   AGE
daemonset.apps/envoy   10        10        10      10           10          <none>          12d

CRD

contour 安装完成后,查看 crd 有哪些:

代码语言:javascript复制
$ kubectl get crd|grep contour
contourconfigurations.projectcontour.io                      2021-11-04T13:30:06Z
contourdeployments.projectcontour.io                         2021-11-04T13:30:06Z
extensionservices.projectcontour.io                          2021-11-04T13:30:06Z
httpproxies.projectcontour.io                                2021-11-04T13:30:06Z
tlscertificatedelegations.projectcontour.io                  2021-11-04T13:30:06Z

其中最核心的 CRD 是 HTTPProxy,等效于 k8s 提供的原生 Ingress,提供路由配置功能。

看一个简单的 httpproxy 配置示例,当访问 foo1.bar.com 时,将代理到后端 s1 服务的 80 端口

将 foo1.bar.com 域名和任意一个Node节点(daemonset部署)的ip做域名绑定后,即可通过域名访问服务

代码语言:javascript复制
apiVersion: projectcontour.io/v1
kind: HTTPProxy
metadata:
  name: name-example-foo
  namespace: default
spec:
  virtualhost:
    fqdn: foo1.bar.com
  routes:
    - conditions:
      - prefix: /
      services:
        - name: s1
          port: 80

架构

概述

contour 主要包含两个组件:

  • Envoy:提供高性能的反向代理
  • Contour:充当 Envoy 的后台管理服务,提供相关配置

两个组件是分开部署的,Contour 以 Deployment 的方式部署,Envoy 以 DaemonSet 的方式部署

原理

数据面:envoy侧

envoy pod 中包含两个 container

  • initContainer container:执行 contour bootstrap 命令,生成一个配置文件到临时目录,envoy container共享这个目录,并作为启动的配置文件,里面配置了 contour server 地址,作为 envoy 的管理服务
  • envoy container:根据配置文件启动envoy,建立 GRPC连接,并从服务端接收配置请求
控制面:contour 侧
  • contour 是 k8s api 的一个客户端,通过 informer 机制监听 ingress,service,endpoint,secret等k8s原生资源,和httpproxy这个CRD
  • contour 缓存k8s的资源信息,并最终转换为 Envoy 需要的 XDS 信息。
其他

从图中我们还看到其他的一些信息

  • contour job:自动生成证书,使得grpc上的xds安全传输
  • contour 内部还提供了选主模式

HTTPProxy配置

概述

Ingress 资源在 k8s 1.1 版本引入,从那以后,Ingress API 保持相对稳定,需要使用特殊的能力需要借助于 Annotation 实现。

HttpProxy CRD 的目标是扩展 Ingress API 的能力,为用户提供更丰富的功能体验,以及解决 Ingress 在多租户环境下的限制。

对比 Ingress 的优势

  • 安全地支持多 Kubernetes 集群,能够限制哪些命名空间可以配置虚拟主机和 TLS 凭据。
  • 能够包括来自另一个 HTTPProxy 的路径或域的路由配置(这个HTTPProxy可能在另一个命名空间中)
  • 允许单个路由中接受多个服务并负载均衡它们之间的流量
  • 天生支持定义服务权重和负载策略,不需要使用注解
  • 支持创建时校验配置的合法性

配置文件对比

Ingress
代码语言:javascript复制
# ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: basic
spec:
  rules:
  - host: foo-basic.bar.com
    http:
      paths:
      - backend:
          service:
            name: s1
            port:
              number: 80
HTTPProxy
代码语言:javascript复制
# httpproxy.yaml
apiVersion: projectcontour.io/v1
kind: HTTPProxy
metadata:
  name: basic
spec:
  virtualhost:
    fqdn: foo-basic.bar.com
  routes:
    - conditions:
      - prefix: /
      services:
        - name: s1
          port: 80

源码分析

原理回顾

通过前面 contour 的基本原理和架构的介绍,总结如下:

数据流转原理
  • contour 通过 informer 机制监听 k8s 资源
  • 将k8s资源转换为 envoy 需要的 xds 资源
  • 通过grpc将xds配置下发到envoy节点
流量流转原理
  • 带有域名的请求经过任意一个Node节点,进入Envoy(以 hostNetWork=true 部署方式为例)
  • envoy将请求路由到 HTTPProxy 中配置的后端 service

数据流转实现

概述

实际上在将 k8s 资源转换为最终 xds 资源的过程中,还经过了其他的数据转换。

高清图

  • k8s资源变化,触发 informer 通知机制,注册的回调函数将事件放入 channel
  • EventHandler协程消费 channel,将channel中的每种资源以一个map形式保存在KubernetesCache中,map的key是该资源的namespace name唯一确定的
  • KubenetesCache中的资源,经过一序列 processor 的处理之后,生成 DAG
  • DAG变更,会通知一序列 Observer,依次调用 OnChange方法,传入DAG,每个 Observer对应一个 XDS协议,将DAG中需要的资源转换为XDS配置,保存在Cache中
  • GRPC读取Cache中的数据,通过Stream将XDS下发到Envoy。下发这一步使用了go-control-plane这个开源框架
源码调用图

源码调用高清图

ResourceEventHandler

k8s 提供了 informer 机制,可以 watch kubernetes资源的变更。contour中 watch 的资源包括两类:

k8s原生资源:

  • ingress
  • service
  • endpoint
  • namespace

crd资源:

  • httpproxy
代码语言:javascript复制
// contour/cmd/contour/serve.go
func doServe(log logrus.FieldLogger, ctx *serveContext) error {
  // 这里监听 contour 的 crd 资源, 比如:httpproxy
    for _, r := range k8s.DefaultResources() {
        inf, err := clients.InformerForResource(r)
        ......
        inf.AddEventHandler(&dynamicHandler)
    }
  // 监听 ingress 资源
    for _, r := range k8s.IngressV1Resources() {
        if err := informOnResource(clients, r, &dynamicHandler); err != nil {
            log.WithError(err).WithField("resource", r).Fatal("failed to create informer")
        }
    }
  ......
  // 监听 secret 资源
    for _, r := range k8s.SecretsResources() {
    ......
        if err := informOnResource(clients, r, handler); err != nil {
            log.WithError(err).WithField("resource", r).Fatal("failed to create informer")
        }
    }

    // 监听 endpoint 资源
    for _, r := range k8s.EndpointsResources() {
        if err := informOnResource(clients, r, &k8s.DynamicClientHandler{
            Next: &contour.EventRecorder{
                Next:    endpointHandler,
                Counter: contourMetrics.EventHandlerOperations,
            },
            Converter: converter,
            Logger:    log.WithField("context", "endpointstranslator"),
        }); err != nil {
            log.WithError(err).WithField("resource", r).Fatal("failed to create informer")
        }
    }
  ......
}

不管是 原生资源,还是 crd 资源,Informer 监听的原理都是一样的,用户只需要编写事件处理函数,对应的接口是 ResourceEventHandler,该接口提供了增加、删除、更新回调函数。

代码语言:javascript复制
// k8s.io/client-go/tools/cache/controller.go
type ResourceEventHandler interface {
    OnAdd(obj interface{})
    OnUpdate(oldObj, newObj interface{})
    OnDelete(obj interface{})
}

Contour 中所有的资源复用了两个 ResourceEventHandler 的实现类:

  • EndpointsTranslator:处理 endpoint 资源
  • EventHandler:处理除了 endpoint 外的其他资源。

EventHandler 将得到的资源信息,放入channel中,由单独的协程去消费以解耦

代码语言:javascript复制
func (e *EventHandler) OnAdd(obj interface{}) {
    e.update <- opAdd{obj: obj}
}

func (e *EventHandler) OnUpdate(oldObj, newObj interface{}) {
    e.update <- opUpdate{oldObj: oldObj, newObj: newObj}
}

func (e *EventHandler) OnDelete(obj interface{}) {
    e.update <- opDelete{obj: obj}
}

放入channel的信息,由谁去消费呢?

监听 channel 的协程,拿到事件后,取出资源信息,保存到 KubernetesCache 中。

代码语言:javascript复制
// contour/cmd/contour/server.go
func doServe(log logrus.FieldLogger, ctx *serveContext) error {
    ......
  // 启动协程
  g.Add(eventHandler.Start())
  ......
}

// counter/internal/contour/handler.go
func (e *EventHandler) Start() func(<-chan struct{}) error {
    e.update = make(chan interface{})
    return e.run
}

func (e *EventHandler) run(stop <-chan struct{}) error {
    ......
  // 死循环,一直监听 channel 中的事件
    for {
        select {
        case op := <-e.update:
            if e.onUpdate(op) {
                ......
            }
      ......
 }
  
// 处理事件
// 将资源保存到 e.Builder.Source
func (e *EventHandler) onUpdate(op interface{}) bool {
    switch op := op.(type) {
    case opAdd:
        return e.Builder.Source.Insert(op.obj)
    case opUpdate:
        ......
        remove := e.Builder.Source.Remove(op.oldObj)
        insert := e.Builder.Source.Insert(op.newObj)
        return remove || insert
    case opDelete:
        return e.Builder.Source.Remove(op.obj)
    case bool:
        return op
    default:
        return false
    }
}

EndpointsTranslator 监听的 endpoint 资源,处理比较麻烦一些,以 add 为例分析

代码语言:javascript复制
// contour/internal/xdscache/v3/endpointstranslator.go
func (e *EndpointsTranslator) OnAdd(obj interface{}) {
    switch obj := obj.(type) {
    case *v1.Endpoints:
    // 更新本地缓存
        e.cache.UpdateEndpoint(obj)
    // 合并资源
        e.Merge(e.cache.Recalculate())
        e.Notify()
        if e.Observer != nil {
      // 刷新保存快照
            e.Observer.Refresh()
        }
    default:
        e.Errorf("OnAdd unexpected type %T: %#v", obj, obj)
    }
}
KubernetesCache

类中定义了多个map类型的资源对象,分别存储不同的资源,key是 namespace 和 name 组成的 type.NamespacedName,将原生k8s中不同 namespace 的同一资源都存储到一个map中,多种资源的多个map,共同组成 KubernetesCache

代码语言:javascript复制
// contour/internal/dag/cache.go
type KubernetesCache struct {
    ......
    ingresses                 map[types.NamespacedName]*networking_v1.Ingress
    ingressclass              *networking_v1.IngressClass
    httpproxies               map[types.NamespacedName]*contour_api_v1.HTTPProxy
    secrets                   map[types.NamespacedName]*v1.Secret
    tlscertificatedelegations map[types.NamespacedName]*contour_api_v1.TLSCertificateDelegation
    services                  map[types.NamespacedName]*v1.Service
    namespaces                map[string]*v1.Namespace
    gatewayclass              *gatewayapi_v1alpha1.GatewayClass
    gateway                   *gatewayapi_v1alpha1.Gateway
    httproutes                map[types.NamespacedName]*gatewayapi_v1alpha1.HTTPRoute
    tlsroutes                 map[types.NamespacedName]*gatewayapi_v1alpha1.TLSRoute
    tcproutes                 map[types.NamespacedName]*gatewayapi_v1alpha1.TCPRoute
    udproutes                 map[types.NamespacedName]*gatewayapi_v1alpha1.UDPRoute
    backendpolicies           map[types.NamespacedName]*gatewayapi_v1alpha1.BackendPolicy
    extensions                map[types.NamespacedName]*contour_api_v1alpha1.ExtensionService
  ......
}

// map中key的数据类型:由 namespace 和 name 共同决定
type NamespacedName struct {
    Namespace string
    Name      string
}

KubernetesCache 插入流程:

  • 首先判断资源类型
  • 将不同的资源放入不同的map
  • 有些资源的变更,还需要触发其他资源变更。比如:service变化了,对应的 ingress或者httpproxy也需要联动变化
代码语言:javascript复制
// contour/internal/dag/cache.go
func (kc *KubernetesCache) Insert(obj interface{}) bool {
  // 初始化 map,确保执行一次
    kc.initialize.Do(kc.init)
  // 根据不同的资源类型,放入不同的 map
    switch obj := obj.(type) {
    case *v1.Secret:
        ......
    // 保存 secret 资源
        kc.secrets[k8s.NamespacedNameOf(obj)] = obj
        return kc.secretTriggersRebuild(obj)
    case *v1.Service:
        kc.services[k8s.NamespacedNameOf(obj)] = obj
        return kc.serviceTriggersRebuild(obj)
    case *v1.Namespace:
        kc.namespaces[obj.Name] = obj
        return true
    ......
    }

    return false
}
DAG

构造好 KubernetesCache 之后,contour 通过 一些列的 processor,将 KubernetesCache 变成 DAG,DAG 代表了 k8s 不同资源关系的一个有向无环图。核心代码如下:

代码语言:javascript复制
// contour/internal/contour/handler.go
func (e *EventHandler) rebuildDAG() {
  // 将 KubernetesCache 构建成 DAG
    latestDAG := e.Builder.Build()
  // DAG 变更时,通知所有的 Observer,
  // 每种 Observer 是一个 XDS 对象,取出 DAG 资源转换为 XDS 配置
    e.Observer.OnChange(latestDAG)
  // 更新 snapshot 信息
    for _, upd := range latestDAG.StatusCache.GetStatusUpdates() {
        e.StatusUpdater.Send(upd)
    }

}

DAG 数据结构如下,根节点 roots 是一个 Vertex 的列表,Vertex 只定义了Visit接口。即:所有实现了 Visit

代码语言:javascript复制
// contour/internal/dag/dag.go
type DAG struct {
    // StatusCache holds a cache of status updates to send.
    StatusCache status.Cache

    // roots are the root vertices of this DAG.
    roots []Vertex
}

// Vertex is a node in the DAG that can be visited.
type Vertex interface {
    Visit(func(Vertex))
}

build过程分析:

代码语言:javascript复制
// contour/internal/dag/builder.go
func (b *Builder) Build() *DAG {
    // 先构造一个空的 DAG
  dag := DAG{
        StatusCache: status.NewCache(b.Source.ConfiguredGateway),
    }
 
  // 遍历所有的 processor,每个 processor 把不同的 k8s 资源变成 DAG 的一部分信息
  // 所有的 processor 共同构成完整的 DAG
    for _, p := range b.Processors {
        p.Run(&dag, &b.Source)
    }
    return &dag
}

processor 列表包括:

  • IngressProcessor
  • ExtensionServiceProcessor
  • HTTPProxyProcessor
  • GatewayAPIProcessor
  • ListenerProcessor
代码语言:javascript复制
func getDAGBuilder(ctx *serveContext, clients *k8s.Clients, clientCert, fallbackCert *types.NamespacedName,     log logrus.FieldLogger) dag.Builder {

  dagProcessors := []dag.Processor{
      &dag.IngressProcessor{
        ...
      },
      &dag.ExtensionServiceProcessor{
        ...
      },
      &dag.HTTPProxyProcessor{
       ...
      },
    }
    if ctx.Config.GatewayConfig != nil && clients.ResourcesExist(k8s.GatewayAPIResources()...) {
      dagProcessors = append(dagProcessors, &dag.GatewayAPIProcessor{
        FieldLogger: log.WithField("context", "GatewayAPIProcessor"),
      })
    }
    ...
    dagProcessors = append(dagProcessors, &dag.ListenerProcessor{})
  ...
}

每个processor 实现 Run 方法,接收 DAG 和 KubernetesCache 参数,通过 KubernetesCache 修改 DAG,以 HTTPProxyProcessor 为例分析。

代码语言:javascript复制
func (p *HTTPProxyProcessor) Run(dag *DAG, source *KubernetesCache) {
    ...
  // 校验合法性
    for _, proxy := range p.validHTTPProxies() {
    // 计算路由规则
    // 入参为 crd 资源 HttpProxy
    // 内部经过各种计算,构造路由关系,最终保存在DAG中
        p.computeHTTPProxy(proxy)
    }

    for meta := range p.orphaned {
        proxy, ok := p.source.httpproxies[meta]
        if ok {
            pa, commit := p.dag.StatusCache.ProxyAccessor(proxy)
            pa.ConditionFor(status.ValidCondition).AddError(contour_api_v1.ConditionTypeOrphanedError,
                "Orphaned",
                "this HTTPProxy is not part of a delegation chain from a root HTTPProxy")
            commit()
        }
    }
}

DAG 中的节点(Vertex)有:

  • VirtualHost:对应 LDS
  • SecureVirtualHost:对应 LDS 和 CDS
  • ExtensionCluster:对应 CDS
  • ServiceCluster:对应 EDS
  • Cluster:对应 CDS,一部分SDS
  • Listener:对应 RDS

其中某些 Vectex 又包含子 Vectex,因此用图这种数据结构串联起来。

ResourceCache

前面分析 DAG 代码时,在生成 DAG 后,会通知所有的 Observer,即e.Observer.OnChange

代码语言:javascript复制
// contour/internal/contour/handler.go
func (e *EventHandler) rebuildDAG() {
  // 将 KubernetesCache 构建成 DAG
    latestDAG := e.Builder.Build()
  // DAG 变更时,通知所有的 Observer,
  // 每种 Observer 是一个 XDS 对象,取出 DAG 资源转换为 XDS 配置
    e.Observer.OnChange(latestDAG)
  // 更新 snapshot 信息
    for _, upd := range latestDAG.StatusCache.GetStatusUpdates() {
        e.StatusUpdater.Send(upd)
    }
}

这里的 e.Observer 是 一堆 Observer的组合

代码语言:javascript复制
// contour/cmd/contour/serve.go
func doServe(log logrus.FieldLogger, ctx *serveContext) error {
     ...
  // ResoureCache 列表
  resources := []xdscache.ResourceCache{
        xdscache_v3.NewListenerCache(listenerConfig, ctx.statsAddr, ctx.statsPort),
        &xdscache_v3.SecretCache{},
        &xdscache_v3.RouteCache{},
        &xdscache_v3.ClusterCache{},
        endpointHandler,
    }
  ...
    eventHandler := &contour.EventHandler{
        ...
    // 注册一个组合的 Observer
        Observer:        dag.ComposeObservers(append(xdscache.ObserversOf(resources), snapshotHandler)...),
        ...
    }
}

// 组合多个Observer
func ComposeObservers(observers ...Observer) Observer {
    return ObserverFunc(func(d *DAG) {
        for _, o := range observers {
            o.OnChange(d)
        }
    })
}

ResourceCache 是一个接口,聚合了两个接口:dag.Observer 和 xds.Resource。该接口定义了如何处理 DAG,以及处理完成后如何返回给调用方。

  • Observer接口:
    • OnChange:参数为DAG,当 DAG 有任何变更,都会触发这个函数,可以拿到最新的DAG做处理
  • Resource接口:
    • Contents:返回xds资源的接口
    • Query:查询xds资源的接口
    • Register:注册xds资源的接口
    • TypeURL:xds资源类型
代码语言:javascript复制
// contour/internal/xdscache/resources.go
type ResourceCache interface {
    dag.Observer
    xds.Resource
}

// contour/internal/dag/dag.go
type Observer interface {
  // 当 DAG变更时,触发相应业务逻辑
    OnChange(*DAG)
}

// contour/internal/xds/resource.go
type Resource interface {
  // 返回全部资源内容
    // Contents returns the contents of this resource.
    Contents() []proto.Message
  
  // 通过给定的名称,返回对应的资源内容
    // Query returns an entry for each resource name supplied.
    Query(names []string) []proto.Message

  // 注册资源
    // Register registers ch to receive a value when Notify is called.
    Register(chan int, int, ...string)

  // 资源类型
    // TypeURL returns the typeURL of messages returned from Values.
    TypeURL() string
}

不同的 xds 资源类型都实现了ResourceCache 接口:

  • RouteCache:对应 RDS
  • ClusterCache:对应 CDS
  • ListenerCache:对应 LDS
  • SecretCache:对应 SDS
  • EndpointsTranslator:对应 EDS

转换后的 xds 资源都保存在 values 字段中。

  • Observer的OnChange接口:将 DAG 转换为 Envoy xds 配置保存在 values 中
  • Resource的接口:提供查询 values 的方法,即可得到 envoy 的xds配置
代码语言:javascript复制
// contour/internal/xdscache/v3/cluster.go
// ClusterCache manages the contents of the gRPC CDS cache.
type ClusterCache struct {
    mu     sync.Mutex
    values map[string]*envoy_cluster_v3.Cluster
    contour.Cond
}

// contour/internal/xdscache/v3/listener.go
// ListenerCache manages the contents of the gRPC LDS cache.
type ListenerCache struct {
    mu           sync.Mutex
    values       map[string]*envoy_listener_v3.Listener
    staticValues map[string]*envoy_listener_v3.Listener

    Config ListenerConfig
    contour.Cond
}

// contour/internal/xdscache/v3/route.go
// RouteCache manages the contents of the gRPC RDS cache.
type RouteCache struct {
    mu     sync.Mutex
    values map[string]*envoy_route_v3.RouteConfiguration
    contour.Cond
}

// contour/internal/xdscache/v3/secret.go
// SecretCache manages the contents of the gRPC SDS cache.
type SecretCache struct {
    mu     sync.Mutex
    values map[string]*envoy_tls_v3.Secret
    contour.Cond
}

// contour/internal/xdscache/v3/endpointstranslator.go
// A EndpointsTranslator translates Kubernetes Endpoints objects into Envoy
// ClusterLoadAssignment resources.
type EndpointsTranslator struct {
    // Observer notifies when the endpoints cache has been updated.
    Observer contour.Observer

    contour.Cond
    logrus.FieldLogger

    cache EndpointsCache

    mu      sync.Mutex // Protects entries.
    entries map[string]*envoy_endpoint_v3.ClusterLoadAssignment
}

以 RouteCache 的 OnChange 为例,查看实现逻辑

代码语言:javascript复制
// contour/internal/xdscache/v3/route.go
func (c *RouteCache) OnChange(root *dag.DAG) {
  // 读取 DAG 中的信息,生成 envoy 的 RDS 资源 envoy_route_v3.RouteConfiguration
    routes := visitRoutes(root)
  // 将配置文件放到 value 字段中,供 Query、Context 等方法调用时查看
    c.Update(routes)
}

DAG 转换为 RDS 的逻辑实现

代码语言:javascript复制
// contour/internal/xdscache/v3/route.go
func visitRoutes(root dag.Vertex) map[string]*envoy_route_v3.RouteConfiguration {
    // Collect the route configurations for all the routes we can
    // find. For HTTP hosts, the routes will all be collected on the
    // well-known ENVOY_HTTP_LISTENER, but for HTTPS hosts, we will
    // generate a per-vhost collection. This lets us keep different
    // SNI names disjoint when we later configure the listener.
  
  // http 和 https 需要做不同的处理
    rv := routeVisitor{
        routes: map[string]*envoy_route_v3.RouteConfiguration{
            ENVOY_HTTP_LISTENER: envoy_v3.RouteConfiguration(ENVOY_HTTP_LISTENER),
        },
    }

    rv.visit(root)

    for _, v := range rv.routes {
        sort.Stable(sorter.For(v.VirtualHosts))
    }

    return rv.routes
}

// contour/internal/xdscache/v3/route.go
// 从根节点开始查找所有的 Route,查找顺序: Listener -> VirtualHost(SecureVirtualHost) -> Route
func (v *routeVisitor) visit(vertex dag.Vertex) {
    switch l := vertex.(type) {
   // 处理 DAG 中的 Listener
    case *dag.Listener:
        l.Visit(func(vertex dag.Vertex) {
            switch vh := vertex.(type) {
      // 处理 VirtualHost
            case *dag.VirtualHost:
                v.onVirtualHost(vh)
      // 处理 SecureVirtualHost
            case *dag.SecureVirtualHost:
                v.onSecureVirtualHost(vh)
            default:
                // recurse
                vertex.Visit(v.visit)
            }
        })
    default:
        // recurse
        vertex.Visit(v.visit)
    }
}

// 查找 Listener.VirtualHost.Route
func (v *routeVisitor) onVirtualHost(vh *dag.VirtualHost) {
    var routes []*dag.Route

    vh.Visit(func(v dag.Vertex) {
        route, ok := v.(*dag.Route)
        if !ok {
            return
        }
    // 将所有的 route 收集到列表中
        routes = append(routes, route)
    })

    if len(routes) == 0 {
        return
    }

  // 定义了把 dag.Route 对象转换为 envoy_route_v3.Route 对象的方法
  // 将两种数据格式进行转换
    toEnvoyRoute := func(route *dag.Route) *envoy_route_v3.Route {
        ...
        rt := &envoy_route_v3.Route{
            Match:  envoy_v3.RouteMatch(route),
            Action: envoy_v3.RouteRoute(route),
        }
        ...
        return rt
    }

  // 对 route 排序
    sortRoutes(routes)
  // 这里生成的配置,存储到 routes 这个map中,key 是 ingress_http
    v.routes[ENVOY_HTTP_LISTENER].VirtualHosts = append(v.routes[ENVOY_HTTP_LISTENER].VirtualHosts, toEnvoyVirtualHost(vh, routes, toEnvoyRoute))
}
GRPC 数据下发

从前面的分析我们知道,数据最终被转换为多个 ResourceCache 的列表,这些数据最终有两个用途:

  • 通过 GRPC 下发给数据面侧的 Envoy,最终达到流量路由的效果
  • 传给 SnapshotHander,用于生成快照
代码语言:javascript复制
func doServe(log logrus.FieldLogger, ctx *serveContext) error {
    ...
  // 用于生成快照,这部分比较简单,不做介绍
  snapshotHandler := xdscache.NewSnapshotHandler(resources, log.WithField("context", "snapshotHandler"))

  ...
  // 注册服务到 GRPC
  // 先通过 NewContourServer 生成一个 GRPC 服务,再注册到 grpcServer
  switch ctx.Config.Server.XDSServerType {
    // 根据启动参数的不同配置,实例化不同的对象
    
    // serverType = "envoy"
        case config.EnvoyServerType:
            v3cache := contour_xds_v3.NewSnapshotCache(false, log)
            snapshotHandler.AddSnapshotter(v3cache)
      // 调用 NewServer 构造对象
      // 这个方法是 go-control-plane 内置的方法
            contour_xds_v3.RegisterServer(envoy_server_v3.NewServer(taskCtx, v3cache, contour_xds_v3.NewRequestLoggingCallbacks(log)), grpcServer)
    // serverType = "contour",这个是默认配置
        case config.ContourServerType:
      // 调用 NewContourServer 构造对象
            contour_xds_v3.RegisterServer(contour_xds_v3.NewContourServer(log, xdscache.ResourcesOf(resources)...), grpcServer)
  ...
}

下面分析数据经过 grpc 下发的流程,这部分主要是使用 go-control-plane sdk

注册 ADS、SDS、CDS、EDS、LDS、RDS 等都使用了 srv 对象,可以推断 Server 继承了 各种 XDS的接口

代码语言:javascript复制
// contour/internal/xds/v3/server.go

// 以下注册方法均调用的 go-control-plane sdk
// 包括注册 ADS、SDS、CDS、EDS、LDS、RDS
func RegisterServer(srv Server, g *grpc.Server) {
    // register services
    envoy_service_discovery_v3.RegisterAggregatedDiscoveryServiceServer(g, srv)
    envoy_service_secret_v3.RegisterSecretDiscoveryServiceServer(g, srv)
    envoy_service_cluster_v3.RegisterClusterDiscoveryServiceServer(g, srv)
    envoy_service_endpoint_v3.RegisterEndpointDiscoveryServiceServer(g, srv)
    envoy_service_listener_v3.RegisterListenerDiscoveryServiceServer(g, srv)
    envoy_service_route_v3.RegisterRouteDiscoveryServiceServer(g, srv)
}

// 真正实例化的 Server 对象是 `contourServer`
// 将前面准备好的 ResourceCache 列表传进来
func NewContourServer(log logrus.FieldLogger, resources ...xds.Resource) Server {
    c := contourServer{
        FieldLogger: log,
        resources:   map[string]xds.Resource{},
    }
  // 按照 TypeURL 作为 key,保存资源
    for i, r := range resources {
        c.resources[r.TypeURL()] = resources[i]
    }

    return &c
}
contourServer
代码语言:javascript复制
// contour/internal/xds/v3/contour.go
type contourServer struct {
   // Since we only implement the streaming state of the world
   // protocol, embed the default null implementations to handle
   // the unimplemented gRPC endpoints.
   envoy_service_discovery_v3.UnimplementedAggregatedDiscoveryServiceServer
   envoy_service_secret_v3.UnimplementedSecretDiscoveryServiceServer
   envoy_service_route_v3.UnimplementedRouteDiscoveryServiceServer
   envoy_service_endpoint_v3.UnimplementedEndpointDiscoveryServiceServer
   envoy_service_cluster_v3.UnimplementedClusterDiscoveryServiceServer
   envoy_service_listener_v3.UnimplementedListenerDiscoveryServiceServer

   logrus.FieldLogger
   // 所有生成的 Envoy XDS 信息都传进来保存到这个字段
   resources   map[string]xds.Resource
   connections xds.Counter
}

把 ResourceCache 传给 contourServer,contourServer作为GRPC的server注册到GRPC上,go-control-plane 框架封装了各个XDS数据传输的接口,contourServer 实现这些接口,从而实现数据下发

代码语言:javascript复制
func (s *contourServer) StreamClusters(srv envoy_service_cluster_v3.ClusterDiscoveryService_StreamClustersServer) error {
    return s.stream(srv)
}

func (s *contourServer) StreamEndpoints(srv envoy_service_endpoint_v3.EndpointDiscoveryService_StreamEndpointsServer) error {
    return s.stream(srv)
}

func (s *contourServer) StreamListeners(srv envoy_service_listener_v3.ListenerDiscoveryService_StreamListenersServer) error {
    return s.stream(srv)
}

func (s *contourServer) StreamRoutes(srv envoy_service_route_v3.RouteDiscoveryService_StreamRoutesServer) error {
    return s.stream(srv)
}

func (s *contourServer) StreamSecrets(srv envoy_service_secret_v3.SecretDiscoveryService_StreamSecretsServer) error {
    return s.stream(srv)
}

所有的 XDS Stream方法内部都调用了 s.stream 方法

代码语言:javascript复制
func (s *contourServer) stream(st grpcStream) error {
    ...
  // 持续监听连接,并处理请求
    // now stick in this loop until the client disconnects.
    for {
        // 获取 envoy 发送来的请求
        req, err := st.Recv()
        ...
    // 根据请求的 TypeURL,找到匹配的 ResourceCache
        r, ok := s.resources[req.GetTypeUrl()]
        ...
    // 注册监听器
        r.Register(ch, last, req.ResourceNames...)
        select {
        case last = <-ch:
            
            var resources []proto.Message
            switch len(req.ResourceNames) {
      // 如果没有指定资源名称,返回所有资源
            case 0:
                // 调用前面介绍过的 Contents 方法,从 ResourceCache 的 value 中取到 envoy 配置
                resources = r.Contents()
      // 如果指定资源名称,查找特定资源
            default:
                // resource hints supplied, return exactly those
                resources = r.Query(req.ResourceNames)
            }
      // 将 ResourceCache 转换成 PB 数据
            any := make([]*any.Any, 0, len(resources))
            for _, r := range resources {
                a, err := anypb.New(proto.MessageV2(r))
                if err != nil {
                    return done(log, err)
                }
                any = append(any, a)
            }
      // 构造 grpc response 对象
            resp := &envoy_service_discovery_v3.DiscoveryResponse{
                VersionInfo: strconv.Itoa(last),
                Resources:   any,
                TypeUrl:     req.GetTypeUrl(),
                Nonce:       strconv.Itoa(last),
            }

      // 将 response 通过 grpc 发送给客户端 envoy
            if err := st.Send(resp); err != nil {
                return done(log, err)
            }

        case <-ctx.Done():
            return done(log, ctx.Err())
        }
    }
}
框架server VS contourServer

contour会根据 ctx.Config.Server.XDSServerType 属性的值,初始化不同的 server。

  • envoy:初始化框架默认的server
  • contour:初始化 contourServer

Go-control-plane 提供的 pb 接口包括三类:

  • StreamXXX
  • DeltaXXX
  • FetchXXX

XXX 代表不同的 XDS,比如 StreamClusters。

contourServer 只实现了所有 XDS 的 Stream 相关方法,并没有实现 Delta,Fetch 相关方法

Envoy 连接到 Contour服务端

前面介绍了 contour 下发数据到 envoy 的流程,那最开始 envoy是如何连接上 contour的?

initContainer 生成配置文件

查看 envoy daemonset yaml文件,可以看到,内部配置了一个 initContainer,用于生成本地配置文件,并和 envoy 业务容器共享,envoy 启动时根据配置文件启动。启动命令中还指定了 envoy 连接的 grpc server 地址,即 contour的地址,这里配置的是 k8s svc 名称

代码语言:javascript复制
initContainers:
- args:
  - bootstrap
  - /config/envoy.json
  - --xds-address=contour
  - --xds-port=8001
  - --xds-resource-version=v3
  - --resources-dir=/config/resources
  - --envoy-cafile=/certs/ca.crt
  - --envoy-cert-file=/certs/tls.crt
  - --envoy-key-file=/certs/tls.key

envoy启动配置文件内容

进入 envoy 容器可以查看生成的配置内容

代码语言:javascript复制
kubectl -n projectcontour exec -it envoy-65qfq -c envoy -- cat /config/envoy.json
代码语言:javascript复制
{
    "static_resources": {
        "clusters": [
            {
                "name": "contour",
                "alt_stat_name": "projectcontour_contour_8001",
                "type": "STRICT_DNS",
                "connect_timeout": "5s",
                "load_assignment": {
                    "cluster_name": "contour",
                    "endpoints": [
                        {
                            "lb_endpoints": [
                                {
                                    "endpoint": {
                                        "address": {
                                            "socket_address": {
                                                "address": "contour",
                                                "port_value": 8001
                                            }
                                        }
                                    }
                                }
                            ]
                        }
                    ]
                },
                "circuit_breakers": {
                    "thresholds": [
                        {
                            "priority": "HIGH",
                            "max_connections": 100000,
                            "max_pending_requests": 100000,
                            "max_requests": 60000000,
                            "max_retries": 50
                        },
                        {
                            "max_connections": 100000,
                            "max_pending_requests": 100000,
                            "max_requests": 60000000,
                            "max_retries": 50
                        }
                    ]
                },
                "typed_extension_protocol_options": {
                    "envoy.extensions.upstreams.http.v3.HttpProtocolOptions": {
                        "@type": "type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions",
                        "explicit_http_config": {
                            "http2_protocol_options": {}
                        }
                    }
                },
                "transport_socket": {
                    "name": "envoy.transport_sockets.tls",
                    "typed_config": {
                        "@type": "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext",
                        "common_tls_context": {
                            "tls_params": {
                                "tls_maximum_protocol_version": "TLSv1_3"
                            },
                            "tls_certificate_sds_secret_configs": [
                                {
                                    "name": "contour_xds_tls_certificate",
                                    "sds_config": {
                                        "path": "/config/resources/sds/xds-tls-certificate.json",
                                        "resource_api_version": "V3"
                                    }
                                }
                            ],
                            "validation_context_sds_secret_config": {
                                "name": "contour_xds_tls_validation_context",
                                "sds_config": {
                                    "path": "/config/resources/sds/xds-validation-context.json",
                                    "resource_api_version": "V3"
                                }
                            }
                        }
                    }
                },
                "upstream_connection_options": {
                    "tcp_keepalive": {
                        "keepalive_probes": 3,
                        "keepalive_time": 30,
                        "keepalive_interval": 5
                    }
                }
            },
            {
                "name": "envoy-admin",
                "alt_stat_name": "projectcontour_envoy-admin_9001",
                "type": "STATIC",
                "connect_timeout": "0.250s",
                "load_assignment": {
                    "cluster_name": "envoy-admin",
                    "endpoints": [
                        {
                            "lb_endpoints": [
                                {
                                    "endpoint": {
                                        "address": {
                                            "pipe": {
                                                "path": "/admin/admin.sock",
                                                "mode": 420
                                            }
                                        }
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ]
    },
    "dynamic_resources": {
        "lds_config": {
            "api_config_source": {
                "api_type": "GRPC",
                "transport_api_version": "V3",
                "grpc_services": [
                    {
                        "envoy_grpc": {
                            "cluster_name": "contour"
                        }
                    }
                ]
            },
            "resource_api_version": "V3"
        },
        "cds_config": {
            "api_config_source": {
                "api_type": "GRPC",
                "transport_api_version": "V3",
                "grpc_services": [
                    {
                        "envoy_grpc": {
                            "cluster_name": "contour"
                        }
                    }
                ]
            },
            "resource_api_version": "V3"
        }
    },
    "admin": {
        "access_log": [
            {
                "name": "envoy.access_loggers.file",
                "typed_config": {
                    "@type": "type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog",
                    "path": "/dev/null"
                }
            }
        ],
        "address": {
            "pipe": {
                "path": "/admin/admin.sock",
                "mode": 420
            }
        }
    }
}

总结

k8s本身没有内置 ingress 控制器,ingress 控制器呈现百花齐放的状态,参考官方文档:ingress 控制器。

不同的控制器,底层使用的网络代理不同,主要有:nginx、envoy、kong、haproxy等。envoy 作为云原生时代的网络代理,被越来越多的开源项目使用。比如:istio、contour、gloo、。这些项目的本质,都是简化 envoy 的配置,封装更简单的路由规则,通过内部的逻辑转换,变成envoy 的xds配置。

go-control-plane 框架封装了基于grpc下发xds配置,可以方便开发者开发自己的控制面。

0 人点赞