Nginx Ingress解析

2023-08-19 09:28:32 浏览数 (1)

负载均衡

负载均衡(Load Balancer,简称 LB)是指把客户端访问的流量通过负载均衡器,然后根据指定的一些负载均衡策略进行转发,最终可以均匀的分摊到后端上游服务器上,然后上游服务器进行响应后再返回数据给客户端。负载均衡的最常见应用是充当反向代理,通过负载均衡,可以大大的提高服务的响应速度、提高并发请求、提高稳定性(防止单点故障)。 负载均衡的基本实现方案,从业界来看,一般分为软件和硬件两大类,软件负载均衡又可以分层如4层、7层负载均衡,如下:

  • 硬件负载均衡 如 F5,性能好,但是贵。一般的互联网公司都没有采集硬件负载均衡
  • 软件负载均衡 4 层:典型的如 LVS 7 层:典型的如 Nginx、HAProxy

目前这两个都可以实现 4 层,但是更多的还是使用 Nginx 的 7 层功能。

容器化

在物理机时代,还没有容器化之前,典型的负载均衡的建设方案就是搭建一套 Nginx 集群,提供 7 层的代理;搭建一套 LVS 集群,提供 4 层代理方案。并且同时,一般 7 层之上,都有一个 4 层代理,流量的基本流向就是 client -> LVS(4 层) -> Nginx(7层) -> server  。 在物理机这个时代,运维人员对 Nginx 的 upstream 的配置,基本都是手动添加修改各个 server,然后推送配置上线应用。传统的物理机时代的维护方式,是基于后端 server 的 IP 基本是固定的,比如,你上线一个 WebServer 的服务,要部署到哪些机器上,这个是事先确定好的了,IP 会固定不变,不管你怎么升级,服务都还是固定在这些机器上,因此这个时代这样的维护方式,并没有太多问题,大家以往也都维护的挺和谐。

在容器化时代,基于 Kubernetes 的容器化平台下,LB 的建设有哪些差异呢?主要分为两大块:

  • 后端服务的 IP,会由于集群的调度,IP 是可变的,每当你部署、升级等操作的时候,IP 都会改变,那么这个时候,我们显然不能够再继续采用原有写死 IP 的方式来进行 7 层代理的维护了。由于服务 IP 的不确定性,我们必须要改变姿势,不能由人为填充 Nginx 的 upstream 的 server ip 的方式,只能通过动态的获取和变更,这个就需要 LB 能够主动发现后端服务并且动态更新
  • Kubernetes 的容器化平台下,集群内部的网络是虚拟的,虚拟网络的 IP 在集群外部是无法访问的,因此还需要解决好容器集群内外的网络互通问题。

K8s 负载均衡

Kubernetes 本身有内置一个集群内部的负载均衡方案,叫 kube-proxy,但是这个只能内部访问,并且功能稍显不足;而实际上,我们的容器平台,必须要提供集群外部访问的功能,因为你的用户(客户端)都是在集群外部。

Kubernetes 负载均衡相关的方案,包括:

  • 集群内部负载均衡【内置】 Pod IP 在集群内部都是互通的,因此集群内部无需考虑网络互通问题 每个 Node 节点上的 kube-proxy,就是集群内置的内部负载均衡的解决方案;但是只限于集群内部,并且功能有限
  • 集群外部负载均衡【额外添加】 社区提供的 nginx-ingress-controller 方案可以满足需求 云厂商的 Cloud provider 也可以满足需求 参考 nginx-ingress-controller 的模式,自建 LB 方案

Nginx-Controller

简单来说,Nginx-Controller 就是来动态发现 Pod,然后渲染为 nginx 的 upstream;Nginx-Controller 就是一个 Nginx 再加上一个 Controller(发现 Pod 并渲染为 upstream)。

因此,一般的架构方案就是;client -> CDN -> LVS -> Nginx-Ingress-Controller -> Pod

下面来说下nginx-ingress的原理

初始化

