用 uWSGI 来跑 asyncio

2020-01-03 13:10:08 浏览数 (1)

最近关注的有点杂,所以也挺久没更新博客了。这一篇主要讨论这些技术:WSGI、py3k、werkzeug、asyncio、uWSGI、nginx。

WSGI

先从最简单的开始说—— WSGI。根据定义,这是一种 web 服务器接口规范,源自 Python,后被其他语言借用而成为一个较为通用的接口。以 Python 为例,这个接口非常的简单:

代码语言:javascript复制
def application(environ, start_response):
    ...

也就是说,WSGI 接口接受两个参数,一个是包含了请求内容等信息的字典 environ,另外就是一个用以开启 HTTP 响应的 Python 函数对象;而接口返回的内容则是 HTTP 响应的内容。这样,你好世界版的样例程序就是这样的了:

代码语言:javascript复制
def application(environ, start_response):
    start_response('200 OK', [('Content-Type', 'text/plain')])
    yield "Hello world!n"

稍微复杂一点的例子就是 Django 了。通过调用 get_wsgi_application(),您可以得到一个 WSGI 接口函数的实现,通过它您可以访问整个 Django 站点。

WSGI 接口及其实现我们就介绍完了。那,谁来调用这个函数呢?WSGI 的调用者叫做 WSGI 容器。

WSGI 容器有很多不同的实现,但他们有着共同的特点:

  1. 监听一个 HTTP 端口,提供 web 服务
  2. 内部会把前端的 HTTP 请求包装成对 WSGI 接口的调用
  3. 可以通过设置来指定 WSGI 接口的实现

说白了,WSGI 容器就是 web 服务器,只不过能够调用指定的 Python 函数来提供服务罢了。比如说,加装了 mod_wsgi 模块的 Apache HTTP 服务器 就是一种 WSGI 容器,经过合理的配置,我们就可以在 Apache 的进程里用上述 Python 代码来提供你好世界的服务。除了 Apache,gevent、Tornado、gunicorn、cherrypy 这些 Python web 服务器也都是 WSGI 容器。

于是呢,我们就能从实践中看到接口设计的解耦合作用:人们经常说,gunicorn Django,其实就是 gunicorn 通过 WSGI 接口整合了 Django 的应用。这时,Django 负责业务逻辑渲染页面,gunicorn 负责解析 HTTP 协议然后调用 Django 的 WSGI 接口。

WSGIuWSGIuwsgi

uWSGI 是一种 WSGI 容器,但它并不直接提供基于 HTTP 协议的 web 服务,而是通过一种叫做 uwsgi 的协议来提供 web 服务——没错,只是大小写的区别,以至于它的作者也承认这是一个很糟糕的名字。所以通常情况下,如果您选择使用 uWSGI 来部署您的 WSGI 应用,您还需选择一款支持 uwsgi 协议的 web 服务器——比如 nginx。

就以 nginx 为例,您总共需要配置执行两个服务器程序:nginx 和 uWSGI。nginx 的配置很简单,通过 uwsgi_pass 指令将请求交给后端的 uwsgi 服务器来处理就好了:

代码语言:javascript复制
        location / {
            root html;
            uwsgi_pass unix:///tmp/uwsgi.sock;
            include uwsgi_params;
        }

而执行 uWSGI 也非常简单,告诉它应该监听的地址,以及应该调用的 WSGI 实现就好了:

代码语言:javascript复制
$ uwsgi -s /tmp/uwsgi.sock -w myweb.myapp

这里的 myweb.myapp 应该是一个可以 import 的 Python 模块,其中应该有一个叫做 application 的函数,这样 uWSGI 就可以调用这个函数来提供 uwsgi 服务了。

不同于其他的 WSGI 容器,uWSGI 是在独立的进程中运行的,不受 web 服务器的影响和限制,所以有较大空间可以灵活配置,比如说可以配置同步还是异步啦、多少个进程或线程啦等等,甚至可以选择主循环引擎、异步切换引擎——比如说 asyncio 的主循环引擎和基于 greenlet 的异步切换引擎。

uWSGI 和 asyncio

uWSGI 从 2.0.4 开始,实验性地支持 asyncio,也就是说,uWSGI 可以启动一个 asyncio 的主循环,然后在它里面(通过 call_later)来调用 WSGI 接口的 application 函数。这样,我们就可以实现异步……了么?

还不行。

因为 uWSGI 没有一个基于回调函数的设计,所以如果我们无法使 application 函数立即返回最终结果,而是返回一个 Future 对象的话,uWSGI 是拿它没有办法的,请求就永远无法得到响应了。

