在Python之并发请求(上)中详细了介绍了使用多线程的方式来编写一个测试服务端程序的高并发请求的性能测试工具。在这个测试的工具中,依据高并发的请求之后,我们得到很全面的响应时间,吞吐量,错误率,以及其他的相关信息。
在性能的测试中,更多的是CPU密集型和IO密集型的,基本很多服务端的程序都是基于IO密集型的,那么这样使用多线程的方式它的效率会更高。我们对前面的代码进行改造,既然我们已经可以很轻松的拿到了我们性能测试的结果信息数据,那么我们是否基于结合Flask-Restful的框架,把它提供成一个API,这样别人直接调用我们的API就可以测试被测试的服务端的程序了,这样的方式更加高效和简单。当然,这只是一种思想,在服务端的测试中,我们需要更多考虑的是服务在高并发下以及连续请求的前提下,是否会出现OOM以及SockedTimeOut以及TimeOut等其他的程序信息,如MQ的消息积压,服务的崩溃以及其他的异常情况。本文章这里就不详细的讨论服务的监控,关于saas化下服务的监控可以参考我的文章saas化架构中服务的监控实现思路和具体的代码。
下面我们对具体的被测试的API进行封装,这里就以测试淘宝首页为案例,我们的目的是我们在PostMan的测试工具中,我只需要输入并发数以及被测试的地址(这里是淘宝),点击发送请求后,就可以得到响应时间等其他的性能测试数据,具体源码如下:
代码语言:javascript复制#!/usr/bin/env python
#!coding:utf-8
from flask import Flask,make_response,jsonify,abort,request
from flask_restful import Api,Resource
from flask import Flask
import requests
import time
import matplotlib.pyplot as plt
from threading import Thread
import datetime
import numpy as np
import json
import re
import hashlib
from urllib import parse
import datetime
app=Flask(__name__)
api=Api(app=app)
def getHeaders(url=None,caller='ucenter-interface-service',secret='6A3012039B5746CEA350B119535C45E0'):
'''
:param pathurl: 请求地址
:param caller: 请求头字段caller
:param secret: 请求头字段secret
:return:对请求地址进行拆分后,然后拼接进行md5的加密,属于请求头
'''
result1 = re.compile(r'/([^/]*)/(vd |d.d)/(. )')
result2 = result1.search(url)
contextPath = result2[1]
version = result2[2]
requestPath = result2[3]
if requestPath.endswith('/'):
requestPath = requestPath[:-1]
tempArray = requestPath.split("/")
requestPath = ""
i = 0
for str in tempArray:
# print(i)
if tempArray[i] != "":
# print(parse.quote(tempArray[i]))
# print(tempArray[i])
requestPath = requestPath "/" parse.quote(tempArray[i])
i = i 1
nowTime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
singStr= secret "callerService" caller "contextPath" contextPath "requestPath" requestPath "timestamp" nowTime "v" version secret
sign=hashlib.md5(singStr.encode(encoding='utf-8')).hexdigest().upper()
dict1={
'X-Caller-Service':'ucenter-interface-service',
'X-Caller-Timestamp':nowTime,
'appSecret':'kOsqkHX2QcB7KVzY03NmqbafIjSnCneG',
'Content-Type':'application/json',
'X-Caller-Sign':sign,}
return dict1
class OlapThread(Thread):
def __init__(self,func,args=()):
'''
:param func: 被测试的函数
:param args: 被测试的函数的返回值
'''
super(OlapThread,self).__init__()
self.func=func
self.args=args
def run(self) -> None:
self.result=self.func(*self.args)
def getResult(self):
try:
return self.result
except BaseException as e:
return e.args[0]
def taoBao(code,seconds,text,requestUrl):
'''
高并发亲求淘宝
:param code:
:param seconds:
:param text:
:param requestUrl: 请求地址
:return:
'''
r=requests.get(url=requestUrl)
print('输出信息昨状态码:{0},响应结果:{1}'.format(r.status_code,r.text))
code=r.status_code
seconds=r.elapsed.total_seconds()
text=r.text
return code,seconds,text
def calculationTime(startTime,endTime):
'''计算两个时间之差,单位是秒'''
return (endTime-startTime).seconds
def getResult(seconds):
'''获取服务端的响应时间信息'''
data={
'Max':sorted(seconds)[-1],
'Min':sorted(seconds)[0],
'Median':np.median(seconds),
'99%Line':np.percentile(seconds,99),
'95%Line':np.percentile(seconds,95),
'90%Line':np.percentile(seconds,90)
}
return data
def show(i,j):
'''
:param i: 请求总数
:param j: 请求响应时间列表
:return:
'''
fig,ax=plt.subplots()
ax.plot(list_count,seconds)
ax.set(xlabel='number of times', ylabel='Request time-consuming',
title='olap continuous request response time (seconds)')
ax.grid()
fig.savefig('olap.png')
plt.show()
def highConcurrent(count,requestUrl):
'''
对服务端发送高并发的请求
:param count: 并发数
:param requestData:请求参数
:param requestUrl: 请求地址
:return:
'''
startTime=datetime.datetime.now()
sum=0
list_count=list()
tasks=list()
results = list()
#失败的信息
fails=[]
#成功任务数
success=[]
codes = list()
seconds = list()
texts=[]
for i in range(0,count):
t=OlapThread(taoBao,args=(i,i,i,requestUrl))
tasks.append(t)
t.start()
print('测试中:{0}'.format(i))
for t in tasks:
t.join()
if t.getResult()[0]!=200:
fails.append(t.getResult())
results.append(t.getResult())
for item in fails:
print('请求失败的信息:n',item[2])
endTime=datetime.datetime.now()
for item in results:
codes.append(item[0])
seconds.append(item[1])
texts.append(item[2])
for i in range(len(codes)):
list_count.append(i)
#生成可视化的趋势图
fig,ax=plt.subplots()
ax.plot(list_count,seconds)
ax.set(xlabel='number of times', ylabel='Request time-consuming',
title='taobao continuous request response time (seconds)')
ax.grid()
fig.savefig('taobao.png')
plt.show()
for i in seconds:
sum =i
rate=sum/len(list_count)
# print('n总共持续时间:n',endTime-startTime)
totalTime=calculationTime(startTime=startTime,endTime=endTime)
if totalTime<1:
totalTime=1
#吞吐量的计算
try:
throughput=int(len(list_count)/totalTime)
except Exception as e:
print(e.args[0])
getResult(seconds=seconds)
errorRate=0
if len(fails)==0:
errorRate=0.00
else:
errorRate=len(fails)/len(tasks)*100
throughput=str(throughput) '/S'
timeData=getResult(seconds=seconds)
# print('总耗时时间:',(endTime-startTime))
timeConsuming=(endTime-startTime)
return timeConsuming,throughput,rate,timeData,errorRate,len(list_count),len(fails)
class Index(Resource):
def get(self):
return {'status':0,'msg':'ok','datas':[]}
def post(self):
if not request.json:
return jsonify({'status':1001,'msg':'请求参数不是JSON的数据,请检查,谢谢!'})
else:
try:
data={
'count':request.json.get('count'),
'requestUrl':request.json.get('requestUrl')
}
timeConsuming,throughput,rate,timeData,errorRate,sum,fails=highConcurrent(
count=data['count'],
requestUrl=data['requestUrl'])
print('执行总耗时:',timeConsuming)
return jsonify({'status':0,'msg': '请求成功','datas':[{
'吞吐量':throughput,
'平均响应时间':rate,
'响应时间信息':timeData,
'错误率':errorRate,
'请求总数':sum,
'失败数':fails
}]}, 200)
except Exception as e:
return e.args[0]
api.add_resource(Index,'/v1/index')
if __name__ == '__main__':
app.run(debug=True,port=5003,host='0.0.0.0')
# print(highConcurrent(count=5))
在PostMan中进行调用,如下图所示:
点击Sending后,在PostMan中显示的结果信息如下:
每次请求响应时间可视化的趋势图如下所示: