locust:Python 分布式压力测试(带WebUI)

2023-09-10 08:30:30 浏览数 (2)

Locust 介绍

它采用纯 Python 实现,是一个分布式用户负载测试的工具。 使用基于 Requests 库的客户端发起请求,使编写脚本大大简化; 在模拟并发方面摒弃进程和线程,完全基于时间驱动,采用协程(gevent)提供的非阻塞 IO 和 coroutine 来实现网络层的并发请求。因此单台压力机也能产生数千并发请求数。

Locust官网

特点:

  1. 基于 python 的 Requests 和 Gevent , 脚本编写简单易读
  2. c/s 架构有一个简洁的用户界面,实时显示相关测试结果
  3. 支持分布式测试, 能跨平台易于扩展

安装及使用

安装

支持Python版本: 3.6, 3.7, 3.8

代码语言:javascript复制
pip install -U locust==2.5.1
#pip install -U locust==2.5.1 -i https://pypi.tuna.tsinghua.edu.cn/simple

快速使用

  • locust 中通过 HttpUser 来表示一个用户,我们需要自定类继承自 HttpUser
  • HttpUser 的 client 属性可以模拟客户端请求,支持 get,post 等待方法,并且自动保持 session。(实际上他就是 requests 包的 session 对象)
    • client 的 get,post 等方法传递 url 地址时,不需要携带 http://www.example.com 这个前缀,只需要提供路径就行
  • wait_time 下一个任务执行之前等待的时间,用于模拟用户的思考时间,
    • wait_time = between(1, 5) 间隔是1-5秒随机值
    • wait_time = constant(1) 间隔固定1秒
  • host 表示服务器的地址,例如: http://www.baidu.com
  • 通过 @task 装饰一个方法,这个方法表示一个用户可能执行的动作,一个用户会有多种动作;
    • 一个用户下一步具体要执行什么动作,是随机的,也就是 locust 会从动作列表中随机选择一个
    • @task 接受一个参数,表示权重,数值越大,这个动作被选中的概率就越高
  • on_start 测试之前执行的操作
  • on_stop 测试结束执行的操作
代码语言:javascript复制
from locust import HttpUser, task, between

# 相当于模拟一个用户
class WebUser(HttpUser):
    # 服务器的地址
    host = 'http://localhost:8000'

    # 下一个任务执行之前等待的时间,用于模式用户的思考时间
    wait_time = between(3, 5)  # 这里使用随机 3,5 秒钟

    # 定义一个测试任务,类似于一个取样器
    @task(1)
    def info(self):
        # 获取用户信息
        self.client.get('/info/')

    @task(10)
    def addresses(self):
        # 获取用户地址
        self.client.get('/addresses/')

    def on_start(self):
        # 测试之前执行的操作
        print('用户登录')
        auth = {'username': 'root', 'password': 'root1234'}
        self.client.post('/login/', json=auth)

    def on_stop(self):
        # 测试结束执行的操作
        print('用户退出')
        self.client.delete('/logout/')

启动locust

  • 在当前目录下, 可以直接运行locust 他会自动找到名为 locustfile.py 的文件
  • 为了子目录下或者不同级别目录下, 可以使用-f 命令启动 locust -f xxx/locustfile.py
代码语言:javascript复制
# 直接运行
locust

# 指定文件
locust -f xxx/locustfile.py

然后打开浏览器, 直接在浏览器输入http://localhost:8089打开UI界面, 界面如下:

locust 无界面测试

locust -f locustfile.py --headless -u 100 -r 10 -t 10s

  • --headless 表示不启动界面
  • -u [数量] 设置虚拟用户数
  • -r [时间] 每秒钟启动虚拟用户数
  • -t [时间] 设置测试多长时间 ----秒,数字后面加上 s ,例如 10s ----分钟,数字后面加上 m,例如 1m ----小时,数字后面加上 h,例如 1h

基本示例

代码语言:javascript复制
from locust import User, events, task, between
import random


