利用pytest hook函数实现自动化测试结果推送企业微信

2022-11-14 14:26:25 浏览数 (1)

前言

通常,自动化测试用例在执行完成后,都会发送一个结果通知,以提醒测试人员或测试leader测试用例的执行结果。如有测试失败的情况,测试人员再去查看具体的测试报告,检查是哪个场景没有测试通过。当前较为流行的提醒方式有:

  • 邮件
  • 企业微信、钉钉等push消息

由于我们公司所使用的办公软件是企业微信,因此,在实现测试结果通知提醒的功能时,选用的是企业微信。当前较为流行的实现方式有两种形式:

  • 企业微信应用通知:需要在企业微信中创建一个应用,再获取Secret
  • 普通群消息推送:需要在群中添加一个群机器人(会自动生成webhook_url,以供后续接口调用)

由于方式一需要在企业微信中创建应用(需要管理员操作权限),总体实现起来较为繁琐,因此我选用的是第二种群机器人的实现方式。

一、实现原理及实现效果

1.外部链路流程

2.内部调用原理及过程

1)各模块&方法功能:

  • RedisHandler基类:用于初始化redis连接、查询数据、写入数据
  • CaseCount基类:用于初始化用例统计、获取成功&失败&跳过&报错的用例数以及计算用例通过率
  • EnterpriseWechatNotification基类:用于定义发送企业微信消息的内容模板、定义调用hook_url(群机器人)发送消息方法
  • hook方法pytest_runtest_makereport:用于获取pytest执行后的测试结果、将结果写入缓存、生成控制台测试报告
  • hook方法pytest_terminal_summary:用于获取执行结果、调用发送消息方法发送微信消息

2)具体调用原理、流程:

① 前提:

  • 已添加企业微信群机器人,并记住hook地址;
  • python pytest已编写测试用例;

② pytest运行测试用例,RedisHandler连接redis ,pytest_runtest_makereport获取用例执行结果,并:

  • 调用RedisHandler中的写入缓存方法,将结果写入缓存;
  • 调用CaseCount中的计算用例通过率方法获取用例通过率;
  • 将获取到的各条测试结果分输出到控制台进行展示:↓(Windows本地运行效果)

③ pytest_terminal_summary方法:

  • 分别调用CaseCount中的获取通过、失败、跳过、报错的用例条数的方法(此方法调用RedisHandler中的get_key方法),获取到各个(通过、失败、跳过、报错)执行结果统计;
  • 调用CaseCount中的计算用例通过率方法获取用例通过率;
  • 调用EnterpriseWechatNotification中的发送企业微信消息方法,将获取到的各个(通过、失败、跳过、报错)执行结果的数量统计与EnterpriseWechatNotification中预定义的模板进行拼接,发送到企业微信;
  • 将获取到的各个(通过、失败、跳过、报错)执行结果与用例通过率一起,输出到控制台展示:↓(Windows本地运行效果)

二、编码实现

1.各个基类

  • RedisHandler基类:用于初始化redis连接、查询数据、写入数据
代码语言:javascript复制
import redis


class RedisHandler:
    def __init__(self, host, port=6379, db=0):
        # 生成客户端连接,StrictRedis()默认使用连接池,不必再单独使用ConnectPool
        self.client = redis.StrictRedis(host=host, port=port, db=db)

    def set_string(self, name: str, value, ex=None, px=None, nx=False, xx=False) -> None:
        """
        缓存中写入str(单个)
        :param name: 缓存名称
        :param value: 缓存值
        :param ex: 过期时间(秒)
        :param px: 过期时间(毫秒)
        :param nx: 如果设置为True,则只有name不存在时,当前set操作才执行(新增)
        :param xx: 如果设置为True,则只有name不存在时,当前set操作才执行(修改)
        :return:
        """
        self.client.set(name, value=value, ex=ex, px=px, nx=nx, xx=xx)

    def incr(self, key):
        """
        使用incr方法,处理并发问题
        当key不存在时,会先初始为0,每次调用,则会 1
        :param key:
        :return:
        """
        self.client.incr(key)

    def get_key(self, name):
        """读取缓存"""
        result = self.client.get(name)
        return result
  • CaseCount基类:用于初始化用例统计、获取成功&失败&跳过&报错的用例数以及计算用例通过率
