Pin<Box<dyn Future<>>>解析

2023-11-24 16:21:00 浏览数 (1)

Tower 是一个专注于对网络编程进行抽象的框架,最核心的抽象为 Service trait。Service::call 接受一个 request 进行处理,成功则返回 response,否则返回 error。

代码语言:javascript复制
fn call(Request) -> Result<Response, Error>

Service trait 的定义

我们希望 Service 是异步编程风格,也就是 call 为 async 方法:

代码语言:javascript复制
async fn call(Request) -> Result<Response, Error>

我们就可以在 Service::call 上 await

代码语言:javascript复制
service.call(request).await

然而当前 Rust 不支持 async trait 方法。

我们可以让 call 作为普通方法,返回一个实现了 Future 的类型:

代码语言:javascript复制
trait Service<Request> {
    type Response;
    type Error;
    // ERROR: `impl Trait` not allowed outside of function and inherent
    // method return types
    fn call(&mut self, req: Request) -> impl Future<Output = Result<Self::Response, Self::Error>>;
}

还是不行,目前 Rust 也不支持在 Trait 中使用 impl Trait 做返回值,上一篇文章<impl Trait 的使用>分析过原因。

Tower 在定义 Service 时,使用了关联类型 type Future ,其实是将问题留给了 Service 的实现者,由用户选择 type Future 的实际类型:

代码语言:javascript复制
pub trait Service<Request> {
    type Response;
    type Error;
    type Future: Future<Output = Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;

    fn call(&mut self, req: Request) -> Self::Future;
}

实现自己的 Future 类型

我们在实现 Service 时,仍然需要为type Future 设置具体的类型。

既然没法让 call 直接返回 impl Future,一种方法是定义自己的 Future 类型,例如:

代码语言:javascript复制
pub struct HttpRequest {
    url: String,
}
pub struct HttpResponse {
    code: u32,
}

pub struct ResponseFuture {
    request: HttpRequest,
}

impl Future for ResponseFuture {
    type Output = Result<HttpResponse, Error>;

    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
        println!("process url:{}", &self.as_ref().get_ref().request.url);
        Poll::Ready(Ok(HttpResponse { code: 200 }))
    }
}

在实现 Service 时,关联类型 type Future 设置为我们手工实现的 Future:

代码语言:javascript复制
struct RequestHandler;

impl Service<HttpRequest> for RequestHandler {
    type Response = HttpResponse;

    type Error = Error;

    type Future = ResponseFuture;

    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: HttpRequest) -> Self::Future {
        ResponseFuture { request: req }
    }
}

然后就可以像正常的 Future 一样,使用 Service :

代码语言:javascript复制
#[tokio::main]
async fn main() {
    let mut service = RequestHandler {};
    match service
        .call(HttpRequest {
            url: "/user/mazhen".to_owned(),
        })
        .await
    {
        Ok(r) => println!("Response code: {}", r.code),
        Err(e) => println!("process failed. {:?}", e),
    }
}

使用 async blocks

手工实现 Future 会比较麻烦,我们一般都是用 async fn/async blocks语法糖生成 Future,那么这时 Service::call 返回什么类型呢?

代码语言:javascript复制
struct RequestHandler;

impl Service<HttpRequest> for RequestHandler {
    ...
    type Future = ???

    fn call(&mut self, req: HttpRequest) -> Self::Future {
        async move {
            println!("process url {:?}", &req.url);
            Ok(HttpResponse { code: 200 })
        }
    }
}

既然不能返回 impl Trait ,可以让 call 返回 trait object,用 trait object 统一返回值的类型。

代码语言:javascript复制
impl Service<HttpRequest> for RequestHandler {
    ...
    type Future = Box<dyn Future<Output = Result<Self::Response, Self::Error>>>;

    fn call(&mut self, req: HttpRequest) -> Self::Future {
        Box::new(async move {
            println!("process url {:?}", &req.url);
            Ok(HttpResponse { code: 200 })
        })
    }
}

这时候会报错,说 dyn Future 没有实现 Unpin

