Istio注入SideCar原理

2023-08-19 09:46:56 浏览数 (1)

重新发一次,之前的排版有问题

简介

Istio提供一种简单的方式来建立已部署的服务的网络,具备负载均衡,服务到服务认证,监控等等功能,而不需要改动任何服务代码。

简单的说,有了Istio,你的服务就不再需要任何微服务开发框架(典型如Spring Cloud,Dubbo),也不再需要自己动手实现各种复杂的服务治理的功能(很多是Spring Cloud和Dubbo也不能提供的,需要自己动手)。只要服务的客户端和服务器可以进行简单的直接网络访问,就可以通过将网络层委托给Istio,从而获得一系列的完备功能。

可以近似的理解为:Istio = 微服务框架 服务治理

这里主要讲解使用istio时,一些sidecar容器的注入原理

sidecar

stio 服务网格目前所需的容器有: istio-init 用于设置 iptables 规则,以便将入站/出站流量通过 Sidecar 代理。 istio-proxy 这个容器是真正的 Sidecar 代理(基于 Envoy) 向 pod 中注入 Istio Sidecar 的两种方法:

  1. 使用 istioctl 手动注入
  2. 启用 pod 所属命名空间的 Istio Sidecar 注入器自动注入。

这里只讲解自动注入的情况

使用配置

如果想把一个服务纳入 Istio 的网格中,需要在 pod 中注入 Sidecar 进行流量的劫持处理,通用的做法就是在 namespace 上打上 istio-injection=enabled 标签,这样只要在此 namespace 下创建或重启 pod 都会导致 pod 被注入 Sidecar,当然为了不让指定 pod 注入 Sidecar,可以在 pod 的 annotations 里加上 sidecar.istio.io/inject: "false"

webhook

Kubernetes 提供了自定义资源类型和自定义控制器来扩展功能,还提供了动态准入控制 Webhook,其实这个动态准入控制就是一个回调,Kubernetes 通过 Webhook 来实现准入控制,分为两种:验证性质的准入 Webhook (Validating Admission Webhook) 和修改性质的准入 Webhook (Mutating Admission Webhook)。 在 Istio 的配置里可以看到回调的 url 路径 /inject,Istio 主要使用的是 Mutating Admission Webhook,在资源持久化到 ETCD 之前进行资源的修改,增加 Init Container 和 Sidecar Container。但是 Istio 在进行资源修改前,需要满足一些条件,这些条件可以通过配置进行修改。

代码语言:javascript复制
apiVersion: admissionregistration.k8s.io/v1beta1
kind: MutatingWebhookConfiguration
metadata:
  name: istio-sidecar-injector
webhooks:
  - name: sidecar-injector.istio.io
    clientConfig:
      service:
        name: istio-sidecar-injector
        namespace: istio-system
        path: "/inject"
      caBundle: ${CA_BUNDLE}
    rules:
      - operations: [ "CREATE" ]
        apiGroups: [""]
        apiVersions: ["v1"]
        resources: ["pods"]
    namespaceSelector:
      matchLabels:
        istio-injection: enabled

通过配置我们看到, namespaceSelector 会去 match 标签为 istio-injection: enabled 的 namespace,并且根据请求规则,去匹配所有 pod 的创建 CREATE 请求。当 apiserver 收到一个符合规则的请求时,apiserver 会给 Webhook 服务发送一个通过审核的请求,Istio 中的这个 Webhook 服务是 Istiod 的 service,请求地址为 /inject。从代码 /pkg/kube/inject/webhook.go,中我们查看 Istio 是如何处理自动注入的,在 Discovery Server 中注册了两个用来处理自动注入的请求 handler, p.Mux.HandleFunc ("/inject", wh.serveInject)p.Mux.HandleFunc ("/inject/", wh.serveInject)wh.serveInject 就是实现自动注入的主要逻辑。

源码解析

入口函数:

