负载均衡
负载均衡(Load Balancer,简称 LB)是指把客户端访问的流量通过负载均衡器,然后根据指定的一些负载均衡策略进行转发,最终可以均匀的分摊到后端上游服务器上,然后上游服务器进行响应后再返回数据给客户端。负载均衡的最常见应用是充当反向代理,通过负载均衡,可以大大的提高服务的响应速度、提高并发请求、提高稳定性(防止单点故障)。 负载均衡的基本实现方案,从业界来看,一般分为软件和硬件两大类,软件负载均衡又可以分层如4层、7层负载均衡,如下:
- 硬件负载均衡 如 F5,性能好,但是贵。一般的互联网公司都没有采集硬件负载均衡
- 软件负载均衡 4 层:典型的如 LVS 7 层:典型的如 Nginx、HAProxy
目前这两个都可以实现 4 层,但是更多的还是使用 Nginx 的 7 层功能。
容器化
在物理机时代,还没有容器化之前,典型的负载均衡的建设方案就是搭建一套 Nginx 集群,提供 7 层的代理;搭建一套 LVS 集群,提供 4 层代理方案。并且同时,一般 7 层之上,都有一个 4 层代理,流量的基本流向就是 client -> LVS(4 层) -> Nginx(7层) -> server
。
在物理机这个时代,运维人员对 Nginx 的 upstream 的配置,基本都是手动添加修改各个 server,然后推送配置上线应用。传统的物理机时代的维护方式,是基于后端 server 的 IP 基本是固定的,比如,你上线一个 WebServer 的服务,要部署到哪些机器上,这个是事先确定好的了,IP 会固定不变,不管你怎么升级,服务都还是固定在这些机器上,因此这个时代这样的维护方式,并没有太多问题,大家以往也都维护的挺和谐。
在容器化时代,基于 Kubernetes 的容器化平台下,LB 的建设有哪些差异呢?主要分为两大块:
- 后端服务的 IP,会由于集群的调度,IP 是可变的,每当你部署、升级等操作的时候,IP 都会改变,那么这个时候,我们显然不能够再继续采用原有写死 IP 的方式来进行 7 层代理的维护了。由于服务 IP 的不确定性,我们必须要改变姿势,不能由人为填充 Nginx 的 upstream 的 server ip 的方式,只能通过动态的获取和变更,这个就需要 LB 能够主动发现后端服务并且动态更新
- Kubernetes 的容器化平台下,集群内部的网络是虚拟的,虚拟网络的 IP 在集群外部是无法访问的,因此还需要解决好容器集群内外的网络互通问题。
K8s 负载均衡
Kubernetes 本身有内置一个集群内部的负载均衡方案,叫 kube-proxy,但是这个只能内部访问,并且功能稍显不足;而实际上,我们的容器平台,必须要提供集群外部访问的功能,因为你的用户(客户端)都是在集群外部。
Kubernetes 负载均衡相关的方案,包括:
- 集群内部负载均衡【内置】 Pod IP 在集群内部都是互通的,因此集群内部无需考虑网络互通问题 每个 Node 节点上的 kube-proxy,就是集群内置的内部负载均衡的解决方案;但是只限于集群内部,并且功能有限
- 集群外部负载均衡【额外添加】 社区提供的 nginx-ingress-controller 方案可以满足需求 云厂商的 Cloud provider 也可以满足需求 参考 nginx-ingress-controller 的模式,自建 LB 方案
Nginx-Controller
简单来说,Nginx-Controller 就是来动态发现 Pod,然后渲染为 nginx 的 upstream;Nginx-Controller 就是一个 Nginx 再加上一个 Controller(发现 Pod 并渲染为 upstream)。
因此,一般的架构方案就是;client -> CDN -> LVS -> Nginx-Ingress-Controller -> Pod
下面来说下nginx-ingress的原理
初始化
代码语言:javascript复制// NewNGINXController creates a new NGINX Ingress controller.
func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
Interface: config.Client.CoreV1().Events(config.Namespace),
})
h, err := dns.GetSystemNameServers()
if err != nil {
klog.Warningf("Error reading system nameservers: %v", err)
}
n := &NGINXController{
isIPV6Enabled: ing_net.IsIPv6Enabled(),
resolver: h,
cfg: config,
syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
Component: "nginx-ingress-controller",
}),
stopCh: make(chan struct{}),
updateCh: channels.NewRingChannel(1024),
ngxErrCh: make(chan error),
stopLock: &sync.Mutex{},
runningConfig: new(ingress.Configuration),
Proxy: &tcpproxy.TCPProxy{},
metricCollector: mc,
command: NewNginxCommand(),
}
...
n.syncQueue = task.NewTaskQueue(n.syncIngress)
if config.UpdateStatus {
n.syncStatus = status.NewStatusSyncer(status.Config{
Client: config.Client,
PublishService: config.PublishService,
PublishStatusAddress: config.PublishStatusAddress,
IngressLister: n.store,
UpdateStatusOnShutdown: config.UpdateStatusOnShutdown,
UseNodeInternalIP: config.UseNodeInternalIP,
})
} else {
klog.Warning("Update of Ingress status is disabled (flag --update-status)")
}
onTemplateChange := func() {
template, err := ngx_template.NewTemplate(nginx.TemplatePath)
if err != nil {
// this error is different from the rest because it must be clear why nginx is not working
klog.ErrorS(err, "Error loading new template")
return
}
n.t = template
klog.InfoS("New NGINX configuration template loaded")
n.syncQueue.EnqueueTask(task.GetDummyObject("template-change"))
}
ngxTpl, err := ngx_template.NewTemplate(nginx.TemplatePath)
if err != nil {
klog.Fatalf("Invalid NGINX configuration template: %v", err)
}
n.t = ngxTpl
_, err = file.NewFileWatcher(nginx.TemplatePath, onTemplateChange)
if err != nil {
klog.Fatalf("Error creating file watcher for %v: %v", nginx.TemplatePath, err)
}
filesToWatch := []string{}
err = filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
filesToWatch = append(filesToWatch, path)
return nil
})
if err != nil {
klog.Fatalf("Error creating file watchers: %v", err)
}
for _, f := range filesToWatch {
_, err = file.NewFileWatcher(f, func() {
klog.InfoS("File changed detected. Reloading NGINX", "path", f)
n.syncQueue.EnqueueTask(task.GetDummyObject("file-change"))
})
if err != nil {
klog.Fatalf("Error creating file watcher for %v: %v", f, err)
}
}
return n
}
关键逻辑如下:
- 初始化任务队列,n.syncIngress为任务Handler
- 监控nginx.tmpl模板文件的变化
- 创建新任务用来重新生成nginx.conf
启动
代码语言:javascript复制// Start starts a new NGINX master process running in the foreground.
func (n *NGINXController) Start() {
klog.InfoS("Starting NGINX Ingress controller")
n.store.Run(n.stopCh)
...
cmd := n.command.ExecCommand()
// put NGINX in another process group to prevent it
// to receive signals meant for the controller
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
Pgid: 0,
}
if n.cfg.EnableSSLPassthrough {
n.setupSSLProxy()
}
klog.InfoS("Starting NGINX process")
n.start(cmd)
go n.syncQueue.Run(time.Second, n.stopCh)
// force initial sync
n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync"))
// In case of error the temporal configuration file will
// be available up to five minutes after the error
go func() {
for {
time.Sleep(5 * time.Minute)
err := cleanTempNginxCfg()
if err != nil {
klog.ErrorS(err, "Unexpected error removing temporal configuration files")
}
}
}()
...
for {
select {
case err := <-n.ngxErrCh:
if n.isShuttingDown {
return
}
// if the nginx master process dies, the workers continue to process requests
// until the failure of the configured livenessProbe and restart of the pod.
if process.IsRespawnIfRequired(err) {
return
}
case event := <-n.updateCh.Out():
if n.isShuttingDown {
break
}
if evt, ok := event.(store.Event); ok {
klog.V(3).InfoS("Event received", "type", evt.Type, "object", evt.Obj)
if evt.Type == store.ConfigurationEvent {
// TODO: is this necessary? Consider removing this special case
n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
continue
}
n.syncQueue.EnqueueSkippableTask(evt.Obj)
} else {
klog.Warningf("Unexpected event type received %T", event)
}
case <-n.stopCh:
return
}
}
}
大概逻辑就是:
- 通过Informer机制同步并监测k8s资源的变化
- 给nginx进程设置单独的pgid,防止收到发给controller进程的SIGTERM信号而退出。
- 启动nginx进程
- 启动任务队列:用来处理List/Watch后的资源
- 初次启动,需要触发一起配置文件同步
- 发生错误时,临时文件暂存5分种
资源监控
“Store”负责缓存List/Watch的资源对象,因为Ingress控制器需要List/Watch Ingress、configmap、Services 等很多资源,这里把”store”相关的操作抽象成了一个接口。
代码语言:javascript复制func New(
namespace string,
namespaceSelector labels.Selector,
configmap, tcp, udp, defaultSSLCertificate string,
resyncPeriod time.Duration,
client clientset.Interface,
updateCh *channels.RingChannel,
disableCatchAll bool,
deepInspector bool,
icConfig *ingressclass.IngressClassConfiguration) Storer {
store := &k8sStore{
informers: &Informer{},
listers: &Lister{},
sslStore: NewSSLCertTracker(),
updateCh: updateCh,
backendConfig: ngx_config.NewDefault(),
syncSecretMu: &sync.Mutex{},
backendConfigMu: &sync.RWMutex{},
secretIngressMap: NewObjectRefMap(),
defaultSSLCertificate: defaultSSLCertificate,
}
...
store.informers.Ingress.AddEventHandler(ingEventHandler)
if !icConfig.IgnoreIngressClass {
store.informers.IngressClass.AddEventHandler(ingressClassEventHandler)
}
store.informers.EndpointSlice.AddEventHandler(epsEventHandler)
store.informers.Secret.AddEventHandler(secrEventHandler)
store.informers.ConfigMap.AddEventHandler(cmEventHandler)
store.informers.Service.AddEventHandler(serviceHandler)
}
从函数中我们可以获取两个重要信息:
- 控制器监听了Ingress、IngressClass、Endpoint、Secret、ConfigMap、Service 六种资源。
- 定义各个资源变化的处理函数:这些处理函数实际就是解析各个资源信息,然后更新nginx配置信息
- 监听到资源变化后生成Event并通过updateCh通道发送出去。
配置更新
在最开始我们提到n.syncQueue = task.NewTaskQueue(n.syncIngress)
,下面来具体看看实现:
// syncIngress collects all the pieces required to assemble the NGINX
// configuration file and passes the resulting data structures to the backend
// (OnUpdate) when a reload is deemed necessary.
func (n *NGINXController) syncIngress(interface{}) error {
n.syncRateLimiter.Accept()
if n.syncQueue.IsShuttingDown() {
return nil
}
ings := n.store.ListIngresses()
hosts, servers, pcfg := n.getConfiguration(ings)
n.metricCollector.SetSSLExpireTime(servers)
n.metricCollector.SetSSLInfo(servers)
if n.runningConfig.Equal(pcfg) {
klog.V(3).Infof("No configuration change detected, skipping backend reload")
return nil
}
n.metricCollector.SetHosts(hosts)
if !utilingress.IsDynamicConfigurationEnough(pcfg, n.runningConfig) {
klog.InfoS("Configuration changes detected, backend reload required")
hash, _ := hashstructure.Hash(pcfg, &hashstructure.HashOptions{
TagName: "json",
})
pcfg.ConfigurationChecksum = fmt.Sprintf("%v", hash)
err := n.OnUpdate(*pcfg)
if err != nil {
n.metricCollector.IncReloadErrorCount()
n.metricCollector.ConfigSuccess(hash, false)
klog.Errorf("Unexpected failure reloading the backend:n%v", err)
n.recorder.Eventf(k8s.IngressPodDetails, apiv1.EventTypeWarning, "RELOAD", fmt.Sprintf("Error reloading NGINX: %v", err))
return err
}
klog.InfoS("Backend successfully reloaded")
n.metricCollector.ConfigSuccess(hash, true)
n.metricCollector.IncReloadCount()
n.recorder.Eventf(k8s.IngressPodDetails, apiv1.EventTypeNormal, "RELOAD", "NGINX reload triggered due to a change in configuration")
}
isFirstSync := n.runningConfig.Equal(&ingress.Configuration{})
if isFirstSync {
// For the initial sync it always takes some time for NGINX to start listening
// For large configurations it might take a while so we loop and back off
klog.InfoS("Initial sync, sleeping for 1 second")
time.Sleep(1 * time.Second)
}
retry := wait.Backoff{
Steps: 1 n.cfg.DynamicConfigurationRetries,
Duration: time.Second,
Factor: 1.3,
Jitter: 0.1,
}
retriesRemaining := retry.Steps
err := wait.ExponentialBackoff(retry, func() (bool, error) {
err := n.configureDynamically(pcfg)
if err == nil {
klog.V(2).Infof("Dynamic reconfiguration succeeded.")
return true, nil
}
retriesRemaining--
if retriesRemaining > 0 {
klog.Warningf("Dynamic reconfiguration failed (retrying; %d retries left): %v", retriesRemaining, err)
return false, nil
}
klog.Warningf("Dynamic reconfiguration failed: %v", err)
return false, err
})
if err != nil {
klog.Errorf("Unexpected failure reconfiguring NGINX:n%v", err)
return err
}
ri := utilingress.GetRemovedIngresses(n.runningConfig, pcfg)
re := utilingress.GetRemovedHosts(n.runningConfig, pcfg)
rc := utilingress.GetRemovedCertificateSerialNumbers(n.runningConfig, pcfg)
n.metricCollector.RemoveMetrics(ri, re, rc)
n.runningConfig = pcfg
return nil
}
大概逻辑如下:
- 从缓存中获取Ingresses
- 解析ingress
- 同当前运行的配置作比对判断配置是否有改变
- 有改变则进行如下动作
- 判断是否能够通过lua动态更新
- 不能通过lua动态更新,则生成新的nginx.conf并reload
- 通过lua动态更新配置
为什么需要动态更新呢? 因为在一个kubernetes集群里service的endpoints可能会频率变动,如果都静态写进nginx.conf配置文件就意味着每次endpoints变动都要nginx -s reload 重新加载配置文件,在高并发下可能会导致某些请求时延变长,所以就需要借助lua-nginx-module使用lua来动态更新upstream上游节点。