代码语言:javascript复制
// NewNGINXController creates a new NGINX Ingress controller.
func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXController {
   eventBroadcaster := record.NewBroadcaster()
   eventBroadcaster.StartLogging(klog.Infof)
   eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
      Interface: config.Client.CoreV1().Events(config.Namespace),
   })

   h, err := dns.GetSystemNameServers()
   if err != nil {
      klog.Warningf("Error reading system nameservers: %v", err)
   }

   n := &NGINXController{
      isIPV6Enabled: ing_net.IsIPv6Enabled(),

      resolver: h,
      cfg: config,
      syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1),

      recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
         Component: "nginx-ingress-controller",
      }),

      stopCh: make(chan struct{}),
      updateCh: channels.NewRingChannel(1024),

      ngxErrCh: make(chan error),

      stopLock: &sync.Mutex{},

      runningConfig: new(ingress.Configuration),

      Proxy: &tcpproxy.TCPProxy{},

      metricCollector: mc,

      command: NewNginxCommand(),
   }

   ... 
   
   n.syncQueue = task.NewTaskQueue(n.syncIngress)

   if config.UpdateStatus {
      n.syncStatus = status.NewStatusSyncer(status.Config{
         Client: config.Client,
         PublishService: config.PublishService,
         PublishStatusAddress: config.PublishStatusAddress,
         IngressLister: n.store,
         UpdateStatusOnShutdown: config.UpdateStatusOnShutdown,
         UseNodeInternalIP: config.UseNodeInternalIP,
      })
   } else {
      klog.Warning("Update of Ingress status is disabled (flag --update-status)")
   }

   onTemplateChange := func() {
      template, err := ngx_template.NewTemplate(nginx.TemplatePath)
      if err != nil {
         // this error is different from the rest because it must be clear why nginx is not working
         klog.ErrorS(err, "Error loading new template")
         return
      }

      n.t = template
      klog.InfoS("New NGINX configuration template loaded")
      n.syncQueue.EnqueueTask(task.GetDummyObject("template-change"))
   }

   ngxTpl, err := ngx_template.NewTemplate(nginx.TemplatePath)
   if err != nil {
      klog.Fatalf("Invalid NGINX configuration template: %v", err)
   }

   n.t = ngxTpl

   _, err = file.NewFileWatcher(nginx.TemplatePath, onTemplateChange)
   if err != nil {
      klog.Fatalf("Error creating file watcher for %v: %v", nginx.TemplatePath, err)
   }

   filesToWatch := []string{}
   err = filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error {
      if err != nil {
         return err
      }

      if info.IsDir() {
         return nil
      }

      filesToWatch = append(filesToWatch, path)
      return nil
   })

   if err != nil {
      klog.Fatalf("Error creating file watchers: %v", err)
   }

   for _, f := range filesToWatch {
      _, err = file.NewFileWatcher(f, func() {
         klog.InfoS("File changed detected. Reloading NGINX", "path", f)
         n.syncQueue.EnqueueTask(task.GetDummyObject("file-change"))
      })
      if err != nil {
         klog.Fatalf("Error creating file watcher for %v: %v", f, err)
      }
   }

   return n
}

关键逻辑如下:

  • 初始化任务队列,n.syncIngress为任务Handler
  • 监控nginx.tmpl模板文件的变化
  • 创建新任务用来重新生成nginx.conf

启动

