前言
我们上一次分享了FastAPI(八十六)实战开发《在线课程学习系统》-- 留言接口测试,这次我们看下查看留言接口测试。
正文
我们看下之前开发的接口的文章FastAPI(七十)实战开发《在线课程学习系统》接口开发-- 查看留言,我们去设计下我们的测试用例。
代码语言:javascript复制1.用户未登陆
2.用户登陆留言不存在
3.用户不能查看这个留言
4.查看留言
那么根据设计的测试用例的场景去设计对应的接口测试的代码
代码语言:javascript复制import unittest
from test.userlogin import student_parame,get_students_token
import requests
class TestViesMessageCase(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.url = 'http://127.0.0.1:8000/user/viewmessage'
cls.token = get_students_token(student_parame)
@classmethod
def tearDownClass(cls) -> None:
cls.url=''
def setUp(self) -> None:
pass
def tearDown(self) -> None:
self.text=''
def testuser_not_login(self):
reponse=requests.get(self.url)
status=reponse.status_code
reslut=reponse.json()
self.assertEqual(status, 200)
self.assertEqual(reslut['code'], 421)
def testuser_login_id_not(self):
headers = {
"token": self.token
}
data={'id':10000}
reponse=requests.get(self.url,params=data,headers=headers)
status=reponse.status_code
reslut=reponse.json()
print(reslut)
self.assertEqual(status, 200)
self.assertEqual(reslut['code'], 100601)
self.assertEqual(reslut['message'], '留言不存在')
def testuser_login_id_per(self):
headers = {
"token": self.token
}
data={'id':7}
reponse=requests.get(self.url,params=data,headers=headers)
status=reponse.status_code
reslut=reponse.json()
self.assertEqual(status, 200)
self.assertEqual(reslut['code'], 100602)
self.assertEqual(reslut['message'], '权限不足')
def testuser_login_id_success(self):
headers = {
"token": self.token
}
data={'id':6}
reponse=requests.get(self.url,params=data,headers=headers)
status=reponse.status_code
reslut=reponse.json()
self.assertEqual(status, 200)
self.assertEqual(reslut['code'], 200)
self.assertEqual(reslut['message'], '成功')
if __name__ == '__main__':
unittest.main()
我们完成了上面的4个case,我们执行测试,测试完毕。接口没有发现新的问题。对于数据的id,我们可以参考之前的留言的id的获取。只是对应的sql不一样。 那么我们看下如何封装这样一个sql操作的
代码语言:javascript复制class opearMysql(object):
def __init__(self,host, port, user, password, database):
self.conne=connect(host=host, port=port, user=user,
password=password, db=database)
def run(self,sqlmy:str):
with self.conne.cursor() as conn:
conn.execute(sqlmy)
result = conn.fetchall()
return result
我们需要对现在的代码进行修改。
代码语言:javascript复制import unittest
from test.userlogin import student_parame,get_students_token
import requests
from test.pyopearmysql import OpearMysql
class TestViesMessageCase(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.url = 'http://127.0.0.1:8000/user/viewmessage'
cls.token = get_students_token(student_parame)
cls.connct=OpearMysql("127.0.0.1",3306,'root','','testdb')
@classmethod
def tearDownClass(cls) -> None:
cls.url=''
def setUp(self) -> None:
pass
def tearDown(self) -> None:
self.text=''
def testuser_not_login(self):
reponse=requests.get(self.url)
status=reponse.status_code
reslut=reponse.json()
self.assertEqual(status, 200)
self.assertEqual(reslut['code'], 421)
def testuser_login_id_not(self):
headers = {
"token": self.token
}
selectid = self.connct.run(
"SELECT messages.id FROM messages WHERE senduser in (SELECT id FROM users WHERE username !='liwanle1i' )")
data = {'id': int(selectid[-1][0]) 1}
reponse=requests.get(self.url,params=data,headers=headers)
status=reponse.status_code
reslut=reponse.json()
self.assertEqual(status, 200)
self.assertEqual(reslut['code'], 100601)
self.assertEqual(reslut['message'], '留言不存在')
def testuser_login_id_per(self):
headers = {
"token": self.token
}
selectid=self.connct.run("SELECT messages.id FROM messages WHERE senduser in (SELECT id FROM users WHERE username !='liwanle1i' )")
data={'id':selectid[0][0]}
reponse=requests.get(self.url,params=data,headers=headers)
status=reponse.status_code
reslut=reponse.json()
self.assertEqual(status, 200)
self.assertEqual(reslut['code'], 100602)
self.assertEqual(reslut['message'], '权限不足')
def testuser_login_id_success(self):
headers = {
"token": self.token
}
selectid = self.connct.run(
"SELECT messages.id FROM messages WHERE senduser in (SELECT id FROM users WHERE username ='liwanle1i' and status=0 )")
id=(selectid[0][0])
data={'id':id}
reponse=requests.get(self.url,params=data,headers=headers)
status=reponse.status_code
reslut=reponse.json()
self.assertEqual(status, 200)
self.assertEqual(reslut['code'], 200)
self.assertEqual(reslut['message'], '成功')
if __name__ == '__main__':
unittest.main()
这样,我们修改后,数据都是来自于数据库。但是这个时候,我们就发现了接口存在的问题。我们对接口进行了修改。
一个问题是addtime必须是一个str
一个问题是 查询用户出错。
代码语言:javascript复制@usersRouter.get(path="/viewmessage")
async def viewmessage(id: int,
user: UsernameRole = Depends(get_cure_user),
db: Session = Depends(get_db)):
useris=get_user_username(db,user.username)
message = get_message(db, id)
if not message:
return reponse(code=100601, message='留言不存在', data='')
if message.acceptusers != useris.id and message.senduser != useris.id:
return reponse(code=100602, message='权限不足', data='')
message.read = True
db.commit()
db.refresh(message)
all_pid = get_pid_message(db, message.id)
messageone = MessageOne(id=message.id,
senduser=get_user(db,message.senduser).username,
acceptusers=get_user(db, message.acceptusers).username,
read=message.read,
sendtime=message.sendtime,
addtime=str(message.addtime),
context=message.context)
if len(all_pid) > 0:
allpidlist = []
for item in all_pid:
message = MessagePid(id=message.id,
senduser=get_user(db,item.senduser).username,
acceptusers=get_user(db,item.acceptusers).username,
read=item.read,
sendtime=item.sendtime,
addtime=str(item.addtime),
context=item.context, pid=item.pid)
allpidlist.append(message)
messageone.pid = allpidlist
return reponse(code=200, message='成功', data=jsonable_encoder(messageone))
那么这样我们就完成了查看留言接口测试,以及发现接口开发过程中发现的问题。