Python廖雪峰实战web开发(Day5-编写web框架)

2021-01-04 10:06:25 浏览数 (1)

参考链接: Python编写的网站拦截器

因为复杂的Web应用程序,光靠一个WSGI(Web Server Gateway Interface)函数来处理还是太底层了,我们需要在WSGI之上再抽象出Web框架(比如Aiohttp、Django、Flask等),从而进一步简化Web开发。 

在day1编写web app骨架因为要实现协程,所以运用的是aiohttpweb框架。那么现在为何又要重新编写一个新的web框架呢,这是因为从使用者的角度来说,aiohttp相对比较底层,想要使用框架时编写更少的代码,就只能在aiohttp框架上封装一个更高级的框架。 

 Web框架的设计是完全从使用者出发,目的是让框架使用者编写尽可能少的代码。 

因此我们希望框架使用者可以摒弃复杂的步骤,这次新创建的框架想要达到的预期效果是:只需编写函数(不然就要创建async def handle_url_xxx(request): ...这样的一大推东西),透过新建的Web框架就可以实现相同的效果。同时,这样编写简单的函数而非引入request和web.Response还有一个额外的好处,就是可以单独测试,否则,需要模拟一个request才能测试。 因为是以aiohttp框架为基础,要达到上述预期的效果,也是需要符合aiohttp框架要求,因此就需要考虑如何在request对象中,提取使用者编写的函数中需要用到的参数信息,以及如何将函数的返回值转化web.response对象并返回。 

1. 编写URL处理函数 

1.1 aiohttp编写URL处理处理函数 

day1的URL处理函数比较简单,因为day1的的URL处理函数没有真正意义上使用到request参数,但总体上差不多。 使用aiohttp框架,编写一个URL处理函数大概需要几步: 第一步,添加协程装饰器 

async def handle_url_xxx(request):

       ...

第二步,对request参数进行操作,以获取相应的参数 

url_param = request.match_info['key']

query_params = parse_qs(request.query_string)

第三步,就是构造Response对象并返回 

text = render('template', data)

return web.Response(text.encode('utf-8'))

而新创建的web框架希望可以封装以上一些步骤,在使用时,更加方便快捷。 

1.2 新建web框架编写URL处理函数 

1.2.1 @get 和 @post 

 Http定义了与服务器交互的不同方法,最基本的方法有4种,分别是GET,POST,PUT,DELETE。URL全称是资源描述符,我们可以这样认为:一个URL地址,它用于描述一个网络上的资源,而HTTP中的GET,POST,PUT,DELETE就对应着对这个资源的查,改,增,删4个操作。 建议: 1、get方式的安全性较Post方式要差些,包含机密信息的话,建议用Post数据提交方式; 2、在做数据查询时,建议用Get方式;而在做数据添加、修改或删除时,建议用Post方式; 

把一个函数映射为一个URL处理函数,可以先构造一个装饰器,用来存储、附带URL信息,这里使用了偏函数。 

#这里运用偏函数,一并建立URL处理函数的装饰器,用来存储GET、POST和URL路径信息

import functools

def Handler_decorator(path,*,method):

    def decorator(func):

        @functools.wraps(func)#更正函数签名

        def wrapper(*args,**kw):

            return func(*args,**kw)

        wrapper.__route__ = path #存储路径信息,注意这里属性名叫route

        wrapper.__method__ = method #存储方法信息

        return wrapper

    return decorator

get = functools.partial(Handler_decorator,method = 'GET')

post = functools.partial(Handler_decorator,method = 'POST')

1.2.3 定义RequestHandler 

 参考网站:关于inspect 

使用者编写的URL处理函数不一定是一个coroutine,因此我们用RequestHandler()来封装一个URL处理函数。RequestHandler是一个类,创建的时候定义了__call__()方法,因此可以将其实例视为函数。RequestHandler目的就是从URL函数中分析其需要接收的参数,从request中获取必要的参数,调用URL函数。(要完全符合aiohttp框架的要求,就需要把结果转换为web.Response对象,而这步骤并不是像教程所说在这阶段实现,而是在后面创建middleware的工厂函数时实现。) 

import inspect,asyncio

from web_app.APIError import APIError

from aiohttp import web

from urllib import parse

#运用inspect模块,创建几个函数用以获取URL处理函数与request参数之间的关系

def get_required_kw_args(fn): #收集没有默认值的命名关键字参数

    args = []

    params = inspect.signature(fn).parameters #inspect模块是用来分析模块,函数

    for name, param in params.items():

        if str(param.kind) == 'KEYWORD_ONLY' and param.default == inspect.Parameter.empty:

            args.append(name)

    return tuple(args)

