通过Operator自动暴露集群内部服务

2023-05-10 09:19:15 浏览数 (1)

需求背景

运维团队希望管控部署在k8s集群里对外暴露的服务,开发团队无需关心服务如何暴露给用户。

红色部分

开发团队创建应用的manifests,开发团队可以为Service:资源添加annotationingress/http:true来决定终端用户是否可以访问到该服务,默认不能访问到。至于具体如何让用户可以访问到服务,开发团队不需要关心。

绿色部分

custom controller需要监听Service资源,当Service.发生变化时

  • 如果新增Service时
    • 包含指定annotation创建Ingress资源对象
    • 不包含指定annttation,忽略
  • 如果删除Servicel时删除Ingress资源对象
  • 如果更新Servicel时
    • 包含指定annotation,检查Ingress资源对象是否存在,不存在则创建,存在则忽略
    • 不包含指定annotation,检查 Ingress资源对象是否存在,存在则删除,不存在则忽略

蓝色部分

Ingress Controller使用nginx ingress Controller,类似集群网关(ingress controller 也有 apisix实现),根据 Ingress为我们更新nginx的配置,最后,终端用户便可以通过Ingress Controlleri的地址访问到开发团队指定的服务。

实现

主要步骤

  1. 创建clientset,用于操作资源
  2. 创建serviceInformer、ingressInformer
    1. 通过informer里的indexer获取资源
    2. 添加事件处理
      1. 将事件对应的资源对象的key放进workqueue
  3. 创建work gorounite,从workqueue消费事件
  4. 创建或删除对应的ingress
拿创建service举例
  1. 通过serviceInformer注册的新增事件处理,得到创建的service
  2. 将service的key放进workqueue
  3. work gorounite从workqueue里获取到key
  4. 通过serviceInformer的indexer获取到service
  5. 判断是否具有ingress/http:trueannotation,有的话,查询对应的ingress是否存在,不存在就创建ingress

代码

main.go
代码语言:javascript复制
func main() {
    // config
    config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
    if err != nil {
        clusterConfig, err := rest.InClusterConfig()
        if err != nil {
            panic(err)
        }
        config = clusterConfig
    }

    // client
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err)
    }

    // factory
    factory := informers.NewSharedInformerFactory(clientset, 0)

    // informer
    serviceInformer := factory.Core().V1().Services()
    ingressInformer := factory.Networking().V1().Ingresses()

    // addEvent
    controller := NewController(clientset, serviceInformer, ingressInformer)

    // start
    stopCh := make(chan struct{})
    factory.Start(stopCh)
    factory.WaitForCacheSync(stopCh)
    controller.Run(stopCh)
}
controller.go
代码语言:javascript复制
const (
    workNum  = 5
    maxRetry = 10
)

type Controller struct {
    // 从indexer里获取数据
    serviceLister v13.ServiceLister
    ingressLister v14.IngressLister
    client        *kubernetes.Clientset
    queue         workqueue.RateLimitingInterface
}

func NewController(client *kubernetes.Clientset, serviceInformer v1.ServiceInformer, ingressInformer v12.IngressInformer) *Controller {
    c := &Controller{
        client:        client,
        serviceLister: serviceInformer.Lister(),
        ingressLister: ingressInformer.Lister(),
        queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{
            Name: "ingress",
        }),
    }
    serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: c.addService, UpdateFunc: c.updateService})
    // ingress被删除时,只要service还存在,就会重新创建ingress
    ingressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{DeleteFunc: c.deleteIngress})
    return c
}

func (c Controller) addService(obj interface{}) {
    fmt.Println("add service")
    c.enqueue(obj)
}

func (c Controller) updateService(oldObj interface{}, newObj interface{}) {
    fmt.Println("update service")
    if reflect.DeepEqual(oldObj, newObj) {
        return
    }
    c.enqueue(newObj)
}

func (c Controller) deleteIngress(obj interface{}) {
    fmt.Println("delete ingress")
    ig := obj.(*v15.Ingress)
    _, ok := ig.Annotations["ingress/http"]
    if !ok {
        return
    }

    // 重新创建ingress
    c.enqueue(obj)
}

func (c Controller) enqueue(obj interface{}) {
    key, err := cache.MetaNamespaceKeyFunc(obj)
    if err != nil {
        runtime.HandleError(err)
        return
    }
    c.queue.Add(key)
}

func (c Controller) Run(ch chan struct{}) {
    fmt.Println("controller run")
    for i := 0; i < workNum; i   {
        go c.work()
    }
    <-ch
}

func (c Controller) work() {
    for c.processNextItem() {
    }
}

func (c Controller) processNextItem() bool {
    item, shutdown := c.queue.Get()
    if shutdown {
        return false
    }

    defer c.queue.Done(item)
    err := c.syncService(item.(string))
    if err != nil {
        // 异常情况进行重试
        c.handleErr(item.(string), err)
    }
    return true
}

func (c Controller) handleErr(key string, err error) {
    if c.queue.NumRequeues(key) <= maxRetry {
        c.queue.Add(key)
        return
    }

    c.queue.Forget(key)
    runtime.HandleError(err)
}

func (c Controller) syncService(key string) error {
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }

    ingressExist := false
    _, err = c.ingressLister.Ingresses(namespace).Get(name)
    if err != nil {
        if !errors.IsNotFound(err) {
            return err
        }
    } else {
        ingressExist = true
    }

    serviceExist := false
    service, err := c.serviceLister.Services(namespace).Get(name)
    if err != nil {
        if !errors.IsNotFound(err) {
            return err
        }
    } else {
        serviceExist = true
    }

    annotationExist := false
    if serviceExist {
        if _, ok := service.Annotations["ingress/http"]; ok {
            annotationExist = true
        }
    }

    if annotationExist && !ingressExist {
        if err := c.createIngress(service); err != nil {
            return err
        }
    } else if !annotationExist && ingressExist {
        if err := c.client.NetworkingV1().Ingresses(namespace).Delete(context.TODO(), name, v16.DeleteOptions{}); err != nil {
            return err
        }
    }

    return nil
}

func (c Controller) createIngress(service *v17.Service) error {
    ingress := &v15.Ingress{}
    ingress.Name = service.Name
    ingress.Namespace = service.Namespace
    ingress.Annotations = map[string]string{
        "ingress/http": "true",
    }

    // 删除service,ingress也会被删除
    ingress.ObjectMeta.OwnerReferences = []v16.OwnerReference{
        *v16.NewControllerRef(service, v16.SchemeGroupVersion.WithKind("Service")),
    }

    pathType := v15.PathTypePrefix
    ingressClassName := "nginx"
    ingress.Spec = v15.IngressSpec{
        Rules: []v15.IngressRule{
            {
                Host: "test.com",
                IngressRuleValue: v15.IngressRuleValue{
                    HTTP: &v15.HTTPIngressRuleValue{
                        Paths: []v15.HTTPIngressPath{
                            {
                                Path:     "/",
                                PathType: &pathType,
                                Backend: v15.IngressBackend{
                                    Service: &v15.IngressServiceBackend{
                                        Name: service.Name,
                                        Port: v15.ServiceBackendPort{Number: 80},
                                    },
                                },
                            },
                        },
                    },
                },
            },
        },
        IngressClassName: &ingressClassName,
    }

    _, err := c.client.NetworkingV1().Ingresses(service.Namespace).Create(context.TODO(), ingress, v16.CreateOptions{})
    return err
}

Post Views: 8

0 人点赞