线程

生成一个短命的线程

crossbeam-badge cat-concurrency-badge

该示例使用了crossbeam箱子,为并发和并行编程,提供数据结构和函数。Scope::spawn生成一个新的作用域线程,保证crossbeam::scope函数的闭包(参数)返回之前终止,意味着,您可以从调用的函数中,引用数据。

此示例将数组拆分为一半,并在单独的线程中,执行工作。

extern crate crossbeam;

fn main() {
    let arr = &[1, 25, -4, 10];
    let max = find_max(arr);
    assert_eq!(max, Some(25));
}

fn find_max(arr: &[i32]) -> Option<i32> {
    const THRESHOLD: usize = 2;

    if arr.len() <= THRESHOLD {
        return arr.iter().cloned().max();
    }

    let mid = arr.len()/2;
    let (left, right) = arr.split_at(mid);

    crossbeam::scope(|s| {
        let thread_l = s.spawn(|_| find_max(left));
        let thread_r = s.spawn(|_| find_max(right));

        let min_l = thread_l.join().unwrap()?;
        let min_r = thread_r.join().unwrap()?;

        Some(min_l.max(min_r))
    }).unwrap()
}

保持全球可变状态

lazy_static-badge cat-rust-patterns-badge

使用lazy_static声明全局状态。lazy_static创建一个全局可用static ref,这需要一个Mutex,来决定可变许可(也参见RwLock)。受Mutex包裹,能确保多个线程,不能同时访问(FRUIT)状态,从而防止竞争条件。必须获取一个MutexGuard,才能读取或改变存储在Mutex中的值。

# #[macro_use]
# extern crate error_chain;
#[macro_use]
extern crate lazy_static;

use std::sync::Mutex;
#
# error_chain!{ }

lazy_static! {
    static ref FRUIT: Mutex<Vec<String>> = Mutex::new(Vec::new());
}

fn insert(fruit: &str) -> Result<()> {
    let mut db = FRUIT.lock().map_err(|_| "Failed to acquire MutexGuard")?;
    db.push(fruit.to_string());
    Ok(())
}

fn run() -> Result<()> {
    insert("apple")?;
    insert("orange")?;
    insert("peach")?;
    {
        let db = FRUIT.lock().map_err(|_| "Failed to acquire MutexGuard")?;

        db.iter().enumerate().for_each(|(i, item)| println!("{}: {}", i, item));
    }
    insert("grape")?;
    Ok(())
}
#
# quick_main!(run);

并发计算,所有 iso 文件的 SHA1 和

threadpool-badge num_cpus-badge walkdir-badge ring-badge cat-concurrency-badgecat-filesystem-badge

本示例:对当前目录中,具有 ISO 扩展名的每个文件,合计它们的 SHA1。线程池生成的线程数,等于系统(CPU)核心数,这个能通过num_cpus::get获取。 Walkdir::new迭代当前目录,并调用execute执行读取和计算 SHA1 哈希的操作。

extern crate walkdir;
extern crate ring;
extern crate num_cpus;
extern crate threadpool;

use walkdir::WalkDir;
use std::fs::File;
use std::io::{BufReader, Read, Error};
use std::path::Path;
use threadpool::ThreadPool;
use std::sync::mpsc::channel;
use ring::digest::{Context, Digest, SHA1};

# // Verify the iso extension
# fn is_iso(entry: &Path) -> bool {
#     match entry.extension() {
#         Some(e) if e.to_string_lossy().to_lowercase() == "iso" => true,
#         _ => false,
#     }
# }

fn compute_digest<P: AsRef<Path>>(filepath: P) -> Result<(Digest, P), Error> {
    let mut buf_reader = BufReader::new(File::open(&filepath)?);
    let mut context = Context::new(&SHA1);
    let mut buffer = [0; 1024];

    loop {
        let count = buf_reader.read(&mut buffer)?;
        if count == 0 {
            break;
        }
        context.update(&buffer[..count]);
    }

    Ok((context.finish(), filepath))
}

fn main() -> Result<(), Error> {
    let pool = ThreadPool::new(num_cpus::get());

    let (tx, rx) = channel();

    for entry in WalkDir::new("/home/user/Downloads")
        .follow_links(true)
        .into_iter()
        .filter_map(|e| e.ok())
        .filter(|e| !e.path().is_dir() && is_iso(e.path())) {
            let path = entry.path().to_owned();
            let tx = tx.clone();
            pool.execute(move || {
                let digest = compute_digest(path);
                tx.send(digest).expect("Could not send data!");
            });
        }

    drop(tx);
    for t in rx.iter() {
        let (sha, path) = t?;
        println!("{:?} {:?}", sha, path);
    }
    Ok(())
}

