(十四)Rust_无畏并发


无畏并发

并发相关概念

  • Concurrent(并发):程序的不同部分之间独立的执行。

  • Parallel(并行):程序的不同部分同时运行。

  • Rust 无畏并发:允许你编写没有细微 Bug 的代码,并在不引入新 Bug 的情况下易于重构。

    • 本文中的 并发 泛指 Concurrent 和 Parallel。

线程

  • 进程与线程的概念:略。

实现线程的方式

  • 通过调用 OS 的 API 来创建线程:1:1 模型。

    • 需要较小的运行时。
  • 语言自己实现的线程(绿色线程):M:N 模型。

    • 需要更大的运行时。
  • Rust 标准库仅提供 1:1 模型。

通过 spawn 创建新线程

  • 通过 thread::spawn 函数可以创建新线程:
    • 参数:一个闭包即线程所运行的代码。
// output:
// hi number 1 from the main thread!
// hi number 1 from the spawn thread!
// hi number 2 from the main thread!
// hi number 2 from the spawn thread!
// hi number 3 from the main thread!
// hi number 3 from the spawn thread!
// hi number 4 from the main thread!
// hi number 4 from the spawn thread!
use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawn thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}
  • 主线程结束后其他创建的线程也会停止运行,可以通过 join Handle 来等待所有线程的完成。
    • thread::spawn 函数的返回值类型是 JoinHandle。
    • JoinHandle 持有值的所有权:
      • 调用其 join 方法,可以等待其对应的线程完成。
      • join 方法:调用 handle 的 join 方法会阻止当前运行的线程的执行,直到 handle 所表示的这些线程终结。
use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawn thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
    handle.join().unwrap(); // 这样就可以保证创建的线程执行完成
}

使用 move 闭包

  • move 闭包通常和 thread::spawn 函数一起使用,它允许你使用其他线程的数据。
  • 创建线程时,把值的所有权从一个线程转移到另一个线程。
use std::thread;

fn main() {
    let v = vec![1, 2, 3];
    let handle = thread::spawn(move || {
        println!("Here's a vector: {:?}", v);
    });

    // drop(v);
    //      ^ value used here after move

    handle.join().unwrap();
}

消息传递

  • 一种很流行且能保证安全并发的技术就是:消息传递。
    • 线程(或 Actor)通过彼此发送消息(数据)来进行通信。
  • Go 语言的一句名言:不要用共享内存来通信,要用通信来共享内存。
  • Rust:Channel 及通信方式。

Channel

  • Channel 包含:发送端、接收端。
    • 调用发送端的方法,发送数据。
    • 接收端会检查和接收到达的数据。
    • 如果发送端、接收端中任意一端被丢弃了,那么 Channel 就关闭了。

创建 Channel

  • 使用 mpsc::channel 函数来创建 Channel。

    • mpsc 表示 multiple producer, single consumer(多个生产之、一个消费者)。

    • 返回一个 tuple:里面的元素分别是发送端、接收端。

      use std::sync::mpsc;
      use std::thread;
      
      fn main() {
          let (tx, rx) = mpsc::channel();
      
          thread::spawn(move || {
              let val = String::from("hi");
              tx.send(val).unwrap();
          });
      
          let received = rx.recv().unwrap();
          println!("Got: {}", received);
      }
      
  • 发送端的 send 方法:

    • 参数:想要发送的数据。
    • 返回:Result<T, E>,如果有问题(例如接收端已经被丢弃),就返回一个错误。
  • 接收端的 recv 方法:阻塞当前线程的执行,直到 Channel 中有值被送来。

    • 一旦有值收到,就返回 Result<T, E>
    • 当发送端关闭,就会收到一个错误。
  • try_recv 方法:不会阻塞,会立即返回 Result<T, E>

    • 通常使用循环调用来检查其结果。

Channel 和所有权转移

  • 所有权在消息传递中非常重要:能帮你编写安全、并发的代码。
    • 在上述的代码中加入如下打印,会发现 val 的所有权已经转移。