代码语言:javascript复制
// NewWebhook creates a new instance of a mutating webhook for automatic sidecar injection.
func NewWebhook(p WebhookParameters) (*Webhook, error) {
   if p.Mux == nil {
      return nil, errors.New("expected mux to be passed, but was not passed")
   }

   wh := &Webhook{
      watcher: p.Watcher,
      meshConfig: p.Env.Mesh(),
      env: p.Env,
      revision: p.Revision,
   }

   p.Watcher.SetHandler(wh.updateConfig)
   sidecarConfig, valuesConfig, err := p.Watcher.Get()
   if err != nil {
      return nil, err
   }
   if err := wh.updateConfig(sidecarConfig, valuesConfig); err != nil {
      log.Errorf("failed to process webhook config: %v", err)
   }

   p.Mux.HandleFunc("/inject", wh.serveInject)
   p.Mux.HandleFunc("/inject/", wh.serveInject)

   p.Env.Watcher.AddMeshHandler(func() {
      wh.mu.Lock()
      wh.meshConfig = p.Env.Mesh()
      wh.mu.Unlock()
   })

   return wh, nil
}

从上面可以看到,webhook的回调地址是/inject,对应的处理函数是serveInject

下面来看看这个处理函数

代码语言:javascript复制
func (wh *Webhook) serveInject(w http.ResponseWriter, r *http.Request) {
  // ...省略一万字...

  var reviewResponse *kube.AdmissionResponse
  var obj runtime.Object
  var ar *kube.AdmissionReview
  if out, _, err := deserializer.Decode(body, nil, obj); err != nil {
    handleError(fmt.Sprintf("Could not decode body: %v", err))
    reviewResponse = toAdmissionResponse(err)
  } else {
    log.Debugf("AdmissionRequest for path=%sn", path)
    ar, err = kube.AdmissionReviewKubeToAdapter(out)
    if err != nil {
      handleError(fmt.Sprintf("Could not decode object: %v", err))
      reviewResponse = toAdmissionResponse(err)
    } else {
      reviewResponse = wh.inject(ar, path)
    }
  }

  // ...省略一万字...
}

func (wh *Webhook) inject(ar *v1beta1.AdmissionReview, path string) *v1beta1.AdmissionResponse {
  // ...省略一万字...
  
   wh.mu.RLock()
  if !injectRequired(IgnoredNamespaces.UnsortedList(), wh.Config, &pod.Spec, pod.ObjectMeta) {
    log.Infof("Skipping %s/%s due to policy check", pod.ObjectMeta.Namespace, podName)
    totalSkippedInjections.Increment()
    wh.mu.RUnlock()
    return &kube.AdmissionResponse{
      Allowed: true,
    }
  }

  proxyConfig := mesh.DefaultProxyConfig()
  if wh.env.PushContext != nil && wh.env.PushContext.ProxyConfigs != nil {
    if generatedProxyConfig := wh.env.PushContext.ProxyConfigs.EffectiveProxyConfig(
      &model.NodeMetadata{
        Namespace: pod.Namespace,
        Labels: pod.Labels,
        Annotations: pod.Annotations,
      }, wh.meshConfig); generatedProxyConfig != nil {
      proxyConfig = generatedProxyConfig
    }
  }
  deploy, typeMeta := kube.GetDeployMetaFromPod(&pod)
  params := InjectionParameters{
    pod: &pod,
    deployMeta: deploy,
    typeMeta: typeMeta,
    templates: wh.Config.Templates,
    defaultTemplate: wh.Config.DefaultTemplates,
    aliases: wh.Config.Aliases,
    meshConfig: wh.meshConfig,
    proxyConfig: proxyConfig,
    valuesConfig: wh.valuesConfig,
    revision: wh.revision,
    injectedAnnotations: wh.Config.InjectedAnnotations,
    proxyEnvs: parseInjectEnvs(path),
  }
  wh.mu.RUnlock()

  patchBytes, err := injectPod(params)
  if err != nil {
    handleError(fmt.Sprintf("Pod injection failed: %v", err))
    return toAdmissionResponse(err)
  }
  // ...省略一万字...
}

