线程
生成一个短命的线程
该示例使用了crossbeam箱子,为并发和并行编程,提供数据结构和函数。Scope::spawn
生成一个新的作用域线程,保证crossbeam::scope
函数的闭包(参数)返回之前终止,意味着,您可以从调用的函数中,引用数据。
此示例将数组拆分为一半,并在单独的线程中,执行工作。
extern crate crossbeam; fn main() { let arr = &[1, 25, -4, 10]; let max = find_max(arr); assert_eq!(max, Some(25)); } fn find_max(arr: &[i32]) -> Option<i32> { const THRESHOLD: usize = 2; if arr.len() <= THRESHOLD { return arr.iter().cloned().max(); } let mid = arr.len()/2; let (left, right) = arr.split_at(mid); crossbeam::scope(|s| { let thread_l = s.spawn(|_| find_max(left)); let thread_r = s.spawn(|_| find_max(right)); let min_l = thread_l.join().unwrap()?; let min_r = thread_r.join().unwrap()?; Some(min_l.max(min_r)) }).unwrap() }
保持全球可变状态
使用lazy_static声明全局状态。lazy_static创建一个全局可用static ref
,这需要一个Mutex
,来决定可变许可(也参见RwLock
)。受Mutex
包裹,能确保多个线程,不能同时访问(FRUIT
)状态,从而防止竞争条件。必须获取一个MutexGuard
,才能读取或改变存储在Mutex
中的值。
# #[macro_use] # extern crate error_chain; #[macro_use] extern crate lazy_static; use std::sync::Mutex; # # error_chain!{ } lazy_static! { static ref FRUIT: Mutex<Vec<String>> = Mutex::new(Vec::new()); } fn insert(fruit: &str) -> Result<()> { let mut db = FRUIT.lock().map_err(|_| "Failed to acquire MutexGuard")?; db.push(fruit.to_string()); Ok(()) } fn run() -> Result<()> { insert("apple")?; insert("orange")?; insert("peach")?; { let db = FRUIT.lock().map_err(|_| "Failed to acquire MutexGuard")?; db.iter().enumerate().for_each(|(i, item)| println!("{}: {}", i, item)); } insert("grape")?; Ok(()) } # # quick_main!(run);
并发计算,所有 iso 文件的 SHA1 和
本示例:对当前目录中,具有 ISO 扩展名的每个文件,合计它们的 SHA1。线程池生成的线程数,等于系统(CPU)核心数,这个能通过num_cpus::get
获取。 Walkdir::new
迭代当前目录,并调用execute
执行读取和计算 SHA1 哈希的操作。
extern crate walkdir; extern crate ring; extern crate num_cpus; extern crate threadpool; use walkdir::WalkDir; use std::fs::File; use std::io::{BufReader, Read, Error}; use std::path::Path; use threadpool::ThreadPool; use std::sync::mpsc::channel; use ring::digest::{Context, Digest, SHA1}; # // Verify the iso extension # fn is_iso(entry: &Path) -> bool { # match entry.extension() { # Some(e) if e.to_string_lossy().to_lowercase() == "iso" => true, # _ => false, # } # } fn compute_digest<P: AsRef<Path>>(filepath: P) -> Result<(Digest, P), Error> { let mut buf_reader = BufReader::new(File::open(&filepath)?); let mut context = Context::new(&SHA1); let mut buffer = [0; 1024]; loop { let count = buf_reader.read(&mut buffer)?; if count == 0 { break; } context.update(&buffer[..count]); } Ok((context.finish(), filepath)) } fn main() -> Result<(), Error> { let pool = ThreadPool::new(num_cpus::get()); let (tx, rx) = channel(); for entry in WalkDir::new("/home/user/Downloads") .follow_links(true) .into_iter() .filter_map(|e| e.ok()) .filter(|e| !e.path().is_dir() && is_iso(e.path())) { let path = entry.path().to_owned(); let tx = tx.clone(); pool.execute(move || { let digest = compute_digest(path); tx.send(digest).expect("Could not send data!"); }); } drop(tx); for t in rx.iter() { let (sha, path) = t?; println!("{:?} {:?}", sha, path); } Ok(()) }
将绘制分形工作,分派到线程池
本示例:绘制[朱利亚集合]的一个分形,生成一个图像,会用到分布式计算的线程池。
通过ImageBuffer::new
,为给定宽度和高度的输出图像,分配内存。Rgb::from_channels
计算 RGB 像素值。创建ThreadPool
线程数,等于num_cpus::get
核心数。ThreadPool::execute
收到的每个像素,都作为一个单独的工作。
mpsc::channel
收到工作,还有Receiver::recv
会检索它们。ImageBuffer::put_pixel
使用数据,设置像素颜色。ImageBuffer::save
就将图像写入output.png
。
# #[macro_use] # extern crate error_chain; extern crate threadpool; extern crate num; extern crate num_cpus; extern crate image; use std::sync::mpsc::{channel, RecvError}; use threadpool::ThreadPool; use num::complex::Complex; use image::{ImageBuffer, Pixel, Rgb}; # # error_chain! { # foreign_links { # MpscRecv(RecvError); # Io(std::io::Error); # } # } # # // Function converting intensity values to RGB # // Based on http://www.efg2.com/Lab/ScienceAndEngineering/Spectra.htm # fn wavelength_to_rgb(wavelength: u32) -> Rgb<u8> { # let wave = wavelength as f32; # # let (r, g, b) = match wavelength { # 380...439 => ((440. - wave)/(440. - 380.), 0.0, 1.0), # 440...489 => (0.0, (wave - 440.)/(490. - 440.), 1.0), # 490...509 => (0.0, 1.0, (510. - wave)/(510. - 490.)), # 510...579 => ((wave - 510.)/(580. - 510.), 1.0, 0.0), # 580...644 => (1.0, (645. - wave)/(645. - 580.), 0.0), # 645...780 => (1.0, 0.0, 0.0), # _ => (0.0, 0.0, 0.0), # }; # # let factor = match wavelength { # 380...419 => 0.3 + 0.7 * (wave - 380.)/(420. - 380.), # 701...780 => 0.3 + 0.7 * (780. - wave)/(780. - 700.), # _ => 1.0, # }; # # let (r, g, b) = (normalize(r, factor), normalize(g, factor), normalize(b, factor)); # Rgb::from_channels(r, g, b, 0) # } # # // Maps Julia set distance estimation to intensity values # fn julia(c: Complex<f32>, x: u32, y: u32, width: u32, height: u32, max_iter: u32) -> u32 { # let width = width as f32; # let height = height as f32; # # let mut z = Complex { # // scale and translate the point to image coordinates # re: 3.0 * (x as f32 - 0.5 * width)/width, # im: 2.0 * (y as f32 - 0.5 * height)/height, # }; # # let mut i = 0; # for t in 0..max_iter { # if z.norm() >= 2.0 { # break; # } # z = z * z + c; # i = t; # } # i # } # # // Normalizes color intensity values within RGB range # fn normalize(color: f32, factor: f32) -> u8 { # ((color * factor).powf(0.8) * 255.) as u8 # } fn run() -> Result<()> { let (width, height) = (1920, 1080); let mut img = ImageBuffer::new(width, height); let iterations = 300; let c = Complex::new(-0.8, 0.156); let pool = ThreadPool::new(num_cpus::get()); let (tx, rx) = channel(); for y in 0..height { let tx = tx.clone(); pool.execute(move || for x in 0..width { let i = julia(c, x, y, width, height, iterations); let pixel = wavelength_to_rgb(380 + i * 400/iterations); tx.send((x, y, pixel)).expect("Could not send data!"); }); } for _ in 0..(width * height) { let (x, y, pixel) = rx.recv()?; img.put_pixel(x, y, pixel); } let _ = img.save("output.png")?; Ok(()) } # # quick_main!(run);