前言
通常,自动化测试用例在执行完成后,都会发送一个结果通知,以提醒测试人员或测试leader测试用例的执行结果。如有测试失败的情况,测试人员再去查看具体的测试报告,检查是哪个场景没有测试通过。当前较为流行的提醒方式有:
- 邮件
- 企业微信、钉钉等push消息
由于我们公司所使用的办公软件是企业微信,因此,在实现测试结果通知提醒的功能时,选用的是企业微信。当前较为流行的实现方式有两种形式:
- 企业微信应用通知:需要在企业微信中创建一个应用,再获取Secret
- 普通群消息推送:需要在群中添加一个群机器人(会自动生成webhook_url,以供后续接口调用)
由于方式一需要在企业微信中创建应用(需要管理员操作权限),总体实现起来较为繁琐,因此我选用的是第二种群机器人的实现方式。
一、实现原理及实现效果
1.外部链路流程
2.内部调用原理及过程
1)各模块&方法功能:
- RedisHandler基类:用于初始化redis连接、查询数据、写入数据
- CaseCount基类:用于初始化用例统计、获取成功&失败&跳过&报错的用例数以及计算用例通过率
- EnterpriseWechatNotification基类:用于定义发送企业微信消息的内容模板、定义调用hook_url(群机器人)发送消息方法
- hook方法pytest_runtest_makereport:用于获取pytest执行后的测试结果、将结果写入缓存、生成控制台测试报告
- hook方法pytest_terminal_summary:用于获取执行结果、调用发送消息方法发送微信消息
2)具体调用原理、流程:
① 前提:
- 已添加企业微信群机器人,并记住hook地址;
- python pytest已编写测试用例;
② pytest运行测试用例,RedisHandler连接redis ,pytest_runtest_makereport获取用例执行结果,并:
- 调用RedisHandler中的写入缓存方法,将结果写入缓存;
- 调用CaseCount中的计算用例通过率方法获取用例通过率;
- 将获取到的各条测试结果分输出到控制台进行展示:↓(Windows本地运行效果)
③ pytest_terminal_summary方法:
- 分别调用CaseCount中的获取通过、失败、跳过、报错的用例条数的方法(此方法调用RedisHandler中的get_key方法),获取到各个(通过、失败、跳过、报错)执行结果统计;
- 调用CaseCount中的计算用例通过率方法获取用例通过率;
- 调用EnterpriseWechatNotification中的发送企业微信消息方法,将获取到的各个(通过、失败、跳过、报错)执行结果的数量统计与EnterpriseWechatNotification中预定义的模板进行拼接,发送到企业微信;
- 将获取到的各个(通过、失败、跳过、报错)执行结果与用例通过率一起,输出到控制台展示:↓(Windows本地运行效果)
二、编码实现
1.各个基类
- RedisHandler基类:用于初始化redis连接、查询数据、写入数据
import redis
class RedisHandler:
def __init__(self, host, port=6379, db=0):
# 生成客户端连接,StrictRedis()默认使用连接池,不必再单独使用ConnectPool
self.client = redis.StrictRedis(host=host, port=port, db=db)
def set_string(self, name: str, value, ex=None, px=None, nx=False, xx=False) -> None:
"""
缓存中写入str(单个)
:param name: 缓存名称
:param value: 缓存值
:param ex: 过期时间(秒)
:param px: 过期时间(毫秒)
:param nx: 如果设置为True,则只有name不存在时,当前set操作才执行(新增)
:param xx: 如果设置为True,则只有name不存在时,当前set操作才执行(修改)
:return:
"""
self.client.set(name, value=value, ex=ex, px=px, nx=nx, xx=xx)
def incr(self, key):
"""
使用incr方法,处理并发问题
当key不存在时,会先初始为0,每次调用,则会 1
:param key:
:return:
"""
self.client.incr(key)
def get_key(self, name):
"""读取缓存"""
result = self.client.get(name)
return result
- CaseCount基类:用于初始化用例统计、获取成功&失败&跳过&报错的用例数以及计算用例通过率
from api_test.common.redis_handler import RedisHandler
from api_test.config.db_config import DBConfig
class CaseCountName:
ERROR: str = "error_count"
FAILED: str = "failed_count"
PASSED: str = "passed_count"
SKIP: str = "skip_count"
TOTAL: str = "total_count"
class CaseCount:
"""
redis 缓存统计用例执行情况
"""
def __init__(self):
self.redis = RedisHandler(host=DBConfig.redis_config.get('host')) # redis主机地址可以写死在这里,也可以从配置类中获取
def init_process(self):
"""
初始化进度、总数、成功数、失败数
"""
self.redis.set_string(CaseCountName.TOTAL, 0)
self.redis.set_string(CaseCountName.SKIP, 0)
self.redis.set_string(CaseCountName.PASSED, 0)
self.redis.set_string(CaseCountName.FAILED, 0)
self.redis.set_string(CaseCountName.ERROR, 0)
def failed_count(self):
"""获取失败用例数"""
return int(self.redis.get_key(CaseCountName.FAILED))
def passed_count(self):
"""获取通过用例总数"""
return int(self.redis.get_key(CaseCountName.PASSED))
def skip_count(self):
"""获取跳过用例数"""
return int(self.redis.get_key(CaseCountName.SKIP))
def error_count(self):
"""报错用例数"""
return int(self.redis.get_key(CaseCountName.ERROR))
def total_count(self):
"""用例总数"""
return int(self.redis.get_key(CaseCountName.TOTAL))
def pass_rate(self):
"""计算用例成功率"""
try:
rate = round((self.passed_count() self.skip_count()) / self.total_count() * 100, 2)
return rate
except ZeroDivisionError:
raise Exception("执行失败,未检测到用例执行数量")
- EnterpriseWechatNotification基类:用于定义发送企业微信消息的内容模板、定义调用hook_url(群机器人)发送消息方法
import os
import json
import requests
import platform
def get_env_from_jenkins(name):
"""从Jenkins中获取全局环境变量"""
return os.getenv(name) and os.getenv(name).strip()
ProjectName = get_env_from_jenkins("JOB_NAME") # Jenkins构建项目名称
BUILD_URL = get_env_from_jenkins("BUILD_URL") # Jenkins构建项目URL
BUILD_NUMBER = get_env_from_jenkins("BUILD_NUMBER") # Jenkins构建编号
class EnterpriseWechatNotification:
def __init__(self, hook: list):
# 企业微信群机器人的hook地址,一个机器人就一个,多个就定义多个,可以写死,也可以写在配置类中
self.hook_url_list = [f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={i}" for i in hook]
# allure生成报告的地址,Jenkins执行时会用到,Windows暂未配置allure地址
self.allure_url = f"http://192.168.1.122:8088/jenkins/job/{ProjectName}/{BUILD_NUMBER}/allure/"
self.header = {'Content-Type': 'application/json'}
def send_msg(self, result=''):
"""发送企业微信消息通知"""
global payload
linux_content = f"""** 【{ProjectName}】**
> 项目名称:{ProjectName}
> 构件编号:#{BUILD_NUMBER}
> 测试环境:{platform.system()}
> [报告链接]({self.allure_url})
> [控制台链接]({BUILD_URL})
{result}"""
windows_content = f"""** 【auto_test_project】**
> 测试环境:{platform.system()}
{result}"""
if platform.system() == "Linux":
payload = {
"msgtype": "markdown",
"markdown": {
"content": linux_content
}
}
elif platform.system() == "Windows":
payload = {
"msgtype": "markdown",
"markdown": {
"content": windows_content
}
}
for hook_url in self.hook_url_list:
requests.post(url=hook_url, headers=self.header, data=json.dumps(payload))
注意事项: get_env_from_jenkins方法为从Jenkins获取全局变量,查看全局变量的路径为:Jenkins流水线语法-全局变量-env,见下图:
2.pytest的hook方法,定义在conftest.py中
- pytest_runtest_makereport:用于获取pytest执行后的测试结果、将结果写入缓存、生成控制台测试报告
- pytest_terminal_summary:用于获取执行结果、调用发送消息方法发送消息通知到企业微信
import time
import pytest
from api_test.config.config import HookUrlConf
from api_test.common.send_enterprise_wechat import EnterpriseWechatNotification
from api_test.common.redis_handler import RedisHandler
from api_test.config.db_config import DBConfig
from api_test.common.case_count_control import CaseCountName, CaseCount
redis = RedisHandler(host=DBConfig.redis_config.get('host'))
case_count = CaseCount()
case_count.init_process() # 初始化Redis中的用例统计缓存数据
@pytest.hookimpl(hookwrapper=True, tryfirst=True)
def pytest_runtest_makereport(item, call):
"""获取测试结果、生成测试报告"""
print('------------------------------------')
out = yield
report = out.get_result()
if report.when == 'call':
# print(f"测试报告:{report}")
# print(f"步骤:{report.when}")
print(f"用例id:{report.nodeid}")
print(f"用例描述:{str(item.function.__doc__)}")
print(f"运行结果:{report.outcome}")
"""将用例执行结果写入缓存"""
if report.outcome == 'passed':
redis.incr(CaseCountName.PASSED)
if report.outcome == 'failed':
redis.incr(CaseCountName.FAILED)
if report.when == 'setup':
if report.outcome == 'skipped':
redis.incr(CaseCountName.SKIP)
CaseCount().total_run_count()
def pytest_terminal_summary(terminalreporter, exitstatus, config):
"""收集测试结果,从Redis缓存数据中获取"""
total_case = case_count.total_count()
pass_case = case_count.passed_count()
fail_case = case_count.failed_count()
skip_case = case_count.skip_count()
error_case = case_count.error_count()
pass_rate = case_count.pass_rate()
run_time = round((time.time() - terminalreporter._sessionstarttime), 2)
print("******用例执行结果统计******")
print(f"总用例数:{total_case}条")
print(f"通过:{pass_case}条")
print(f"失败:{fail_case}条")
print(f"跳过:{skip_case}条")
print(f"报错:{error_case}条")
print(f"用例通过率:{pass_rate}%")
print(f"用时:{run_time}s")
desc = """
本次执行情况如下:
总用例数为:{}
通过用例数:<font color="info">{}条</font>
失败用例数:<font color="warning">{}条</font>
错误用例数:{}
跳过用例数:{}
通过率为:{} %
用时:{}s
""".format(total_case, pass_case, fail_case, error_case, skip_case, pass_rate, run_time)
EnterpriseWechatNotification(hook=HookUrlConf.HOOK_URL.value).send_msg(desc) # 执行结果发送企业微信
上述两个hook函数中的print都是为了将执行结果打印在控制台
三、运行过程与运行效果
1.运行过程
- Windows本地运行
- Jenkins触发运行
2.企业微信消息通知
- 通过Jenkins触发运行的通知效果:↓
- Windows本地手动触发运行的通知效果:↓
小结
以上就是利用pytest的hook函数:pytest_runtest_makereport、pytest_terminal_summary redis,实现自动收集测试结果并发送消息通知到企业微信的原理及过程:
- 不管是接口自动化测试还是UI自动化测试都可以通过这种方式来实现消息通知;
- 除了在代码中调用pytest hook函数实现消息通知外,Jenkins也可以通过安装插件达到邮件通知、执行Python脚本达到企微消息通知的目的;
- 测试结果的存储不一定要用到redis,也可以写在本地文件等,多一层调用,就多一层处理和可能面临的调试报错,另外redis所在服务器连接出错也会影响用例的正常运行;
- 发送消息的内容样式支持Markdown,发送内容还可以继续优化,比如:通知哪条用例报错等等;