上篇文章我们知道,Rust
的Future
是异步执行,await
时是阻塞在当前的异步任务task
上,直到完成。
当多个异步任务执行时,如果只能都阻塞一个个执行,那就变成同步串行执行了,当然不是我们通常希望的并发处理方式,今天就来聊聊多个异步任务的一些并发处理方式。
文章目录
- join
- try_join
- spawn
- select
- 顺序执行
- precondition
- 分支修改
- cancel
join
多个异步任务执行时,如果希望全部执行完成后统一返回,可以让他们都并发去执行,等全部完成后再一起返回。join!
宏就可以实现它。
async fn async_fn1() -> u32 {
1
}
async fn async_fn2() -> u32 {
2
}
#[tokio::main]
async fn main() {
let (first, second) = tokio::join!(async_fn1(), async_fn2());
assert_eq!(first, 1);
assert_eq!(second, 2);
}
try_join
如果其中有失败的话,也会返回失败的Err
。如果想一有失败就立马返回,不等待其他任务完成,可以使用try_join!
。
async fn async_fn1() -> Result<u32, &'static str> {
Ok(1)
}
async fn async_fn2() -> Result<u32, &'static str> {
Err("async_fn2 failed")
}
#[tokio::main]
async fn main() {
let res = tokio::try_join!(async_fn1(), async_fn2());
match res {
Ok((first, second)) => {
println!("first = {}, second = {}", first, second);
}
Err(err) => {
println!("error: {}", err);
}
}
}
spawn
上边join
虽然是让多个异步任务并发执行,但其实际还是在同一个task
上异步执行,如果想让每个异步任务都在一个新的task
上独立执行,可以用spawn
。
异步任务spawn
后会在后台立即开始运行,即便没有对其返回的JoinHandle
进行await
这就有点像多线程里的spawn
,只不过这里粒度不是线程,是task
。
use futures::future::join_all;
use tokio::{join, task::JoinHandle};
async fn async_op(id: i32) -> String {
let s = format!("Start task {}", id);
println!("{}", s);
s
}
#[tokio::main]
async fn main() {
let ops = vec![1, 2, 3];
let mut tasks: Vec<JoinHandle<String>> = ops
.into_iter()
.map(|op| tokio::spawn(async_op(op)))
.collect();
// option 1
// let outputs = join!(
// tasks.pop().unwrap(),
// tasks.pop().unwrap(),
// tasks.pop().unwrap()
// );
// println!("{:?}", outputs);
// tuple of results:
// (Ok("Start task 3"), Ok("Start task 2"), Ok("Start task 1"))
// option 2
let outputs = join_all(tasks).await;
println!("{:?}", outputs);
// vector of results:
// [Ok("Start task 1"), Ok("Start task 2"), Ok("Start task 3")]
}
select
如果是多个异步分支(branch
)有一个完成就返回,并取消(drop
来释放异步资源)其他异步分支的话,可以用select
async fn async_fn1() {}
async fn async_fn2() {}
#[tokio::main]
async fn main() {
tokio::select! {
_ = async_fn1() => {
println!("async_fn1() completed first")
}
_ = async_fn2() => {
println!("async_fn2() completed first")
}
};
}
顺序执行
这里select
会对每个分支随机执行,顺序是不保证的。如果期望顺序执行,可以用biased
#[tokio::main]
async fn main() {
let mut count = 0u8;
loop {
tokio::select! {
// 顺序执行
biased;
_ = async {}, if count < 1 => {
count = 1;
assert_eq!(count, 1);
}
_ = async {}, if count < 2 => {
count = 1;
assert_eq!(count, 2);
}
else => {
break;
}
};
}
}
precondition
上边例子中,分支使用了if precondition
,如果当前select
循环中运行到该分支,条件满足则执行;不满足的话会标记分支为失效(disabled
)本次select
中不会执行。
如果在loop
中,下一次进入select
循环会重新标记disabled
状态
另外当前循环如果所以分支都被标记为disabled
状态,就必须要有else
分支,使select
仍可运行。不然就会收到panic
: all branches are disabled and there is no else branch
.
分支修改
select
的分支也可修改, 比如下边通过Pin::set
来修改Pin
住的异步任务。
use tokio::select;
async fn action(input: Option<i32>) -> Option<String> {
match input {
Some(input) => Some(input.to_string()),
None => return None,
}
}
#[tokio::main]
async fn main() {
let (tx, mut rx) = tokio::sync::mpsc::channel(128);
let mut done = false;
let operation = action(None);
tokio::pin!(operation);
tokio::spawn(async move {
let _ = tx.send(1).await;
let _ = tx.send(3).await;
let _ = tx.send(2).await;
});
loop {
select! {
res = &mut operation, if !done => {
println!("Got = {:?}", res);
done = true;
if let Some(_) = res {
return;
}
}
Some(v) = rx.recv() => {
if v % 2 == 0 {
// `.set` is a method on `Pin`.
operation.set(action(Some(v)));
done = false;
}
}
}
}
}
这里第一个分支的precondition
是必须的,不然就会有可能出现多次执行一个已完成的异步任务,会panic
: async fn resumed after completion
。
cancel
最后在聊聊分支取消。
当select
有分支完成时,其他分支会被取消。取消依托于Drop
。当future
被drop
,其也会停止被异步调度。
比如下边代码,当oneshot::Receiver
被取消而Drop
时,会向Sender
发送close
通知,以便于清理sender
并中断其执行。
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (mut tx1, rx1) = oneshot::channel::<u32>();
let (tx2, rx2) = oneshot::channel();
tokio::spawn(async move {
tokio::select! {
_ = tx1.closed() => {
// `val = rx1` is canceled
println!("tx1 closed");
}
}
});
tokio::spawn(async {
let _ = tx2.send("two");
});
tokio::select! {
val = rx1 => {
println!("rx1 completed first with {:?}", val);
}
val = rx2 => {
println!("rx2 completed first with {:?}", val);
}
}
}
如果有用,点个 在看,让更多人看到