代码语言:javascript复制
|
51  |     type Future = Box<dyn Future<Output = Result<Self::Response, Self::Error>>>;
    |                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `(dyn std::future::Future<Output = Result<HttpResponse, std::io::Error>>   'static)`
    |

编译错误提示的没有实现 Unpin trait 是什么意思?那要先从 Pin 说起。

一句话解释Pin

Pin 本质上解决的问题是保证 Pin<P<T>> 中的 T 不会被 move,除非 T 满足 T: Unpin。

什么是move

所有权转移的这个过程就是 move,例如:

代码语言:javascript复制
let s1 = "Hello, world".to_owned();
let s2 = s1;

s2 浅copy s1 的内容,同时 String 的所有权转移给了 s2。

通过std::mem::swap()方法交换了两个可变借用 &mut 的内容,也发生了move。

为什么要Pin

自引用结构体,move了以后会出问题。

所以需要 Pin,不能move。

怎么 Pin 住的

保证 T 不会被move,需要避免两种情况:

  • 不能暴露 T ,否则赋值、方法调用等都会move
  • 不能暴露 &mut T,开发者可以调用 std::mem::swap()std::mem::replace() 这类方法来 move 掉 T

Pin<P<T>>没有暴露T,而且没法让你获得 &mut T,所以就 Pin 住了T。但注意有个前提条件:T 没有实现 Unpin

谁没有实现 Unpin

Unpin 是一个auto trait,编译器默认会给所有类型实现 Unpin。唯独有几个例外,他们实现的是 !Unpin

  • PhantomPinned
代码语言:javascript复制
/// A marker type which does not implement `Unpin`.
///
/// If a type contains a `PhantomPinned`, it will not implement `Unpin` by default.
#[stable(feature = "pin", since = "1.33.0")]
#[derive(Debug, Default, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct PhantomPinned;

#[stable(feature = "pin", since = "1.33.0")]
impl !Unpin for PhantomPinned {}

  • 如果你使用了 PhantomPinned,你的类型自动实现 !Unpin
代码语言:javascript复制
use std::marker::PhantomPinned;

#[derive(Debug)]
struct SelfReference {
    name: String,
    // 在初始化后指向 name
    name_ptr: *const String,
    // PhantomPinned 占位符
    _marker: PhantomPinned,
}

  • 编译器为 async fn 生成的匿名结构体实现的是 !Unpin

async fn

async fn 是语法糖,在编译时,编译器使用Generator为 async fn 生成匿名结构体,这个结构体实现了 Future

这个匿名结构体是自引用的。原因是,如果 async fn 内有多个 await,执行到 await 可能因为资源没准备好而让出 CPU 暂停执行,随后该 Future 可能被调度到其他线程接着执行。所以这个匿名结构体需要保存跨 await 的数据,形成了自引用结构。

由于为 async fn 生成的结构体是自引用的,所以这个结构体实现了 !Unpin,表示它不能被 move。

这也是为什么不能给Future::poll 直接传 &mut Self 的原因:生成的匿名结构体不能被move,而拿到 &mut Self就可以使用 swap 或 replace之类的方法进行move,这样不安全,所以必须使用 Pin<&mut Self>

Future 都是 !Unpin 的吗

不一定。

async fn 语法糖生成的实现了 Future 的匿名结构,内部包含自引用,它会明确实现 !Unpin,不能 move。

但如果你自己实现的 Future,内部没有自引用,它就不是 !Unpin,当然可以 move。

也就是说,Future!Unpin 是两个 trait,虽然它们经常联系在一起,但并不是实现了 Future 的类型都必须同时实现 !Unpin,没有包含自引用的 Future当然可以安全的 move 了。

实现了 Unpin 的 Future

如果是可以 move 的 Future,也就是实现了 Unpin 的 Future,在调用 Future::poll 的时候,要求传入 Pin<&mut Self>,会不会有什么问题呢?

首先,如果 T:Unpin,那么 Pin<&mut T> 就完全等同于 &mut T。换句话说,Unpin 意味着这个类型可以被移动,即使是在 Pin 住的情况下,所以 Pin 对这样的类型没有影响。因为 Pin 是智能指针,它实现了 Deref/DerefMut,只要满足 T: Unpin,你就能拿到&mut T

