Rust Concurrency
Rust is defined for high performance programming, no garbage collection & zero cost abstractions. The type system guarantees no dataraces.
1. Ownership Model
In rust, each object has a single owner. When the owner goes out of scope, the object is destroyed (similar to std::unique_ptr in C++, but enforced). For example:
1struct Point { 2 x: u32, 3 y: u32, 4} 5 6fn show(p: Point) { 7 println!("({}, {})", p.x, p.y); 8} // p goes out of scope here, Point is destroyed 9 10fn main() { 11 let p = Point { x: 1, y: 2 }; 12 show(p); 13 // p is no longer valid here 14}
If we wanted show to not take ownership of p we can take an immutable reference to p instead. In this case, show borrows p:
1fn show(p: &Point) { 2 println!("({}, {})", p.x, p.y); 3} // p goes out of scope here, but Point is not destroyed 4 5fn main() { 6 let p = Point { x: 1, y: 2 }; 7 show(&p); 8 show(&p); // p is still valid here 9}
If we needed to modify a point, we may try to write:
1fn scale(p: &Point, factor: u32) { 2 p.x *= factor; // ERROR: cannot modify through immutable reference 3 p.y *= factor; // ERROR: cannot modify through immutable reference 4}
If we wanted a mutable reference instead, we could do:
1fn scale(p: &mut Point, factor: u32) { 2 p.x *= factor; // OK 3 p.y *= factor; // OK 4} 5 6fn main() { 7 let mut p = Point { x: 1, y: 2 }; // Must be mutable to take mutable reference 8 scale(&mut p, 3); 9 show(&p); // prints (3, 6) 10}
2. Hello Threads
1use std::thread; // Thread library 2 3fn main() { 4 let t1 = thread::spawn(|| { 5 println!("Hello from thread 1"); 6 }); 7 let t2 = thread::spawn(|| { 8 println!("Hello from thread 2"); 9 }); 10 t1.join().unwrap(); // Unwrap panics if the result is not `Ok` 11 t2.join().unwrap(); // Unwrap panics if the result is not `Ok` 12}
3. Summing a Vector
To sum a vector in parallel we will spawn two threads to sum each half of the vector, then combine the results:
1use std::thread; 2 3fn main() { 4 let max = 16_777_216; 5 6 let mut data = Vec::<u32>::new(); 7 for _ in 0..max { 8 data.push(1); 9 } 10 11 let mut result: u32 = 0; 12 13 let t1 = thread::spawn(|| { 14 let mut my_result: u32 = 0; 15 for i in 0..(max / 2) { 16 my_result += data[i]; 17 } 18 result += my_result; // RR and WR race on `result` 19 }) 20 21 let t2 = thread::spawn(|| { 22 let mut my_result: u32 = 0; 23 for i in (max / 2)..max { 24 my_result += data[i]; 25 } 26 result += my_result; // RR and WR race on `result` 27 }); 28 29 t1.join().unwrap(); 30 t2.join().unwrap(); 31 32 println!("Result: {}", result); 33}
In this case, the compiler will complain that the threads may outlive data and result, so we need to use the move keyword to move ownership of these variables into the thread closures.
We cannot move into both threads as this passes ownership to the first thread only. To share ownership we can use an Arc (Atomic Reference Counted pointer):
3.1 Rust Mutexes
In Rust a mutex contains the data it protects. This gives ownership of the data to the mutex. After creating the mutex, we can no longer access the inner value without acquiring the lock. For example:
1let data_mutex: Mutex<Vec<u32>> = Mutex::new(data); 2data[0] = 42; // ERROR: cannot access data directly 3{ 4 let lock_result: LockResult<MutexGuard<Vec<u32>>> = data_mutex.lock(); 5 let data_mutex_guard: MutexGuard<Vec<u32>> = lock_result.unwrap(); 6 let imm_data_ref: &Vec<u32> = &(*data_mutex_guard); // Deref to get reference to data 7 imm_data_ref[0] = 42; // ERROR: cannot modify through immutable reference 8 9 let mut_data_ref: &mut Vec<u32> = &mut (*data_mutex_guard); 10 mut_data_ref[0] = 42; // OK: can modify through mutable reference 11} 12 13// Instead, we could simply write: 14data_mutex.lock().unwrap()[0] = 42; // OK
3.2 Arc
An Arc<T> is an atomically reference counted object:
- An
Arc<T>contains a non-owning a reference to another objectTand a reference counter. - An arc can be
cloned to create a new pointer to the same object, incrementing the reference count. - When an arc is dropped, the reference count is decremented. When the reference count reaches zero, the inner object is destroyed.
Arc<T>manipulates the reference count using atomic operations, so it is thread-safe.
3.3 Vector Sum
To borrow the data in multiple threads,
1use std::sync::atomic::{AtomicU32, Ordering}; // Atomic types 2use std::sync::Arc; // Atomic Reference Counted object 3use std::sync::Mutex; 4use std::thread; 5 6fn main() { 7 let max = 16_777_216; 8 9 let mut data = Vec::<u32>::new(); 10 for _ in 0..max { 11 data.push(1); 12 } 13 14 let data_mutex_arc_t1: Arc<Mutex<Vec<u32>>> = Arc::new(Mutex::new(data)); 15 let data_mutex_arc_t2: Arc<Mutex<Vec<u32>>> = data_mutex_arc_t1.clone(); 16 17 let result_arc: Arc<AtomicU32> = Arc::new(AtomicU32::new(0)); 18 let result_arc_t1 = result_arc.clone(); 19 let result_arc_t2 = result_arc.clone(); 20 21 let t1 = thread::spawn(move || { 22 let mut my_result: u32 = 0; 23 for i in 0..(max / 2) { 24 my_result += data_mutex_arc_t1.lock().unwrap()[i]; 25 } 26 result_arc_t1.fetch_add(my_result, Ordering::Relaxed); 27 }) 28 29 let t2 = thread::spawn(move || { 30 let mut my_result: u32 = 0; 31 for i in (max / 2)..max { 32 my_result += data_mutex_arc_t2.lock().unwrap()[i]; 33 } 34 result_arc_t2.fetch_add(my_result, Ordering::Relaxed); 35 }); 36 37 t1.join().unwrap(); 38 t2.join().unwrap(); 39 40 println!("Result: {}", result_arc.load(Ordering::Relaxed)); 41}
3.4 Interior Mutability
Interior mutability is used in Mutex<T>, which contains unsafe code to allow mutability, but only in a manner that avoids data races.