使用python进行操作k8s api

2023-03-14 16:09:27 浏览数 (1)

代码语言:javascript复制
# !/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2022/5/13 11:42
# @Author  : xxx
# @File    : s.py
# @Description : 这个类封装kube-sdk

import os
import re
import time
import kubernetes
import requests
from  kubernetes.stream import stream
from kubernetes import client, config


from kubernetes.client.api.networking_v1beta1_api import NetworkingV1beta1Api

class Kube(object):
    def __init__(self, source=None,namespace=None):
        # print("开始请求".center(40, "*"), time.ctime())
        # print("初始化kube对象",source)
        self.source = source
        if source == "intranet-dev":
            # print(local_kubeconfig)
            self.config_file = local_kubeconfig
            # self.config_file = "/application/Repair/Repair/libs/local-config"
            config.kube_config.load_kube_config(config_file=self.config_file)

        elif source == "tencent-prod":
            # print(tencent_kubeconfig)
            # self.config_file = "D:pyrepairRepairRepairlibs\ten-config"
            self.config_file = tencent_kubeconfig
            config.kube_config.load_kube_config(config_file=self.config_file)

        elif source == "aliyun-prod":
            # print(aliyun_kubeconfig)
            self.config_file = aliyun_kubeconfig
            config.kube_config.load_kube_config(config_file=self.config_file)

        elif os.name == "nt":
            # print(local_kubeconfig)
            self.config_file = local_kubeconfig
            # self.config_file = "/application/Repair/Repair/libs/config/local-config"
            config.kube_config.load_kube_config(config_file=self.config_file)

        else:
            pass

        # 获取api对象
        self.namespace = namespace
        self.Api_Instance = client.CoreV1Api()
        self.Api_Beatch = client.BatchApi()
        self.Api_Apps = client.AppsV1Api()
        self.k8s_client = client.ApiClient()
        self.Api_Network = NetworkingV1beta1Api()

    # 获取ingress
    def get_ingress(self, flag=False):
        ingress_list = []
        # flag = True 进行单空间查询 反之 为多空间
        if flag:
            time_list_namespace = self.list_namespaces(flag)
            # print(time_list_namespace)

            ingress_obj = self.Api_Network.list_ingress_for_all_namespaces()
            for ingress in ingress_obj.items:
                # if ingress.metadata.namespace == "wuhan":
                for rule in ingress.spec.rules:
                    for path in rule.http.paths:
                        if ingress.metadata.namespace in time_list_namespace:
                            # if ingress.metadata.namespace == "muleizhwl":
                            data = {
                                "namespace": ingress.metadata.namespace,
                                "ingress": ingress.metadata.name,
                                "dns": ingress.spec.rules[0].host,
                                "service": path.backend.service_name,
                                "port": path.backend.service_port,
                                "path": path.path,
                            }
                            ingress_list.append(data)
        else:
            ingress_obj = self.Api_Network.list_ingress_for_all_namespaces()
            for ingress in ingress_obj.items:
                # if ingress.metadata.namespace == "wuhan":
                for rule in ingress.spec.rules:
                    for path in rule.http.paths:
                        data = {
                            "namespace": ingress.metadata.namespace,
                            "ingress": ingress.metadata.name,
                            "dns": ingress.spec.rules[0].host,
                            "service": path.backend.service_name,
                            "port": path.backend.service_port,
                            "path": path.path,
                        }
                        ingress_list.append(data)
        return ingress_list

    # 写入文件
    def write_execl(self, flag=False):
        ingress_list = self.get_ingress(flag)
        data_tp = ("空间", "ingress", "域名", "服务", "端口", "路由")
        _execl = Execl()
        data = _execl.format_data(data_info=ingress_list)
        result = _execl.write_excel(file="ingress情况表.xlsx", data=data, title=data_tp)
        if result:
            print("写入完成")

    # 获取空间
    def list_namespaces(self, flag=False):
        # print("获取空间")
        namespaces = self.Api_Instance.list_namespace()
        namespace_list_line = []
        time_list_namespace = []

        for namespace in namespaces.items:
            # 启用时间筛选
            if flag:
                if namespace.metadata.name == "sjj-wangmangling-dev":
                    print(namespace.metadata.annotations.get("kubesphere.io/alias-name").split("-"))
                    break
            else:

                if namespace.metadata.labels.get("kubesphere.io/devopsproject") == None:
                    desc = namespace.metadata.annotations
                    data = self.get_patch_namespace(namespace,desc)
                    namespace_list_line.append(data)
                    if len(namespace_list_line) == 0:
                        return time_list_namespace
        return namespace_list_line


    def get_patch_namespace(self,namespace,desc):
        cloud_type = models.DictItems.objects.filter(dict_code=self.source).first()

        # print("获取到的",cloud_type)

        head = self.re_head(namespace)
        try:
            #环境
            environment = namespace.metadata.annotations.get("kubesphere.io/alias-name").split("-")[-3]
        except Exception:
            environment = ''

        ex_time = self.re_time(namespace)

        if self.source == 'aliyun-prod':
            url = "https://xxx/{0}/clusters/default/projects/{1}/deployments".format(
                namespace.metadata.labels.get("kubesphere.io/workspace"), namespace.metadata.name)

        elif self.source == "tencent-prod" or self.source == "intranet-dev":
            url = f"https://xxxx/{namespace.metadata.labels.get('kubesphere.io/workspace')}/clusters/{self.source}/projects/{namespace.metadata.name}/deployments"

        data = {
            "namespace": namespace.metadata.name,
            "desc": desc,
            "work": namespace.metadata.labels.get("kubesphere.io/workspace"),
            "createtime": namespace.metadata.creation_timestamp.strftime("%Y-%m-%d"),
            "url": url,
            "cloud_type": cloud_type,
            "create_time": namespace.metadata.creation_timestamp,
            "head": head,
            "ex_time": ex_time,
            "environment": environment,
        }
        return data




    # 处理配额
    def get_namespace_resource_quota(self, flag=False):
        namespace_quota = self.Api_Instance.list_resource_quota_for_all_namespaces()
        quota_new_list = []
        for quota in namespace_quota.items:
            if flag:
                break
            if quota.status.used == None:
                quotas = "无配置"
            else:
                quotas = quota.status.used.get("limits.memory")
            data = {
                "namespace": quota.metadata.namespace,
                "limit_memory": quotas,
            }
            quota_new_list.append(data)
        return quota_new_list

    """
    处理有状态服务
    """
    def get_statefulset_pod(self,namespace=None):
        new_statefulset_list = []
        statefulset_list = self.Api_Apps.list_namespaced_stateful_set(namespace=namespace)
        for statefulset_pod in statefulset_list.items:
            new_statefulset_list.append(statefulset_pod.metadata.name)
        return new_statefulset_list

    # 处理deployment控制器
    def get_list_deployments_probe(self, namespace=None, flag=False, namespaces=False):
        print("本次要获取的空间",namespace)
        new_namespaces_dict = {}

        # 获取所有空间
        if namespaces:
            namespace = self.list_namespaces()
            for name in namespace:
                deployment_list_line, deployment_list_limit, deployment_list_desc,middleware_list = self.get_list_deployment_dict_data(
                    name.get("namespace"), flag)
                new_namespaces_dict["deployment_list_line"] = deployment_list_line
                new_namespaces_dict["deployment_list_limit"] = deployment_list_limit
                new_namespaces_dict["deployment_list_desc"] = deployment_list_desc
            return new_namespaces_dict
        else:
            print("手动获取空间")
            deployment_list_line, deployment_list_limit, deployment_list_desc,middleware_list,clse_list = self.get_list_deployment_dict_data(namespace,flag)
            return deployment_list_line, deployment_list_limit, deployment_list_desc,middleware_list,clse_list



    def re_head(self,namespace):
        try:
            if "-" in re.search("负责人(.*)",
                                 namespace.metadata.annotations.get(
                                     "kubesphere.io/alias-name")).group(1):
                head = re.search("负责人(.*)",
                                 namespace.metadata.annotations.get(
                                     "kubesphere.io/alias-name")).group(1).split("-")[0]
            else:

                    head = re.search("负责人(.*)",
                                     namespace.metadata.annotations.get(
                                         "kubesphere.io/alias-name")).group(1)
        except Exception:
            return ""
        return head

    def re_time(self,namespace):
        try:
            if re.search("-([0-9]{4}-[0-9]{2}-[0-9]{2})", namespace.metadata.annotations.get(
                                     "kubesphere.io/alias-name")):
                ex_time = re.search("-([0-9]{4}-[0-9]{2}-[0-9]{2})", namespace.metadata.annotations.get(
                                     "kubesphere.io/alias-name")).group(1)
            else:
                ex_time = namespace.metadata.annotations.get("kubesphere.io/alias-name").split("-")[-1]
        except Exception:
            return ""
        # print(ex_time)
        return ex_time


    # 处理deployment的资源组合成dict数据
    def get_list_deployment_dict_data(self, namespace, flag=False):
        deployment_list_line = []
        deployment_list_limit = []
        deployment_list_desc = []
        middleware_list = []
        clse_list = []
        statefulset_list = self.get_statefulset_pod(namespace)
        list_deployment_response = self.Api_Apps.list_namespaced_deployment(namespace)

        data_middleware_list = ["redis","mysql","elasticsearch","es","mongo-bi-v1","redis-v1","mongo-bi","rabbitmq-server","nacos","rabbitmq","mongo"
                ,"elasticsearch-v1",
                "mongdo","rabbitmq-v1","rmq","mysql-v1","sentinel","minio","redis5","mongodb","mysql-server","mongodb-bi"]


        for deployment in list_deployment_response.items:
            if  deployment.metadata.name in data_middleware_list:
                middleware_list.append(deployment.metadata.name)
                continue
            else:
                if flag:
                    print("控制器", deployment)
                else:
                    if deployment.spec.replicas == 0:
                        print("以关闭空间",deployment.metadata.self_link.split("/")[5])
                        clse_list.append(deployment.metadata.self_link.split("/")[5])
                        time.sleep(2)

                    else:
                        if deployment.spec.template.spec.containers[0].liveness_probe == None and 
                                deployment.spec.template.spec.containers[0].readiness_probe == None:
                            try:
                                data = {
                                    "namespace": deployment.metadata.self_link.split("/")[5],
                                    "deployment": deployment.metadata.name,
                                    "line": "×",
                                    "cloud_type":models.DictItems.objects.filter(dict_code=self.source).first(),
                                }
                                deployment_list_line.append(data)
                            except Exception:
                                data = {
                                    "namespace": "无命名空间",
                                    "deployment": deployment.metadata.name,
                                    "line": "×",
                                    "cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),
                                }
                                deployment_list_line.append(data)
                        else:
                            try:
                                data = {
                                    "namespace": deployment.metadata.self_link.split("/")[5],
                                    "deployment": deployment.metadata.name,
                                    "line": "√",
                                    "cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),

                                }
                                deployment_list_line.append(data)
                            except Exception:
                                data = {
                                    "namespace": "无命名空间",
                                    "deployment": deployment.metadata.name,
                                    "line": "√",
                                    "cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),

                                }
                                deployment_list_line.append(data)

                        if deployment.metadata.annotations.get(
                                "kubesphere.io/description") == None and deployment.spec.replicas == 1:
                            try:
                                data = {
                                    "namespace": deployment.metadata.self_link.split("/")[5],
                                    "deployment": deployment.metadata.name,
                                    "desc": "×",
                                    "cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),
                                }
                                deployment_list_desc.append(data)
                            except Exception:
                                data = {
                                    "namespace": "无命名空间",
                                    "deployment": deployment.metadata.name,
                                    "desc": "×",
                                    "cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),


                                }
                                deployment_list_desc.append(data)
                        else:
                            try:
                                data = {
                                    "namespace": deployment.metadata.self_link.split("/")[5],
                                    "deployment": deployment.metadata.name,
                                    "desc": "√",
                                    "cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),
                                }
                                deployment_list_desc.append(data)
                            except Exception:
                                data = {
                                    "namespace": "无命名空间",
                                    "deployment": deployment.metadata.name,
                                    "desc": "√",
                                    "cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),
                                }
                                deployment_list_desc.append(data)

                        if deployment.spec.template.spec.containers[0] 
                                .resources.limits == None and deployment.spec.replicas == 1:
                            try:
                                data = {
                                    "namespace": deployment.metadata.self_link.split("/")[5],
                                    "deployment": deployment.metadata.name,
                                    "limit": "×",
                                    "cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),


                                }
                                deployment_list_limit.append(data)
                            except Exception:
                                data = {
                                    "namespace": "无命名空间",
                                    "deployment": deployment.metadata.name,
                                    "limit": "×",
                                    "cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),


                                }
                                deployment_list_limit.append(data)
                        else:
                            try:
                                data = {
                                    "namespace": deployment.metadata.self_link.split("/")[5],
                                    "deployment": deployment.metadata.name,
                                    "limit": "√",
                                    "cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),


                                }
                                deployment_list_limit.append(data)
                            except Exception:
                                data = {
                                    "namespace": "无命名空间",
                                    "deployment": deployment.metadata.name,
                                    "limit": "√",
                                    "cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),

                                }
                                deployment_list_limit.append(data)
        new_middleware_list = middleware_list   statefulset_list
        return deployment_list_line, deployment_list_limit, deployment_list_desc,new_middleware_list,clse_list



    # 获取非running状态
    def get_not_running(self, running=True):
        list_pod_data = []
        project_data = []
        # 获取所有空间的pod
        pod_list = self.Api_Instance.list_pod_for_all_namespaces(watch=False)
        for pod in pod_list.items:
            # print(pod)
            if pod.status.phase != "Running":
                if pod.status.phase != "Succeeded":
                    # print("结果",pod)
                    try:
                        if pod.status.container_statuses[0].ready == True:
                            ready = "准备好"
                        else:
                            ready = "未准备好"

                        if pod.status.container_statuses[0].started == True:
                            status = "已启动"
                        else:
                            status = "未启动"
                        restart = pod.status.container_statuses[0].restart_count
                    except Exception:
                        status, ready, restart = None, None, None

                    msg_dict = {"project": pod.metadata.namespace, "service": pod.metadata.name,
                                "status": pod.status.phase, "reay": ready,
                                "runing": status,
                                "restart": restart}
                    list_pod_data.append("项目:{0}t 服务名称:{1}t 当前状态:{2}t 是否准备好:{3}t 是否启动:{4}t  自动重启次数:{5}".format(
                        pod.metadata.namespace, pod.metadata.name,
                        pod.status.phase, ready,
                        status, restart))
                    project_data.append(msg_dict)
                    # self.send_msg(msg=msg_dict)
        if running != True:
            return project_data

        return list_pod_data


    # 创建deployment使用yaml格式
    def create_namespace_deployment_yaml(self, namespace, images, name, port,cluster_name):
        #print("端口号",port)
        if int(port) != 80:
            print("是java")
            init_time = 60
            time_out = 3
            path = "/health"
        else:
            print("非java")
            init_time = 5
            time_out = 1
            path = "/"
        deployment_pvc_json = {'apiVersion': 'v1', 'kind': 'PersistentVolumeClaim', 'metadata': {'name': name},
                               'spec': {'accessModes': ['ReadWriteOnce'],
                                        'the__resources': {'requests': {'storage': '10Gi'}},
                                        'storageClassName': 'nfs-client'}}

        deployment_json = {'kind': 'Deployment', 'apiVersion': 'apps/v1',
                           'metadata': {'name': name, 'namespace': namespace, "labels": {'app': name},
                                        'annotations': {'deployment.kubernetes.io/revision': '1.txt'}},
                           'spec': {'replicas': 1, 'selector': {'matchLabels': {'app': name}},
                                    'template': {'metadata': {'labels': {'app': name}}, 'spec': {'volumes': [
                                        {'name': 'host-time', 'hostPath': {'path': '/etc/localtime', 'type': ''}},
                                        {'name': name}], 'containers': [{'name': name, 'image': images, 'ports': [
                                        {'name': 'tcp-{0}'.format(port), 'containerPort': int(port),
                                         'protocol': 'TCP'}],
                                                                         'the__resources': {
                                                                             'limits': {'cpu': '1.txt', 'memory': '1500Mi'},
                                                                             'requests': {'cpu': '500m',
                                                                                          'memory': '500Mi'}},
                                                                         'volumeMounts': [
                                                                             {'name': 'host-time', 'readOnly': True,
                                                                              'mountPath': '/etc/localtime'},
                                                                             {'name': name, 'mountPath': '/data/db'}],
                                                                         'livenessProbe': {
                                                                             'httpGet': {
                                                                                 'path': path,
                                                                                 'port': int(port),
                                                                                 'scheme': 'HTTP'
                                                                                 },
                                                                             'initialDelaySeconds': init_time,
                                                                             'timeoutSeconds': time_out,
                                                                             'periodSeconds': 10,
                                                                             'successThreshold': 1,
                                                                             'failureThreshold': 3
                                                                             },
                                                                         'readinessProbe': {
                                                                             'httpGet': {
                                                                                 'path': path,
                                                                                 'port': int(port),
                                                                                 'scheme': 'HTTP'
                                                                             },
                                                                             'initialDelaySeconds': init_time,
                                                                             'timeoutSeconds': time_out,
                                                                             'periodSeconds': 10,
                                                                             'successThreshold': 1,
                                                                             'failureThreshold': 3
                                                                         },
                                                                         'imagePullPolicy': 'Always'}],
                                        'imagePullSecrets': [{"name": "harbor"}],
                                        'restartPolicy': 'Always'},
                                                 }}}

        deployment_svc_json = {'kind': 'Service', 'apiVersion': 'v1',
                               'metadata': {'name': name, 'namespace': namespace, 'labels': {'app': name}}, 'spec': {
                'ports': [
                    {'name': 'tcp-{0}'.format(port), 'protocol': 'TCP', 'port': int(port), 'targetPort': int(port)}],
                'selector': {'app': name}, 'type': 'ClusterIP'}}

        """{'kind': 'Secret', 'apiVersion': 'v1', 'metadata': {'name': 'xx', 'namespace': 'devops', 'annotations': {'kubesphere.io/alias-name': 'xx', 'kubesphere.io/creator': 'liuchengguo', 'kubesphere.io/description': 'xx'}}, 'data': {'tls.crt': '1.txt', 'tls.key': '2'}, 'type': 'kubernetes.io/tls'}"""
        # print(json.dumps(deployment_svc_json))

        deploy_list = self.Api_Apps.list_namespaced_deployment(namespace=namespace)
        new_deploy_list = []

        if len(deploy_list.items) == 0:
            print("检验部署")
            try:
                self.Api_Apps.create_namespaced_deployment(body=deployment_json, namespace=namespace)
                self.Api_Instance.create_namespaced_service(body=deployment_svc_json, namespace=namespace)
                print("部署成功")
                return True
            except Exception as e:
                return "创建deployment失败,部署失败原因:{0}".format(json.loads(e.body).get("message"))
        else:
            print("检验部署1")
            for deploy in deploy_list.items:
                new_deploy_list.append(deploy.metadata.name)
            print(new_deploy_list)

            if name in new_deploy_list:
                print("存在")
                try:
                    depl = self.Api_Apps.replace_namespaced_deployment(body=deployment_json, namespace=namespace,
                                                                             name=name)
                    return True
                except Exception as e:
                    return "创建deployment失败,部署失败原因:{0} 部署环境:{1}".format(json.loads(e.body).get("message"),cluster_name)
            else:
                try:
                    self.Api_Apps.create_namespaced_deployment(body=deployment_json, namespace=namespace)
                    self.Api_Instance.create_namespaced_service(body=deployment_svc_json, namespace=namespace)
                    print("部署成功")
                    return True
                except Exception as e:
                    return "创建deployment失败,部署失败原因:{0} 部署环境:{1}".format(json.loads(e.body).get("message"),cluster_name)



        # try:
        #     pv = self.Api_Instance.create_namespaced_persistent_volume_claim(body=deployment_pvc_json,
        #                                                                      namespace=namespace)
        # except Exception:
        #     print("创建pv失败")


    """监听ingress对象"""
    def watch_all_namespaces_ingress(self):
        try:
            api_response = self.Api_Network.list_ingress_for_all_namespaces(allow_watch_bookmarks=True, watch=True)

        except Exception as e:
            print("watch失败:{0}", e)


    """监听deployment对象"""
    def watch_all_namespaces_deployment(self):
        # watch = kubernetes.watch.Watch()
        # for e in watch.stream(self.Api_Instance.list_event_for_all_namespaces):
        #     print("操作:{0}tt空间:{1.txt}tt服务:{2}".format(e.get("type"),e.get("object").metadata.namespace,e.get("object").metadata.name))
        deployment_list = self.Api_Apps.list_deployment_for_all_namespaces()
        len_deployment = len(deployment_list.items)
        new_len_add_deploy = []

        new_len_deployment = 1
        watch = kubernetes.watch.Watch()
        for e in watch.stream(self.Api_Apps.list_deployment_for_all_namespaces):
            if new_len_deployment <= len_deployment and e.get("type") == "ADDED":
                new_len_deployment  = 1
            else:
                alias_name = e.get("object").metadata.annotations.get("kubesphere.io/alias-name") if e.get(
                    "object").metadata.annotations.get("kubesphere.io/alias-name") else "无别名"
                description = e.get("object").metadata.annotations.get("kubesphere.io/description") if e.get(
                    "object").metadata.annotations.get("kubesphere.io/description") else "无描述"

                new_len_add_deploy.append(e.get("type"))
                namespace = e.get("object").metadata.namespace
                name = e.get("object").metadata.name

                replicas = 0 if e.get("object").status.replicas == None else e.get("object").status.replicas
                ready_replicas = 0 if e.get("object").status.ready_replicas == None else e.get(
                    "object").status.ready_replicas

                try:
                    image = e.get("raw_object").get("spec").get("template").get("spec").get("containers")[0].get(
                        "image")
                except Exception:
                    print("非更新操作")
                    image = "无"

                "删除"
                if e.get("type") == "DELETED":

                    kube_namespace = models.Kube.objects.filter(kubenamespaces=namespace, status=1).first()
                    print("删除匹配", kube_namespace)
                    if kube_namespace:
                        msg_json = " ### K8S-POD操作告警提示:nn > **操作:** 删除nn > **空间:** {0}nn > **服务:** {1}nn > **服务别名:** {2}nn > **服务描述:** {3}nn   > nn  > **当前镜像:** {4}".format(
                            namespace, name, alias_name, description, image)

                        self.send_msg(msg_json=msg_json, token=kube_namespace.token)
                        new_len_add_deploy = []

                "修改"
                if e.get("type") == "MODIFIED" and len(new_len_add_deploy) == 6:
                    print(new_len_add_deploy)

                    kube_namespace = models.Kube.objects.filter(kubenamespaces=namespace, status=1).first()
                    print("更新匹配", kube_namespace)
                    if kube_namespace:
                        msg_json = " ### K8S-POD操作告警提示:nn > **操作:** 修改nn > **空间:** {0}nn > **服务:** {1}nn > **服务别名:** {2}nn > **服务描述:** {3}nn   > nn  > **当前镜像:** {4}".format(
                            namespace, name, alias_name, description, image)
                        self.send_msg(msg_json=msg_json, token=kube_namespace.token)
                        new_len_add_deploy = []

                "添加"
                if e.get("type") == "ADDED" and "ADDED" in new_len_add_deploy:
                    print("ADDED", new_len_add_deploy)

                    kube_namespace = models.Kube.objects.filter(kubenamespaces=namespace, status=1).first()
                    print("添加匹配", kube_namespace)
                    if kube_namespace:
                        msg_json = " ### K8S-POD操作告警提示:nn > **操作:** 添加nn > **空间:** {0}nn > **服务:** {1}nn > **服务别名:** {2}nn > **服务描述:** {3}nn   > nn  > **当前镜像:** {4}".format(
                            namespace, name, alias_name, description, image)
                        self.send_msg(msg_json=msg_json, token=kube_namespace.token)
                        new_len_add_deploy = []

    #监听pod状态
    def watch_all_namespaces_pod(self):
        pod_list = self.Api_Instance.list_pod_for_all_namespaces()
        len_pod = len(pod_list.items)
        new_len_add_pod = []

        new_len_pod = 1
        watch = kubernetes.watch.Watch()
        for e in watch.stream(self.Api_Instance.list_pod_for_all_namespaces):
            if new_len_pod <= len_pod and e.get("type") == "ADDED":
                new_len_pod  = 1
            else:
                print("开始监听pod",e.get("type"))
                description = e.get("object").metadata.annotations
                # description = e.get("object").metadata.annotations.get("kubesphere.io/description") if e.get(
                #     "object").metadata.annotations.get("kubesphere.io/description") else "无描述"

                new_len_add_pod.append(e.get("type"))
                namespace = e.get("object").metadata.namespace
                name = e.get("object").metadata.name
                status = e.get("object").status.phase

                "删除"
                if e.get("type") == "DELETED":

                    # kube_namespace = models.Kube.objects.filter(kubenamespaces=namespace, status=1.txt).first()
                    # print("删除匹配", kube_namespace)
                    # if kube_namespace:
                    #     msg_json = " ### K8S-POD操作告警提示:nn > **操作:** 删除nn > **空间:** {0}nn > **服务:** {1.txt}nn > **服务别名:** {2}nn > **服务描述:** {3}nn   > nn  > **当前镜像:** {4}".format(
                    #         namespace, name, alias_name, description, image)

                        # self.send_msg(msg_json=msg_json, token=kube_namespace.token)
                    msg_json = " ### K8S-POD操作告警提示:nn > **操作:** 删除nn > **空间:** {0}nn > **服务:** {1}nn  > **当前镜像:** {2}".format(
                        namespace, name, description)
                    print(msg_json)
                    new_len_add_deploy = []

                "修改"
                if e.get("type") == "MODIFIED" and len(new_len_add_pod) == 6:
                    # print(new_len_add_deploy)
                    #
                    # kube_namespace = models.Kube.objects.filter(kubenamespaces=namespace, status=1.txt).first()
                    # print("更新匹配", kube_namespace)
                    # if kube_namespace:
                    #     msg_json = " ### K8S-POD操作告警提示:nn > **操作:** 修改nn > **空间:** {0}nn > **服务:** {1.txt}nn > **服务别名:** {2}nn > **服务描述:** {3}nn   > nn  > **当前镜像:** {4}".format(
                    #         namespace, name, alias_name, description, image)
                    #     self.send_msg(msg_json=msg_json, token=kube_namespace.token)
                    msg_json = " ### K8S-POD操作告警提示:nn > **操作:** 更新nn > **空间:** {0}nn > **服务:** {1}nn  > **当前镜像:** {2}".format(
                        namespace, name, description)
                    print(msg_json)
                    new_len_add_deploy = []


                "添加"
                if e.get("type") == "ADDED" and "ADDED" in new_len_add_pod:
                    # print("ADDED", new_len_add_deploy)
                    #
                    # kube_namespace = models.Kube.objects.filter(kubenamespaces=namespace, status=1.txt).first()
                    # print("添加匹配", kube_namespace)
                    # if kube_namespace:
                    #     msg_json = " ### K8S-POD操作告警提示:nn > **操作:** 添加nn > **空间:** {0}nn > **服务:** {1.txt}nn > **服务别名:** {2}nn > **服务描述:** {3}nn   > nn  > **当前镜像:** {4}".format(
                    #         namespace, name, alias_name, description, image)
                    #     self.send_msg(msg_json=msg_json, token=kube_namespace.token)
                    msg_json = " ### K8S-POD操作告警提示:nn > **操作:** 添加nn > **空间:** {0}nn > **服务:** {1}nn  > **当前镜像:** {2}".format(
                        namespace, name, description)
                    # print(msg_json)
                    new_len_add_deploy = []

    def pod_exec(self,pod,container=""):
        exec_command = [
            "/bin/sh",
            "-c",
            'TERM=xterm-256color; export TERM; [ -x /bin/bash ] '
            '&& ([ -x /usr/bin/script ] '
            '&& /usr/bin/script -q -c "/bin/bash" /dev/null || exec /bin/bash) '
            '|| exec /bin/sh']
        cont_stream = stream(self.Api_Instance.connect_get_namespaced_pod_exec,
                             name="pyrepair-f5fd48f6-nfh2k",
                             namespace="cmdb",
                             container=container,
                             command=exec_command,
                             stderr=True, stdin=True,
                             stdout=True, tty=True,
                             _preload_content=False
                             )
        return cont_stream

    def get_deployment_pod(self, RAND):

        try:
            r = self.Api_Instance.list_namespaced_pod(
                namespace=self.namespace,
                label_selector="app=%s" % RAND
            )

            return True, r
        except Exception as e:
            return False, 'Get Deployment: '   str(e)


    # 发送报警
    def send_msg(self, msg=None, token="331c2210aca410f0b3f333113579db3caacadca491a2e6060e2e3bbf2aafcf82",
                 msg_json=None):
        # print(msg_json)
        webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token={0}'.format(token)
        headers = {'Content-Type': 'application/json;charset=utf-8'}
        if msg:
            msg_str = "## K8S-POD状态异常告警提示:nn  项目:{0}t 服务名称:{1}t 当前状态:{2}t 是否准备好:{3}t 是否启动:{4}t 重启次数:{5}t".format(
                msg.get("project"), msg.get("service"), msg.get("status"), msg.get("reay"), msg.get("runing"),
                msg.get("restart"))
        else:
            msg_str = msg_json

        data = {
            "msgtype": "markdown",
            "markdown": {
                "title": "### K8S-POD状态异常告警提示:",
                "text": msg_str,
            },

            "at": {
                "atMobiles": 'reminders',
                "isAtAll": False,
            },
        }

        r = requests.post(url=webhook_url, json=data, headers=headers)

        return r.text

    #销毁对象
    def __del__(self):
        pass
        # print("销毁对象:{0}".format(self.source))


