SCF云函数API实践

2023-07-04 13:21:37 浏览数 (1)

前言

SCF云函数对于我来说真的是非常好用,原先部署在服务器上的一些处理数据的函数可以直接交付给SCF来处理,省了很多服务器的费用。现在腾讯云可以免费试用SCF个人高级版三个月https://cloud.tencent.com/act/free?from=20876,强力推荐!!

在以往想部署一些简单的项目,需要经历服务器的购买、镜像选配、环境搭建、项目部署等一系列操作,而现在直接用SCF就不需要去操劳这些,直接注重业务逻辑,也不需要考虑并发扩容等性能问题。若考虑日志直接搭配CLS生成各种各样的表图,方便后期分析数据日志。

SCF控制台的界面简洁明了,操作指引非常细致,且有很多示例很容易上手。在这里必须要夸一下SCF的相关工程师,解决高并发复制出错的BUG非常快,以及提的SCF相关文档的建议也都采纳了,非常NICE!!

本篇文章主要讲述如何使用API来批量操作SCF。

前置步骤

首先需要导入以下包:

代码语言:python代码运行次数:0复制
import json
import time
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.scf.v20180416 import scf_client, models
import threading
import queue
from tqdm import tqdm

用户基础信息配置,这个部分主要配置用户API密钥(这里获取https://console.cloud.tencent.com/cam/capi)以及地域:

代码语言:python代码运行次数:0复制
# API密钥
cred = credential.Credential("SecretId", "SecretKey")
httpProfile = HttpProfile()
httpProfile.endpoint = "scf.tencentcloudapi.com"
clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
# 地域,这里以北京为例
client = scf_client.ScfClient(cred, "ap-beijing", clientProfile)

API方法

接下来是涉及的一些方法,涉及函数生成、删除、异步配置、通用配置、触发器以及函数信息的获取方便后面判定函数状态是否适合操作。

代码语言:python代码运行次数:0复制
# 生成scf函数
def copy_scf_function(scf_name, target_namespace, source_scf_name, source_namespace):
    try:
        req = models.CopyFunctionRequest()
        params = {
            "FunctionName": source_scf_name,
            "NewFunctionName": scf_name,
            "Namespace": source_namespace,
            "TargetNamespace": target_namespace,
            "Description": f'from_{source_scf_name}',
            "CopyConfiguration": True
        }
        req.from_json_string(json.dumps(params))
        resp = client.CopyFunction(req)
        # print('normal:', scf_name, resp.to_json_string())
    except TencentCloudSDKException as err:
        # print('err:', err)
        pass


# 删除函数
def delete_scf_function(scf_name, Namespace):
    try:
        req = models.DeleteFunctionRequest()
        params = {
            "FunctionName": scf_name,
            "Namespace": Namespace
        }
        req.from_json_string(json.dumps(params))
        resp = client.DeleteFunction(req)
        # print('删除函数:', scf_name, resp.to_json_string())
    except TencentCloudSDKException as err:
        # print(err)
        pass


# 修改函数异步配置
def modify_scf_async_settings(scf_name, Namespace, RetryNum=0):
    try:
        req = models.UpdateFunctionEventInvokeConfigRequest()
        params = {
            "AsyncTriggerConfig": {
                "RetryConfig": [
                    {
                        "RetryNum": RetryNum
                    }
                ],
                "MsgTTL": 21600
            },
            "FunctionName": scf_name,
            "Namespace": Namespace
        }
        req.from_json_string(json.dumps(params))
        resp = client.UpdateFunctionEventInvokeConfig(req)
        print('修改异步配置:', scf_name, resp.to_json_string())
    except TencentCloudSDKException as err:
        # print(err)
        pass


# 修改函数配置
def modify_scf_function_settings(scf_name, Namespace, MemorySize=64):
    try:
        req = models.UpdateFunctionConfigurationRequest()
        params = {
            "FunctionName": scf_name,
            "MemorySize": MemorySize,
            "Timeout": 120,
            "Namespace": Namespace,
            "Environment": {
                "Variables": [
                    {
                        "Key": "TZ",
                        "Value": "Asia/Shanghai"  # 设置时区
                    },
                    {
                        "Key": "SEVER_NAME",   # 这里将函数名也作为环境变量
                        "Value": scf_name
                    }
                ]
            },
            "Layers": [
                {
                    "LayerName": "py", # 配置层
                    "LayerVersion": 1  # 曾版本
                }
            ]
        }
        req.from_json_string(json.dumps(params))
        resp = client.UpdateFunctionConfiguration(req)
        # print('修改函数配置:', scf_name, resp.to_json_string())
    except TencentCloudSDKException as err:
        # print(err)
        pass


# 配置触发器
def configue_trigger(scf_name, Namespace, timer='1 * * * * * *', enable='CLOSE', TriggerName='python_auto_gen'):
    try:
        req = models.CreateTriggerRequest()
        params = {
            "FunctionName": scf_name,
            "TriggerName": TriggerName,
            "Type": "timer",
            "TriggerDesc": timer,
            "Namespace": Namespace,
            "Qualifier": "$DEFAULT",
            "Enable": enable  # OPEN CLOSE
        }
        req.from_json_string(json.dumps(params))
        resp = client.CreateTrigger(req)
        # print('创建触发器:', scf_name, resp.to_json_string())
    except TencentCloudSDKException as err:
        # print(err)
        pass


# 删除触发器
def delete_trigger(scf_name, Namespace, TriggerName="python_auto_gen"):
    try:
        req = models.DeleteTriggerRequest()
        params = {
            "FunctionName": scf_name,
            "TriggerName": TriggerName,
            "Namespace": Namespace,
            "Type": "timer",
            "Qualifier": "$DEFAULT"
        }
        req.from_json_string(json.dumps(params))
        # print(req)
        resp = client.DeleteTrigger(req)
        # print('删除触发器:', scf_name, resp.to_json_string())
    except TencentCloudSDKException as err:
        # print(err)
        pass


# 获取scf信息
def get_scf_info(Namespace, SearchKey):
    try:
        req = models.ListFunctionsRequest()
        params = {
            "Order": "ASC",
            "Limit": 50,
            "Namespace": Namespace,
            "SearchKey": SearchKey
        }
        req.from_json_string(json.dumps(params))
        resp = client.ListFunctions(req)
        # print(resp.to_json_string())
        return resp.to_json_string()
    except TencentCloudSDKException as err:
        # print(err)
        return None

整合SCF类

基础的一些方法写好后可以进行批量操作,在下面的这个类中可以实现一键配置函数以及相关配置的操作,由于代码难度不大我就放在一起了。

代码语言:python代码运行次数:0复制
class SCF:
    def __init__(self, NameSpace, timer='1 * * * * * *', start=1, end=50, memsize=64, triggername='python_auto_gen'):
        self.ns_list = ['SCF_0_', 'SCF_1_', 'SCF_2_', 'SCF_3_', 'SCF_4_']
        self.NameSpace = NameSpace
        self.triggername = triggername
        self.memsize = memsize
        self.start = start
        self.end = end
        self.ns_prefix = None
        self.timer = timer
        if self.NameSpace == 'default':
            self.ns_prefix = self.ns_list[0]
            # print('NameSpace:', self.NameSpace)
        else:
            for i in self.ns_list:
                if NameSpace in i:
                    self.ns_prefix = i
                    # print('命名空间:', self.NameSpace, 'n前缀:', self.ns_prefix)
                    # print('NameSpace:', self.NameSpace)
                    break
        if not self.ns_prefix:
            raise '配置错误啦!请确认正确后再试!!!'
        self.pbar = None
        self.delete_bar = None
        self.modify = None
        self.trigger = None
        self.delete_trigger = None

    def get_scf_name_list(self):
        scf_name_list = []
        for i in range(self.start, self.end   1):
            if i <= 9:
                scf_name_list_sub = '0'   str(i)
            else:
                scf_name_list_sub = str(i)
            scf_name_list_sub = self.ns_prefix   scf_name_list_sub
            scf_name_list.append(scf_name_list_sub)
        return scf_name_list

    def fast_modify_trigger(self, scf_name):
        if isinstance(scf_name, list):
            self.delete_trigger = tqdm(total=len(scf_name), desc="Tragger Deleting...", unit="it", colour='MAGENTA')
            q = queue.Queue()
            for i in scf_name:
                q.put(i)
            threads = []
            for th in range(1, 9):
                threads.append(
                    threading.Thread(target=self.__fdt, args=(q,))
                )
            for thread in threads:
                thread.start()
            for thread in threads:
                thread.join()
            self.delete_trigger.desc = 'Tragger Deleted!'
            self.delete_trigger.close()
            self.fast_configue_trigger(scf_name)
        else:
            delete_trigger(scf_name, self.NameSpace)
            configue_trigger(scf_name, self.NameSpace, self.trigger, self.timer)

    def __fdt(self, q: queue.Queue):
        while True:
            if q.empty():
                break
            else:
                scf_name = q.get()
                delete_trigger(scf_name, self.NameSpace, TriggerName=self.triggername)  # 删除触发器
                self.delete_trigger.update(1)

    def fast_copy_scf_function(self, scf_name, source_scf_name, source_scf_namespace):
        if isinstance(scf_name, list):
            self.pbar = tqdm(total=len(scf_name), desc=f"{self.NameSpace}: SCF Copying...", unit="it", colour='MAGENTA')
            q = queue.Queue()
            for i in scf_name:
                q.put([i, source_scf_name, source_scf_namespace])
            threads = []
            for th in range(1, 21):
                threads.append(
                    threading.Thread(target=self.__csf, args=(q,))
                )
            for thread in threads:
                thread.start()
            for thread in threads:
                thread.join()
            self.pbar.desc = f"{self.NameSpace}: SCF Copy Complete!"
            self.pbar.close()
            self.fast_modify_scf_function_settings(scf_name)
        else:
            copy_scf_function(scf_name, self.NameSpace, source_scf_name, source_scf_namespace)

    def __csf(self, q: queue.Queue):
        while True:
            if q.empty():
                break
            else:
                target = q.get()
                copy_scf_function(target[0], self.NameSpace, target[1], target[2])
                # self.pbar.update(1)
                time.sleep(0.1)
                while True:
                    if self.scf_info(target[0]):
                        self.pbar.update(1)
                        break
                    else:
                        time.sleep(0.3)

    def fast_modify_scf_function_settings(self, scf_name):
        if isinstance(scf_name, list):
            q = queue.Queue()
            self.modify = tqdm(total=len(scf_name), desc=f"{self.NameSpace}: SCF Modifying...", unit="it",
                               colour='CYAN')
            for i in scf_name:
                q.put(i)
            threads = []
            for th in range(1, 20):
                threads.append(
                    threading.Thread(target=self.__msfs, args=(q,))
                )
            for thread in threads:
                thread.start()
            for thread in threads:
                thread.join()
            self.modify.desc = f"{self.NameSpace}: SCF Modification Completed!"
            self.modify.close()
            self.fast_configue_trigger(scf_name)
        else:
            modify_scf_function_settings(scf_name, self.NameSpace)

    def __msfs(self, q: queue.Queue):
        while True:
            if q.empty():
                break
            else:
                scf_name = q.get()
                while True:
                    if self.scf_exist(scf_name):
                        if self.scf_info(scf_name):
                            modify_scf_function_settings(scf_name, self.NameSpace, self.memsize)
                            self.modify.update(1)
                            break
                        else:
                            time.sleep(0.1)
                    else:
                        break

    def fast_delete_scf_function(self, scf_name):
        if isinstance(scf_name, list):
            q = queue.Queue()
            self.delete_bar = tqdm(total=len(scf_name), desc=f"{self.NameSpace}: Deleting Old SCF...", unit="it",
                                   colour='GREEN')
            for i in scf_name:
                q.put(i)
            threads = []
            for th in range(1, 15):
                threads.append(
                    threading.Thread(target=self.__dsf, args=(q,))
                )
            for thread in threads:
                thread.start()
            for thread in threads:
                thread.join()
            self.delete_bar.desc = f"{self.NameSpace}: Delete SCF Completed!"
            self.delete_bar.close()
        else:
            if isinstance(scf_name, int):
                if scf_name <= 9:
                    delete_scf_function(f'{self.ns_prefix}0{scf_name}', self.NameSpace)
                else:
                    delete_scf_function(f'{self.ns_prefix}{scf_name}', self.NameSpace)
            else:
                delete_scf_function(scf_name, self.NameSpace)

    def __dsf(self, q: queue.Queue):
        while True:
            if q.empty():
                break
            else:
                scf_name = q.get()
                delete_scf_function(scf_name, self.NameSpace)
                self.delete_bar.update(1)
                # while True:
                #     delete_scf_function(scf_name, self.NameSpace)
                #     if not self.scf_exist(scf_name):
                #         self.delete_bar.update(1)
                #         break
                #     time.sleep(0.1)

    def fast_configue_trigger(self, scf_name):
        if self.timer == 'reset':
            return
        if isinstance(scf_name, list):
            q = queue.Queue()
            self.trigger = tqdm(total=len(scf_name), desc=f"{self.NameSpace}: Trigger Configuring...", unit="个",
                                colour='YELLOW')
            for i in scf_name:
                q.put(i)
            threads = []
            for th in range(1, 20):
                threads.append(
                    threading.Thread(target=self.__fct, args=(q,))
                )
            for thread in threads:
                thread.start()
            for thread in threads:
                thread.join()
            self.trigger.desc = f"{self.NameSpace}: Trigger Configured!"
            self.trigger.close()
        else:
            configue_trigger(scf_name, self.NameSpace)

    def __fct(self, q: queue.Queue):
        while True:
            if q.empty():
                break
            else:
                scf_name = q.get()
                configue_trigger(scf_name, self.NameSpace, self.timer, enable='OPEN',
                                 TriggerName=self.triggername)  # 开启触发器
                # configue_trigger(scf_name, self.NameSpace, self.timer)  # 关闭触发器
                self.trigger.update(1)

    def scf_info(self, scf_name):
        res = get_scf_info(self.NameSpace, scf_name)
        try:
            json_data = json.loads(res)
            for i in json_data['Functions']:
                if i['FunctionName'] == scf_name and i['Status'] == 'Active':
                    return True
            return False
        except:
            print(res)
            return False

    def scf_exist(self, scf_name):
        json_data = json.loads(get_scf_info(self.NameSpace, scf_name))
        if json_data['Functions']:
            return True
        else:
            return False

    def modify_func_async(self, scf_name_list: list):
        for i in scf_name_list:
            modify_scf_async_settings(scf_name=i, Namespace=self.NameSpace)

使用方式

在上述的SCF类中实现对同一地域五个命名空间共250个函数同时操作的功能,但因为实际需要一个用于复制,所以是对249个函数进行操作,限于API的请求限制这里的多线程可以再优化一下数量。

接下来就可以初始化SCF传入必要的命名空间即可调用里面的方法啦。

代码语言:python代码运行次数:0复制
tst = SCF(NameSpace='default', timer='1 * * * * * *', start=2, end=50, memsize=64, triggername='pyt0')

如果需要操作其他命名空间可以直接用SCF_1作为命名空间这样子就会自动生成SCF_1_01、SCF_1_02……,注意SCF_0是代表默认的default命名空间

在这里用SCF_1来命名命名空间,则可以写为

代码语言:python代码运行次数:0复制
tst = SCF(NameSpace=' SCF_1', timer='1 * * * * * *', start=2, end=50, memsize=64, triggername='pyt1')

如果想根命名空间为default的SCF_0_01作为源函数来复制函数则可以写为:

代码语言:python代码运行次数:0复制
tst.fast_copy_scf_function(tst.get_scf_name_list(), 'SCF_0_01', 'default')

操作完事可以再检查一下函数状态是否准备就绪:

代码语言:python代码运行次数:0复制
def fast_get_scf_info(q: queue.Queue):
    while True:
        if q.empty():
            break
        else:
            info = q.get()
            json_data = json.loads(get_scf_info(info[0], info[1]))
            if not json_data['Functions']:
                print('函数异常:', info[0], info[1])

def check_status(ls: list):
    q = queue.Queue() 
	# 只需要将scf命名空间和scf名称放入即可
	for namespace, scf_name in ls:    
		q.put([namespace, scf_name])
	threads = []
	for th in range(1, 20):
	    threads.append(
	        threading.Thread(target=fast_get_scf_info, args=(q,))
	    )
	a = time.time()
	print('{:*^20}'.format('开始检查'))
	for thread in threads:
	    thread.start()
	for thread in threads:
	    thread.join()
	print('{:*^20}'.format('检查完成'))
	print('耗时:{:.3f}s'.format(time.time() - a))

总结

文章到这里就结束了,这个类中所涉及的参数并不是全部比如除timer以外的触发器都没有提及,配置的相关参数也没有全部放入,如果您要使用需要根据自身使用情况来进一步修改。最后感谢大家的支持,祝愿腾讯云越来越好呀!

0 人点赞