并行任务

并行,变换数组的元素

rayon-badge cat-concurrency-badge

该示例使用了rayon箱子,这是 Rust 的数据并行库。rayon提供了par_iter_mut方法,给任何的并行可迭代数据类型使用。这是一个类似迭代器的链,(潜在地)并行执行。

extern crate rayon;

use rayon::prelude::*;

fn main() {
    let mut arr = [0, 7, 9, 11];
    arr.par_iter_mut().for_each(|p| *p -= 1);
    println!("{:?}", arr);
}

并行测试,集合的任何或所有元素是否与给定断言匹配

rayon-badge cat-concurrency-badge

这个例子演示了如何使用rayon::anyrayon::all方法,是与std::anystd::all相对应的并行方法。rayon::any并行检查迭代器的任何元素,是否与断言匹配,并在找到一个元素后立即返回。rayon::all并行检查迭代器的所有元素,是否与断言匹配,并在找到非匹配元素后,立即返回。

extern crate rayon;

use rayon::prelude::*;

fn main() {
    let mut vec = vec![2, 4, 6, 8];

    assert!(!vec.par_iter().any(|n| (*n % 2) != 0));
    assert!(vec.par_iter().all(|n| (*n % 2) == 0));
    assert!(!vec.par_iter().any(|n| *n > 8 ));
    assert!(vec.par_iter().all(|n| *n <= 8 ));

    vec.push(9);

    assert!(vec.par_iter().any(|n| (*n % 2) != 0));
    assert!(!vec.par_iter().all(|n| (*n % 2) == 0));
    assert!(vec.par_iter().any(|n| *n > 8 ));
    assert!(!vec.par_iter().all(|n| *n <= 8 )); 
}

使用给定断言,并行搜索项目

rayon-badge cat-concurrency-badge

这个例子使用rayon::find_anypar_iter获得一个,通过并行搜索,满足给定闭包中断言的元素 vector。

如果,有多个元素满足rayon::find_any闭包参数中,定义的断言,rayon返回找到的第一个,但不一定是(顺序上的)第一个。

另请注意,闭包的参数是对一个引用的一个引用(&&x)。请查阅std::find的讨论,了解更多细节。

extern crate rayon;

use rayon::prelude::*;

fn main() {
    let v = vec![6, 2, 1, 9, 3, 8, 11];

    let f1 = v.par_iter().find_any(|&&x| x == 9);
    let f2 = v.par_iter().find_any(|&&x| x % 2 == 0 && x > 6);
    let f3 = v.par_iter().find_any(|&&x| x > 8);

    assert_eq!(f1, Some(&9));
    assert_eq!(f2, Some(&8));
    assert!(f3 > Some(&8));
}

并行,排序 vector

rayon-badge rand-badge cat-concurrency-badge

此示例:并行排序字符串 vector。

分配一个空字符串的 vector。par_iter_mut().for_each会并发填充随机值。虽然存在多种选择,可以对可枚举数据类型进行排序,par_sort_unstable通常比稳定排序算法快。

extern crate rand;
extern crate rayon;

use rand::{Rng, thread_rng};
use rand::distributions::Alphanumeric;
use rayon::prelude::*;

fn main() {
  let mut vec = vec![String::new(); 100_000];
  vec.par_iter_mut().for_each(|p| {
    let mut rng = thread_rng();
    *p = (0..5).map(|_| rng.sample(&Alphanumeric)).collect()
  });
  vec.par_sort_unstable();
}

Map-reduce 并行

rayon-badge cat-concurrency-badge

这个例子使用rayon::filterrayon::map,和rayon::reduce,计算Person对象的平均年龄,哪个年龄超过 30 。

rayon::filter返回满足给定断言的集合,其中的元素。rayon::map对每个元素执行一个操作,创建一个新的迭代,然后rayon::reduce:执行带有一个当前元素,和前一个(执行结果)的操作。还显示了使用rayon::sum,与本例中的 reduce 操作,具有相同的结果。

extern crate rayon;

use rayon::prelude::*;

struct Person {
    age: u32,
}

fn main() {
    let v: Vec<Person> = vec![
        Person { age: 23 },
        Person { age: 19 },
        Person { age: 42 },
        Person { age: 17 },
        Person { age: 17 },
        Person { age: 31 },
        Person { age: 30 },
    ];

    let num_over_30 = v.par_iter().filter(|&x| x.age > 30).count() as f32;
    let sum_over_30 = v.par_iter()
        .map(|x| x.age)
        .filter(|&x| x > 30)
        .reduce(|| 0, |x, y| x + y);

    let alt_sum_30: u32 = v.par_iter()
        .map(|x| x.age)
        .filter(|&x| x > 30)
        .sum();

    let avg_over_30 = sum_over_30 as f32/num_over_30;
    let alt_avg_over_30 = alt_sum_30 as f32/ num_over_30;

    assert!((avg_over_30 - alt_avg_over_30).abs() < std::f32::EPSILON);
    println!("The average age of people older than 30 is {}", avg_over_30);
}

并行,生成 jpg 缩略图

rayon-badge glob-badge image-badge cat-concurrency-badge cat-filesystem-badge

此示例:帮当前目录中的所有.jpg 文件生成缩略图,然后将其保存在名为thumbnails的新文件夹中。

glob::glob_with在当前目录中,查找 jpeg 文件。rayon使用par_iter,并发调整图像大小,每次都调用DynamicImage::resize

# #[macro_use]
# extern crate error_chain;
extern crate glob;
extern crate image;
extern crate rayon;

use std::path::Path;
use std::fs::create_dir_all;

# use error_chain::ChainedError;
use glob::{glob_with, MatchOptions};
use image::{FilterType, ImageError};
use rayon::prelude::*;

# error_chain! {
#     foreign_links {
#         Image(ImageError);
#         Io(std::io::Error);
#         Glob(glob::PatternError);
#     }
# }

fn run() -> Result<()> {
    let options: MatchOptions = Default::default();
    let files: Vec<_> = glob_with("*.jpg", &options)?
        .filter_map(|x| x.ok())
        .collect();

    if files.len() == 0 {
        bail!("No .jpg files found in current directory");
    }

    let thumb_dir = "thumbnails";
    create_dir_all(thumb_dir)?;

    println!("Saving {} thumbnails into '{}'...", files.len(), thumb_dir);

    let image_failures: Vec<_> = files
        .par_iter()
        .map(|path| {
            make_thumbnail(path, thumb_dir, 300)
                .map_err(|e| e.chain_err(|| path.display().to_string()))
        })
        .filter_map(|x| x.err())
        .collect();

    image_failures.iter().for_each(|x| println!("{}", x.display_chain()));

    println!("{} thumbnails saved successfully", files.len() - image_failures.len());
    Ok(())
}

fn make_thumbnail<PA, PB>(original: PA, thumb_dir: PB, longest_edge: u32) -> Result<()>
where
    PA: AsRef<Path>,
    PB: AsRef<Path>,
{
    let img = image::open(original.as_ref())?;
    let file_path = thumb_dir.as_ref().join(original);

    Ok(img.resize(longest_edge, longest_edge, FilterType::Nearest)
        .save(file_path)?)
}
#
# quick_main!(run);