利用Python调用云Api在阿里云申请免费证书并同步到腾讯云

2022-07-05 13:41:31 浏览数 (1)

腾讯云证书免费额度用完了怎么办? 调用云api 从阿里云申请证书并托管到腾讯云

0.准备工作

使用本代码请先进行子用户创建

前往创建子用户(腾讯云):https://console.cloud.tencent.com/cam

授权DNSPod 全读写访问权限、SSL证书(SSL)全读写访问权限

前往创建子用户(阿里云):https://ram.console.aliyun.com/users

授权管理云盾证书服务的权限(AliyunYundunCertFullAccess)

请注意 为了保障您的账户以及云上资产的安全 请谨慎保管SecretId 与 SecretKey 并定期更新 删除无用权限

1.SDK下载

请确保Python版本为3.6

查看Python版本

代码语言:javascript复制
python3 -V

安装腾讯云Python SDK

代码语言:javascript复制
pip install -i https://mirrors.tencent.com/pypi/simple/ --upgrade tencentcloud-sdk-python

安装阿里云SDK

代码语言:javascript复制
pip install -i https://mirrors.aliyun.com/pypi/simple/ --upgrade alibabacloud_tea_openapi alibabacloud_cas20180713==1.0.1 alibabacloud_cas20200407==1.0.5

2.代码部分

别忘了去创建阿里云的免费证书!

代码语言:javascript复制
# -*- coding: utf-8 -*-

import json
import tencentcloud.ssl.v20191205.ssl_client
import tencentcloud.ssl.v20191205.models
from time import sleep
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.dnspod.v20210323 import dnspod_client, models
from alibabacloud_cas20180713.client import Client as cas20180713Client
from alibabacloud_cas20180713 import models as cas_20180713_models
from alibabacloud_cas20200407.client import Client as cas20200407Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_cas20200407 import models as cas_20200407_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


# aliyun
access_key_id = ''
access_key_secret = ''
# tencentcloud
SecretId = ''
SecretKey = ''


class Ver2020:
    def __init__(self):
        pass

    @staticmethod
    def create_client(
    ) -> cas20200407Client:

        config = open_api_models.Config(
            # 您的 AccessKey ID,
            access_key_id=access_key_id,
            # 您的 AccessKey Secret,
            access_key_secret=access_key_secret
        )
        # 访问的域名
        config.endpoint = f'cas.aliyuncs.com'
        return cas20200407Client(config)

    @staticmethod
    def main():
        client = Ver2020.create_client()
        list_user_certificate_order_request = cas_20200407_models.ListUserCertificateOrderRequest()
        runtime = util_models.RuntimeOptions()
        cert_id = ''
        error_1 = 0
        try:
            # 复制代码运行请自行打印 API 的返回值
            response = client.list_user_certificate_order_with_options(list_user_certificate_order_request, runtime)
            try:
                for i in range(len(response.body.certificate_order_list) 1):
                    if response.body.certificate_order_list[i].domain == '':
                        cert_id = response.body.certificate_order_list[i].instance_id
                        print('获取证书id成功:' cert_id)
                        break
            except IndexError:
                error_1 = 1
                print('没有空闲的证书,请尝试去阿里云控制台申请免费额度或切换账号')
        except Exception as error:
            # 如有需要,请打印 error
            UtilClient.assert_as_string(error.message)
        return cert_id, error_1