代码语言:javascript复制
#[stable(feature = "pin", since = "1.33.0")]
impl<P: DerefMut<Target: Unpin>> DerefMut for Pin<P> {
    fn deref_mut(&mut self) -> &mut P::Target {
        Pin::get_mut(Pin::as_mut(self))
    }
}

其次,为了用户方便,FutureExt 提供了 poll_unpin,让你直接在 UnpinFuturepoll

代码语言:javascript复制
pub trait FutureExt: Future {
    /// A convenience for calling `Future::poll` on `Unpin` future types.
    fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output>
    where
        Self: Unpin,
    {
        Pin::new(self).poll(cx)
    }
}

所以,如果你的 FutureUnpin,那么即使Future::poll 要求传入的是 Pin<&mut Self>,对你也没有任何影响。

Box<dyn Futrue<>> 的问题

回到上面的问题,我们想让 Service::call 返回 trait object,也就是 Box<dyn Futrue<>>,会编译不过,为什么呢?

因为标准库为 Box 实现了 Future,但要求 Box 包装的 Future 必须同时实现了 Unpin :

代码语言:javascript复制
impl<F, A> Future for Box<F, A>
where
    F: Future   Unpin   ?Sized,
    A: Allocator   'static,

前面已经讲过,async 解语法糖生成的 Future 没有实现 Unpin,所以Box::new(async{...}) 不满足类型约束,它没有实现 Future,不能在它上面await。

Pin 实现了 Future

前面讲过,在 Future 上 poll 的时候,不能直接传入&mut Self,而需要传入 Pin<&mut Self>,需要这样调用 Future::poll(Pin::new(&mut future), ctx)。如果 Pin 实现了 Future ,我们就可以直接这样 poll 了: Pin::new(&mut future).poll(ctx)

标准库也确实为 Pin 实现了 Future:

代码语言:javascript复制
impl<P> Future for Pin<P>
where
    P: DerefMut,
    <P as Deref>::Target: Future,

我们来看对 P 的约束,P 可解引用为 Future,也就是说,P是 Future 的引用 &mut future,或者是智能指针 Box<dyn Future> 都可以满足约束。因为 Pin<Box<dyn Future<>>> 实现了 Future,我们可以用它作为 Service::call 返回值类型。

Box 提供了 pin 方法,让用户构建 Pin<Box<T>>

代码语言:javascript复制
pub fn pin(x: T) -> Pin<Box<T, Global>>

使用 Box::pin,RequestHanlder 可以这么修改:

代码语言:javascript复制
impl Service<HttpRequest> for RequestHandler {
    ...
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;

    fn call(&mut self, req: HttpRequest) -> Self::Future {
        Box::pin(async move {
            println!("process url {:?}", &req.url);
            Ok(HttpResponse { code: 200 })
        })
    }
}

这回终于可以了。事实上,在异步场景下,我们经常会看到使用 Box::pin 去包装 async block

Pin<Box<dyn Future<>>>

Pin<Box<dyn Future<>>> 除了实现了Future,也实现了 Unpin。

因为 Pin 实现了 Unpin,只要 P 是 Unpin 的:

代码语言:javascript复制
impl<P> Unpin for Pin<P>
where
    P: Unpin

而 Box 正好是 Unpin 的:

代码语言:javascript复制
impl<T, A> Unpin for Box<T, A>
where
    A: Allocator   'static,
    T: ?Sized,

因此 Pin<Box<T>>是 Unpin 的。可以这么理解,Pin 钉住了 T,但 Pin 本身是 Unpin的,可以安全的 move。

很多异步方法需要你的 Future 同时实现了 Unpin ,例如tokio::select!(),而 async fn 返回的 Future 显然不满足 Unpin,这个时候仍然可以用 Box::pin把 Future pin 住,得到的Pin<Box<dyn Future<...>>> 同时实现了 Future 和 Unpin,满足你的要求。

简单总结,在异步编程场景,我们经常会用Box::pin 包装 async block,获得同时实现了 Future 和 Unpin 的Pin<Box<dyn Future<>>>

0 人点赞