这篇文章摘自我的博客, 欢迎大家没事去逛逛~
背景
这几个月我开发了公司里的一个restful webservice,起初技术选型的时候是采用了flask
框架。虽然flask是一个同步的框架,但是可以配合gevent
或者其它方式运行在异步的容器中(测试链接),效果看上去也还可以,因此就采用了这种方式。
后面阅读了tornado
的源码,也去了解了各种协程框架以及运行的原理。总感觉flask的这种同步方式编程不够好,同时对于这种运行在容器里的模式目前还缺乏了解。但至少现在对于tornado的运行原理有了一定的了解,如果用tornado写的话,是很可控的,而且可以保证运行是高效的。因此就决定把原来基于flask的项目用tornado重构了。
重构的过程
项目重构的过程中遇到了一些问题,也学习了一些东西,这里做一个简单的总结。
接入层
所有框架都要处理的一个接入层的事情就是:
- url-mapping
- 项目初始化
- 参数解析
对于restful风格的接口以及项目的初始化,每个框架都有自己的方式,在它们的文档中都演示得特别清楚,所以关于这些我就不展开了。
关于参数解析,这里并不是指简单地调用类似于get_argument
这样的方法去获取数据。而是 如何从不可靠的client端传来的数据中过滤掉服务器不关注的数据,同时对服务器关注的数据作一些更强的校验,这就是协议层的事情了。
使用谷歌的ProtocolBuffer
是一个不错的方案,它有很不错的数据压缩率,也支持目前大多数主流的开发语言。但在一些小型项目中,我还是更偏向于使用json
的方式,它显得更加灵活。但是对于json的话,如何作数据校验就是另外一个问题了。
在重构前,我是通过python中的装饰器
来实现的这个功能:
class SomeHandlerInFlask(Resource):
@util.deco({
'key_x': (str, 'form'),
'key_y': (int, 'form'),
'key_z': (str, 'url')
})
def post(self):
# logic code
pass
在装饰器中分别从不同的地方,form或者url中获取相应的参数。如果取不到,则直接报错,逻辑也不会进入到post函数中。
这是我基于flask这个框架自己总结出来的一套尚且还能看能用的参数解析方式,如果在每个函数中通过框架提供的get_argument
来逐一获取参数,则显得太丑,而且每个接口所需要的数据是什么也不够直观。不过这种方式我自己还不是特别满意,总感觉还是有点不太舒服,也说不清不舒服在哪里。那就干脆放弃它,使用别的方式吧。
后来我了解到了jsonschema这个东西,看了一下感觉与ProtocolBuffer很相似,只不过它是采用json的格式定义,正合我意(对于它我也有点吐槽,在数据库层有提到),每次数据进来就对数据和schema作一次validate操作,再进入业务逻辑层。
业务逻辑层
业务逻辑层的重构其实改动的代码并不多,把一些同步的操作改成异步的操作。就拿如何重构某个接口来说吧,重构前的代码可能是这样的:
代码语言:javascript复制def function_before_refactor(some_params):
result_1 = sync_call_1(some_params)
result_2 = sync_call_2(some_params)
# some other processes
return result
使用gen.coroutine
重构后:
from tornado import gen
@gen.coroutine
def function_after_refactor(some_params):
# if you don't want to refactor
# just call it as it always be
result_1 = sync_call_1(some_params)
result_2 = yield async_call_2(some_params)
# some other processes
raise gen.Return(result)
# python3及以上的版本不需要采用抛出异常的方式,直接return就可以了
# return result
考虑到函数名根本不用改,重构的过程非常容易:
- 函数用
gen.coroutine
包装成协程 - 已经重构成异步方式的函数调用时添加
yield
关键字即可 - 函数返回采用
raise gen.Return(result)
的方式(仅限于Python 2.7)
因为我目前采用的是python 2.7,所以在处理返回的时候要用抛出异常的方式,在这种方式下有一个点需要注意到,那就是与平常异常的处理的混用,不然会导致逻辑流执行混乱:
代码语言:javascript复制from tornado import gen
@gen.coroutine
def function_after_refactor(some_params):
try:
# some logic code
pass
except Exception as e:
if isinstance(e, gen.Return):
# return the value raised by logic
raise gen.Return(e.value)
# more exception process
数据库层
数据库采用的是mongodb
,在flask框架中采用了mongoengine作为数据库层的orm,对于这个python-mongodb的orm产品,我个人并不是很喜欢(可能是因为我习惯了mongoose的工作方式),这里面嵌套json的定义居然不能体现在schema中,需要分开定义两个schema,然后再作引入的操作。比如(代码只是用作演示,与项目无关):
class Comment(EmbeddedDocument):
content = StringField()
# more comment details
class Page(Document):
comments = ListField(EmbeddedDocumentField(Comment))
# more page details
而在mongoose中就直观多了:
代码语言:javascript复制var PageSchema = new Schema({
title : {type : String, required : true},
time : {type : Date, default : Date.now(), required : true},
comments : [{
content : {type : String}
// more comment details
}]
// more page details
});
扯远了,在tornado的框架中,再使用mongoengine就不合适了,毕竟有着异步和同步的区别。那有什么比较好的python-mongodb的异步orm框架呢?搜了下,有一个叫做motorengine的东西,orm的使用方式和mongoengine基本一样,但看它的star数实在不敢用呀。而且它处理异步的方式是使用回调,现在都是使用协程的年代了,想想还是算了吧。
最后找了个motor,感觉还不错,它有对目前大部分主流协程框架的支持,操作mongodb的方式与直接使用pymongo的方式差不多(毕竟都是基于pymongo的封装嘛),但是就是没有orm的验证层,那就自己再去另外搞一个简化的orm层吧。(mongokit的orm方式看上去还不错,但貌似对协程框架的支持一般)。这里暂时先懒惰一下,还是采用了jsonschema。每次保存前都validate一下对象是否符合schema的定义。如果没有类mongoose的python-mongodb异步框架,有时间就自己写一个吧~
这里顺带吐槽一下jsonschema,简直太琐碎了,一个很短的文档结构定义,它会描述成好几十行,我就不贴代码了,有兴趣的朋友可以戳这里http://jsonschema.net/玩玩。而且python中的jsonschema库还不支持对于default关键字的操作,参见这个issue。
测试
自己摸索的一种接口测试方案
python中的测试框架有很多,只要选择一个合适的能够很方便与项目集成就好。我个人还是很喜欢unittest
这个框架,小而精。我的这套测试方案也是基于unittest框架的。
# TestUserPostAccessComponents.py
class TestUserPostAccessComponents(unittest.TestCase):
@classmethod
def setUpClass(cls):
# 定义在其它地方,具体细节就不展示了
# 在setup中使用测试账号获取登陆态
# 并把各种中间用得到的信息放在TestUserPostAccess类上
setup(cls)
@classmethod
def tearDownClass(cls):
pass
def setUp(self):
pass
def tearDown(self):
pass
def test_1_user_1_user_2_add_friend(self):
pass
def test_2_user_1_user_2_del_friend(self):
pass
def test_3_user_1_add_public_user_post(self):
pass
# more other components
最顶层的测试文件:
代码语言:javascript复制# run_test.py
# 各种import
def user_basic_post_access_test():
tests = ['test_3_user_1_add_public_user_post',
'test_5_user_2_as_a_stranger_can_access_public_user_post',
'test_4_user_1_del_public_user_post',
'test_6_user_1_add_private_user_post',
'test_8_user_2_as_a_stranger_can_not_access_private_user_post',
'test_9_user_1_self_can_access_private_user_post',
'test_7_user_1_del_private_user_post']
return unittest.TestSuite(map(TestUserPostAccessComponents, tests))
def other_process_test():
tests = [
# compose a process by components by yourself
]
return unittest.TestSuite(map(OtherTestCaseComponents, tests))
runner = unittest.TextTestRunner(verbosity=2)
runner.run(user_basic_post_access_test())
runner.run(other_process_test())
这套测试是基于 BDD (行为驱动)的测试方式,针对每一个逻辑模块,定义一个components类,把所有子操作都定义成单独的测试单元。这里面的测试单元可以是完全无序的,把逻辑有序化组织成测试用例的过程会在最外面通过TestSuit
的方式组织起来。这里可能会有一些异议,因为有些人在使用这个测试类的时候是把它作为一个测试用例来组织的,当然这些都是不同的使用方式。
这套测试方案中的每个component都是api级别的测试,并不是函数级别的测试(集成测试与单元测试),每个TestSuit都是完整的一个业务流程。这样的好处在于 测试和项目完全解耦。测试代码不用关心项目的代码是同步还是异步的。就算项目重构了,测试完全无感知,只要api没变,就可以继续工作。
当然以上都是理想的状态,因为在刚开始写这些测试的时候我还没有总结到这些点,导致了一些耦合性的存在。比如说测试代码中import了项目中的某个函数去获取一些数据,用于检查某个component的更新操作是否成功。在重构的过程中,该函数被重构成了协程。这样一来,在测试代码中就不能采用原来一样的方式去调用了,也就是说测试代码受到了框架同步与异步的影响,下一节我们就来谈谈同步与异步的测试,以及对于这种问题的解决方案。
异步测试&同步测试
在tornado中,也提供了一套测试的功能,具体在tornado.testing
这个模块,看它源码其实可以发现它也是基于unittest
的一层封装。
我心里一直有一个问题:unittest的执行流程是同步的,既然这样,它是怎么去测一个由gen.coroutine
包装的协程的呢,毕竟后者是异步的。
直到看了源码,恍然大悟,原来是io_loop.run_sync
这个函数的功劳,具体实现在gen_test
这个装饰器中,摘一部分源码(对于tornado源码不熟的同学可以先去看看tornado中的ioloop模块的实现,看完会对这个部分有更深刻的理解):
def gen_test(func=None, timeout=None):
if timeout is None:
timeout = get_async_test_timeout()
def wrap(f):
# Stack up several decorators to allow us to access the generator
# object itself. In the innermost wrapper, we capture the generator
# and save it in an attribute of self. Next, we run the wrapped
# function through @gen.coroutine. Finally, the coroutine is
# wrapped again to make it synchronous with run_sync.
#
# This is a good case study arguing for either some sort of
# extensibility in the gen decorators or cancellation support.
@functools.wraps(f)
def pre_coroutine(self, *args, **kwargs):
result = f(self, *args, **kwargs)
if isinstance(result, types.GeneratorType):
self._test_generator = result
else:
self._test_generator = None
return result
coro = gen.coroutine(pre_coroutine)
@functools.wraps(coro)
def post_coroutine(self, *args, **kwargs):
try:
return self.io_loop.run_sync(
functools.partial(coro, self, *args, **kwargs),
timeout=timeout)
except TimeoutError as e:
# run_sync raises an error with an unhelpful traceback.
# If we throw it back into the generator the stack trace
# will be replaced by the point where the test is stopped.
self._test_generator.throw(e)
# In case the test contains an overly broad except clause,
# we may get back here. In this case re-raise the original
# exception, which is better than nothing.
raise
return post_coroutine
if func is not None:
# Used like:
# @gen_test
# def f(self):
# pass
return wrap(func)
else:
# Used like @gen_test(timeout=10)
return wrap
在源码中,先把某个测试单元封装成一个协程,然后获取当前线程的ioloop对象,把协程抛给他去执行,直到执行完毕。这样就完美地实现了异步到同步的过渡,满足unittest测试框架的同步需求。
在具体的使用中只需要继承tornado提供的AsyncTestCase
类就行了,注意这里不是unittest.TestCase
。看了源码也可以发现,前者就是继承自后者的。
# This test uses coroutine style.
class MyTestCase(AsyncTestCase):
@tornado.testing.gen_test
def test_http_fetch(self):
client = AsyncHTTPClient(self.io_loop)
response = yield client.fetch("http://www.tornadoweb.org")
# Test contents of response
self.assertIn("FriendFeed", response.body)
回到上一节的问题,有了这种方式,就可以很容易地解决同步异步的问题了。如果测试用例中某一个函数已经被项目重构成了协程,只需要做以下三步:
- 把测试components的类改成继承自
AsyncTestCase
- 该测试单元使用
gen_test
装饰(其它测试单元可以不用加,只需要改涉及到协程的测试单元就行) - 调用协程的地方添加
yield
关键字
测试代码如何适应项目的重构
- 如果是api测试 测试中尽量不要调用任何项目中的代码,它只专注于测试接口是否按照预期在工作,具体里面是怎么样的不需要关心。这样的话整套测试是完全独立于项目而存在的,即使项目重构,也可以不用作任何修改,无缝对接。
- 如果是单元测试 参考上一节的方案。
总结
重构是一个不断优化和学习的过程,在这个过程中我踩了一些坑,也爬出了一些坑,希望可以把我的这些总结分享给大家。欢迎大家跟我交流。对于文中的一些方案,也欢迎大家拍砖,欢迎有更多的做法可以一起探讨学习。另外,对于这个项目的重构,文章里面可能还少了一些更加直观的性能测试,后面我会加上去,孝敬各位爷~