解决办法就是使用 uWSGI 提供的异步切换引擎,比如说 greenlet —— uWSGI 会在单独的微线程中来执行每一个 application 函数。我们先引用官方的一个例子来看看吧:

代码语言:javascript复制
import asyncio
import greenlet

def two_seconds_elapsed(me):
    print("Hello 2 seconds elapsed")
    # back to WSGI  callable
    me.switch()

def application(environ, start_response):
    start_response('200 OK', [('Content-Type','text/html')])
    myself = greenlet.getcurrent()
    asyncio.get_event_loop().call_later(2, two_seconds_elapsed, myself)
    # back to event loop
    myself.parent.switch()
    return [b"Hello World"]

小程序很简单,对于任何请求,我们希望在 2 秒钟的异步等待之后返回你好世界。

做法很简单,在启动了异步调用(call_later(2, ...))之后,当我们需要等待异步结果的时候,通过 greenlet 将执行权交出去,切换至主循环所在的微线程去,也就是 myself.parent.switch()。这样在这 2 秒钟的异步等待中,别的微线程就可以有机会被执行到。等到 2 秒钟过去了,主循环会去调用 two_seconds_elapsed(),通过 me.switch(),我们又回到了最初的 application() 调用中,myself.parent.switch() 的调用在这时返回结果,继而返回响应你好世界。

咦,greenlet 代表的不是隐式的异步切换么?怎么这里跟显式的 asyncio 混在了一起呢?为什么不直接用 asyncio 自己的异步切换方式——coroutine 呢?

因为 WSGI 的标准——我认为——不支持显式的异步切换,uWSGI 官方给出的解释是asyncio 的 coroutine 会把 WSGI 破坏的一塌糊涂……我都不想提了。虽然也有人提出了异步的 WSGI 设计,但终究无法成为主流。其实,这里使用 greenlet 并不影响我们实现一个纯粹的 asyncio 异步 WSGI 服务器,只要写一个简单的函数包装一下就好了。

Werkzeug

在桥接 uWSGI 之前呢,请允许我先介绍一下 Werkzeug。

Werkzeug 是一个工具集,专门用于在 Python 中搭建各种 WSGI 服务器。以 WSGI 的接口 application(environ, start_response) 为起点,Werkzeug 提供了请求/响应的封装、请求的分发、基本的 session 和 cookie 的使用等多种绿色实用功能,小巧方便,经久耐用,令人爱不释手,是搭建各种 web 框架的必备工具箱。用一个例子来说明:

代码语言:javascript复制
from werkzeug.wrappers import Request, Response

def application(environ, start_response):
    request = Request(environ)
    response = Response("Hello %s!" % request.args.get('name', 'World!'))
    return response(environ, start_response)

为了让我们的实例程序尽可能的简单,我们这里就只用 Werkzeug 来写一个简单的 web 服务器。基于前面这个简单到不能再简单的例子呢,我们增加一点功能——请求分发功能——总不能在一个函数里面处理所有的请求吧。

分发请求呢,就得有分发规则。我们的分发规则长这样:

代码语言:javascript复制
from werkzeug.routing import Map, Rule

url_map = Map([
    Rule('/', endpoint=IndexResource),
    Rule('/user', endpoint=UserResource),
])

就是所有到 / 的请求由 IndexResource 来处理,/user 则由 UserResource 来处理。比如说:

代码语言:javascript复制
from werkzeug.wrappers import Response

class IndexResource:
    def get(self, request):
        return Response("Hello world!")

为了实现这个功能,我们需要改一下我们的 application 函数了:

代码语言:javascript复制
def application(environ, start_response):
    urls = url_map.bind_to_environ(environ)
    try:
        endpoint, args = urls.match()
    except HTTPException as e:
        return e(environ, start_response)
    else:
        request = Request(environ)
        method = getattr(endpoint(), request.method.lower(), None)
        if not method:
            return NotFound('not implemented')(environ, start_response)

        resp = method(request)
        return resp(environ, start_response)

简单来说呢,就是从 url_map 里匹配一个 Resource 出来,建一个对象,然后调用其 get 或者 post 函数,然后将其返回的 Response 对象作为响应返回。

放在一起

最后,我们需要把所有的东西放在一起。目的呢,是为了能让 Resource 变成异步的,像这样:

代码语言:javascript复制
import asyncio
from werkzeug.wrappers import Response

class IndexResource:
    @asyncio.coroutine
    def get(self, request):
        yield from asyncio.sleep(1)
        return Response("Hello world!")

如果是这样的话,前面代码里面的这两句就要改了:

代码语言:javascript复制
        resp = method(request)
        return resp(environ, start_response)

