Rayon魔法:使Rust并行编程变得轻而易举

2024-01-17 16:08:13 浏览数 (1)

Rayon库是一个数据并行化(data-parallelism)的 Rust库。在并行编程里是一个很有趣的存在, 且非常的容易上手。它可以很轻松地将同步计算流程转化为并行计算。而且基本能保证编译通过就不会有data race

文章目录

  • 同步转并行
  • 背后的魔法
  • join
  • par_bridge

同步转并行

假设有个如下的求和的同步代码

代码语言:javascript复制
fn main() {
   let sum: i32 = (0..100)
        .into_iter()
        .map(|i| {
            // Simulate some computation
            sleep(Duration::from_nanos(1));
            i
        })
        .sum();
    assert_eq!(sum, 4950);
}

想要转成并行,只需要into_iter变成into_par_iter

Rayon会将同步的遍历转成并行的遍历,而且保证返回的顺序是一致的,瞬间并行是不是!

代码语言:javascript复制
use rayon::prelude::*;
fn main() {
    let sum: i32 = (0..100)
        .into_par_iter() // 这里
        .map(|i| {
            // Simulate some computation
            sleep(Duration::from_nanos(1));
            i
        })
        .sum();
    assert_eq!(sum, 4950);
}

divan在 10 核的 M1 pro 上测试结果如下,一行改变让代码速度提升了不少。

Benchmark

Fastest

Slowest

Median

Mean

Samples

Iterations

iter

549.2 µs

1.244 ms

687.4 µs

738.5 µs

100

100

par_iter

195 µs

488.1 µs

315.1 µs

321.9 µs

100

100

背后的魔法

这个并行遍历是怎么处理的呢?

Rayon利用一个可伸缩线程池来执行并行任务,默认情况下,线程池的大小与系统的逻辑核心数量相匹配。

在进行并行任务时,Rayon将当前任务拆分成多个子任务(依据线程池大小),并尽可能地将它们分配给空闲的线程以执行,每个线程有自己的本地任务队列。

如果当前有空闲线程,但已分配的任务仍在等待其线程完成当前任务,空闲线程将尝试执行work stealing,从其他线程任务队列中中窃取一些任务来执行,以确保最大程度地利用 CPU 资源。

最终,将并行任务的结果进行两两合并,将线程结果全部汇总以完成整个并行计算过程。

这里任务拆分和work stealing就是将并行任务分而治之的精髓。

join

其底层很多使用了join, 将两个任务并行执行,并等待任务结果一起返回:

代码语言:javascript复制
use rayon::prelude::*;

fn main() {
    let v1 = vec![1, 2, 3, 4, 5];
    let v2 = vec![6, 7, 8, 9, 10];

    let (sum1, sum2) = rayon::join(
        || v1.par_iter().sum::<i32>(),
        || v2.par_iter().sum::<i32>()
    );

    println!("sum1: {}, sum2: {}", sum1, sum2);
}

par_bridge

常规能很容易并行化拆分的par_iter就可以了,但是如果遇到不容易并行化的(有阻塞等待等),如channel或者文件、网络 IO 的操作, 则可以用par_bridge

性能会有些损耗,因为其执行的方式是每次获取下一个可遍历的内容,分发到线程池内可用线程上执行,同时也不保证结果返回的顺序。

代码语言:javascript复制
use rayon::iter::ParallelBridge;
use rayon::prelude::ParallelIterator;
use std::sync::mpsc::channel;

fn main() {
    let rx = {
        let (tx, rx) = channel();

        (1..=3).into_iter().for_each(|i| {
            let _ = tx.send(i);
        });

        rx
    };

    let mut output: Vec<i32> = rx.into_iter().par_bridge().collect();
    output.sort_unstable(); // 重新保证顺序

    assert_eq!(&*output, &[1, 2, 3]);
}

总之,对于串行化遍历任务,一般都可以用Rayon转化为并行处理,当然也要看有没有转化的必要,常规简单遍历自然是不需要并行化的,毕竟线程和任务并行调度也是有开销的。

想了解更多,推荐看看Rayon: data parallelism in Rust[1]

参考资料

[1]

Rayon: data parallelism in Rust: https://smallcultfollowing.com/babysteps/blog/2015/12/18/rayon-data-parallelism-in-rust/

如果有用,点个 在看,让更多人看到

外链不能跳转

0 人点赞