概述
- contour 是一个开源的 k8s ingress controller
- 使用 envoy 提供反向代理
- 提供动态配置更新
- 支持多集群网络代理
特性
- 基于高性能的L7代理和负载均衡Envoy作为数据面
- 灵活的架构
- TLS安全代理
为什么选择Contour
- 动态配置更新,最大限度减少连接中断
- 安全的支持多集群,多配置
- 使用 HTTPProxy CRD 增强 ingress 核心配置
安装
kubectl
代码语言:javascript复制# yaml 安装
kubectl apply -f https://projectcontour.io/quickstart/contour.yaml
# 验证
kubectl get pod -n projectcontour
helm
代码语言:javascript复制# 添加仓库
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm install contour --namespace projectcontour bitnami/contour
资源
部署完成后查看安装的资源信息,包括:
- contour deployment:counter controller,作为控制面
- envoy daemonset:envoy,作为数据面
$ kubectl get deploy,svc,ds,job -n projectcontour
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/contour 2/2 2 2 12d
NAME TYPE CLUSTER-IP EXTERNAL-IP
service/contour ClusterIP 172.20.126.255 <none> 8001/TCP 12d
service/envoy LoadBalancer 172.20.12.164 ...
NAME DESIRED CURRENT READY UP-TO-DATE AVAILABLE NODE SELECTOR AGE
daemonset.apps/envoy 10 10 10 10 10 <none> 12d
CRD
contour 安装完成后,查看 crd 有哪些:
代码语言:javascript复制$ kubectl get crd|grep contour
contourconfigurations.projectcontour.io 2021-11-04T13:30:06Z
contourdeployments.projectcontour.io 2021-11-04T13:30:06Z
extensionservices.projectcontour.io 2021-11-04T13:30:06Z
httpproxies.projectcontour.io 2021-11-04T13:30:06Z
tlscertificatedelegations.projectcontour.io 2021-11-04T13:30:06Z
其中最核心的 CRD 是 HTTPProxy,等效于 k8s 提供的原生 Ingress,提供路由配置功能。
看一个简单的 httpproxy 配置示例,当访问 foo1.bar.com 时,将代理到后端 s1 服务的 80 端口
代码语言:javascript复制将 foo1.bar.com 域名和任意一个Node节点(daemonset部署)的ip做域名绑定后,即可通过域名访问服务
apiVersion: projectcontour.io/v1
kind: HTTPProxy
metadata:
name: name-example-foo
namespace: default
spec:
virtualhost:
fqdn: foo1.bar.com
routes:
- conditions:
- prefix: /
services:
- name: s1
port: 80
架构
概述
contour 主要包含两个组件:
- Envoy:提供高性能的反向代理
- Contour:充当 Envoy 的后台管理服务,提供相关配置
两个组件是分开部署的,Contour 以 Deployment 的方式部署,Envoy 以 DaemonSet 的方式部署
原理
数据面:envoy侧
envoy pod 中包含两个 container
- initContainer container:执行
contour bootstrap
命令,生成一个配置文件到临时目录,envoy container共享这个目录,并作为启动的配置文件,里面配置了 contour server 地址,作为 envoy 的管理服务 - envoy container:根据配置文件启动envoy,建立 GRPC连接,并从服务端接收配置请求
控制面:contour 侧
- contour 是 k8s api 的一个客户端,通过 informer 机制监听 ingress,service,endpoint,secret等k8s原生资源,和httpproxy这个CRD
- contour 缓存k8s的资源信息,并最终转换为 Envoy 需要的 XDS 信息。
其他
从图中我们还看到其他的一些信息
- contour job:自动生成证书,使得grpc上的xds安全传输
- contour 内部还提供了选主模式
HTTPProxy配置
概述
Ingress 资源在 k8s 1.1 版本引入,从那以后,Ingress API 保持相对稳定,需要使用特殊的能力需要借助于 Annotation 实现。
HttpProxy CRD 的目标是扩展 Ingress API 的能力,为用户提供更丰富的功能体验,以及解决 Ingress 在多租户环境下的限制。
对比 Ingress 的优势
- 安全地支持多 Kubernetes 集群,能够限制哪些命名空间可以配置虚拟主机和 TLS 凭据。
- 能够包括来自另一个 HTTPProxy 的路径或域的路由配置(这个HTTPProxy可能在另一个命名空间中)
- 允许单个路由中接受多个服务并负载均衡它们之间的流量
- 天生支持定义服务权重和负载策略,不需要使用注解
- 支持创建时校验配置的合法性
配置文件对比
Ingress
代码语言:javascript复制# ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: basic
spec:
rules:
- host: foo-basic.bar.com
http:
paths:
- backend:
service:
name: s1
port:
number: 80
HTTPProxy
代码语言:javascript复制# httpproxy.yaml
apiVersion: projectcontour.io/v1
kind: HTTPProxy
metadata:
name: basic
spec:
virtualhost:
fqdn: foo-basic.bar.com
routes:
- conditions:
- prefix: /
services:
- name: s1
port: 80
源码分析
原理回顾
通过前面 contour 的基本原理和架构的介绍,总结如下:
数据流转原理
- contour 通过 informer 机制监听 k8s 资源
- 将k8s资源转换为 envoy 需要的 xds 资源
- 通过grpc将xds配置下发到envoy节点
流量流转原理
- 带有域名的请求经过任意一个Node节点,进入Envoy(以 hostNetWork=true 部署方式为例)
- envoy将请求路由到 HTTPProxy 中配置的后端 service
数据流转实现
概述
实际上在将 k8s 资源转换为最终 xds 资源的过程中,还经过了其他的数据转换。
高清图
- k8s资源变化,触发 informer 通知机制,注册的回调函数将事件放入 channel
- EventHandler协程消费 channel,将channel中的每种资源以一个map形式保存在KubernetesCache中,map的key是该资源的namespace name唯一确定的
- KubenetesCache中的资源,经过一序列 processor 的处理之后,生成 DAG
- DAG变更,会通知一序列 Observer,依次调用 OnChange方法,传入DAG,每个 Observer对应一个 XDS协议,将DAG中需要的资源转换为XDS配置,保存在Cache中
- GRPC读取Cache中的数据,通过Stream将XDS下发到Envoy。下发这一步使用了
go-control-plane
这个开源框架
源码调用图
源码调用高清图
ResourceEventHandler
k8s 提供了 informer 机制,可以 watch kubernetes资源的变更。contour中 watch 的资源包括两类:
k8s原生资源:
- ingress
- service
- endpoint
- namespace
crd资源:
- httpproxy
// contour/cmd/contour/serve.go
func doServe(log logrus.FieldLogger, ctx *serveContext) error {
// 这里监听 contour 的 crd 资源, 比如:httpproxy
for _, r := range k8s.DefaultResources() {
inf, err := clients.InformerForResource(r)
......
inf.AddEventHandler(&dynamicHandler)
}
// 监听 ingress 资源
for _, r := range k8s.IngressV1Resources() {
if err := informOnResource(clients, r, &dynamicHandler); err != nil {
log.WithError(err).WithField("resource", r).Fatal("failed to create informer")
}
}
......
// 监听 secret 资源
for _, r := range k8s.SecretsResources() {
......
if err := informOnResource(clients, r, handler); err != nil {
log.WithError(err).WithField("resource", r).Fatal("failed to create informer")
}
}
// 监听 endpoint 资源
for _, r := range k8s.EndpointsResources() {
if err := informOnResource(clients, r, &k8s.DynamicClientHandler{
Next: &contour.EventRecorder{
Next: endpointHandler,
Counter: contourMetrics.EventHandlerOperations,
},
Converter: converter,
Logger: log.WithField("context", "endpointstranslator"),
}); err != nil {
log.WithError(err).WithField("resource", r).Fatal("failed to create informer")
}
}
......
}
不管是 原生资源,还是 crd 资源,Informer 监听的原理都是一样的,用户只需要编写事件处理函数,对应的接口是 ResourceEventHandler,该接口提供了增加、删除、更新回调函数。
代码语言:javascript复制// k8s.io/client-go/tools/cache/controller.go
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
Contour 中所有的资源复用了两个 ResourceEventHandler 的实现类:
- EndpointsTranslator:处理 endpoint 资源
- EventHandler:处理除了 endpoint 外的其他资源。
EventHandler 将得到的资源信息,放入channel中,由单独的协程去消费以解耦
代码语言:javascript复制func (e *EventHandler) OnAdd(obj interface{}) {
e.update <- opAdd{obj: obj}
}
func (e *EventHandler) OnUpdate(oldObj, newObj interface{}) {
e.update <- opUpdate{oldObj: oldObj, newObj: newObj}
}
func (e *EventHandler) OnDelete(obj interface{}) {
e.update <- opDelete{obj: obj}
}
放入channel的信息,由谁去消费呢?
监听 channel 的协程,拿到事件后,取出资源信息,保存到 KubernetesCache 中。
代码语言:javascript复制// contour/cmd/contour/server.go
func doServe(log logrus.FieldLogger, ctx *serveContext) error {
......
// 启动协程
g.Add(eventHandler.Start())
......
}
// counter/internal/contour/handler.go
func (e *EventHandler) Start() func(<-chan struct{}) error {
e.update = make(chan interface{})
return e.run
}
func (e *EventHandler) run(stop <-chan struct{}) error {
......
// 死循环,一直监听 channel 中的事件
for {
select {
case op := <-e.update:
if e.onUpdate(op) {
......
}
......
}
// 处理事件
// 将资源保存到 e.Builder.Source
func (e *EventHandler) onUpdate(op interface{}) bool {
switch op := op.(type) {
case opAdd:
return e.Builder.Source.Insert(op.obj)
case opUpdate:
......
remove := e.Builder.Source.Remove(op.oldObj)
insert := e.Builder.Source.Insert(op.newObj)
return remove || insert
case opDelete:
return e.Builder.Source.Remove(op.obj)
case bool:
return op
default:
return false
}
}
EndpointsTranslator 监听的 endpoint 资源,处理比较麻烦一些,以 add 为例分析
代码语言:javascript复制// contour/internal/xdscache/v3/endpointstranslator.go
func (e *EndpointsTranslator) OnAdd(obj interface{}) {
switch obj := obj.(type) {
case *v1.Endpoints:
// 更新本地缓存
e.cache.UpdateEndpoint(obj)
// 合并资源
e.Merge(e.cache.Recalculate())
e.Notify()
if e.Observer != nil {
// 刷新保存快照
e.Observer.Refresh()
}
default:
e.Errorf("OnAdd unexpected type %T: %#v", obj, obj)
}
}
KubernetesCache
类中定义了多个map类型的资源对象,分别存储不同的资源,key是 namespace 和 name 组成的 type.NamespacedName,将原生k8s中不同 namespace 的同一资源都存储到一个map中,多种资源的多个map,共同组成 KubernetesCache
代码语言:javascript复制// contour/internal/dag/cache.go
type KubernetesCache struct {
......
ingresses map[types.NamespacedName]*networking_v1.Ingress
ingressclass *networking_v1.IngressClass
httpproxies map[types.NamespacedName]*contour_api_v1.HTTPProxy
secrets map[types.NamespacedName]*v1.Secret
tlscertificatedelegations map[types.NamespacedName]*contour_api_v1.TLSCertificateDelegation
services map[types.NamespacedName]*v1.Service
namespaces map[string]*v1.Namespace
gatewayclass *gatewayapi_v1alpha1.GatewayClass
gateway *gatewayapi_v1alpha1.Gateway
httproutes map[types.NamespacedName]*gatewayapi_v1alpha1.HTTPRoute
tlsroutes map[types.NamespacedName]*gatewayapi_v1alpha1.TLSRoute
tcproutes map[types.NamespacedName]*gatewayapi_v1alpha1.TCPRoute
udproutes map[types.NamespacedName]*gatewayapi_v1alpha1.UDPRoute
backendpolicies map[types.NamespacedName]*gatewayapi_v1alpha1.BackendPolicy
extensions map[types.NamespacedName]*contour_api_v1alpha1.ExtensionService
......
}
// map中key的数据类型:由 namespace 和 name 共同决定
type NamespacedName struct {
Namespace string
Name string
}
KubernetesCache 插入流程:
- 首先判断资源类型
- 将不同的资源放入不同的map
- 有些资源的变更,还需要触发其他资源变更。比如:service变化了,对应的 ingress或者httpproxy也需要联动变化
// contour/internal/dag/cache.go
func (kc *KubernetesCache) Insert(obj interface{}) bool {
// 初始化 map,确保执行一次
kc.initialize.Do(kc.init)
// 根据不同的资源类型,放入不同的 map
switch obj := obj.(type) {
case *v1.Secret:
......
// 保存 secret 资源
kc.secrets[k8s.NamespacedNameOf(obj)] = obj
return kc.secretTriggersRebuild(obj)
case *v1.Service:
kc.services[k8s.NamespacedNameOf(obj)] = obj
return kc.serviceTriggersRebuild(obj)
case *v1.Namespace:
kc.namespaces[obj.Name] = obj
return true
......
}
return false
}
DAG
构造好 KubernetesCache 之后,contour 通过 一些列的 processor,将 KubernetesCache 变成 DAG,DAG 代表了 k8s 不同资源关系的一个有向无环图。核心代码如下:
代码语言:javascript复制// contour/internal/contour/handler.go
func (e *EventHandler) rebuildDAG() {
// 将 KubernetesCache 构建成 DAG
latestDAG := e.Builder.Build()
// DAG 变更时,通知所有的 Observer,
// 每种 Observer 是一个 XDS 对象,取出 DAG 资源转换为 XDS 配置
e.Observer.OnChange(latestDAG)
// 更新 snapshot 信息
for _, upd := range latestDAG.StatusCache.GetStatusUpdates() {
e.StatusUpdater.Send(upd)
}
}
DAG 数据结构如下,根节点 roots 是一个 Vertex 的列表,Vertex 只定义了Visit接口。即:所有实现了 Visit
代码语言:javascript复制// contour/internal/dag/dag.go
type DAG struct {
// StatusCache holds a cache of status updates to send.
StatusCache status.Cache
// roots are the root vertices of this DAG.
roots []Vertex
}
// Vertex is a node in the DAG that can be visited.
type Vertex interface {
Visit(func(Vertex))
}
build过程分析:
代码语言:javascript复制// contour/internal/dag/builder.go
func (b *Builder) Build() *DAG {
// 先构造一个空的 DAG
dag := DAG{
StatusCache: status.NewCache(b.Source.ConfiguredGateway),
}
// 遍历所有的 processor,每个 processor 把不同的 k8s 资源变成 DAG 的一部分信息
// 所有的 processor 共同构成完整的 DAG
for _, p := range b.Processors {
p.Run(&dag, &b.Source)
}
return &dag
}
processor 列表包括:
- IngressProcessor
- ExtensionServiceProcessor
- HTTPProxyProcessor
- GatewayAPIProcessor
- ListenerProcessor
func getDAGBuilder(ctx *serveContext, clients *k8s.Clients, clientCert, fallbackCert *types.NamespacedName, log logrus.FieldLogger) dag.Builder {
dagProcessors := []dag.Processor{
&dag.IngressProcessor{
...
},
&dag.ExtensionServiceProcessor{
...
},
&dag.HTTPProxyProcessor{
...
},
}
if ctx.Config.GatewayConfig != nil && clients.ResourcesExist(k8s.GatewayAPIResources()...) {
dagProcessors = append(dagProcessors, &dag.GatewayAPIProcessor{
FieldLogger: log.WithField("context", "GatewayAPIProcessor"),
})
}
...
dagProcessors = append(dagProcessors, &dag.ListenerProcessor{})
...
}
每个processor 实现 Run 方法,接收 DAG 和 KubernetesCache 参数,通过 KubernetesCache 修改 DAG,以 HTTPProxyProcessor 为例分析。
代码语言:javascript复制func (p *HTTPProxyProcessor) Run(dag *DAG, source *KubernetesCache) {
...
// 校验合法性
for _, proxy := range p.validHTTPProxies() {
// 计算路由规则
// 入参为 crd 资源 HttpProxy
// 内部经过各种计算,构造路由关系,最终保存在DAG中
p.computeHTTPProxy(proxy)
}
for meta := range p.orphaned {
proxy, ok := p.source.httpproxies[meta]
if ok {
pa, commit := p.dag.StatusCache.ProxyAccessor(proxy)
pa.ConditionFor(status.ValidCondition).AddError(contour_api_v1.ConditionTypeOrphanedError,
"Orphaned",
"this HTTPProxy is not part of a delegation chain from a root HTTPProxy")
commit()
}
}
}
DAG 中的节点(Vertex)有:
- VirtualHost:对应 LDS
- SecureVirtualHost:对应 LDS 和 CDS
- ExtensionCluster:对应 CDS
- ServiceCluster:对应 EDS
- Cluster:对应 CDS,一部分SDS
- Listener:对应 RDS
其中某些 Vectex 又包含子 Vectex,因此用图这种数据结构串联起来。
ResourceCache
前面分析 DAG 代码时,在生成 DAG 后,会通知所有的 Observer,即e.Observer.OnChange
// contour/internal/contour/handler.go
func (e *EventHandler) rebuildDAG() {
// 将 KubernetesCache 构建成 DAG
latestDAG := e.Builder.Build()
// DAG 变更时,通知所有的 Observer,
// 每种 Observer 是一个 XDS 对象,取出 DAG 资源转换为 XDS 配置
e.Observer.OnChange(latestDAG)
// 更新 snapshot 信息
for _, upd := range latestDAG.StatusCache.GetStatusUpdates() {
e.StatusUpdater.Send(upd)
}
}
这里的 e.Observer 是 一堆 Observer的组合
代码语言:javascript复制// contour/cmd/contour/serve.go
func doServe(log logrus.FieldLogger, ctx *serveContext) error {
...
// ResoureCache 列表
resources := []xdscache.ResourceCache{
xdscache_v3.NewListenerCache(listenerConfig, ctx.statsAddr, ctx.statsPort),
&xdscache_v3.SecretCache{},
&xdscache_v3.RouteCache{},
&xdscache_v3.ClusterCache{},
endpointHandler,
}
...
eventHandler := &contour.EventHandler{
...
// 注册一个组合的 Observer
Observer: dag.ComposeObservers(append(xdscache.ObserversOf(resources), snapshotHandler)...),
...
}
}
// 组合多个Observer
func ComposeObservers(observers ...Observer) Observer {
return ObserverFunc(func(d *DAG) {
for _, o := range observers {
o.OnChange(d)
}
})
}
ResourceCache 是一个接口,聚合了两个接口:dag.Observer 和 xds.Resource。该接口定义了如何处理 DAG,以及处理完成后如何返回给调用方。
- Observer接口:
- OnChange:参数为DAG,当 DAG 有任何变更,都会触发这个函数,可以拿到最新的DAG做处理
- Resource接口:
- Contents:返回xds资源的接口
- Query:查询xds资源的接口
- Register:注册xds资源的接口
- TypeURL:xds资源类型
// contour/internal/xdscache/resources.go
type ResourceCache interface {
dag.Observer
xds.Resource
}
// contour/internal/dag/dag.go
type Observer interface {
// 当 DAG变更时,触发相应业务逻辑
OnChange(*DAG)
}
// contour/internal/xds/resource.go
type Resource interface {
// 返回全部资源内容
// Contents returns the contents of this resource.
Contents() []proto.Message
// 通过给定的名称,返回对应的资源内容
// Query returns an entry for each resource name supplied.
Query(names []string) []proto.Message
// 注册资源
// Register registers ch to receive a value when Notify is called.
Register(chan int, int, ...string)
// 资源类型
// TypeURL returns the typeURL of messages returned from Values.
TypeURL() string
}
不同的 xds 资源类型都实现了ResourceCache 接口:
- RouteCache:对应 RDS
- ClusterCache:对应 CDS
- ListenerCache:对应 LDS
- SecretCache:对应 SDS
- EndpointsTranslator:对应 EDS
转换后的 xds 资源都保存在 values
字段中。
- Observer的OnChange接口:将 DAG 转换为 Envoy xds 配置保存在 values 中
- Resource的接口:提供查询 values 的方法,即可得到 envoy 的xds配置
// contour/internal/xdscache/v3/cluster.go
// ClusterCache manages the contents of the gRPC CDS cache.
type ClusterCache struct {
mu sync.Mutex
values map[string]*envoy_cluster_v3.Cluster
contour.Cond
}
// contour/internal/xdscache/v3/listener.go
// ListenerCache manages the contents of the gRPC LDS cache.
type ListenerCache struct {
mu sync.Mutex
values map[string]*envoy_listener_v3.Listener
staticValues map[string]*envoy_listener_v3.Listener
Config ListenerConfig
contour.Cond
}
// contour/internal/xdscache/v3/route.go
// RouteCache manages the contents of the gRPC RDS cache.
type RouteCache struct {
mu sync.Mutex
values map[string]*envoy_route_v3.RouteConfiguration
contour.Cond
}
// contour/internal/xdscache/v3/secret.go
// SecretCache manages the contents of the gRPC SDS cache.
type SecretCache struct {
mu sync.Mutex
values map[string]*envoy_tls_v3.Secret
contour.Cond
}
// contour/internal/xdscache/v3/endpointstranslator.go
// A EndpointsTranslator translates Kubernetes Endpoints objects into Envoy
// ClusterLoadAssignment resources.
type EndpointsTranslator struct {
// Observer notifies when the endpoints cache has been updated.
Observer contour.Observer
contour.Cond
logrus.FieldLogger
cache EndpointsCache
mu sync.Mutex // Protects entries.
entries map[string]*envoy_endpoint_v3.ClusterLoadAssignment
}
以 RouteCache 的 OnChange 为例,查看实现逻辑
代码语言:javascript复制// contour/internal/xdscache/v3/route.go
func (c *RouteCache) OnChange(root *dag.DAG) {
// 读取 DAG 中的信息,生成 envoy 的 RDS 资源 envoy_route_v3.RouteConfiguration
routes := visitRoutes(root)
// 将配置文件放到 value 字段中,供 Query、Context 等方法调用时查看
c.Update(routes)
}
DAG 转换为 RDS 的逻辑实现
代码语言:javascript复制// contour/internal/xdscache/v3/route.go
func visitRoutes(root dag.Vertex) map[string]*envoy_route_v3.RouteConfiguration {
// Collect the route configurations for all the routes we can
// find. For HTTP hosts, the routes will all be collected on the
// well-known ENVOY_HTTP_LISTENER, but for HTTPS hosts, we will
// generate a per-vhost collection. This lets us keep different
// SNI names disjoint when we later configure the listener.
// http 和 https 需要做不同的处理
rv := routeVisitor{
routes: map[string]*envoy_route_v3.RouteConfiguration{
ENVOY_HTTP_LISTENER: envoy_v3.RouteConfiguration(ENVOY_HTTP_LISTENER),
},
}
rv.visit(root)
for _, v := range rv.routes {
sort.Stable(sorter.For(v.VirtualHosts))
}
return rv.routes
}
// contour/internal/xdscache/v3/route.go
// 从根节点开始查找所有的 Route,查找顺序: Listener -> VirtualHost(SecureVirtualHost) -> Route
func (v *routeVisitor) visit(vertex dag.Vertex) {
switch l := vertex.(type) {
// 处理 DAG 中的 Listener
case *dag.Listener:
l.Visit(func(vertex dag.Vertex) {
switch vh := vertex.(type) {
// 处理 VirtualHost
case *dag.VirtualHost:
v.onVirtualHost(vh)
// 处理 SecureVirtualHost
case *dag.SecureVirtualHost:
v.onSecureVirtualHost(vh)
default:
// recurse
vertex.Visit(v.visit)
}
})
default:
// recurse
vertex.Visit(v.visit)
}
}
// 查找 Listener.VirtualHost.Route
func (v *routeVisitor) onVirtualHost(vh *dag.VirtualHost) {
var routes []*dag.Route
vh.Visit(func(v dag.Vertex) {
route, ok := v.(*dag.Route)
if !ok {
return
}
// 将所有的 route 收集到列表中
routes = append(routes, route)
})
if len(routes) == 0 {
return
}
// 定义了把 dag.Route 对象转换为 envoy_route_v3.Route 对象的方法
// 将两种数据格式进行转换
toEnvoyRoute := func(route *dag.Route) *envoy_route_v3.Route {
...
rt := &envoy_route_v3.Route{
Match: envoy_v3.RouteMatch(route),
Action: envoy_v3.RouteRoute(route),
}
...
return rt
}
// 对 route 排序
sortRoutes(routes)
// 这里生成的配置,存储到 routes 这个map中,key 是 ingress_http
v.routes[ENVOY_HTTP_LISTENER].VirtualHosts = append(v.routes[ENVOY_HTTP_LISTENER].VirtualHosts, toEnvoyVirtualHost(vh, routes, toEnvoyRoute))
}
GRPC 数据下发
从前面的分析我们知道,数据最终被转换为多个 ResourceCache 的列表,这些数据最终有两个用途:
- 通过 GRPC 下发给数据面侧的 Envoy,最终达到流量路由的效果
- 传给 SnapshotHander,用于生成快照
func doServe(log logrus.FieldLogger, ctx *serveContext) error {
...
// 用于生成快照,这部分比较简单,不做介绍
snapshotHandler := xdscache.NewSnapshotHandler(resources, log.WithField("context", "snapshotHandler"))
...
// 注册服务到 GRPC
// 先通过 NewContourServer 生成一个 GRPC 服务,再注册到 grpcServer
switch ctx.Config.Server.XDSServerType {
// 根据启动参数的不同配置,实例化不同的对象
// serverType = "envoy"
case config.EnvoyServerType:
v3cache := contour_xds_v3.NewSnapshotCache(false, log)
snapshotHandler.AddSnapshotter(v3cache)
// 调用 NewServer 构造对象
// 这个方法是 go-control-plane 内置的方法
contour_xds_v3.RegisterServer(envoy_server_v3.NewServer(taskCtx, v3cache, contour_xds_v3.NewRequestLoggingCallbacks(log)), grpcServer)
// serverType = "contour",这个是默认配置
case config.ContourServerType:
// 调用 NewContourServer 构造对象
contour_xds_v3.RegisterServer(contour_xds_v3.NewContourServer(log, xdscache.ResourcesOf(resources)...), grpcServer)
...
}
下面分析数据经过 grpc 下发的流程,这部分主要是使用 go-control-plane
sdk
注册 ADS、SDS、CDS、EDS、LDS、RDS 等都使用了 srv 对象,可以推断 Server 继承了 各种 XDS的接口
代码语言:javascript复制// contour/internal/xds/v3/server.go
// 以下注册方法均调用的 go-control-plane sdk
// 包括注册 ADS、SDS、CDS、EDS、LDS、RDS
func RegisterServer(srv Server, g *grpc.Server) {
// register services
envoy_service_discovery_v3.RegisterAggregatedDiscoveryServiceServer(g, srv)
envoy_service_secret_v3.RegisterSecretDiscoveryServiceServer(g, srv)
envoy_service_cluster_v3.RegisterClusterDiscoveryServiceServer(g, srv)
envoy_service_endpoint_v3.RegisterEndpointDiscoveryServiceServer(g, srv)
envoy_service_listener_v3.RegisterListenerDiscoveryServiceServer(g, srv)
envoy_service_route_v3.RegisterRouteDiscoveryServiceServer(g, srv)
}
// 真正实例化的 Server 对象是 `contourServer`
// 将前面准备好的 ResourceCache 列表传进来
func NewContourServer(log logrus.FieldLogger, resources ...xds.Resource) Server {
c := contourServer{
FieldLogger: log,
resources: map[string]xds.Resource{},
}
// 按照 TypeURL 作为 key,保存资源
for i, r := range resources {
c.resources[r.TypeURL()] = resources[i]
}
return &c
}
contourServer
代码语言:javascript复制// contour/internal/xds/v3/contour.go
type contourServer struct {
// Since we only implement the streaming state of the world
// protocol, embed the default null implementations to handle
// the unimplemented gRPC endpoints.
envoy_service_discovery_v3.UnimplementedAggregatedDiscoveryServiceServer
envoy_service_secret_v3.UnimplementedSecretDiscoveryServiceServer
envoy_service_route_v3.UnimplementedRouteDiscoveryServiceServer
envoy_service_endpoint_v3.UnimplementedEndpointDiscoveryServiceServer
envoy_service_cluster_v3.UnimplementedClusterDiscoveryServiceServer
envoy_service_listener_v3.UnimplementedListenerDiscoveryServiceServer
logrus.FieldLogger
// 所有生成的 Envoy XDS 信息都传进来保存到这个字段
resources map[string]xds.Resource
connections xds.Counter
}
把 ResourceCache 传给 contourServer,contourServer作为GRPC的server注册到GRPC上,go-control-plane 框架封装了各个XDS数据传输的接口,contourServer 实现这些接口,从而实现数据下发
代码语言:javascript复制func (s *contourServer) StreamClusters(srv envoy_service_cluster_v3.ClusterDiscoveryService_StreamClustersServer) error {
return s.stream(srv)
}
func (s *contourServer) StreamEndpoints(srv envoy_service_endpoint_v3.EndpointDiscoveryService_StreamEndpointsServer) error {
return s.stream(srv)
}
func (s *contourServer) StreamListeners(srv envoy_service_listener_v3.ListenerDiscoveryService_StreamListenersServer) error {
return s.stream(srv)
}
func (s *contourServer) StreamRoutes(srv envoy_service_route_v3.RouteDiscoveryService_StreamRoutesServer) error {
return s.stream(srv)
}
func (s *contourServer) StreamSecrets(srv envoy_service_secret_v3.SecretDiscoveryService_StreamSecretsServer) error {
return s.stream(srv)
}
所有的 XDS Stream方法内部都调用了 s.stream 方法
代码语言:javascript复制func (s *contourServer) stream(st grpcStream) error {
...
// 持续监听连接,并处理请求
// now stick in this loop until the client disconnects.
for {
// 获取 envoy 发送来的请求
req, err := st.Recv()
...
// 根据请求的 TypeURL,找到匹配的 ResourceCache
r, ok := s.resources[req.GetTypeUrl()]
...
// 注册监听器
r.Register(ch, last, req.ResourceNames...)
select {
case last = <-ch:
var resources []proto.Message
switch len(req.ResourceNames) {
// 如果没有指定资源名称,返回所有资源
case 0:
// 调用前面介绍过的 Contents 方法,从 ResourceCache 的 value 中取到 envoy 配置
resources = r.Contents()
// 如果指定资源名称,查找特定资源
default:
// resource hints supplied, return exactly those
resources = r.Query(req.ResourceNames)
}
// 将 ResourceCache 转换成 PB 数据
any := make([]*any.Any, 0, len(resources))
for _, r := range resources {
a, err := anypb.New(proto.MessageV2(r))
if err != nil {
return done(log, err)
}
any = append(any, a)
}
// 构造 grpc response 对象
resp := &envoy_service_discovery_v3.DiscoveryResponse{
VersionInfo: strconv.Itoa(last),
Resources: any,
TypeUrl: req.GetTypeUrl(),
Nonce: strconv.Itoa(last),
}
// 将 response 通过 grpc 发送给客户端 envoy
if err := st.Send(resp); err != nil {
return done(log, err)
}
case <-ctx.Done():
return done(log, ctx.Err())
}
}
}
框架server VS contourServer
contour会根据 ctx.Config.Server.XDSServerType 属性的值,初始化不同的 server。
- envoy:初始化框架默认的server
- contour:初始化 contourServer
Go-control-plane 提供的 pb 接口包括三类:
- StreamXXX
- DeltaXXX
- FetchXXX
XXX 代表不同的 XDS,比如 StreamClusters。
contourServer 只实现了所有 XDS 的 Stream 相关方法,并没有实现 Delta,Fetch 相关方法
Envoy 连接到 Contour服务端
前面介绍了 contour 下发数据到 envoy 的流程,那最开始 envoy是如何连接上 contour的?
initContainer 生成配置文件
查看 envoy daemonset yaml文件,可以看到,内部配置了一个 initContainer,用于生成本地配置文件,并和 envoy 业务容器共享,envoy 启动时根据配置文件启动。启动命令中还指定了 envoy 连接的 grpc server 地址,即 contour的地址,这里配置的是 k8s svc 名称
代码语言:javascript复制initContainers:
- args:
- bootstrap
- /config/envoy.json
- --xds-address=contour
- --xds-port=8001
- --xds-resource-version=v3
- --resources-dir=/config/resources
- --envoy-cafile=/certs/ca.crt
- --envoy-cert-file=/certs/tls.crt
- --envoy-key-file=/certs/tls.key
envoy启动配置文件内容
进入 envoy 容器可以查看生成的配置内容
代码语言:javascript复制kubectl -n projectcontour exec -it envoy-65qfq -c envoy -- cat /config/envoy.json
代码语言:javascript复制{
"static_resources": {
"clusters": [
{
"name": "contour",
"alt_stat_name": "projectcontour_contour_8001",
"type": "STRICT_DNS",
"connect_timeout": "5s",
"load_assignment": {
"cluster_name": "contour",
"endpoints": [
{
"lb_endpoints": [
{
"endpoint": {
"address": {
"socket_address": {
"address": "contour",
"port_value": 8001
}
}
}
}
]
}
]
},
"circuit_breakers": {
"thresholds": [
{
"priority": "HIGH",
"max_connections": 100000,
"max_pending_requests": 100000,
"max_requests": 60000000,
"max_retries": 50
},
{
"max_connections": 100000,
"max_pending_requests": 100000,
"max_requests": 60000000,
"max_retries": 50
}
]
},
"typed_extension_protocol_options": {
"envoy.extensions.upstreams.http.v3.HttpProtocolOptions": {
"@type": "type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions",
"explicit_http_config": {
"http2_protocol_options": {}
}
}
},
"transport_socket": {
"name": "envoy.transport_sockets.tls",
"typed_config": {
"@type": "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext",
"common_tls_context": {
"tls_params": {
"tls_maximum_protocol_version": "TLSv1_3"
},
"tls_certificate_sds_secret_configs": [
{
"name": "contour_xds_tls_certificate",
"sds_config": {
"path": "/config/resources/sds/xds-tls-certificate.json",
"resource_api_version": "V3"
}
}
],
"validation_context_sds_secret_config": {
"name": "contour_xds_tls_validation_context",
"sds_config": {
"path": "/config/resources/sds/xds-validation-context.json",
"resource_api_version": "V3"
}
}
}
}
},
"upstream_connection_options": {
"tcp_keepalive": {
"keepalive_probes": 3,
"keepalive_time": 30,
"keepalive_interval": 5
}
}
},
{
"name": "envoy-admin",
"alt_stat_name": "projectcontour_envoy-admin_9001",
"type": "STATIC",
"connect_timeout": "0.250s",
"load_assignment": {
"cluster_name": "envoy-admin",
"endpoints": [
{
"lb_endpoints": [
{
"endpoint": {
"address": {
"pipe": {
"path": "/admin/admin.sock",
"mode": 420
}
}
}
}
]
}
]
}
}
]
},
"dynamic_resources": {
"lds_config": {
"api_config_source": {
"api_type": "GRPC",
"transport_api_version": "V3",
"grpc_services": [
{
"envoy_grpc": {
"cluster_name": "contour"
}
}
]
},
"resource_api_version": "V3"
},
"cds_config": {
"api_config_source": {
"api_type": "GRPC",
"transport_api_version": "V3",
"grpc_services": [
{
"envoy_grpc": {
"cluster_name": "contour"
}
}
]
},
"resource_api_version": "V3"
}
},
"admin": {
"access_log": [
{
"name": "envoy.access_loggers.file",
"typed_config": {
"@type": "type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog",
"path": "/dev/null"
}
}
],
"address": {
"pipe": {
"path": "/admin/admin.sock",
"mode": 420
}
}
}
}
总结
k8s本身没有内置 ingress 控制器,ingress 控制器呈现百花齐放的状态,参考官方文档:ingress 控制器。
不同的控制器,底层使用的网络代理不同,主要有:nginx、envoy、kong、haproxy等。envoy 作为云原生时代的网络代理,被越来越多的开源项目使用。比如:istio、contour、gloo、。这些项目的本质,都是简化 envoy 的配置,封装更简单的路由规则,通过内部的逻辑转换,变成envoy 的xds配置。
go-control-plane 框架封装了基于grpc下发xds配置,可以方便开发者开发自己的控制面。