需求背景
运维团队希望管控部署在k8s集群里对外暴露的服务,开发团队无需关心服务如何暴露给用户。
红色部分
开发团队创建应用的manifests,开发团队可以为Service:资源添加annotation
为ingress/http:true
来决定终端用户是否可以访问到该服务,默认不能访问到。至于具体如何让用户可以访问到服务,开发团队不需要关心。
绿色部分
custom controller需要监听Service资源,当Service.发生变化时
- 如果新增Service时
- 包含指定annotation创建Ingress资源对象
- 不包含指定annttation,忽略
- 如果删除Servicel时删除Ingress资源对象
- 如果更新Servicel时
- 包含指定annotation,检查Ingress资源对象是否存在,不存在则创建,存在则忽略
- 不包含指定annotation,检查 Ingress资源对象是否存在,存在则删除,不存在则忽略
蓝色部分
Ingress Controller使用nginx ingress Controller,类似集群网关(ingress controller 也有 apisix实现),根据 Ingress为我们更新nginx的配置,最后,终端用户便可以通过Ingress Controlleri的地址访问到开发团队指定的服务。
实现
主要步骤
- 创建clientset,用于操作资源
- 创建serviceInformer、ingressInformer
- 通过informer里的indexer获取资源
- 添加事件处理
- 将事件对应的资源对象的key放进workqueue
- 创建work gorounite,从workqueue消费事件
- 创建或删除对应的ingress
拿创建service举例
- 通过serviceInformer注册的新增事件处理,得到创建的service
- 将service的key放进workqueue
- work gorounite从workqueue里获取到key
- 通过serviceInformer的indexer获取到service
- 判断是否具有
ingress/http:true
的annotation
,有的话,查询对应的ingress是否存在,不存在就创建ingress
代码
main.go
代码语言:javascript复制func main() {
// config
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
clusterConfig, err := rest.InClusterConfig()
if err != nil {
panic(err)
}
config = clusterConfig
}
// client
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
// factory
factory := informers.NewSharedInformerFactory(clientset, 0)
// informer
serviceInformer := factory.Core().V1().Services()
ingressInformer := factory.Networking().V1().Ingresses()
// addEvent
controller := NewController(clientset, serviceInformer, ingressInformer)
// start
stopCh := make(chan struct{})
factory.Start(stopCh)
factory.WaitForCacheSync(stopCh)
controller.Run(stopCh)
}
controller.go
代码语言:javascript复制const (
workNum = 5
maxRetry = 10
)
type Controller struct {
// 从indexer里获取数据
serviceLister v13.ServiceLister
ingressLister v14.IngressLister
client *kubernetes.Clientset
queue workqueue.RateLimitingInterface
}
func NewController(client *kubernetes.Clientset, serviceInformer v1.ServiceInformer, ingressInformer v12.IngressInformer) *Controller {
c := &Controller{
client: client,
serviceLister: serviceInformer.Lister(),
ingressLister: ingressInformer.Lister(),
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{
Name: "ingress",
}),
}
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: c.addService, UpdateFunc: c.updateService})
// ingress被删除时,只要service还存在,就会重新创建ingress
ingressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{DeleteFunc: c.deleteIngress})
return c
}
func (c Controller) addService(obj interface{}) {
fmt.Println("add service")
c.enqueue(obj)
}
func (c Controller) updateService(oldObj interface{}, newObj interface{}) {
fmt.Println("update service")
if reflect.DeepEqual(oldObj, newObj) {
return
}
c.enqueue(newObj)
}
func (c Controller) deleteIngress(obj interface{}) {
fmt.Println("delete ingress")
ig := obj.(*v15.Ingress)
_, ok := ig.Annotations["ingress/http"]
if !ok {
return
}
// 重新创建ingress
c.enqueue(obj)
}
func (c Controller) enqueue(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}
c.queue.Add(key)
}
func (c Controller) Run(ch chan struct{}) {
fmt.Println("controller run")
for i := 0; i < workNum; i {
go c.work()
}
<-ch
}
func (c Controller) work() {
for c.processNextItem() {
}
}
func (c Controller) processNextItem() bool {
item, shutdown := c.queue.Get()
if shutdown {
return false
}
defer c.queue.Done(item)
err := c.syncService(item.(string))
if err != nil {
// 异常情况进行重试
c.handleErr(item.(string), err)
}
return true
}
func (c Controller) handleErr(key string, err error) {
if c.queue.NumRequeues(key) <= maxRetry {
c.queue.Add(key)
return
}
c.queue.Forget(key)
runtime.HandleError(err)
}
func (c Controller) syncService(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
ingressExist := false
_, err = c.ingressLister.Ingresses(namespace).Get(name)
if err != nil {
if !errors.IsNotFound(err) {
return err
}
} else {
ingressExist = true
}
serviceExist := false
service, err := c.serviceLister.Services(namespace).Get(name)
if err != nil {
if !errors.IsNotFound(err) {
return err
}
} else {
serviceExist = true
}
annotationExist := false
if serviceExist {
if _, ok := service.Annotations["ingress/http"]; ok {
annotationExist = true
}
}
if annotationExist && !ingressExist {
if err := c.createIngress(service); err != nil {
return err
}
} else if !annotationExist && ingressExist {
if err := c.client.NetworkingV1().Ingresses(namespace).Delete(context.TODO(), name, v16.DeleteOptions{}); err != nil {
return err
}
}
return nil
}
func (c Controller) createIngress(service *v17.Service) error {
ingress := &v15.Ingress{}
ingress.Name = service.Name
ingress.Namespace = service.Namespace
ingress.Annotations = map[string]string{
"ingress/http": "true",
}
// 删除service,ingress也会被删除
ingress.ObjectMeta.OwnerReferences = []v16.OwnerReference{
*v16.NewControllerRef(service, v16.SchemeGroupVersion.WithKind("Service")),
}
pathType := v15.PathTypePrefix
ingressClassName := "nginx"
ingress.Spec = v15.IngressSpec{
Rules: []v15.IngressRule{
{
Host: "test.com",
IngressRuleValue: v15.IngressRuleValue{
HTTP: &v15.HTTPIngressRuleValue{
Paths: []v15.HTTPIngressPath{
{
Path: "/",
PathType: &pathType,
Backend: v15.IngressBackend{
Service: &v15.IngressServiceBackend{
Name: service.Name,
Port: v15.ServiceBackendPort{Number: 80},
},
},
},
},
},
},
},
},
IngressClassName: &ingressClassName,
}
_, err := c.client.NetworkingV1().Ingresses(service.Namespace).Create(context.TODO(), ingress, v16.CreateOptions{})
return err
}
Post Views: 8