class Ver2018:
    def __init__(self):
        pass

    @staticmethod
    def create_client(
            access_key_id: str,
            access_key_secret: str,
    ) -> cas20180713Client:

        config = open_api_models.Config(
            # 您的 AccessKey ID,
            access_key_id=access_key_id,
            # 您的 AccessKey Secret,
            access_key_secret=access_key_secret
        )
        # 访问的域名
        config.endpoint = f'cas.aliyuncs.com'
        return cas20180713Client(config)

    @staticmethod
    def main(cert_id, domian):
        client = Ver2018.create_client('accessKeyId', 'accessKeySecret')
        create_dvorder_audit_request = cas_20180713_models.CreateDVOrderAuditRequest(
            lang='zh',
            instance_id='{0}'.format(cert_id),
            domain='{0}'.format(domian),
            domain_verify_type='DNS',
            username='zhangsan',
            email='i@{0}'.format(domian),
            mobile='13888888888',
            province='zhejiang',
            city='hangzhou'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # 复制代码运行请自行打印 API 的返回值
            response = client.create_dvorder_audit_with_options(create_dvorder_audit_request, runtime)
            print('已提交申请 等待获取Dns记录值')
        except Exception as error:
            # 如有需要,请打印 error
            UtilClient.assert_as_string(error.message)

    @staticmethod
    def get_order_result(order_id, result):
        check_name = ''
        check_value = ''
        private_key = ''
        certificate = ''
        client = Ver2018.create_client('accessKeyId', 'accessKeySecret')
        describe_dvorder_result_request = cas_20180713_models.DescribeDVOrderResultRequest(
            instance_id=order_id
        )
        runtime = util_models.RuntimeOptions()
        try:
            # 复制代码运行请自行打印 API 的返回值
            response = client.describe_dvorder_result_with_options(describe_dvorder_result_request, runtime)
            if response.body.order_status == 'checking':
                check_name = response.body.check_name
                check_value = response.body.check_value
                print(result)
            elif response.body.order_status == 'issued':
                private_key = response.body.private_key
                certificate = response.body.certificate
                print('证书公私钥获取成功,等待上传至腾讯云证书托管')

        except Exception as error:
            # 如有需要,请打印 error
            UtilClient.assert_as_string(error.message)

        return certificate, private_key, check_value, check_name


class QCloud:
    def __init__(self):
        pass

    @staticmethod
    def add_dns_record(main_domian, sub_domian, dns_value):
        try:
            cred = credential.Credential("{0}".format(SecretId), "{0}".format(SecretKey))
            httpProfile = HttpProfile()
            httpProfile.endpoint = "dnspod.tencentcloudapi.com"

            clientProfile = ClientProfile()
            clientProfile.httpProfile = httpProfile
            client = dnspod_client.DnspodClient(cred, "", clientProfile)

            req = models.CreateRecordRequest()
            params = {
                "Domain": main_domian,
                "SubDomain": sub_domian,
                "RecordType": 'TXT',
                "RecordLine": "默认",
                "Value": dns_value
            }
            req.from_json_string(json.dumps(params))

            resp = client.CreateRecord(req)
            print('解析添加成功! ' resp.to_json_string())

        except TencentCloudSDKException as err:
            print(err)

    @staticmethod
    def upload_cert(public_key, private_key, cert_name):
        try:
            cred = credential.Credential("{0}".format(SecretId), "{0}".format(SecretKey))
            httpProfile = HttpProfile()
            httpProfile.endpoint = "ssl.tencentcloudapi.com"

            clientProfile = ClientProfile()
            clientProfile.httpProfile = httpProfile
            client = tencentcloud.ssl.v20191205.ssl_client.SslClient(cred, "", clientProfile)

            req = tencentcloud.ssl.v20191205.models.UploadCertificateRequest()
            params = {
                "CertificatePublicKey": public_key,
                "CertificatePrivateKey": private_key,
                "Alias": cert_name
            }
            req.from_json_string(json.dumps(params))

            resp = client.UploadCertificate(req)
            print('上传成功!  ' resp.to_json_string())
        except TencentCloudSDKException as err:
            print(err)


if __name__ == '__main__':
    # tmd 名字太难起了 你们凑合看吧 
    # 获取证书id,如果没有空闲证书a[1]返回1,如果有空闲证书则返回0(出现其他错误也返回1)
    a = Ver2020.main()
    # 如果有空闲证书则执行
    if a[1] == 0:
        # 要申请证书的域名
        b = input('domian=')
        # 提交申请
        Ver2018.main(a[0], b)
        # 5秒后获取Dns验证参数
        sleep(5)
        # 这就是个提示语
        i = '等待添加解析'
        # return certificate(公钥), private_key(私钥), check_value(记录值), check_name(主机记录)
        c = Ver2018.get_order_result(a, i)

        # 获取主域名
        d = b.split('.')[-2] '.'   b.split('.')[-1]
        # 避免重复添加
        f = []
        while True:
            # 这就是个提示语
            if len(f) == 1:
                j = '等待解析生效(预计1~2分钟)....'
            else:
                j = '......'

            c = Ver2018.get_order_result(a, j)
            # 判断是否返回公钥
            if c[0] == '':
                # 是否为首次添加dns解析
                if len(f) == 0:
                    # 主机记录
                    e = c[3].replace('.' d, '')
                    # print(e)
                    # 提交解析
                    QCloud.add_dns_record(d, e, c[2])
                    # 避免重复添加解析
                    f.append('1')
                    # 重复循环
                    continue
                else:
                    # 若仍未获取到公钥 休息10s 重复此操作 直到获取到公钥(约1min左右)
                    sleep(40)
                    continue
            else:
                # 公钥
                g = c[0]
                # 私钥
                h = c[1]

                # 上传到腾讯云进行证书托管 参数为 公钥 私钥 域名
                QCloud.upload_cert(g, h, b)
                # 结束循环
                break

0 人点赞