简单并发实现
网上超级多类似代码,我也是学他们的。我总结了一下并运用了一点自己的理解,使代码运行逻辑看起来更加直观和好理解。随口一提,rust 语言圣经提到“编译器驱动编程”,这个观点我觉得很有趣,意思就是只要知识储备足够,就能先手写出来想要的函数名字,根据函数名字下面的波浪线提示把整个实际功能写出来。不得不说第一次学 rust 真心觉得它是实际上最好的语言,优雅极速,这么美的语法老前端都看哭了。
代码
先看一下整体代码
use std::{
thread, thread::JoinHandle,
sync::{Arc,Mutex,mpsc::{channel, Sender}}
};
// 包装匿名函数类型
type Workfn = Box<dyn FnOnce()->() + Send + 'static>;
// 区分工作和停机消息
enum Msg {
Work(Workfn),
Down
}
// 使用Msg命名空间
use Msg::*;
// 主构造函数Concurrent
pub struct Concur {
count: usize, // 线程数量
sender: Sender<Msg>, // 异步发送器
threads: Option<Vec<JoinHandle<()>>> // 带有 原子指针 异步接收器 的线程 列表
}
impl Concur {
// 初始化函数
pub fn new(count: usize)-> Concur {
let mut threads = Vec::with_capacity(count);
let (sender,receiver) = channel();
let receiver = Arc::new(Mutex::new(receiver));
for i in 0..count {
let p_rec = Arc::clone(&receiver);
threads.push(thread::spawn(move || loop {
let f: Msg = p_rec.lock().unwrap().recv().unwrap();
match f {
Work(f)=>{f();println!("{} works",i)},
Down=>{println!("{} down",i);break}
};
}));
}
Concur { count, sender, threads: Some (threads) }
}
// 实例的exec方法
pub fn exec(&self,f:Workfn) {
self.sender.send(Work(Box::new(f))).unwrap();
}
}
// Concur实例生命结束时会由rust运行drop()
impl Drop for Concur {
fn drop(&mut self) {
// 发送停机消息
for _ in 0..self.count {
self.sender.send(Down).unwrap();
}
// 等待所有线程运行完毕
for thread in self.threads.take().unwrap() {
thread.join().unwrap();
}
}
}
使用
use proj::Concur;
fn main() {
let cc = Concur::new(5); // 新建5线程的线程池
cc.exec(Box::new(move||{
// 要异步的代码
}));
}
原理
初学 rust 会觉得上面代码莫名其妙。核心原理存在于 Concur::new()
函数,以下重点讲解,剩余部分注释均有讲解。
with_capacity
Vec::with_capacity
根据数量创建动态数组,方便编译器分配内存。
多线程如何拿到一个任务
使用 元组
解构定义异步 sender
和 receiver
,注意 mpsc 是专门的异步消息传输库,只能有一个接收者但可以有多个发送者。可多线程实现需要使用多个接收者该怎么办?答案是 Mutex
互斥锁。简单理解就是把消息放进一个电话筒里,让几个线程调用 lock()
就相当于抢这玩意(所有线程阻塞住暂停运行),当一线程抢到(lock
得到了返回值从而恢复线程),开始等待电话另一头来消息(调用 recv()
,是这个过程的第二次阻塞)。sender
发来消息时 recv()
得到返回值开始运行返回值里的函数 dyn FnOnce()+Send
。
上文中有两个难理解的地方:mutex 什么时候把锁(电话筒)还给别的线程?为啥一个闭包函数被称作 dyn FnOnce+Send
类型?
-
rust 有一个特殊的
trait
叫作Drop
,只要为你的结构体实现这玩意就能在作用域结束时自动调用,一般用作清理内存和工作。作用域结束通常有两种情况(几乎没人讲过,会触发Drop
的时机):- 当为变量
a
赋值了这个结构体(比如调用A_Struct::New()
返回或者直接构造)时,这个变量离开所在的最小的{大括号范围}。 - 当你获得了一个结构体,此时直接在其后面调用其方法,会立即触发
Drop
。
- 当为变量
而mutex的机制是离开作用域就自动把锁还回去。浅显易懂。如果你把
mutex.unlock().unwrap()
赋给了一个变量,那么这个变量所在整个{大括号}结束前都是会一直霸占这个锁的。
- 学过别的编程语言一般都会有种一切皆对象的思维。rust 的字面量和各种对象实现的并不是对象或构造器这种东西本身,而是一种叫
trait
的东西。可以理解为 rust 一切(应该有例外)皆有特性。trait
也叫做泛型(为了实现类型而存在的基类)。而函数是一种特殊的泛型,有Fn()
,FnOnce()
,FnMut()
三种类型,我不详细展开。至于它特殊就在于它作为泛型却拥有指定参数和返回值的能力(FnOnce(u8)-> &str
这种)。Send
就不细说了,单纯是因为编译器让你加,并没有什么内涵可说,可以理解为 内存所有权 单向传输 就要加这个特性。dyn
很有趣,只要在一串特性前面加上dyn
就能把这串特性当作一个类型type
使用。
为什么能有多个接收器
智能指针的定义的就不细讲了。在多线程编程中使用了一种叫作原子计数指针 Arc
的异步指针,实际上每个线程拿到的只是 (接收器的互斥锁) 的 原子指针
,虽然实际编程中并没有直接使用 Arc
的计数功能,但并发时 Arc
确实很好用。
在每个线程生成前都会调用 Arc::clone
为 接收器的互斥锁 的 原子指针 创建一份指针拷贝(给指针创建指针的拷贝听着是不是挺绕的),然后传给线程内的函数。之所以函数体直接用了 loop 就是因为内部的阻塞,不需要担心死循环问题。
所以说,Arc::new(Mutex::new(rcvr))
并不是什么复杂结构,而是为一个必须用到的互斥锁创建了多线程用的指针。
结语
多线程并发是服务器最常用的技术手段,在 rust 这种性能极高的语言中学习并发无异于画龙点睛。看看我的代码是不是很像赛尔号?
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于