def get_named_kw_args(fn):  #获取命名关键字参数

    args = []

    params = inspect.signature(fn).parameters

    for name,param in params.items():

        if str(param.kind) == 'KEYWORD_ONLY':

            args.append(name)

    return tuple(args)

def has_named_kw_arg(fn): #判断有没有命名关键字参数

    params = inspect.signature(fn).parameters

    for name,param in params.items():

        if str(param.kind) == 'KEYWORD_ONLY':

            return True

def has_var_kw_arg(fn): #判断有没有关键字参数

    params = inspect.signature(fn).parameters

    for name,param in params.items():

        if str(param.kind) == 'VAR_KEYWORD':

            return True

def has_request_arg(fn): #判断是否含有名叫'request'参数,且该参数是否为最后一个参数

    params = inspect.signature(fn).parameters

    sig = inspect.signature(fn)

    found = False

    for name,param in params.items():

        if name == 'request':

            found = True

            continue #跳出当前循环,进入下一个循环

        if found and (str(param.kind) != 'VAR_POSITIONAL' and str(param.kind) != 'KEYWORD_ONLY' and str(param.kind != 'VAR_KEYWORD')):

            raise ValueError('request parameter must be the last named parameter in function: %s%s'%(fn.__name__,str(sig)))

    return found

#定义RequestHandler,正式向request参数获取URL处理函数所需的参数

class RequestHandler(object):

    def __init__(self,app,fn):#接受app参数

        self._app = app

        self._fn = fn

        self._required_kw_args = get_required_kw_args(fn)

        self._named_kw_args = get_named_kw_args(fn)

        self._has_named_kw_arg = has_named_kw_arg(fn)

        self._has_var_kw_arg = has_var_kw_arg(fn)

        self._has_request_arg = has_request_arg(fn)

    async def __call__(self,request): #__call__这里要构造协程

        kw = None

        if self._has_named_kw_arg or self._has_var_kw_arg:

            if request.method == 'POST': #判断客户端发来的方法是否为POST

                if not request.content_type: #查询有没提交数据的格式(EncType)

                    return web.HTTPBadRequest(text='Missing Content_Type.')#这里被廖大坑了,要有text

                ct = request.content_type.lower() #小写

                if ct.startswith('application/json'): #startswith

                    params = await request.json() #Read request body decoded as json.

                    if not isinstance(params,dict):

                        return web.HTTPBadRequest(text='JSON body must be object.')

                    kw = params

                elif ct.startswith('application/x-www-form-urlencoded') or ct.startswith('multipart/form-data'):

                    params = await request.post() # reads POST parameters from request body.If method is not POST, PUT, PATCH, TRACE or DELETE or content_type is not empty or application/x-www-form-urlencoded or multipart/form-data returns empty multidict.

                    kw = dict(**params)

                else:

                    return web.HTTPBadRequest(text='Unsupported Content_Tpye: %s'%(request.content_type))

            if request.method == 'GET': 

                qs = request.query_string #The query string in the URL

                if qs:

                    kw = dict()

                    for k,v in parse.parse_qs(qs,True).items(): #Parse a query string given as a string argument.Data are returned as a dictionary. The dictionary keys are the unique query variable names and the values are lists of values for each name.

                        kw[k] = v[0]

        if kw is None:

            kw = dict(**request.match_info)

        else:

            if not self._has_var_kw_arg and self._named_kw_args: #当函数参数没有关键字参数时,移去request除命名关键字参数所有的参数信息

                copy = dict()

                for name in self._named_kw_args:

                    if name in kw:

                        copy[name] = kw[name]

                kw = copy

            for k,v in request.match_info.items(): #检查命名关键参数

                if k in kw:

                    logging.warning('Duplicate arg name in named arg and kw args: %s' % k)

                kw[k] = v

        if self._has_request_arg:

            kw['request'] = request

        if self._required_kw_args: #假如命名关键字参数(没有附加默认值),request没有提供相应的数值,报错

            for name in self._required_kw_args:

                if name not in kw:

                    return web.HTTPBadRequest(text='Missing argument: %s'%(name))

        logging.info('call with args: %s' % str(kw))

        try:

            r = await self._fn(**kw)

            return r

        except APIError as e: #APIError另外创建

            return dict(error=e.error, data=e.data, message=e.message)

在上述RequestHandler代码可以看出最后调用URL函数时,URL函数可能会返回一个名叫APIError的错误,那这个APIError又是什么来的呢,其实它的作用是用来返回诸如账号登录信息的错误,这会在day10编写用户注册API里面讲到,此时先按下面封装一些APIError吧: 

class APIError(Exception):

    '''

    基础的APIError,包含错误类型(必要),数据(可选),信息(可选)

    '''

    def __init__(self,error,data = '',message = ''):

        super(APIError,self).__init__(message)

        self.error = error

        self.data = data

        self.message = message