msg = {"msg": "服务器炸了"}


if __name__ == '__main__':
    client = Kube(source="intranet-dev")
    print(client.test(namespace="zoudao"))

作者这里面基本覆盖了大多调用api的示例

附带:导出execl

代码语言:javascript复制
"""
公共写入execl方法
"""
from xlsxwriter import workbook


class Execl(object):
    def format_data(self, data_info, kube=False):
        '''
        从全部数据中整理出需要的数据
        '''
        result = []

        # 自定义组装数据
        for line in data_info:
            if kube:
                # 健康检查
                data = (
                    line.get('namespaces'),
                    line.get('name'),
                    line.get("server"),
                    line.get('porbe'),
                    line.get("limit"),
                    line.get("desc"),
                    line.get('url'),
                    line.get('kubetotalscore'),
                    line.get("cloud_type"),
                )
                # 空间
                # data = (
                #     line.get("cloud_type"),
                #     line.get('desc'),
                #     line.get('namespaces'),
                #     line.get('head'),
                #     line.get('environment'),
                #     line.get('time'),
                #     line.get('middleware'),
                #     line.get('url'),
                # )
            else:
                # data = (
                #     line.get("namespace"),
                #     line.get("ingress"),
                #     line.get("dns"),
                #     line.get("service"),
                #     line.get("port"),
                #     line.get("path")
                # )
                # print(line)
                # result.append(data)
                # break

                # 域名导出
                # data = (
                #     line.get("status"),
                #     line.get("domain"),
                #     line.get("type"),
                #     line.get("value"),
                #     line.get("remark"),
                #     line.get("etime"),
                # )

                # 数据库
                data = (
                    line.get("CreateTime"),
                    line.get("desc"),
                    line.get("DBName"),
                    line.get("DB"),
                    line.get("MySQLTotalExecutionCounts"),
                    line.get("MySQLTotalExecutionTimes"),
                    line.get("MaxExecutionTime"),
                    line.get("MaxLockTime"),
                    line.get("ParseTotalRowCounts"),
                    line.get("ParseMaxRowCount"),
                    line.get("ReturnTotalRowCounts"),
                    line.get("ReturnMaxRowCount"),
                    line.get("SQLText")
                )

                # 空间内存
                # data = (
                #     line.get('namespaces'),
                #     line.get('name'),
                #     line.get("memory"),
                #     line.get("url"),
                # )

                #代码统计
                # data = (
                #     line.get('project_name'),
                #     line.get('author_name'),
                #     line.get("author_email"),
                #     line.get("code"),
                # )

                # 短信模板
                # data = (
                #         line.get('template_code'),
                #         line.get('template_content'),
                #         line.get("template_name"),
                #         line.get('create_time'),
                #       )
            result.append(data)
        return result

    def write_excel(self, file=None, data=None, title=None):
        '''
        1.txt、设置 Excel 样式
        2、将数据写入到 Excel 中
        '''
        # 生成 Excel 文件
        work = workbook.Workbook(file)
        # 建立工作表,表名默认
        worksheet = work.add_worksheet()
        # 设置字体加粗、字体大小
        format_title = work.add_format({'bold': True, 'font_size': 10})
        # 设置水平对齐、垂直对齐
        format_title.set_align('center')
        format_title.set_align('vcenter')

        format_body = work.add_format({'font_size': 10})
        # 设置样式,行高、列宽
        worksheet.set_row(0, 25)
        worksheet.set_column(0, 0, 30)
        worksheet.set_column(1, 1, 20)
        worksheet.set_column(2, 3, 28)
        worksheet.set_column(4, 5, 25)
        worksheet.set_column(6, 6, 12)
        worksheet.set_column(7, 9, 16)
        worksheet.set_column(10, 11, 25)

        """
        自定义表头
        """
        # 定义表头
        title = title

        row = 0
        col = 0
        # 表头写入文件,引用样式
        for item in title:
            worksheet.write(row, col, item, format_title)
            col  = 1

        for line in data:
            row  = 1
            col = 0

            for key in line:
                worksheet.write(row, col, key, format_body)
                col  = 1

        work.close()
        return True
api

0 人点赞