最好理解的 rust 线程池实现

本贴最后更新于 420 天前,其中的信息可能已经时移世改

简单并发实现

网上超级多类似代码,我也是学他们的。我总结了一下并运用了一点自己的理解,使代码运行逻辑看起来更加直观和好理解。随口一提,rust 语言圣经提到“编译器驱动编程”,这个观点我觉得很有趣,意思就是只要知识储备足够,就能先手写出来想要的函数名字,根据函数名字下面的波浪线提示把整个实际功能写出来。不得不说第一次学 rust 真心觉得它是实际上最好的语言,优雅极速,这么美的语法老前端都看哭了。

代码

先看一下整体代码

use std::{
  thread, thread::JoinHandle,
  sync::{Arc,Mutex,mpsc::{channel, Sender}}
};

// 包装匿名函数类型
type Workfn = Box<dyn FnOnce()->() + Send + 'static>;
// 区分工作和停机消息
enum Msg {
  Work(Workfn),
  Down
}
// 使用Msg命名空间
use Msg::*;

// 主构造函数Concurrent
pub struct Concur {
  count: usize, // 线程数量
  sender: Sender<Msg>, // 异步发送器
  threads: Option<Vec<JoinHandle<()>>> // 带有 原子指针 异步接收器 的线程 列表
}
impl Concur {
  // 初始化函数
  pub fn new(count: usize)-> Concur {
    let mut threads = Vec::with_capacity(count);
    let (sender,receiver) = channel();
    let receiver = Arc::new(Mutex::new(receiver));
    for i in 0..count {
      let p_rec = Arc::clone(&receiver);
      threads.push(thread::spawn(move || loop {
        let f: Msg = p_rec.lock().unwrap().recv().unwrap();
        match f {
          Work(f)=>{f();println!("{} works",i)},
          Down=>{println!("{} down",i);break}
        };
      }));
    }
    Concur { count, sender, threads: Some (threads) }
  }
  // 实例的exec方法
  pub fn exec(&self,f:Workfn) {
    self.sender.send(Work(Box::new(f))).unwrap();
  }
}

// Concur实例生命结束时会由rust运行drop()
impl Drop for Concur {
  fn drop(&mut self) {
    // 发送停机消息
    for _ in 0..self.count {
      self.sender.send(Down).unwrap();
    }
    // 等待所有线程运行完毕
    for thread in self.threads.take().unwrap() {
      thread.join().unwrap();
    }
  }
}

使用

use proj::Concur;
fn main() {
  let cc = Concur::new(5); // 新建5线程的线程池
  cc.exec(Box::new(move||{
    // 要异步的代码
  }));
}

原理

初学 rust 会觉得上面代码莫名其妙。核心原理存在于 Concur::new() 函数,以下重点讲解,剩余部分注释均有讲解。

with_capacity

Vec::with_capacity 根据数量创建动态数组,方便编译器分配内存。

多线程如何拿到一个任务

使用 元组 解构定义异步 senderreceiver,注意 mpsc 是专门的异步消息传输库,只能有一个接收者但可以有多个发送者。可多线程实现需要使用多个接收者该怎么办?答案是 Mutex 互斥锁。简单理解就是把消息放进一个电话筒里,让几个线程调用 lock() 就相当于抢这玩意(所有线程阻塞住暂停运行),当一线程抢到(lock 得到了返回值从而恢复线程),开始等待电话另一头来消息(调用 recv(),是这个过程的第二次阻塞)。sender 发来消息时 recv() 得到返回值开始运行返回值里的函数 dyn FnOnce()+Send

上文中有两个难理解的地方:mutex 什么时候把锁(电话筒)还给别的线程?为啥一个闭包函数被称作 dyn FnOnce+Send 类型?

  1. rust 有一个特殊的 trait 叫作 Drop,只要为你的结构体实现这玩意就能在作用域结束时自动调用,一般用作清理内存和工作。作用域结束通常有两种情况(几乎没人讲过,会触发 Drop 的时机):

    • 当为变量 a 赋值了这个结构体(比如调用 A_Struct::New() 返回或者直接构造)时,这个变量离开所在的最小的{大括号范围}。
    • 当你获得了一个结构体,此时直接在其后面调用其方法,会立即触发 Drop
而mutex的机制是离开作用域就自动把锁还回去。浅显易懂。如果你把

mutex.unlock().unwrap() 赋给了一个变量,那么这个变量所在整个{大括号}结束前都是会一直霸占这个锁的。

  1. 学过别的编程语言一般都会有种一切皆对象的思维。rust 的字面量和各种对象实现的并不是对象或构造器这种东西本身,而是一种叫 trait 的东西。可以理解为 rust 一切(应该有例外)皆有特性。trait 也叫做泛型(为了实现类型而存在的基类)。而函数是一种特殊的泛型,有 Fn(),FnOnce(),FnMut() 三种类型,我不详细展开。至于它特殊就在于它作为泛型却拥有指定参数和返回值的能力(FnOnce(u8)-> &str 这种)。Send 就不细说了,单纯是因为编译器让你加,并没有什么内涵可说,可以理解为 内存所有权 单向传输 就要加这个特性。dyn 很有趣,只要在一串特性前面加上 dyn 就能把这串特性当作一个类型 type 使用。

为什么能有多个接收器

智能指针的定义的就不细讲了。在多线程编程中使用了一种叫作原子计数指针 Arc 的异步指针,实际上每个线程拿到的只是 (接收器的互斥锁) 的 原子指针,虽然实际编程中并没有直接使用 Arc 的计数功能,但并发时 Arc 确实很好用。

在每个线程生成前都会调用 Arc::clone 为 接收器的互斥锁 的 原子指针 创建一份指针拷贝(给指针创建指针的拷贝听着是不是挺绕的),然后传给线程内的函数。之所以函数体直接用了 loop 就是因为内部的阻塞,不需要担心死循环问题。

所以说,Arc::new(Mutex::new(rcvr)) 并不是什么复杂结构,而是为一个必须用到的互斥锁创建了多线程用的指针。

结语

多线程并发是服务器最常用的技术手段,在 rust 这种性能极高的语言中学习并发无异于画龙点睛。看看我的代码是不是很像赛尔号?

屏幕截图 20230928042710.png

最好的语言

单选 公开 永不结束 1 票
js
0% 0 票
java
0% 0 票
go
0% 0 票
rust
100% 1 票
c 系列
0% 0 票
python
0% 0 票

  • Rust

    Rust 是一门赋予每个人构建可靠且高效软件能力的语言。Rust 由 Mozilla 开发,最早发布于 2014 年 9 月。

    58 引用 • 22 回帖
  • 并发
    75 引用 • 73 回帖 • 1 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...