def success_call(name, recvText, total_time):
    events.request_success.fire(
        request_type="[Success]",
        name=name,
        response_time=total_time,
        response_length=len(recvText)
    )


def fail_call(name, total_time):
    events.request_failure.fire(
        request_type="[Fail]",
        name=name,
        response_time=total_time,
        response_length=0,
        exception=Exception(),
    )


class MyUser(User):
    wait_time = between(1, 5)

    @task(1)
    def job1(self):
        print('This is job 1')
        self.sendMsg("job1", 'This is job 2')

    @task(2)
    def job2(self):
        print('This is job 2')
        self.sendMsg("job2", 'This is job 2')

    def sendMsg(self, methodname, msg):
        num = random.randint(0, 2)
        time = random.randint(10, 50)
        if (num == 0):
            fail_call(methodname, time)
        else:
            success_call(methodname, msg, time);

执行

代码语言:javascript复制
locust -f test4.py --autostart --autoquit 0 -u 1 -r 3 --run-time 10s

其中

  • request_type请求方式,这里为了方便理解,直接使用的[Success][Fail],来区分成功的请求和失败的请求,只有成功或失败的回调被调用后,locust才会对其统计。

注意:

所有的task不能是异步方法。

结果:

locust 分布式

运行多个进程的Locust, 使用 --master 命令启动主进程, 使用 --worker 启动从属进程

主进程控制从进程并收集从进程的测试结果;从进程负责执行测试,并反馈结果给主进程。

启动无界面主进程

代码语言:javascript复制
locust -f locustfile.py --master --headless --master-bind-host=127.0.0.1 --master-bind-port=8090 -t 15s -u 5 -r 1
  • --master 指明当前为主进程
  • --master-bind-host=[ip 地址] 指明主进程绑定的地址
  • --master-bind-port=[端口号] 指明主进程的绑定的端口号

启动后,如果当前没有从进程的话会显示等待从进程。当从进程启动后便可以执行脚本了。

启动无界面的从进程:

代码语言:javascript复制
locust -f locustfile.py --worker --master-host=127.0.0.1 --master-port=8090 --headless
  • --worker 指明当前为从进程
  • --master-host=[ip 地址] 指明需要连接的主进程的 ip 地址
  • --master-port=[端口号] 指明需要连接的主进程的端口

从进程启动后将开始执行测试,主进程将会统计结果。

动作集合 TaskSet

对于 web 网站来说,通常情况下,网站的一个页面中用户能够执行多个动作,并且这些动作通常是为了某个特定的目标,比如注册页面,用户可以执行的动作: 验证用户名,手机号码是否重复,更新图形验证码,获取手机验证码等。这些动作的目标就是为了完成用户注册。我们可以把这些动作组合在一起。

在 locust 中我们通过 TaskSet 类来组合多个动作,然后在 HttpUser 对象中引入 taskset ,引入方式是在 tasks 属性列表中,添加 taskset 类。

taskset 也是可以嵌套的,我们在 taskset 的 tasks 属性中添加要嵌套的其他 taskset 类。类似于: 网站首页包含用户登录页面,用户登录页面包含用户注册页面这种关系

  • 注意: taskset 相当于一个特殊的动作,如果 locust 进入到 taskset 中执行其中的动作,那么 locust 会一直执行 taskset 中的动作,那么 WebUser 中定义的动作就不会被执行。 需要在 taskset 中调用 self.ingterrupt() 来退出,一般可以定义一个动作,在这个动作中调用 self.interrupt()
代码语言:javascript复制
import uuid

from locust import HttpUser, task, between, TaskSet

# 注册
class RegistTaskSet(TaskSet):
    # 用户名是否重复
    @task
    def username_count(self):
        username = 'zhangsan'
        self.client.get(f'/usernames/{username}/count/')

    # 手机号是否重复
    @task
    def mobile_count(self):
        mobile = '13712345678'
        self.client.get(f'/mobiles/{mobile}/count/')

    # 获取图形验证码
    @task
    def image_codes(self):
        image_code_id = str(uuid.uuid4())
        self.client.get(f'/image_codes/{image_code_id}/')

    @task
    def stop(self):
        print('退出当前动作集合')
        self.interrupt()