主要逻辑就是

  • 解析request请求
  • 检查各项配置,查看是否运用webhook进行sidecar的注入
  • 检查通过则注入sidecar
  • 构造适配kube的返回结果
1. 注入条件

下面来看下需要满足哪些条件才会注入

代码语言:javascript复制
func injectRequired(ignored []string, config *Config, podSpec *corev1.PodSpec, metadata *metav1.ObjectMeta) bool { 
    // Skip injection when host networking is enabled. The problem is
  // that the iptables changes are assumed to be within the pod when,
  // in fact, they are changing the routing at the host level. This
  // often results in routing failures within a node which can
  // affect the network provider within the cluster causing
  // additional pod failures.
  if podSpec.HostNetwork {
    return false
  }

  // skip special kubernetes system namespaces
  for _, namespace := range ignored {
    if metadata.Namespace == namespace {
      return false
    }
  }

  annos := metadata.GetAnnotations()

  var useDefault bool
  var inject bool

  objectSelector := annos[annotation.SidecarInject.Name]
  if lbl, labelPresent := metadata.GetLabels()[label.SidecarInject.Name]; labelPresent {
    // The label is the new API; if both are present we prefer the label
    objectSelector = lbl
  }
  switch strings.ToLower(objectSelector) {
  // http://yaml.org/type/bool.html
  case "y", "yes", "true", "on":
    inject = true
  case "":
    useDefault = true
  }

  // If an annotation is not explicitly given, check the LabelSelectors, starting with NeverInject
  if useDefault {
    for _, neverSelector := range config.NeverInjectSelector {
      selector, err := metav1.LabelSelectorAsSelector(&neverSelector)
      if err != nil {
        log.Warnf("Invalid selector for NeverInjectSelector: %v (%v)", neverSelector, err)
      } else if !selector.Empty() && selector.Matches(labels.Set(metadata.Labels)) {
        log.Debugf("Explicitly disabling injection for pod %s/%s due to pod labels matching NeverInjectSelector config map entry.",
          metadata.Namespace, potentialPodName(metadata))
        inject = false
        useDefault = false
        break
      }
    }
  }

  // If there's no annotation nor a NeverInjectSelector, check the AlwaysInject one
  if useDefault {
    for _, alwaysSelector := range config.AlwaysInjectSelector {
      selector, err := metav1.LabelSelectorAsSelector(&alwaysSelector)
      if err != nil {
        log.Warnf("Invalid selector for AlwaysInjectSelector: %v (%v)", alwaysSelector, err)
      } else if !selector.Empty() && selector.Matches(labels.Set(metadata.Labels)) {
        log.Debugf("Explicitly enabling injection for pod %s/%s due to pod labels matching AlwaysInjectSelector config map entry.",
          metadata.Namespace, potentialPodName(metadata))
        inject = true
        useDefault = false
        break
      }
    }
  }

  var required bool
  switch config.Policy {
  default: // InjectionPolicyOff
    log.Errorf("Illegal value for autoInject:%s, must be one of [%s,%s]. Auto injection disabled!",
      config.Policy, InjectionPolicyDisabled, InjectionPolicyEnabled)
    required = false
  case InjectionPolicyDisabled:
    if useDefault {
      required = false
    } else {
      required = inject
    }
  case InjectionPolicyEnabled:
    if useDefault {
      required = true
    } else {
      required = inject
    }
  }

  if log.DebugEnabled() {
    // Build a log message for the annotations.
    annotationStr := ""
    for name := range AnnotationValidation {
      value, ok := annos[name]
      if !ok {
        value = "(unset)"
      }
      annotationStr  = fmt.Sprintf("%s:%s ", name, value)
    }

    log.Debugf("Sidecar injection policy for %v/%v: namespacePolicy:%v useDefault:%v inject:%v required:%v %s",
      metadata.Namespace,
      potentialPodName(metadata),
      config.Policy,
      useDefault,
      inject,
      required,
      annotationStr)
  }

  return required
}