class APIValueError(APIError):

    '''

    Indicate the input value has error or invalid. The data specifies the error field of input form.

    表明输入数据有问题,data说明输入的错误字段

    '''

    def __init__(self,field,message = ''):

        super(APIValueError,self).__init__('Value: invalid',field,message)

class APIResourceNotfoundError(APIError):

    '''

    Indicate the resource was not found. The data specifies the resource name.

    表明找不到资源,data说明资源名字

    '''

    def __init__(self,field,message = ''):

        super(APIResourceNotFoundError,self).__init__('Value: Notfound',field,message)

class APIPermissionError(APIError):

    '''

    Indicate the api has no permission.

    接口没有权限

    '''

    def __init__(self,message = ''):

        super(APIPermissionError,self).__init__('Permission: forbidden','Permission',message)

2. 编写add_route函数以及add_static函数 

 参考网站: 

 关于_import_关于rfind关于add_static关于Jinja2 

由于新建的web框架时基于aiohttp框架,所以需要再编写一个add_route函数,用来注册一个URL处理函数,主要起验证函数是否有包含URL的响应方法与路径信息,以及将函数变为协程。 以下是代码: 

import inspect,asyncio

#编写一个add_route函数,用来注册一个URL处理函数

def add_route(app,fn):

    method = getattr(fn,'__method__',None)

    path = getattr(fn,'__route__',None)

    if method is None or path is None:

        return ValueError('@get or @post not defined in %s.'%str(fn))

    if not asyncio.iscoroutinefunction(fn) and not inspect.isgeneratorfunction(fn): #判断是否为协程且生成器,不是使用isinstance

        fn = asyncio.coroutine(fn)

    logging.info('add route %s %s => %s(%s)'%(method,path,fn.__name__,','.join(inspect.signature(fn).parameters.keys())))

    app.router.add_route(method,path,RequestHandler(app,fn))#别忘了RequestHandler的参数有两个

通常add_route()注册会调用很多次,而为了框架使用者更加方便,可以编写了一个可以批量注册的函数,预期效果是:只需向这个函数提供要批量注册函数的文件路径,新编写的函数就会筛选,注册文件内所有符合注册条件的函数。 代码如下: 

#直接导入文件,批量注册一个URL处理函数

def add_routes(app,module_name):

    n = module_name.rfind('.')

    if n == -1:

        mod = __import__(module_name,globals(),locals())

    else:

        name = module_name[n 1:]

        mod = getattr(__import__(module_name[:n],globals(),locals(),[name],0),name)#第一个参数为文件路径参数,不能掺夹函数名,类名

    for attr in dir(mod):

        if attr.startswith('_'):

            continue

        fn = getattr(mod,attr)

        if callable(fn): 

            method = getattr(fn,'__method__',None) 

            path = getattr(fn,'__route__',None)

            if path and method: #这里要查询path以及method是否存在而不是等待add_route函数查询,因为那里错误就要报错了

                add_route(app,fn)

然后添加静态文件夹的路径: 

import os,logging

#添加静态文件夹的路径

def add_static(add):

    path = os.path.join(os.path.dirname(os.path.abspath(__file__)),'static')#输出当前文件夹中'static'的路径

    app.router.add_static('/static/',path)#prefix (str) – URL path prefix for handled static files

    logging.info('add static %s => %s'%('/static/',path))

添加完静态文件还需要初始化jinja2模板: 

from jinja2 import Environment, FileSystemLoader

from datetime import datetime

import json, time

import logging

#初始化jinja2,以便其他函数使用jinja2模板

def init_jinja2(app, **kw):

    logging.info('init jinja2...')

    options = dict(

        autoescape = kw.get('autoescape', True),

        block_start_string = kw.get('block_start_string', '{%'),

        block_end_string = kw.get('block_end_string', '%}'),

        variable_start_string = kw.get('variable_start_string', '{{'),

        variable_end_string = kw.get('variable_end_string', '}}'),

        auto_reload = kw.get('auto_reload', True)

    )

    path = kw.get('path', None)

    if path is None:

        path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates')

    logging.info('set jinja2 template path: %s' % path)

    env = Environment(loader=FileSystemLoader(path), **options)

    filters = kw.get('filters', None)

    if filters is not None:

        for name, f in filters.items():

            env.filters[name] = f

    app['__templating__'] = env

