Rust之tower如何构建请求中间件

2024-02-26 09:46:49 浏览数 (3)

提到Rust请求中间件, 就不能不提tower

tower是一个请求协议无关的的中间件定义类库,主要定义了ServiceLayer两个trait来帮助实现可重用的请求处理中间件。

今天拿聊聊它如何巧妙构建起中间件。

初始请求

假设我们有一个请求handler, 用hyper官方的hello world例子代码如下:

代码语言:javascript复制
use http_body_util::Full;
use hyper::{
    body::{Bytes, Incoming},
    server::conn::http1,
    Request, Response,
};
use hyper_util::rt::TokioIo;
use std::{convert::Infallible, net::SocketAddr};
use tokio::net::TcpListener;

async fn handler(_: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
    Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error   Send   Sync>> {
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    let listener = TcpListener::bind(addr).await?;
    loop {
        let (stream, _) = listener.accept().await?;
        let io = TokioIo::new(stream);
        tokio::spawn(async move {
            // 请求在这里转成了Service
            let svc = hyper::service::service_fn(handler);
            if let Err(err) = http1::Builder::new().serve_connection(io, svc).await {
                eprintln!("server error: {}", err);
            }
        });
    }
}

这里service_fnhandler转成了Service,也就是server启动时要的是一个实现了Service trait的请求处理函数,这是后边构建中间件的基础。

注意,在 hyper 发布 v1 之后,这里的Service准确说不是tower的Service trait,但理念是一样,我们后边在讲他们接口的不同

这时如果想在处理上边加上LoggerTimeout两个流程来分别记录请求日志和超时约束, 能很灵活的按如下方式组织

tower-service

代码语言:javascript复制
let svc = hyper::service::service_fn(handler);
// 增加两个layer中间件
let svc = ServiceBuilder::new()
    .layer_fn(Logger::new)
    .layer_fn(|s| Timeout::new(s, std::time::Duration::from_secs(5)))
    .service(svc);
// 先忽略下边为了接口转换,后边在展开这里
// let svc = TowerToHyperService::new(svc);

Service trait

这样处理,就像是一个service链,一个service处理完,再调用下一个service

所以tower定义了如下Service trait:

代码语言:javascript复制
pub trait Service<Request> {
    type Response;
    type Error;
    type Future: Future<Output = Result<Self::Response, Self::Error>>;
    // 请求是否可以处理
    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
    // 处理逻辑
    fn call(&mut self, req: Request) -> Self::Future;
}

这里poll_ready是决定是否可以执行请求处理call前的判断。

call拿到请求,返回一个异步处理的结果,这样当请求执行耗时时不阻塞其他请求的处理。

说个题外话,你可能会好奇为什么这里要返回一个Future而不是用async

这是因为之前Rust不支持trait中定义异步函数。不过Rust 1.75开始支持了,如果后边换成下边的实现就不奇怪了

代码语言:javascript复制
trait Service<Request> {
    type Response;
    type Error;
    async fn call(&mut self, req: Request) -> Result<Self::Response, Self::Error>;
}

实现 middleware

Logger middleware实现来看如何构建起service

注释及代码如下:

代码语言:javascript复制
use tower::Service;

#[derive(Debug, Clone)]
pub struct Logger<S> {
    inner: S,
}
impl<S> Logger<S> {
    pub fn new(inner: S) -> Self {
        Logger { inner }
    }
}
type Req = hyper::Request<Incoming>;
impl<S> Service<Req> for Logger<S>
where
    S: Service<Req>   Clone,
{
    // Logger拿到的也是一个Service,返回类型也没有变化,直接指定即可
    type Response = S::Response;
    type Error = S::Error;
    type Future = S::Future;

    fn poll_ready(
        &mut self,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        // 直接可以处理,无需额外满足条件
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, req: Req) -> Self::Future {
        println!("processing request: {} {}", req.method(), req.uri().path());
        // 处理完调用下一个Service
        self.inner.call(req)
    }
}

这样添加Logger可以这么添加

代码语言:javascript复制
let svc = Logger::new(svc);

再加个Timeout

代码语言:javascript复制
let svc = Timeout::new(svc, std::time::Duration::from_secs(5));

不够优雅,要是链式操作就好了,这时就到Layer trait显身手了

Layer trait

代码语言:javascript复制
pub trait Layer<S> {
    type Service;
    fn layer(&self, inner: S) -> Self::Service;
}

只要实现以上Layer trait

代码语言:javascript复制
impl<S> Layer<S> for Logger<S> {
    type Service = Logger<S>;
    fn layer(&self, inner: S) -> Self::Service {
        Logger { inner }
    }
}

然后就能用ServiceBuilder构建service

代码语言:javascript复制
tower::ServiceBuilder::new()
    .layer(LoggerLayer)
    .layer_fn(|s| Timeout::new(s, std::time::Duration::from_secs(5)))
    .service(svc);

当然layer_fn也可以直接将函数转为实现Layer trait

最重要的是顺序是按调用顺序。

hyper Service trait

hyper之前依赖了tower Service,但 v1 稳定版发布前替换成了自己的Service

一方面是tower还没有稳定版本

另一方面为了简化请求处理:

  • 移除了poll_ready
  • call也不再需要&mut self,即不再考虑通过其修改请求,如果需要的话可以加Arc<Mutex<_>>state
代码语言:javascript复制
pub trait Service<Request> {
    type Response;
    type Error;
    type Future: Future<Output = Result<Self::Response, Self::Error>>;

    fn call(&self, req: Request) -> Self::Future;
}

这也就是为什么如果你想直接复用tower Serivice(如Timeout等)需要TowerToHyperService转一下:

代码语言:javascript复制
let svc = TowerToHyperService::new(svc);

另外hyper作为比较底层的请求库,很多web框架(Axum, Actix web等)都依赖他。也就也支持了tower, 使得tower实现的中间件就更容易复用了。

总的来说,tower能用Service trait构建一个请求中间件的规范,确实很神奇。从目前实现反推似乎很简单,但其实设计过程中还是有很多考虑的。推荐看看官方的这篇inventing-the-service-trait[1]

想了解中间件实现过程的话也推荐看看 David Pedersen 的Rust live codingTower deep dive[2] (看不了的同学可以 B 站找找...)

参考资料

[1]

inventing-the-service-trait: https://tokio.rs/blog/2021-05-14-inventing-the-service-trait

[2]

Tower deep dive: https://www.youtube.com/watch?v=16sU1q8OeeI&t=4227s

0 人点赞