判断条件很多,具体如下

  • 判断 pod 的 spec 中没有设置 hostNetwork:true
  • 判断待注入的 pod 不在系统 namespace 里,如 kube-system 、kube-public
  • 设置三个临时变量, useDefault=falseinject=false 、required=false,判断是否配置 sidecar.istio.io/inject
  • 如果 sidecar.istio.io/inject 的值设置为 y, yes, true, on,则 inject=true
  • sidecar.istio.io/inject 为其他值,则 useDefault=true
  • 判断 neverInjectSelector 是否有匹配到的条件,如果匹配到则设置 useDefault = false、inject = false
  • 判断 alwaysInjectSelector 是否有匹配到的条件,如果匹配到则设置 useDefault = false、inject = true, alwaysInjectSelector 优先级高于 neverInjectSelector
  • 判断 policy 的取值
  • 如果 policy 取值为 disabled,并且上述得到的 useDefault=true,则 required=false,不进行注入
  • 如果 policy 取值为 disabled,并且上述得到的 useDefault=false,则需要根据 inject 的值判断是否需要注入
  • 如果 policy 取值为 enabled,并且上述得到的 useDefault=false,则需要根据 inject 的值判断是否需要注入
  • 如果 policy 取值为 enabled,并且上述得到的 useDefault=true,则 required=true,进行注入
  • policy 为其他值,不进行注入

从上面可以看出 是否注入Sidecar的优先级为

Pod Annotations → NeverInjectSelector → AlwaysInjectSelector → Default Policy

留给使用者的控制选项是很多的,非常灵活。

2. 注入过程

下面来看下具体的注入过程

代码语言:javascript复制
func injectPod(req InjectionParameters) ([]byte, error) {
   checkPreconditions(req)

   // The patch will be built relative to the initial pod, capture its current state
   originalPodSpec, err := json.Marshal(req.pod)
   if err != nil {
      return nil, err
   }

   // Run the injection template, giving us a partial pod spec
   mergedPod, injectedPodData, err := RunTemplate(req)
   if err != nil {
      return nil, fmt.Errorf("failed to run injection template: %v", err)
   }

   mergedPod, err = reapplyOverwrittenContainers(mergedPod, req.pod, injectedPodData)
   if err != nil {
      return nil, fmt.Errorf("failed to re apply container: %v", err)
   }

   // Apply some additional transformations to the pod
   if err := postProcessPod(mergedPod, *injectedPodData, req); err != nil {
      return nil, fmt.Errorf("failed to process pod: %v", err)
   }

   patch, err := createPatch(mergedPod, originalPodSpec)
   if err != nil {
      return nil, fmt.Errorf("failed to create patch: %v", err)
   }

   log.Debugf("AdmissionResponse: patch=%vn", string(patch))
   return patch, nil
}

func createPatch(pod *corev1.Pod, original []byte) ([]byte, error) {
  reinjected, err := json.Marshal(pod)
  if err != nil {
    return nil, err
  }
  p, err := jsonpatch.CreatePatch(original, reinjected)
  if err != nil {
    return nil, err
  }
  return json.Marshal(p)
}

可以看到,整个注入过程逻辑为:

  • 把原本的Pod配置反解析成Pod对象,
  • 把需要注入的Yaml内容(如:Sidecar)反序列成对象然后append到对应Pod (如:Container)上,
  • 再把修改后的Pod重新解析成yaml 内容返回给k8s的api server,
  • k8s 拿着修改后内容再将这两个容器调度到同一台机器进行部署

https://www.cnblogs.com/haoyunlaile/p/12960441.html

0 人点赞