def datetime_filter(t):

    delta = int(time.time() - t)

    if delta < 60:

        return u'1分钟前'

    if delta < 3600:

        return u'%s分钟前' % (delta // 60)

    if delta < 86400:

        return u'%s小时前' % (delta // 3600)

    if delta < 604800:

        return u'%s天前' % (delta // 86400)

    dt = datetime.fromtimestamp(t)

    return u'%s年%s月%s日' % (dt.year, dt.month, dt.day)

以上的datetime_filter函数实质是一个拦截器,具体作用在可以看Day8。 

3. 编写middleware 

 参考网站: 

 关于middleware关于response 

一轮过后,如何将函数返回值转化为web.response对象呢? 这里引入aiohttp框架的web.Application()中的middleware参数。 middleware是一种拦截器,一个URL在被某个函数处理前,可以经过一系列的middleware的处理。一个middleware可以改变URL的输入、输出,甚至可以决定不继续处理而直接返回。middleware的用处就在于把通用的功能从每个URL处理函数中拿出来,集中放到一个地方。 在我看来,middleware的感觉有点像装饰器,这与上面编写的RequestHandler有点类似。 有官方文档可以知道,当创建web.appliction的时候,可以设置middleware参数,而middleware的设置是通过创建一些middleware factory(协程函数)。这些middleware factory接受一个app实例,一个handler两个参数,并返回一个新的handler。 

例如,一个记录URL日志的logger可以作为middle factory简单定义如下: 

import logging

async def logger_factory(app,handler):#协程,两个参数

    async def logger_middleware(request):#协程,request作为参数

        logging.info('Request: %s %s'%(request.method,request.path))#日志

        return await handler(request)#返回

    return logger_middleware

接下来就编写转化得到response对象的middleware factory。 

from aiohttp import web

import logging

import json

#函数返回值转化为`web.response`对象

async def response_factory(app,handler):

    async def response_middleware(request):

        logging.info('Response handler...')

        r = await handler(request)

        if isinstance(r, web.StreamResponse):

            return r

        if isinstance(r, bytes):

            resp = web.Response(body=r)

            resp.content_type = 'application/octet-stream'

            return resp

        if isinstance(r,str):

            if r.startswith('redirect:'): #重定向

                return web.HTTPFound(r[9:]) #转入别的网站

            resp =  web.Response(body=r.encode('utf-8'))

            resp.content_type = 'text/html;charsest=utf-8'

            return resp

        if isinstance(r,dict):

            template = r.get('__template__')

            if template is None: #序列化JSON那章,传递数据

                resp = web.Response(body=json.dumps(r, ensure_ascii=False, default=lambda o: o.__dict__).encode('utf-8')) #https://docs.python.org/2/library/json.html#basic-usage

                return resp

            else: #jinja2模板

                resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8'))

                resp.content_type = 'text/html;charset=utf-8'

                return resp

        if isinstance(r, int) and r >= 100 and r < 600:

            return web.Response(r)

        if isinstance(r, tuple) and len(r) == 2:

            t, m = r

            if isinstance(t, int) and t >= 100 and t < 600:

                return web.Response(t, str(m))

        # default,错误

        resp = web.Response(body=str(r).encode('utf-8'))

        resp.content_type = 'text/plain;charset=utf-8'

        return resp

    return response_middleware

值得注意得是在参考廖老师的源代码时,意外的发现了一个名叫data_factory的函数,其中思维是我目前远远不能达到的,如果使用其作为middleware参数,那么定义RequestHandler时就不用那么麻烦咯,但不知道老师教程不使用的原因是什么,这里贴一位大神,使用data_factory作为middleware参数编写的关于frame,关于factory代码。 

4. 测试运行 

最后,当然就要测试一下看能不能跑得动了,一下是代码: 

import asyncio

from web_app.webframe import get,post

#编写用于测试的URL处理函数

@get('/')

async def handler_url_blog(request):

    body='<h1>Awesome</h1>'

    return body

@get('/greeting')

async def handler_url_greeting(*,name,request):

    body='<h1>Awesome: /greeting %s</h1>'%name

    return body

编写以上代码另存名为webframe_test_handler放在web_app文件夹上。再编写以下代码用于生成页面进行此时: 

from aiohttp import web

import asyncio

from web_app.webframe import add_routes,add_static

from web_app.middleware_factories import init_jinja2,datetime_filter,logger_factory,response_factory

import logging; logging.basicConfig(level=logging.INFO)

#编写web框架测试

async def init(loop):

    app = web.Application(loop=loop,middlewares=[logger_factory,response_factory])

    init_jinja2(app,filters=dict(datetime=datetime_filter),path = r"E:learningpythonweb_apptemplates")#初始化Jinja2,这里值得注意是设置文件路径的path参数

    add_routes(app,'web_app.webframe_test_handler')

    add_static(app)

    srv = await loop.create_server(app.make_handler(),'127.0.0.1',9000)

    logging.info('Server started at http://127.0.0.1:9000...')

    return srv

loop = asyncio.get_event_loop()

loop.run_until_complete(init(loop))

loop.run_forever()

最后访问一下http://127.0.0.1:9000网页,awesome~

0 人点赞