无畏并发
并发相关概念
Concurrent(并发):程序的不同部分之间独立的执行。
Parallel(并行):程序的不同部分同时运行。
Rust 无畏并发:允许你编写没有细微 Bug 的代码,并在不引入新 Bug 的情况下易于重构。
- 本文中的
并发
泛指 Concurrent 和 Parallel。
- 本文中的
线程
- 进程与线程的概念:略。
实现线程的方式
通过调用 OS 的 API 来创建线程:1:1 模型。
- 需要较小的运行时。
语言自己实现的线程(绿色线程):M:N 模型。
- 需要更大的运行时。
Rust 标准库仅提供 1:1 模型。
通过 spawn 创建新线程
- 通过
thread::spawn
函数可以创建新线程:- 参数:一个闭包即线程所运行的代码。
// output:
// hi number 1 from the main thread!
// hi number 1 from the spawn thread!
// hi number 2 from the main thread!
// hi number 2 from the spawn thread!
// hi number 3 from the main thread!
// hi number 3 from the spawn thread!
// hi number 4 from the main thread!
// hi number 4 from the spawn thread!
use std::thread;
use std::time::Duration;
fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawn thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
}
- 主线程结束后其他创建的线程也会停止运行,可以通过 join Handle 来等待所有线程的完成。
thread::spawn
函数的返回值类型是 JoinHandle。- JoinHandle 持有值的所有权:
- 调用其 join 方法,可以等待其对应的线程完成。
- join 方法:调用 handle 的 join 方法会阻止当前运行的线程的执行,直到 handle 所表示的这些线程终结。
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawn thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
handle.join().unwrap(); // 这样就可以保证创建的线程执行完成
}
使用 move 闭包
- move 闭包通常和
thread::spawn
函数一起使用,它允许你使用其他线程的数据。 - 创建线程时,把值的所有权从一个线程转移到另一个线程。
use std::thread;
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(move || {
println!("Here's a vector: {:?}", v);
});
// drop(v);
// ^ value used here after move
handle.join().unwrap();
}
消息传递
- 一种很流行且能保证安全并发的技术就是:消息传递。
- 线程(或 Actor)通过彼此发送消息(数据)来进行通信。
- Go 语言的一句名言:不要用共享内存来通信,要用通信来共享内存。
- Rust:Channel 及通信方式。
Channel
- Channel 包含:发送端、接收端。
- 调用发送端的方法,发送数据。
- 接收端会检查和接收到达的数据。
- 如果发送端、接收端中任意一端被丢弃了,那么 Channel 就关闭了。
创建 Channel
使用
mpsc::channel
函数来创建 Channel。mpsc 表示 multiple producer, single consumer(多个生产之、一个消费者)。
返回一个 tuple:里面的元素分别是发送端、接收端。
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); let received = rx.recv().unwrap(); println!("Got: {}", received); }
发送端的 send 方法:
- 参数:想要发送的数据。
- 返回:
Result<T, E>
,如果有问题(例如接收端已经被丢弃),就返回一个错误。
接收端的 recv 方法:阻塞当前线程的执行,直到 Channel 中有值被送来。
- 一旦有值收到,就返回
Result<T, E>
。 - 当发送端关闭,就会收到一个错误。
- 一旦有值收到,就返回
try_recv
方法:不会阻塞,会立即返回Result<T, E>
。- 通常使用循环调用来检查其结果。
Channel 和所有权转移
- 所有权在消息传递中非常重要:能帮你编写安全、并发的代码。
- 在上述的代码中加入如下打印,会发现 val 的所有权已经转移。
/*
error: argument never used
--> src/main.rs:10:29
|
10 | println!("val is ", val);
| --------- ^^^ argument never used
| |
| formatting specifier missing
error: could not compile `messagepass` due to previous error
*/
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is ", val);
});
- 发送多个值,看到接收者在等待。
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(1));
}
});
for received in rx {
println!("Got: {}", received);
}
}
- 通过克隆创建多个发送者。
/*
output:
Got: 1:hi
Got: hi
Got: from
Got: 1:from
Got: the
Got: 1:the
Got: thread
Got: 1:thread
*/
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = mpsc::Sender::clone(&tx);
thread::spawn(move || {
let vals = vec![
String::from("1:hi"),
String::from("1:from"),
String::from("1:the"),
String::from("1:thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_millis(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(1));
}
});
for received in rx {
println!("Got: {}", received);
}
}
使用共享来实现并发
- Rust 支持通过共享状态来实现并发。
- Channel 类似单所有权:一旦将值的所有权转移至 Channel,就无法使用它了。
- 共享内存并发类似多所有权:多个线程可以同时访问同一块内存。
使用 Mutex 来控制线程竞争
在使用数据之前,必须尝试获取锁(lock)。
使用完 mutex 所保护的数据,必须对数据进行解锁,以便其他线程可以获取锁。
Mutex<T>
的 API:通过
Mutex::new(数据)
来创建Mutex<T>
,其是一个智能指针。访问数据前,通过 lock 方法来获取锁:
- 会阻塞当前线程。
- lock 可能会失败。
- 返回的是 MutexGuard(智能指针,实现了 Deref 和 Drop)。
use std::sync::Mutex; fn main() { let m = Mutex::new(5); { let mut num = m.lock().unwrap(); *num = 6; } println! {"m = {:?}", m}; }
多线程共享 Mutex。
- 需要多线程多重所有权:由于前面所讲的
Rc<T>
只适用于单线程传递,多线程间需要使用Arc<T>
来进行原子引用计数。 Arc<T>
与Rc<T>
类似,它可以用于并发情景,A(atomic)即原子的。
- 需要多线程多重所有权:由于前面所讲的
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter); // 使用 Arc::clone 复制 counter 传递给不同的线程
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
RefCell<T>
/Rc<T>
vs Mutex<T>
/Arc<T>
Mutex<T>
提供了内部可变性,和 Cell 家族是一样的。- 我们使用
RefCell<T>
来改变Rc<T>
里面的内容。 - 我们使用
Mutex<T>
来改变Arc<T>
里面的内容。 - 注意
Mutex<T>
有死锁的风险。
通过 Send 和 Sync Trait 来扩展并发
- Rust 语言的并发特性比较少,前文所有的并发特性都是来自标准库(而不是语言本身)。
- 无需局限于标准库的并发,可以自己实现并发。
- 在 Rust 语言中有两个并发概念:
std::marker::Sync
和std::marker::Send
这两个 trait。
Send:允许线程间转移所有权
- 实现
Send trait
的类型可在线程间转移所有权。 - Rust 中几乎所有的类型都实现了 Send。
- 但是
Rc<T>
没有实现 Send,它只用于单线程情景。
- 但是
- 任何完全由 Send 类型组成的类型也被标记为 Send。
- 除了原始指针外,几乎所有的基础类型都是 Send。
Sync:允许从多线程同时访问
- 实现
Sync trait
的类型可以安全的被多个线程引用。 - 也就是说:如果 T 是 Sync,那么 &T 就是 Send。
- 引用可以被安全的送往另一个线程。
- 基础类型都是 Sync。
- 完全由 Sync 组成的类型也是 Sync。
- 但,
Rc<T>
不是 Sync 的。 RefCell<T>
和Cell<T>
家族也不是 Sync 的。Mutex<T>
实现了 Sync。
- 但,
手动来实现 Send 和 Sync 是很不安全的
- 由于需要使用到 Rust 一些特殊的不安全语法,很难保证自己实现的方法是正确且稳定的。