将绘制分形工作,分派到线程池

threadpool-badge num-badge num_cpus-badge image-badge cat-concurrency-badgecat-science-badgecat-rendering-badge

本示例:绘制[朱利亚集合]的一个分形,生成一个图像,会用到分布式计算的线程池。

通过ImageBuffer::new,为给定宽度和高度的输出图像,分配内存。Rgb::from_channels计算 RGB 像素值。创建ThreadPool线程数,等于num_cpus::get核心数。ThreadPool::execute收到的每个像素,都作为一个单独的工作。

mpsc::channel收到工作,还有Receiver::recv会检索它们。ImageBuffer::put_pixel使用数据,设置像素颜色。ImageBuffer::save就将图像写入output.png

# #[macro_use]
# extern crate error_chain;
extern crate threadpool;
extern crate num;
extern crate num_cpus;
extern crate image;

use std::sync::mpsc::{channel, RecvError};
use threadpool::ThreadPool;
use num::complex::Complex;
use image::{ImageBuffer, Pixel, Rgb};
#
# error_chain! {
#     foreign_links {
#         MpscRecv(RecvError);
#         Io(std::io::Error);
#     }
# }
#
# // Function converting intensity values to RGB
# // Based on http://www.efg2.com/Lab/ScienceAndEngineering/Spectra.htm
# fn wavelength_to_rgb(wavelength: u32) -> Rgb<u8> {
#     let wave = wavelength as f32;
#
#     let (r, g, b) = match wavelength {
#         380...439 => ((440. - wave)/(440. - 380.), 0.0, 1.0),
#         440...489 => (0.0, (wave - 440.)/(490. - 440.), 1.0),
#         490...509 => (0.0, 1.0, (510. - wave)/(510. - 490.)),
#         510...579 => ((wave - 510.)/(580. - 510.), 1.0, 0.0),
#         580...644 => (1.0, (645. - wave)/(645. - 580.), 0.0),
#         645...780 => (1.0, 0.0, 0.0),
#         _ => (0.0, 0.0, 0.0),
#     };
#
#     let factor = match wavelength {
#         380...419 => 0.3 + 0.7 * (wave - 380.)/(420. - 380.),
#         701...780 => 0.3 + 0.7 * (780. - wave)/(780. - 700.),
#         _ => 1.0,
#     };
#
#     let (r, g, b) = (normalize(r, factor), normalize(g, factor), normalize(b, factor));
#     Rgb::from_channels(r, g, b, 0)
# }
#
# // Maps Julia set distance estimation to intensity values
# fn julia(c: Complex<f32>, x: u32, y: u32, width: u32, height: u32, max_iter: u32) -> u32 {
#     let width = width as f32;
#     let height = height as f32;
#
#     let mut z = Complex {
#         // scale and translate the point to image coordinates
#         re: 3.0 * (x as f32 - 0.5 * width)/width,
#         im: 2.0 * (y as f32 - 0.5 * height)/height,
#     };
#
#     let mut i = 0;
#     for t in 0..max_iter {
#         if z.norm() >= 2.0 {
#             break;
#         }
#         z = z * z + c;
#         i = t;
#     }
#     i
# }
#
# // Normalizes color intensity values within RGB range
# fn normalize(color: f32, factor: f32) -> u8 {
#     ((color * factor).powf(0.8) * 255.) as u8
# }

fn run() -> Result<()> {
    let (width, height) = (1920, 1080);
    let mut img = ImageBuffer::new(width, height);
    let iterations = 300;

    let c = Complex::new(-0.8, 0.156);

    let pool = ThreadPool::new(num_cpus::get());
    let (tx, rx) = channel();

    for y in 0..height {
        let tx = tx.clone();
        pool.execute(move || for x in 0..width {
                         let i = julia(c, x, y, width, height, iterations);
                         let pixel = wavelength_to_rgb(380 + i * 400/iterations);
                         tx.send((x, y, pixel)).expect("Could not send data!");
                     });
    }

    for _ in 0..(width * height) {
        let (x, y, pixel) = rx.recv()?;
        img.put_pixel(x, y, pixel);
    }
    let _ = img.save("output.png")?;
    Ok(())
}
#
# quick_main!(run);