因为 resp 对象可能是个 coroutine 对象,需要异步等待之后才能得到 Response 对象。因此,配合前面关于 greenlet 的经验,我们加上一个 if

代码语言:javascript复制
        resp = method(request)
        if isinstance(resp, asyncio.Future) or inspect.isgenerator(resp):
            myself = greenlet.getcurrent()
            future = asyncio.Future()
            asyncio.Task(_wrapper(myself, future, resp))
            myself.parent.switch()
            resp = future.result()
        return resp(environ, start_response)

这里用到的 _wrapper 函数定义如下:

代码语言:javascript复制
@asyncio.coroutine
def _wrapper(me, future, coro):
    try:
        resp = yield from coro
    except Exception as e:
        future.set_exception(e)
    else:
        future.set_result(resp)
    finally:
        me.switch()

大功告成!解释依时间顺序如下:

  1. resp 是个 coroutine?没问题,我们放在另一个 coroutine 里(_wrapper)把它 yield from 了就好了
  2. 结果怎么取回来?用 asyncio.Future 对象搞定!创建一个,备用
  3. _wrapper 必须得立即执行,所以用 asyncio.Task 包一下,跑起
  4. 异步切换!asyncio.Task 保证了主循环会尽快调用 _wrapper
  5. _wrapper 里,我们会把异步调用 Resource.getpost 的最终结果设置到 future 对象中
  6. 然后切换回原来的微线程
  7. 这时,我们就可以通过 future.result() 来得到最终结果——或者将底层的异常重新抛出

最后再把完整的 uwsgi.py 文件贴一下:

代码语言:javascript复制
import asyncio
import greenlet
import inspect
from werkzeug.exceptions import HTTPException
from werkzeug.exceptions import NotFound
from werkzeug.routing import Map, Rule
from werkzeug.wrappers import Request
# from ...... import IndexResource, UserResource


url_map = Map([
    Rule('/', endpoint=IndexResource),
    Rule('/user', endpoint=UserResource),
])


@asyncio.coroutine
def _wrapper(me, future, coro):
    try:
        resp = yield from coro
    except Exception as e:
        future.set_exception(e)
    else:
        future.set_result(resp)
    finally:
        me.switch()


def application(environ, start_response):
    urls = url_map.bind_to_environ(environ)
    try:
        endpoint, args = urls.match()
    except HTTPException as e:
        return e(environ, start_response)
    else:
        request = Request(environ)
        method = getattr(endpoint(), request.method.lower(), None)
        if not method:
            return NotFound('not implemented')(environ, start_response)

        # noinspection PyCallingNonCallable
        resp = method(request)
        if isinstance(resp, asyncio.Future) or inspect.isgenerator(resp):
            myself = greenlet.getcurrent()
            future = asyncio.Future()
            asyncio.Task(_wrapper(myself, future, resp))
            myself.parent.switch()
            resp = future.result()
        return resp(environ, start_response)

编译运行 uWSGI

哈哈,把最基本的一项留在了最后。默认的 uWSGI 貌似并不包含 asyncio 和 greenlet 的支持,所以我们得亲自编译一份。通常呢,使用 virtualenv 是一个好的习惯:

代码语言:javascript复制
$ source $PATH_TO_YOUR_VENV/bin/activate
(venv) $ pip install greenlet
....
(venv) $ CFLAGS="-I$PATH_TO_YOUR_VENV/include/python3.4m/" UWSGI_PROFILE="asyncio" pip install uwsgi
....
(venv) $ uwsgi --asyncio 512 -s /tmp/uwsgi.sock --greenlet -w your_package.uwsgi

其中呢,CFLAGS 指定 greenlet 的头文件目前是必须的(路径可能有出入),除非你的 greenlet 安装在系统库中。如果你用的是 Python 3.3,还需 pip install asyncio

最后一句 uwsgi 的调用,指定了 --asyncio 作为主循环引擎,开 512 个微线程(在一个操作系统线程里)来处理请求(所以最大并发量是 512),然后指定了 --greenlet 作为异步切换引擎。其它参数前面已经说过了,最后那个 uwsgi 就是前述 uwsgi.py 的模块全名。

关于 wsgi_input 的问题

WSGI 接口中提供了这么一个参数:environ['wsgi.input'],或者如果用 Werkzeug 就是 request.input_stream(通常被包装成 request.stream 来用),它是一个包含了请求主体内容的类文件对象。在前述的服务器组合中,这个对象自然是 uWSGI 服务器来提供了。那么在异步的环境中,它的 read() 函数会不会阻塞主线程呢?它又能不能跟 asyncio 实现完美的配合呢?

0 人点赞