Skip to content

线程安全案例

线程安全概念理解

除了内存安全受到大家的关注外,Rust的无畏并发也引起了大家的关注。这里利用一个示例展示Rust的线程安全。

首先,简单梳理一下并发编程的相关概念。

并发编程意味着程序的不同部分相互独立的执行;并行编程意味着程序的不同部分同时执行。执行程序在一个进程中运行,操作系统负责管理多个进程。在程序内部,可以有多个同时运行的独立部分,运行这些独立的部分就是线程。

通常情况下,在并发的时候遇到的问题大概两点:

  • 数据竞争。线程不应同时修改数据。当这种情况发生时,可能会破坏数据,导致崩溃。例如,两个线程试图同时修改同一个字符串。
  • 死锁。线程不能以可能导致死锁的方式锁定资源,即线程 1 获得资源 B 的锁定并阻塞资源 A,而线程 2 获得资源 A 的锁定并阻塞资源 B。两个线程都被永远锁定,等待永远不会释放的资源。

针对以上两个问题的解决办法一般是:

保护共享数据。

不要在另一个线程中写入数据的同时读取数据,两个线程也不应该同时写入数据。

防止这种情况的常用方法是:

  • 使用互斥锁来保护对数据的访问。
  • 使用读写锁。与互斥锁类似,它允许一个线程锁定线程以写入数据,但它允许多个线程具有读取访问权限,前提是没有任何内容正在写入。对于读取比修改更频繁的数据,这比仅使用互斥锁要有效得多。

Rust 中只允许同时存在一个可变引用或者多个不可变引用,不允许可变引用和不可变引用同时存在并且一个引用永远也不会比它的所有者存活得更久,避免了数据竞争和通过引用而错误的访问到不存在的数据。

如果在内存安全中,这确保了资源被正确释放,但是当在线程安全中,这意味着一次只有一个线程可以修改一个变量。此外,我们知道没有其他线程会尝试引用过期的借用——借用会强制共享或写入,但绝不会两者兼而有之。如果另一个线程需要修改资源,那么我们可以通过将变量移动到新线程来转移所有权。

避免死锁

避免死锁的最好方法是永远只获得对一件事的锁定,并在完成后立即释放它。但是,如果必须锁定不止一件事,请确保所有线程之间的锁定顺序是一致的。

Rust 虽然在数据竞争中减轻了一些并发风险,但是现在还不能在编译期间检测死锁问题。

关于死锁问题,我们举一个简单的示例就可以知道。

在下面的例子中,我们构建两个线程,t1t2,分别对v1v2加锁,然后两个线程获取对方已经加锁的数据,就会因为死锁导致程序无法运行。

rust
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
fn main() {
    // 原子引用计数可以安全的用于并发环境。创建互斥锁
    let v1 = Arc::new(Mutex::new(0));
    let v2 = Arc::new(Mutex::new(0));

    // clone互斥锁
    let v11 = Arc::clone(&v1);
    let v21 = Arc::clone(&v2);
    
    // 线程1
    let t1 = thread::spawn(move ||
        {
            println!("t1 锁 v1");
            let _v1_guard = (*v11).lock().unwrap();
            println!("t1 获得 v1");

            thread::sleep(Duration::from_secs(5));

            println!("t1 锁 v2");
            let _v2_guard = (*v21).lock().unwrap();
            println!("t1 获得 v2");
        }
    );

    let v12 = Arc::clone(&v1);
    let v22 = Arc::clone(&v2);
    
    // 线程2
    let t2 = thread::spawn(move ||
        {
            println!("t2 锁 v2");
            let _v1_guard = (*v22).lock().unwrap();
            println!("t2 获得 v2");

            thread::sleep(Duration::from_secs(5));

            println!("t2 锁 v1");
            let _v2_guard = (*v12).lock().unwrap();
            println!("t2 获得 v1");
        }
    );

    t1.join().unwrap();
    t2.join().unwrap();
}

运行结果:

rust
t2 锁 v2
t2 获得 v2
t1 锁 v1
t1 获得 v1
t2 锁 v1
t1 锁 v2

类似的情况也会在消息传递时发生。请见下面的例子,消息在传递时发生了阻塞:

rust
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx1, rx1) = mpsc::channel();
    let (tx2, rx2) = mpsc::channel();

    thread::spawn(move || {
        println!("Waiting for item 1");
        rx1.recv().unwrap();

        println!("Sending item 2");
        tx2.send(()).unwrap();
    });

    println!("Waiting for item 2");
    rx2.recv().unwrap();

    println!("Sending item 1");
    tx1.send(()).unwrap();
}

运行结果:

rust
Waiting for item 2
Waiting for item 1

构建并发web服务

为了使大家更加容易的理解 Rust 的并发使用,我们本地构建一个并发的web server

整个 Web 服务的实现流程如下图所示:

预期想要实现的效果是在浏览器中输入 IP:port 后获得预想中的页面显示,如图所示:

首先,在工作目录中新建一个项目:

cargo new web_hello

然后在main.rs中选择使用 TCP 协议监听本机端口的连接。

./web_hello/src/main.rs:

rust
use std::net::TcpListener;
fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();
        // stream为监听浏览器后收到的请求信息
        // 这里再对请求进行处理
        // 相关代码实现
        // handle_connection(stream)
    }

    println!("Shutting down.");
}

监听到请求消息后,程序会发起一个任务处理该请求消息。 如果收到的请求符合要求那么就将正确的页面返回,如果请求路径错误那就返回404页面(hello.html和404.html编码内容后面给出)。

./web_hello/src/main.rs:

rust
fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND", "404.html")
    };

    // 这里将对应的页面信息按照TCP的要求返回
    // 对应代码实现
}

单线程的大致思路就是这样,服务运行之后,浏览器向服务地址发送服务请求,服务器收到请求,判断并返回指定的页面内容。

这样的问题在于单线程无法承载太多的请求,当服务增多时,会导致响应速度太慢。

为了改善服务,我们使用多线程来提高请求过多时导致的响应速度太慢的问题。建立一个线程池,存放多个线程。线程池会维护一个接收请求的队列。每一个线程会从队列中取出一个请求,处理请求,接着向对队列索取另一个请求。

此外,建立结构体Worker用于传递线程,连接两个线程的最佳方式就是利用通道。

./web_hello/src/lib.rs:

rust
// 使用通道向线程发送请求
pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Message>,
}

// Worker 结构体负责从 ThreadPool 中将代码传递给线程
struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

// Terminate是会导致线程退出循环并终止的
enum Message {
    NewJob(Job),
    Terminate,
}

// Job 结构体来存放用于向通道中发送的闭包,是将在通道中发出的类型。
type Job = Box<dyn FnOnce() + Send + 'static>;

对于线程池,我们定义一个有固定数量的线程池,new函数会返回一个包含通道和线程数组的结构体。execute函数会将任务从通道的发送端发出。

rust
impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));
        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);
        self.sender.send(Message::NewJob(job)).unwrap();
    }
}

在上面创建线程池的时候,传递的闭包仍然还只是引用了通道的接收端。相反我们需要闭包一直循环,向通道的接收端请求任务,并在得到任务时执行他们。

rust
impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv().unwrap();

            match message {
                Message::NewJob(job) => {
                    println!("Worker {} got a job; executing.", id);

                    job();
                }
                Message::Terminate => {
                    println!("Worker {} was told to terminate.", id);

                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

上面的代码中,首先在receiver上调用了lock来获取互斥器。如果锁定了互斥器,接着调用 recv从通道中接收Job

调用recv会阻塞当前线程,所以如果还没有任务,其会等待直到有可用的任务。Mutex<T>确保一次只有一个Worker线程尝试请求任务。


在上面的逐步拆解中,完成了一个 Web 服务的大致框架思路。下面,给出服务的完整代码。

./web_hello/src/main.rs

rust
use hello::ThreadPool;
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();

    let response = format!(
        "{}\r\nContent-Length: {}\r\n\r\n{}",
        status_line,
        contents.len(),
        contents
    );

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

./web_hello/src/lib.rs

rust
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Message>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

enum Message {
    NewJob(Job),
    Terminate,
}

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));
        let mut workers = Vec::with_capacity(size);
        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);
        self.sender.send(Message::NewJob(job)).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        println!("Sending terminate message to all workers.");

        for _ in &self.workers {
            self.sender.send(Message::Terminate).unwrap();
        }

        println!("Shutting down all workers.");

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);
            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv().unwrap();

            match message {
                Message::NewJob(job) => {
                    println!("Worker {} got a job; executing.", id);

                    job();
                }
                Message::Terminate => {
                    println!("Worker {} was told to terminate.", id);

                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

./web_hello/hello.html:

html
<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="utf-8">
    <title>Hello!</title>
  </head>
  <body>
    <h1>Hello!</h1>
    <p>Hi from Rust</p>
  </body>
</html>

./web_hello/404.html:

html
<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="utf-8">
    <title>Hello!</title>
  </head>
  <body>
    <h1>Oops!</h1>
    <p>Sorry, I don't know what you're asking for.</p>
  </body>
</html>

测试效果

然后在./web_hello目录下执行命令:

cargo run

在浏览器中打开链接:http://127.0.0.1:7878/,反复多次刷新后显示结果如下所示:

PS D:\Rust\temp\hello> cargo run
warning: unused config key `http.sslVerify` in `C:\Users\l00556901\.cargo\config`
   Compiling hello v0.1.0 (D:\Rust\temp\hello)
    Finished dev [unoptimized + debuginfo] target(s) in 1.01s
     Running `target\debug\hello.exe`
Worker 1 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 3 got a job; executing.
Worker 1 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 3 got a job; executing.
Worker 1 got a job; executing.
...........

浏览器显示:

【参考链接】

参考文献:

1.https://kaisery.github.io/trpl-zh-cn/ch16-00-concurrency.html

2.https://doc.rust-lang.org/nomicon/concurrency.html

3.https://kaisery.github.io/trpl-zh-cn/ch10-00-generics.html

4.https://kaisery.github.io/trpl-zh-cn/ch04-00-understanding-ownership.html

5.https://www.cnblogs.com/praying/p/13912955.html