代码语言:javascript复制
from api_test.common.redis_handler import RedisHandler
from api_test.config.db_config import DBConfig


class CaseCountName:
    ERROR: str = "error_count"
    FAILED: str = "failed_count"
    PASSED: str = "passed_count"
    SKIP: str = "skip_count"
    TOTAL: str = "total_count"


class CaseCount:
    """
    redis 缓存统计用例执行情况
    """

    def __init__(self):
        self.redis = RedisHandler(host=DBConfig.redis_config.get('host'))  # redis主机地址可以写死在这里,也可以从配置类中获取

    def init_process(self):
        """
        初始化进度、总数、成功数、失败数
        """
        self.redis.set_string(CaseCountName.TOTAL, 0)
        self.redis.set_string(CaseCountName.SKIP, 0)
        self.redis.set_string(CaseCountName.PASSED, 0)
        self.redis.set_string(CaseCountName.FAILED, 0)
        self.redis.set_string(CaseCountName.ERROR, 0)

    def failed_count(self):
        """获取失败用例数"""
        return int(self.redis.get_key(CaseCountName.FAILED))

    def passed_count(self):
        """获取通过用例总数"""
        return int(self.redis.get_key(CaseCountName.PASSED))

    def skip_count(self):
        """获取跳过用例数"""
        return int(self.redis.get_key(CaseCountName.SKIP))

    def error_count(self):
        """报错用例数"""
        return int(self.redis.get_key(CaseCountName.ERROR))

    def total_count(self):
        """用例总数"""
        return int(self.redis.get_key(CaseCountName.TOTAL))

    def pass_rate(self):
        """计算用例成功率"""
        try:
            rate = round((self.passed_count()   self.skip_count()) / self.total_count() * 100, 2)
            return rate
        except ZeroDivisionError:
            raise Exception("执行失败,未检测到用例执行数量")
  • EnterpriseWechatNotification基类:用于定义发送企业微信消息的内容模板、定义调用hook_url(群机器人)发送消息方法
代码语言:javascript复制
import os
import json
import requests
import platform


def get_env_from_jenkins(name):
    """从Jenkins中获取全局环境变量"""
    return os.getenv(name) and os.getenv(name).strip()


ProjectName = get_env_from_jenkins("JOB_NAME")  # Jenkins构建项目名称
BUILD_URL = get_env_from_jenkins("BUILD_URL")  # Jenkins构建项目URL
BUILD_NUMBER = get_env_from_jenkins("BUILD_NUMBER")  # Jenkins构建编号