# 登录
class UserLoginTaskSet(TaskSet):
    # 嵌套用户注册
    tasks = [RegistTaskSet]

    @task
    def username_login(self):
        auth = {'username': 'root', 'password': 'root1234'}
        self.client.post('/login/', json=auth)

    @task
    def stop(self):
        print('退出当前动作集合')
        self.interrupt()

# 模拟用户
class WebUser(HttpUser):
    # 包含用户登录
    tasks = [UserLoginTaskSet]
    # 服务器的地址
    host = 'http://localhost:8000'

    # 下一个任务执行之前等待的时间,用于模式用户的思考时间
    wait_time = between(3, 5)  # 这里使用随机 3,5 秒钟

    @task
    def search(self):
        # 搜索
        self.client.get('/search/')

有序动作集合 SequentialTaskSet

我们在 HttpUser 以及 TaskSet 下定义动作后,locust 是从这些动作中随机选择一个动作来执行,有时候我们希望用户执行的动作是有序的,比如:

1.验证用户名是否重复 2.验证手机号码是否重复 3.验证获取图形验证码 4.获取手机验证码 5.注册 这时候就需要通过 SequentialTaskSet 来定义动作集合,他会按照定义 task 定义的顺序来一次调用。

  • 使用的方式和 TaskSet 一样,但是 task 装饰器的权重无效了
代码语言:javascript复制
from locust import HttpUser, task, between, SequentialTaskSet

class RegistSeqTaskSet(SequentialTaskSet):
    # 用户名是否重复
    @task
    def username_count(self):
        print('验证用户名')
        username = 'zhangsan'
        self.client.get(f'/usernames/{username}/count/')

    # 手机号是否重复
    @task
    def mobile_count(self):
        print('验证手机号')
        mobile = '13712345678'
        self.client.get(f'/mobiles/{mobile}/count/')

    # 获取图形验证码
    @task
    def image_codes(self):
        print('获取图形验证码')
        image_code_id = str(uuid.uuid4())
        self.client.get(f'/image_codes/{image_code_id}/')

# 模拟用户
class WebUser(HttpUser):
    # 包含用户登录
    tasks = [RegistSeqTaskSet]
    # 服务器的地址
    host = 'http://localhost:8000'

    # 下一个任务执行之前等待的时间,用于模式用户的思考时间
    wait_time = between(3, 5)  # 这里使用随机 3,5 秒钟

断言

我们需要判断一个请求是失败还是成功,需要给请求参数携带 catch_response=True ,这样请求方法就会返回一个上下文管理器

这个上下文管理器返回的是响应对象,我们通过调用响应对象的 failure(消息) 来标记本次请求失败,

通过 catch_response=True 参数来进行断言。

代码语言:javascript复制
from locust import HttpUser, task, between

class WebUser(HttpUser):
    wait_time = between(1, 5)
    host = 'http://localhost:8000'

    @task
    def info(self):
        with self.client.get('/info/', catch_response=True) as resp:
            res = resp.json()
            if res['code'] != 0:
                resp.failure(res['errmsg'])

WebSocket

压测WS前我们要先看看怎么连接WS

连接WS的库有的是支持异步IO的,项目中我们推荐这样的库,但是压测时还是要选择同步的库

异步

安装

代码语言:javascript复制
pip install websockets

代码示例

代码语言:javascript复制
import asyncio
import websockets
import json
import random

async def mytest():
    async with websockets.connect('wss://sockettest.xhkjedu.com/ws') as websocket:
        num = random.randint(0, 10000000)
        msg = {
            "b": {"num": num},
            "c": 123456,
        }

        msgstr = json.dumps(msg)

        await websocket.send(msgstr)
        print(f"↑: {msgstr}")

        greeting = await websocket.recv()
        print(f"↓: {greeting}")

