Skip to content

31.5 rayon

C#语言有一个很厉害的PLinq扩展,可以轻松地将linq语句并行化。比如:


rust
string[] words = new[] { "Welcome", "to", "Beijing","OK","Hua","Ying","Ni" ,"2008"};
var lazyBeeQuery = from word in words.AsParallel() select word;
lazyBeeQuery.ForAll<string>(word => { Console.WriteLine(word); });

在新的 C++17 中,标准库也支持了一些并行算法:


rust
std::experimental::parallel::for_each(
                        std::experimental::parallel::par, // 并行执行
                        v.begin(), v.end(), functor);

在 Rust 中,迭代器基本已经与 linq 的功能差不多。那我们能不能做个类似的扩展,让普通迭代器轻松变成并行迭代器呢?Rayon 的设计目标就是这个。

Rayon 是 Rust 核心组成员 Nicholas Matsakis 开发的一个并行迭代器项目。它可以把一个按顺序执行的任务轻松变成并行执行。它非常轻量级,效率极高,而且使用非常简单。而且它保证了无数据竞争的线程安全。

Rayon 的 API 主要有两种:

  • 并行迭代器——对一个可迭代的序列调用 par_iter 方法,就可以产生一个并行迭代器;

  • join 函数——它可以把一个递归的分治算法问题变成并行执行。

照例,我们用示例来说明它的基本用法。比如,我们想对一个整数数组执行平方和计算,可以这样做:


rust
use rayon::prelude::*;
fn sum_of_squares(input: &[i32]) -> i32 {
    input.par_iter()  // iter() 换成 par_iter()
        .map(|&i| i * i)
        .sum()
}

这个问题是可以并行计算的,每个元素的平方操作互不干扰,如果能让它们在不同线程计算,最后再一起求和,可以提高执行效率。用 Rayon 来解决这个问题很简单,只需将单线程情况下的 iter()方法改为 par_iter()即可。Rayon 会在后台启动一个线程池,自动分配任务,将多个元素的 map 操作分配到不同的线程中并行执行,最后把所有的执行结果汇总再相加。

类似的,这个迭代器也有 mut 版本。假如我们想并行修改某个数组,可以这样做:


rust
use rayon::prelude::*;
fn increment_all(input: &mut [i32]) {
    input.par_iter_mut()
        .for_each(|p| *p += 1);
}

Rayon 的另外一种使用方式是调用 join 函数。这个函数特别适合于分治算法。一个典型的例子是写一个快速排序算法:


rust
fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
    let pivot = v.len() - 1;
    let mut i = 0;
    for j in 0..pivot {
        if v[j] <= v[pivot] {
            v.swap(i, j);
            i += 1;
        }
    }
    v.swap(i, pivot);
    i
}

fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) {
    if v.len() <= 1 {
        return;
    }

    let mid = partition(v);
    let (lo, hi) = v.split_at_mut(mid);
    rayon::join(|| quick_sort(lo), || quick_sort(hi));
}

fn main() {
    let mut v = vec![10,9,8,7,6,5,4,3,2,1];
    quick_sort(&mut v);
    println!("{:?}", v);
}

在快速排序算法中,我们可以先把数组切分为两部分,然后分别再对这两部分执行快速排序。在这里,我们使用了 rayon::join 函数。

需要注意的是,并行迭代器和 join 函数并不是简单地新建线程,然后在两个线程上分别执行。它内部实际上使用了“work steal”策略。它后台的线程是由一个线程池管理的,join 函数只是把这两个闭包作为两个任务分发出去了,并不保证这两个闭包一定会并行执行或者串行执行。如果现在有空闲线程,那么空闲线程就会执行这个任务。总之,哪个线程有空,就在哪个线程上执行,它不会让某些线程早早结束而让某些任务在其他线程里面等待。所以,它的开销是非常小的。Rayon 这个库在性能测试的 benchmark 上的表现也是非常不错的,具体数据大家可以查看官方网站,或者自行测试。

同时,我还要强调一点,Rayon 同样保证了“线程安全”。比如,我们如果想在两个任务中同时修改一个数组,编译器会阻止我们:


rust
fn increment_all(slice: &mut [i32]) {
    rayon::join(|| process(slice), || process(slice));
}

我们应该能猜想到,这里的 API 肯定用到了 Send、Sync 之类的约束,就跟标准库中的 spawn 函数一样。因此第三方库也一样能享受到“线程安全”的优点。

有关这个库的使用方法以及其内部实现原理,在 Nicholas Matsakis 的博客有详细描述,本书篇幅有限就不再展开了。从这个库我们可以看到,Rust 为各种并行开发的模式提供了无限的可能性。虽然标准库在这方面提供的直接选择不多,但并没有阻碍我们实现各种各样的第三方库。Rust 在并行开发方面同时实现了执行效率高、安全性好、扩展性好的特点。

Released under the MIT License