class EnterpriseWechatNotification:
    def __init__(self, hook: list):
        # 企业微信群机器人的hook地址,一个机器人就一个,多个就定义多个,可以写死,也可以写在配置类中
        self.hook_url_list = [f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={i}" for i in hook]
        # allure生成报告的地址,Jenkins执行时会用到,Windows暂未配置allure地址
        self.allure_url = f"http://192.168.1.122:8088/jenkins/job/{ProjectName}/{BUILD_NUMBER}/allure/"
        self.header = {'Content-Type': 'application/json'}

    def send_msg(self, result=''):
        """发送企业微信消息通知"""
        global payload
        linux_content = f"""** 【{ProjectName}】**
> 项目名称:{ProjectName}
> 构件编号:#{BUILD_NUMBER}
> 测试环境:{platform.system()}
> [报告链接]({self.allure_url})
> [控制台链接]({BUILD_URL})
{result}"""

        windows_content = f"""** 【auto_test_project】**
> 测试环境:{platform.system()}
{result}"""
        if platform.system() == "Linux":
            payload = {
                "msgtype": "markdown",
                "markdown": {
                    "content": linux_content
                }
            }
        elif platform.system() == "Windows":
            payload = {
                "msgtype": "markdown",
                "markdown": {
                    "content": windows_content
                }
            }
        for hook_url in self.hook_url_list:
            requests.post(url=hook_url, headers=self.header, data=json.dumps(payload))

注意事项: get_env_from_jenkins方法为从Jenkins获取全局变量,查看全局变量的路径为:Jenkins流水线语法-全局变量-env,见下图:

2.pytest的hook方法,定义在conftest.py中

  • pytest_runtest_makereport:用于获取pytest执行后的测试结果、将结果写入缓存、生成控制台测试报告
  • pytest_terminal_summary:用于获取执行结果、调用发送消息方法发送消息通知到企业微信
代码语言:javascript复制
import time
import pytest
from api_test.config.config import HookUrlConf
from api_test.common.send_enterprise_wechat import EnterpriseWechatNotification
from api_test.common.redis_handler import RedisHandler
from api_test.config.db_config import DBConfig
from api_test.common.case_count_control import CaseCountName, CaseCount

redis = RedisHandler(host=DBConfig.redis_config.get('host'))
case_count = CaseCount()
case_count.init_process()  # 初始化Redis中的用例统计缓存数据


@pytest.hookimpl(hookwrapper=True, tryfirst=True)
def pytest_runtest_makereport(item, call):
    """获取测试结果、生成测试报告"""
    print('------------------------------------')
    out = yield
    report = out.get_result()
    if report.when == 'call':
        # print(f"测试报告:{report}")
        # print(f"步骤:{report.when}")
        print(f"用例id:{report.nodeid}")
        print(f"用例描述:{str(item.function.__doc__)}")
        print(f"运行结果:{report.outcome}")
        """将用例执行结果写入缓存"""
        if report.outcome == 'passed':
            redis.incr(CaseCountName.PASSED)
        if report.outcome == 'failed':
            redis.incr(CaseCountName.FAILED)

    if report.when == 'setup':
        if report.outcome == 'skipped':
            redis.incr(CaseCountName.SKIP)

    CaseCount().total_run_count()


def pytest_terminal_summary(terminalreporter, exitstatus, config):
    """收集测试结果,从Redis缓存数据中获取"""
    total_case = case_count.total_count()
    pass_case = case_count.passed_count()
    fail_case = case_count.failed_count()
    skip_case = case_count.skip_count()
    error_case = case_count.error_count()
    pass_rate = case_count.pass_rate()
    run_time = round((time.time() - terminalreporter._sessionstarttime), 2)
    print("******用例执行结果统计******")
    print(f"总用例数:{total_case}条")
    print(f"通过:{pass_case}条")
    print(f"失败:{fail_case}条")
    print(f"跳过:{skip_case}条")
    print(f"报错:{error_case}条")
    print(f"用例通过率:{pass_rate}%")
    print(f"用时:{run_time}s")
    desc = """
        本次执行情况如下:
        总用例数为:{}
        通过用例数:<font color="info">{}条</font>
        失败用例数:<font color="warning">{}条</font>
        错误用例数:{}
        跳过用例数:{}
        通过率为:{} %
        用时:{}s
        """.format(total_case, pass_case, fail_case, error_case, skip_case, pass_rate, run_time)
    EnterpriseWechatNotification(hook=HookUrlConf.HOOK_URL.value).send_msg(desc)  # 执行结果发送企业微信

上述两个hook函数中的print都是为了将执行结果打印在控制台

三、运行过程与运行效果

1.运行过程

  • Windows本地运行
  • Jenkins触发运行

2.企业微信消息通知

  • 通过Jenkins触发运行的通知效果:↓
  • Windows本地手动触发运行的通知效果:↓

小结

以上就是利用pytest的hook函数:pytest_runtest_makereport、pytest_terminal_summary‍ redis,实现自动收集测试结果并发送消息通知到企业微信的原理及过程:

  • 不管是接口自动化测试还是UI自动化测试都可以通过这种方式来实现消息通知;
  • 除了在代码中调用pytest hook函数实现消息通知外,Jenkins也可以通过安装插件达到邮件通知、执行Python脚本达到企微消息通知的目的;
  • 测试结果的存储不一定要用到redis,也可以写在本地文件等,多一层调用,就多一层处理和可能面临的调试报错,另外redis所在服务器连接出错也会影响用例的正常运行;
  • 发送消息的内容样式支持Markdown,发送内容还可以继续优化,比如:通知哪条用例报错等等;

0 人点赞