/*
error: argument never used
  --> src/main.rs:10:29
   |
10 |         println!("val is ", val);
   |                  ---------  ^^^ argument never used
   |                  |
   |                  formatting specifier missing

error: could not compile `messagepass` due to previous error
*/
thread::spawn(move || {
    let val = String::from("hi");
    tx.send(val).unwrap();
    println!("val is ", val);
});
  • 发送多个值,看到接收者在等待。
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];
        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_millis(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}
  • 通过克隆创建多个发送者。
/*
output:
    Got: 1:hi
    Got: hi
    Got: from
    Got: 1:from
    Got: the
    Got: 1:the
    Got: thread
    Got: 1:thread
*/
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();
    let tx1 = mpsc::Sender::clone(&tx);
    thread::spawn(move || {
        let vals = vec![
            String::from("1:hi"),
            String::from("1:from"),
            String::from("1:the"),
            String::from("1:thread"),
        ];
        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_millis(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];
        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_millis(1));
        }
    });
    for received in rx {
        println!("Got: {}", received);
    }
}

使用共享来实现并发

  • Rust 支持通过共享状态来实现并发。
  • Channel 类似单所有权:一旦将值的所有权转移至 Channel,就无法使用它了。
  • 共享内存并发类似多所有权:多个线程可以同时访问同一块内存。

使用 Mutex 来控制线程竞争

  • 在使用数据之前,必须尝试获取锁(lock)。

  • 使用完 mutex 所保护的数据,必须对数据进行解锁,以便其他线程可以获取锁。

  • Mutex<T> 的 API:

    • 通过 Mutex::new(数据) 来创建 Mutex<T>,其是一个智能指针。

    • 访问数据前,通过 lock 方法来获取锁:

      • 会阻塞当前线程。
      • lock 可能会失败。
      • 返回的是 MutexGuard(智能指针,实现了 Deref 和 Drop)。
      use std::sync::Mutex;
      
      fn main() {
          let m = Mutex::new(5);
          {
              let mut num = m.lock().unwrap();
              *num = 6;
          }
          println! {"m = {:?}", m};
      }
      
  • 多线程共享 Mutex。

    • 需要多线程多重所有权:由于前面所讲的 Rc<T> 只适用于单线程传递,多线程间需要使用 Arc<T> 来进行原子引用计数。
    • Arc<T>Rc<T> 类似,它可以用于并发情景,A(atomic)即原子的。
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    for _ in 0..10 {
        let counter = Arc::clone(&counter); // 使用 Arc::clone 复制 counter 传递给不同的线程
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });

        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }
    println!("Result: {}", *counter.lock().unwrap());
}

RefCell<T>/Rc<T> vs Mutex<T>/Arc<T>

  • Mutex<T> 提供了内部可变性,和 Cell 家族是一样的。
  • 我们使用 RefCell<T> 来改变 Rc<T> 里面的内容。
  • 我们使用 Mutex<T> 来改变 Arc<T> 里面的内容。
  • 注意 Mutex<T> 有死锁的风险。

通过 Send 和 Sync Trait 来扩展并发

  • Rust 语言的并发特性比较少,前文所有的并发特性都是来自标准库(而不是语言本身)。
  • 无需局限于标准库的并发,可以自己实现并发。
  • 在 Rust 语言中有两个并发概念:
    • std::marker::Syncstd::marker::Send 这两个 trait。

Send:允许线程间转移所有权

  • 实现 Send trait 的类型可在线程间转移所有权。
  • Rust 中几乎所有的类型都实现了 Send。
    • 但是 Rc<T> 没有实现 Send,它只用于单线程情景。
  • 任何完全由 Send 类型组成的类型也被标记为 Send。
  • 除了原始指针外,几乎所有的基础类型都是 Send。

Sync:允许从多线程同时访问

  • 实现 Sync trait 的类型可以安全的被多个线程引用。
  • 也就是说:如果 T 是 Sync,那么 &T 就是 Send。
    • 引用可以被安全的送往另一个线程。
  • 基础类型都是 Sync。
  • 完全由 Sync 组成的类型也是 Sync。
    • 但,Rc<T> 不是 Sync 的。
    • RefCell<T>Cell<T> 家族也不是 Sync 的。
    • Mutex<T> 实现了 Sync。

手动来实现 Send 和 Sync 是很不安全的

  • 由于需要使用到 Rust 一些特殊的不安全语法,很难保证自己实现的方法是正确且稳定的。

文章作者: Layton
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Layton !
  目录