“ 本文节选自「Rust 生态蜜蜂」。Rust 生态蜜蜂是觉学社公众号开启的一个付费合集。生态蜜蜂,顾名思义,是从 Rust 生态的中,汲取养分,供我们成长。计划从2022年7月第二周开始,到2022年12月最后一周结束。预计至少二十篇周刊,外加一篇Rust年度报告总结抢读版。本专栏可以在公众号菜单「生态蜜蜂」中直接进入。欢迎大家订阅!如果有需要,每个订阅者都可以私信我你的电子邮件,我也会把 Markdown 文件发送给你。
背景:结构化并发
Scoped Thread 对应的是一种叫做结构化并发(Structured Concurrency)概念的实现。
结构化并发是一个近几年来才被提出的新概念,旨在通过对并发的执行流进行封装,使得它们能够有确定的入口和出口,确保所有派生“线程”在出口之前完成,从而提高并发编程的确定性、质量和开发效率。
结构化并发的思想来源于结构化编程。在编程史上古年间,大家写代码都是像“面条”一样,没有结构。goto
满天飞,直到 Dijkstra 专门写了一篇著名的名为 「Go To语句被认为有害(Goto statement considered harmful)」的论文,他在文章中批评了当时 goto
语句的过度使用,并提出结构化编程概念,采用子程序、块结构、for
循环以及while
循环等结构,来取代传统的 goto
。从此以后,代码的世界才有了结构,为大型软件的出现奠定了基础。
对于多线程并发来说,线程一旦运行,就像“脱缰野马”一样,不受控制。当然可以使用 join
或锁等机制来实现同步,但是这完全依赖开发者自身的水平,很容易出错。结构化并发想实现的就是,让并发的若干个子线程和父线程之间存在一种结构:让语言本身保证当父线程的作用域结束时,子线程一定已经运行完毕。如果还有任意一个子线程没跑完,父线程都不会结束。某种意义上,子线程像是一个父线程的一个局部变量。
泄漏启示录:Rust 实现结构化并发的历史
在 Rust 1.0 之前,Rust 标准库中自带来结构化并发的实现,即 Scoped Thread。但是就在 1.0 稳定前一个月,有人发现了一个不健全(unsound)问题[1]:通过标准库中 Scoped Thread 和 Rc<T>
一起配合使用可以在 Safe Rust 中构造出 UB。这一历史事件被称为 泄漏启示录[2]。
具体来说,Scoped Thread 的工作机制是,在启动一个Scoped Thread 时会返回一个 Guard 对象。当 Guard 对象被析构时,它会等待线程完成。这将保证子线程不会超过本地变量所在的当前栈帧。然后,有人就发现,可以通过 Rc<T>
可以构造一个循环引用,让引用的计数保持在零以上,就永远不可能执行析构函数drop,这样就会导致内存泄露。所以,通过 Rc<T>
就可以构造出让 Scoped Thread 的 Guard 对象永远都不会析构。这样,当作用域当前栈帧调用结束以后,子线程就能读取局部变量的值,造成 UB。
这个 Bug,不是 Rust 语言天生不健全,而是一种形式的不健全(导致泄漏的能力)转变为更糟糕的不健全形式(导致崩溃的能力)的一种方式。这个问题还直接造就了std::mem::forget
由 Unsafe 被重新定义为 Safe。因为这个问题打破了 Rust 语言对析构函数一定会运行的假设。在 1.0 之前,内部的内存泄露被看作是一种 Unsafe,所以std::mem::forget
之前是 Unsafe。
经过团队的紧急讨论,在 1.0 之前,还是把 Scoped Thread 的特性移除了。后来通过第三方库 crossbeam::Scope
来安全地提供这个功能。但是官方还是希望标准库可以实现这个功能,于是在此事件四年之后又增加了 [RFC 3151] (https://rust-lang.github.io/rfcs/3151-scoped-threads.html#motivation) 来重新设计 Scoped Thread 。
时隔七年,历经 63 个版本迭代,Scoped Thread 现在终于重回标准库了!
1.63 版 Scoped Thread 特性介绍
终于说回正题了。Rust 1.63 Scoped Thread 相关文档地址:https://doc.rust-lang.org/nightly/std/thread/fn.scope.html[3]。目前还只能在 Nightly Rust 下使用,Rust 1.63 稳定版 将于 8 月 11 日发布。
先来看看标准库中普通线程的限制:
代码语言:javascript复制let greeting = String::from("Hello world!");
let handle1 = thread::spawn({
let greeting = greeting.clone();
move || {
println!("thread #1 says: {}", greeting);
}
});
let handle2 = thread::spawn(move || {
println!("thread #2 says: {}", greeting);
});
handle1.join().unwrap();
handle2.join().unwrap();
标准库中通用的线程thread::spawn
因为存在 F: 'static
这样的限制,所以无法在子线程中借用主线程作用域中的局部变量。所以只能使用 move
关键字将主线程的局部变量移动到子线程中。
相比之下, Scoped Thread 就可以打破这个限制:
代码语言:javascript复制#![feature(scoped_threads)]
use std::thread;
fn main(){
let mut a = vec![1, 2, 3];
let mut x = 0;
thread::scope(|s| {
s.spawn(|| {
println!("hello from the first scoped thread");
// We can borrow `a` here.
dbg!(&a);
});
s.spawn(|| {
println!("hello from the second scoped thread");
// We can even mutably borrow `x` here,
// because no other threads are using it.
x = a[0] a[2];
});
println!("hello from the main thread");
});
// After the scope, we can modify and access our variables again:
a.push(4);
assert_eq!(x, a.len());
}
这样子线程中就可以直接借用 主线程当前作用域中的变量了,而不需要 join
子线程。这就在 Rust 中实现了结构化并发。
标准库中支持 Scoped Thread 有优点也有缺点。
优点:
- 这是一个常用且很实用的工具。
- 标准库提供一个统一的可靠实现,相比于个人自己实现更靠谱。
- 相比于使用
thread::spawn
,不会有泄漏的风险。
缺点就是会使标准库变大。
新的 Scoped Thread 经过重新设计避免了 1.0 之前的安全问题,用闭包来代替 Guard 的方式确保子线程可以自动 join
。并且新的 Scoped Thread 和 crossbeam::Scope
的实现完全不同。Scoped Thread 更高效,没有无限的内存使用。API 和 crossbeam::Scope
也不一样,Scoped Thread 是可捕获的 Scope 对象而不是线程的 Scope 参数,没有 Result 返回类型,以及更简单的恐慌处理。
1.63 版 Scoped Thread 的实现机制
新的 Scoped Thread 函数scope
函数签名如下:
pub fn scope<'env, F, T>(f: F) -> T
where
F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T,
看得出来,该函数只能传入 FnOnce(&'scope Scope<'scope, 'env>) -> T
闭包。
其中'scope
生命周期代表作用域本身的生命周期,一旦这个生命周期结束,所有的作用域线程就会被 Join
。这个生命周期在 scope 函数内、f闭包之前开始,在 f闭包结束后返回,等作用域所有线程 Join
后结束,但是在 scope返回之前。
'env
生命周期表示作用域线程借用的任何内容的生命周期。
闭包中的 Scope
对象是一个结构体:
/// A scope to spawn scoped threads in.
///
/// See [`scope`] for details.
#[stable(feature = "scoped_threads", since = "1.63.0")]
pub struct Scope<'scope, 'env: 'scope> {
data: Arc<ScopeData>,
/// Invariance over 'scope, to make sure 'scope cannot shrink,
/// which is necessary for soundness.
///
/// Without invariance, this would compile fine but be unsound:
///
/// ```compile_fail,E0373
/// std::thread::scope(|s| {
/// s.spawn(|| {
/// let a = String::from("abcd");
/// s.spawn(|| println!("{a:?}")); // might run after `a` is dropped
/// });
/// });
/// ```
scope: PhantomData<&'scope mut &'scope ()>,
env: PhantomData<&'env mut &'env ()>,
}
这个 Scope
结构体定义了 'env
和'scope
生命周期的关系。因为 'env
是代表被作用域子线程借用的东西的生命周期,所以它的存活期不能比主线程的 'scope
生命周期短,所以是 'env: 'scope
的关系。这意味着任何超过这个调用的东西,比如在 scope 之前定义的局部变量,都可以被作用域线程借用。
所以,Scope
结构体中,通过 scope:PhantomData<&'scope mut &'scope ()>
和 env: PhantomData<&'env mut &'env ()>
这样的定义,为'env
和 'scope
设定了不变性(Invariance)[4],以便编译器可以识别生命周期收缩的情况。比如上面代码中注释示例:
std::thread::scope(|s| { // --------------------- '1 lifetime
s.spawn(|| {
let a = String::from("abcd"); // ----------------- '2 lifetime
s.spawn(|| println!("{a:?}")); // might run after `a` is dropped
});
});
这段代码中使用了 嵌套scope 线程,会发生编译错误。
因为 s
的生命周期实例是 '1
,在第一层 scope 线程中定义的 a
生命周期为 '2
,在嵌套的scope线程中,闭包产生了 a
的借用&a
,闭包的生命周期实例是 '3
。此时,生命周期的关系是 '3 < '2 < '1
。
而 '3
是 'env
类型的生命周期,'2
是 'scope
生命周期,上面的 Scope 对象生命周期参数定义是 'env: 'scope
,即 'env > 'scope
。所以这里违反了不变性,编译错误。
除了通过生命周期参数来让编译器安全检查保证 Scoped Thread 的引用正确性。内部还通过 Arc<ScopeData>
对运行的线程和panic的线程进行记录。当运行的线程不等于 0
时,主线程就一直 park
阻塞,直到运行的线程为0
。
延伸阅读
内存泄漏是否被认为违反了内存安全?[5]
`mem::forget` 是 unsafe 的,但完全可以用 safe 代码实现相同效果[6]
tokio RFC 实现异步实现结构化并发:tokio::task::scope [7]
参考资料
[1]
不健全(unsound)问题: https://github.com/rust-lang/rust/issues/24292
[2]
泄漏启示录: http://cglab.ca/~abeinges/blah/everyone-poops/
[3]
https://doc.rust-lang.org/nightly/std/thread/fn.scope.html: https://doc.rust-lang.org/nightly/std/thread/fn.scope.html
[4]
不变性(Invariance): https://doc.rust-lang.org/nomicon/phantom-data.html
[5]
内存泄漏是否被认为违反了内存安全?: https://internals.rust-lang.org/t/are-memory-leaks-considered-to-violate-memory-safety/1674
[6]
mem::forget
是 unsafe 的,但完全可以用 safe 代码实现相同效果: https://github.com/rust-lang/rust/issues/24456
[7]
tokio RFC 实现异步实现结构化并发:tokio::task::scope : https://github.com/tokio-rs/tokio/issues/2592