asyncio.get_event_loop().run_until_complete(mytest())

同步

官网地址

https://pypi.org/project/websocket-client/

安装

代码语言:javascript复制
pip install websocket-client

示例

代码语言:javascript复制
from websocket import create_connection
import json
import random

ws = create_connection("wss://sockettest.xhkjedu.com/ws")

num = random.randint(0, 10000000)
msg = {
    "b": {"num": num},
    "c": 123456,
}

msgstr = json.dumps(msg)
print("Sending "   msgstr)
ws.send(msgstr)

result = ws.recv()
print("Received '%s'" % result)
ws.close()
代码语言:javascript复制
import websocket

def on_message(ws, message):
    print(ws)
    print(message)

def on_error(ws, error):
    print(ws)
    print(error)

def on_close(ws):
    print(ws)
    print("### closed ###")

websocket.enableTrace(True)
ws = websocket.WebSocketApp("ws://127.0.0.1:8888/track",
                            on_message=on_message,
                            on_error=on_error,
                            on_close=on_close)

ws.run_forever()

Websocket压测

Jmeter要测试websocket接口,需要先下载安装一个websocket samplers by peter doornbosch的插件

而locust因为是代码实现,所以可以进行任何的测试,引用相应的库即可。

代码语言:javascript复制
from locust import User, task, events
import time
from websocket import create_connection
import json
import random

def success_call(name, recvText, total_time):
    events.request_success.fire(
        request_type="[Success]",
        name=name,
        response_time=total_time,
        response_length=len(recvText)
    )

def fail_call(name, total_time, e):
    events.request_failure.fire(
        request_type="[Fail]",
        name=name,
        response_time=total_time,
        response_length=0,
        exception=e,
    )

class WebSocketClient(object):
    def __init__(self, host):
        self.host = host
        self.ws = None

    def connect(self, burl):
        self.ws = create_connection(burl)

    def recv(self):
        return self.ws.recv()

    def send(self, msg):
        self.ws.send(msg)

class WebsocketUser(User):
    abstract = True

    def __init__(self, *args, **kwargs):
        super(WebsocketUser, self).__init__(*args, **kwargs)
        self.client = WebSocketClient(self.host)
        self.client._locust_environment = self.environment

class ApiUser(WebsocketUser):
    host = "wss://sockettest.xhkjedu.com/"

    @task(1)
    def pft(self):
        # wss 地址
        self.url = 'wss://sockettest.xhkjedu.com/ws'
        print("连接前")
        start_time = time.time()
        try:
            self.client.connect(self.url)
            print("连接后")
            # 发送的订阅请求
            num = random.randint(0, 10000000)
            msg = {
                "b": {"num": num},
                "c": 123456,
            }

            msgstr = json.dumps(msg)

            self.client.send(msgstr)
            print(f"↑: {msgstr}")

            greeting = self.client.recv()
            print(f"↓: {greeting}")

        except Exception as e:
            total_time = int((time.time() - start_time) * 1000)
            fail_call("Send", total_time, e)
        else:
            total_time = int((time.time() - start_time) * 1000)
            success_call("Send", "success", total_time)

测试

代码语言:javascript复制
locust -f main.py -u 5000 -r 300

网页上访问

http://localhost:8089/

或者不用图形化界面

代码语言:javascript复制
locust -f main.py --autostart --autoquit 0 -u 1 -r 3 --run-time 10s

其中

  • --autostart 自动开始 不使用WebUI
  • --autoquit 0autostart搭配使用,测试完成后多长时间退出,后面的数字单位是秒,如果不设置只能CTRL C才能退出
  • -u 1 最大用户数
  • -r 3 每秒创建的用户数,创建用户数和最大用户数一样后就不再创建
  • --run-time 10s 压测的执行时间

0 人点赞