代码语言:javascript复制
// Start starts a new NGINX master process running in the foreground.
func (n *NGINXController) Start() {
   klog.InfoS("Starting NGINX Ingress controller")

   n.store.Run(n.stopCh)

   ...

   cmd := n.command.ExecCommand()

   // put NGINX in another process group to prevent it
   // to receive signals meant for the controller
   cmd.SysProcAttr = &syscall.SysProcAttr{
      Setpgid: true,
      Pgid: 0,
   }

   if n.cfg.EnableSSLPassthrough {
      n.setupSSLProxy()
   }

   klog.InfoS("Starting NGINX process")
   n.start(cmd)

   go n.syncQueue.Run(time.Second, n.stopCh)
   // force initial sync
   n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync"))

   // In case of error the temporal configuration file will
   // be available up to five minutes after the error
   go func() {
      for {
         time.Sleep(5 * time.Minute)
         err := cleanTempNginxCfg()
         if err != nil {
            klog.ErrorS(err, "Unexpected error removing temporal configuration files")
         }
      }
   }()

  ...

   for {
      select {
      case err := <-n.ngxErrCh:
         if n.isShuttingDown {
            return
         }

         // if the nginx master process dies, the workers continue to process requests
         // until the failure of the configured livenessProbe and restart of the pod.
         if process.IsRespawnIfRequired(err) {
            return
         }

      case event := <-n.updateCh.Out():
         if n.isShuttingDown {
            break
         }

         if evt, ok := event.(store.Event); ok {
            klog.V(3).InfoS("Event received", "type", evt.Type, "object", evt.Obj)
            if evt.Type == store.ConfigurationEvent {
               // TODO: is this necessary? Consider removing this special case
               n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
               continue
            }

            n.syncQueue.EnqueueSkippableTask(evt.Obj)
         } else {
            klog.Warningf("Unexpected event type received %T", event)
         }
      case <-n.stopCh:
         return
      }
   }
}

大概逻辑就是:

  • 通过Informer机制同步并监测k8s资源的变化
  • 给nginx进程设置单独的pgid,防止收到发给controller进程的SIGTERM信号而退出。
  • 启动nginx进程
  • 启动任务队列:用来处理List/Watch后的资源
  • 初次启动,需要触发一起配置文件同步
  • 发生错误时,临时文件暂存5分种

资源监控

“Store”负责缓存List/Watch的资源对象,因为Ingress控制器需要List/Watch Ingress、configmap、Services 等很多资源,这里把”store”相关的操作抽象成了一个接口。

代码语言:javascript复制
func New(
   namespace string,
   namespaceSelector labels.Selector,
   configmap, tcp, udp, defaultSSLCertificate string,
   resyncPeriod time.Duration,
   client clientset.Interface,
   updateCh *channels.RingChannel,
   disableCatchAll bool,
   deepInspector bool,
   icConfig *ingressclass.IngressClassConfiguration) Storer {

   store := &amp;k8sStore{
      informers: &amp;Informer{},
      listers: &amp;Lister{},
      sslStore: NewSSLCertTracker(),
      updateCh: updateCh,
      backendConfig: ngx_config.NewDefault(),
      syncSecretMu: &amp;sync.Mutex{},
      backendConfigMu: &amp;sync.RWMutex{},
      secretIngressMap: NewObjectRefMap(),
      defaultSSLCertificate: defaultSSLCertificate,
   }
   ...
   store.informers.Ingress.AddEventHandler(ingEventHandler)
  if !icConfig.IgnoreIngressClass {
    store.informers.IngressClass.AddEventHandler(ingressClassEventHandler)
  }
  store.informers.EndpointSlice.AddEventHandler(epsEventHandler)
  store.informers.Secret.AddEventHandler(secrEventHandler)
  store.informers.ConfigMap.AddEventHandler(cmEventHandler)
  store.informers.Service.AddEventHandler(serviceHandler)
}

从函数中我们可以获取两个重要信息:

  • 控制器监听了Ingress、IngressClass、Endpoint、Secret、ConfigMap、Service 六种资源。
  • 定义各个资源变化的处理函数:这些处理函数实际就是解析各个资源信息,然后更新nginx配置信息
  • 监听到资源变化后生成Event并通过updateCh通道发送出去。

配置更新

在最开始我们提到n.syncQueue = task.NewTaskQueue(n.syncIngress),下面来具体看看实现:

