Rust异步编程之Future并发处理

2024-01-11 11:22:17 浏览数 (1)

上篇文章我们知道,RustFuture是异步执行,await时是阻塞在当前的异步任务task上,直到完成。

当多个异步任务执行时,如果只能都阻塞一个个执行,那就变成同步串行执行了,当然不是我们通常希望的并发处理方式,今天就来聊聊多个异步任务的一些并发处理方式。

文章目录

  • join
  • try_join
  • spawn
  • select
    • 顺序执行
    • precondition
    • 分支修改
    • cancel

join

多个异步任务执行时,如果希望全部执行完成后统一返回,可以让他们都并发去执行,等全部完成后再一起返回。join!宏就可以实现它。

代码语言:javascript复制
async fn async_fn1() -> u32 {
    1
}

async fn async_fn2() -> u32 {
    2
}

#[tokio::main]
async fn main() {
    let (first, second) = tokio::join!(async_fn1(), async_fn2());
    assert_eq!(first, 1);
    assert_eq!(second, 2);
}

try_join

如果其中有失败的话,也会返回失败的Err。如果想一有失败就立马返回,不等待其他任务完成,可以使用try_join!

代码语言:javascript复制
async fn async_fn1() -> Result<u32, &'static str> {
    Ok(1)
}

async fn async_fn2() -> Result<u32, &'static str> {
    Err("async_fn2 failed")
}

#[tokio::main]
async fn main() {
    let res = tokio::try_join!(async_fn1(), async_fn2());

    match res {
        Ok((first, second)) => {
            println!("first = {}, second = {}", first, second);
        }
        Err(err) => {
            println!("error: {}", err);
        }
    }
}

spawn

上边join虽然是让多个异步任务并发执行,但其实际还是在同一个task上异步执行,如果想让每个异步任务都在一个新的task独立执行,可以用spawn

异步任务spawn后会在后台立即开始运行,即便没有对其返回的JoinHandle进行await

这就有点像多线程里的spawn,只不过这里粒度不是线程,是task

代码语言:javascript复制
use futures::future::join_all;
use tokio::{join, task::JoinHandle};

async fn async_op(id: i32) -> String {
    let s = format!("Start task {}", id);
    println!("{}", s);
    s
}

#[tokio::main]
async fn main() {
    let ops = vec![1, 2, 3];
    let mut tasks: Vec<JoinHandle<String>> = ops
        .into_iter()
        .map(|op| tokio::spawn(async_op(op)))
        .collect();

    // option 1
    // let outputs = join!(
    //     tasks.pop().unwrap(),
    //     tasks.pop().unwrap(),
    //     tasks.pop().unwrap()
    // );

    // println!("{:?}", outputs);
    // tuple of results:
    // (Ok("Start task 3"), Ok("Start task 2"), Ok("Start task 1"))

    // option 2
    let outputs = join_all(tasks).await;
    println!("{:?}", outputs);
    // vector of results:
    // [Ok("Start task 1"), Ok("Start task 2"), Ok("Start task 3")]
}

select

如果是多个异步分支(branch)有一个完成就返回,并取消(drop来释放异步资源)其他异步分支的话,可以用select

代码语言:javascript复制
async fn async_fn1() {}

async fn async_fn2() {}

#[tokio::main]
async fn main() {
    tokio::select! {
        _ = async_fn1() => {
            println!("async_fn1() completed first")
        }
        _ = async_fn2() => {
            println!("async_fn2() completed first")
        }
    };
}

顺序执行

这里select会对每个分支随机执行,顺序是不保证的。如果期望顺序执行,可以用biased

代码语言:javascript复制
#[tokio::main]
async fn main() {
    let mut count = 0u8;

    loop {
        tokio::select! {
            // 顺序执行
            biased;

            _ = async {}, if count < 1 => {
                count  = 1;
                assert_eq!(count, 1);
            }
            _ = async {}, if count < 2 => {
                count  = 1;
                assert_eq!(count, 2);
            }

            else => {
                break;
            }
        };
    }
}

precondition

上边例子中,分支使用了if precondition,如果当前select循环中运行到该分支,条件满足则执行;不满足的话会标记分支为失效(disabled)本次select中不会执行。

如果在loop中,下一次进入select循环会重新标记disabled状态

另外当前循环如果所以分支都被标记为disabled状态,就必须要有else分支,使select仍可运行。不然就会收到panic: all branches are disabled and there is no else branch.

分支修改

select的分支也可修改, 比如下边通过Pin::set来修改Pin住的异步任务。

代码语言:javascript复制
use tokio::select;

async fn action(input: Option<i32>) -> Option<String> {
    match input {
        Some(input) => Some(input.to_string()),
        None => return None,
    }
}

#[tokio::main]
async fn main() {
    let (tx, mut rx) = tokio::sync::mpsc::channel(128);

    let mut done = false;
    let operation = action(None);
    tokio::pin!(operation);

    tokio::spawn(async move {
        let _ = tx.send(1).await;
        let _ = tx.send(3).await;
        let _ = tx.send(2).await;
    });

    loop {
        select! {
            res = &mut operation, if !done => {
                println!("Got = {:?}", res);
                done = true;

                if let Some(_) = res {
                    return;
                }
            }
            Some(v) = rx.recv() => {
                if v % 2 == 0 {
                    // `.set` is a method on `Pin`.
                    operation.set(action(Some(v)));
                    done = false;
                }
            }
        }
    }
}

这里第一个分支的precondition是必须的,不然就会有可能出现多次执行一个已完成的异步任务,会panic: async fn resumed after completion

cancel

最后在聊聊分支取消。

select有分支完成时,其他分支会被取消。取消依托于Drop。当futuredrop,其也会停止被异步调度。

比如下边代码,当oneshot::Receiver被取消而Drop时,会向Sender发送close通知,以便于清理sender并中断其执行。

代码语言:javascript复制
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (mut tx1, rx1) = oneshot::channel::<u32>();
    let (tx2, rx2) = oneshot::channel();

    tokio::spawn(async move {
        tokio::select! {
            _ = tx1.closed() => {
                // `val = rx1` is canceled
                println!("tx1 closed");
            }
        }
    });
    tokio::spawn(async {
        let _ = tx2.send("two");
    });
    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);

        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
}

如果有用,点个 在看,让更多人看到

0 人点赞