代码语言:javascript复制
// syncIngress collects all the pieces required to assemble the NGINX
// configuration file and passes the resulting data structures to the backend
// (OnUpdate) when a reload is deemed necessary.
func (n *NGINXController) syncIngress(interface{}) error {
   n.syncRateLimiter.Accept()

   if n.syncQueue.IsShuttingDown() {
      return nil
   }

   ings := n.store.ListIngresses()
   hosts, servers, pcfg := n.getConfiguration(ings)

   n.metricCollector.SetSSLExpireTime(servers)
   n.metricCollector.SetSSLInfo(servers)

   if n.runningConfig.Equal(pcfg) {
      klog.V(3).Infof("No configuration change detected, skipping backend reload")
      return nil
   }

   n.metricCollector.SetHosts(hosts)

   if !utilingress.IsDynamicConfigurationEnough(pcfg, n.runningConfig) {
      klog.InfoS("Configuration changes detected, backend reload required")

      hash, _ := hashstructure.Hash(pcfg, &amp;hashstructure.HashOptions{
         TagName: "json",
      })

      pcfg.ConfigurationChecksum = fmt.Sprintf("%v", hash)

      err := n.OnUpdate(*pcfg)
      if err != nil {
         n.metricCollector.IncReloadErrorCount()
         n.metricCollector.ConfigSuccess(hash, false)
         klog.Errorf("Unexpected failure reloading the backend:n%v", err)
         n.recorder.Eventf(k8s.IngressPodDetails, apiv1.EventTypeWarning, "RELOAD", fmt.Sprintf("Error reloading NGINX: %v", err))
         return err
      }

      klog.InfoS("Backend successfully reloaded")
      n.metricCollector.ConfigSuccess(hash, true)
      n.metricCollector.IncReloadCount()

      n.recorder.Eventf(k8s.IngressPodDetails, apiv1.EventTypeNormal, "RELOAD", "NGINX reload triggered due to a change in configuration")
   }

   isFirstSync := n.runningConfig.Equal(&amp;ingress.Configuration{})
   if isFirstSync {
      // For the initial sync it always takes some time for NGINX to start listening
      // For large configurations it might take a while so we loop and back off
      klog.InfoS("Initial sync, sleeping for 1 second")
      time.Sleep(1 * time.Second)
   }

   retry := wait.Backoff{
      Steps: 1   n.cfg.DynamicConfigurationRetries,
      Duration: time.Second,
      Factor: 1.3,
      Jitter: 0.1,
   }

   retriesRemaining := retry.Steps
   err := wait.ExponentialBackoff(retry, func() (bool, error) {
      err := n.configureDynamically(pcfg)
      if err == nil {
         klog.V(2).Infof("Dynamic reconfiguration succeeded.")
         return true, nil
      }
      retriesRemaining--
      if retriesRemaining > 0 {
         klog.Warningf("Dynamic reconfiguration failed (retrying; %d retries left): %v", retriesRemaining, err)
         return false, nil
      }
      klog.Warningf("Dynamic reconfiguration failed: %v", err)
      return false, err
   })
   if err != nil {
      klog.Errorf("Unexpected failure reconfiguring NGINX:n%v", err)
      return err
   }

   ri := utilingress.GetRemovedIngresses(n.runningConfig, pcfg)
   re := utilingress.GetRemovedHosts(n.runningConfig, pcfg)
   rc := utilingress.GetRemovedCertificateSerialNumbers(n.runningConfig, pcfg)
   n.metricCollector.RemoveMetrics(ri, re, rc)

   n.runningConfig = pcfg

   return nil
}

大概逻辑如下:

  • 从缓存中获取Ingresses
  • 解析ingress
  • 同当前运行的配置作比对判断配置是否有改变
  • 有改变则进行如下动作
  • 判断是否能够通过lua动态更新
  • 不能通过lua动态更新,则生成新的nginx.conf并reload
  • 通过lua动态更新配置

为什么需要动态更新呢? 因为在一个kubernetes集群里service的endpoints可能会频率变动,如果都静态写进nginx.conf配置文件就意味着每次endpoints变动都要nginx -s reload 重新加载配置文件,在高并发下可能会导致某些请求时延变长,所以就需要借助lua-nginx-module使用lua来动态更新upstream上游节点。

0 人点赞