跳到主要内容

消息传递(Message Passing)

消息传递(Message Passing)是 Rust 推荐的并发编程方式。通过在线程间发送消息来共享数据,而不是共享内存。这遵循了 Go 语言的座右铭:"不要通过共享内存来通讯;而是通过通讯来共享内存"。

通道 (Channels)

Rust 标准库提供了通道来实现消息传递。通道有两部分:发送者(transmitter)和接收者(receiver)。

基本通道使用

use std::sync::mpsc;
use std::thread;

fn main() {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});

let received = rx.recv().unwrap();
println!("Got: {}", received);
}

发送多个值

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];

for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});

for received in rx {
println!("Got: {}", received);
}
}

多个生产者

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
let (tx, rx) = mpsc::channel();

let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];

for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});

thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];

for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});

for received in rx {
println!("Got: {}", received);
}
}

同步通道

同步通道会阻塞发送者,直到接收者准备好接收消息:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
let (tx, rx) = mpsc::sync_channel(0); // 容量为 0,完全同步

let handle = thread::spawn(move || {
println!("发送前");
tx.send("hello").unwrap();
println!("发送后");
});

thread::sleep(Duration::from_secs(2));
println!("接收: {}", rx.recv().unwrap());

handle.join().unwrap();
}

有缓冲的同步通道

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
let (tx, rx) = mpsc::sync_channel(2); // 容量为 2

thread::spawn(move || {
tx.send(1).unwrap();
println!("发送了 1");

tx.send(2).unwrap();
println!("发送了 2");

tx.send(3).unwrap(); // 这里会阻塞,直到有空间
println!("发送了 3");
});

thread::sleep(Duration::from_secs(1));
println!("接收: {}", rx.recv().unwrap());

thread::sleep(Duration::from_secs(1));
println!("接收: {}", rx.recv().unwrap());

thread::sleep(Duration::from_secs(1));
println!("接收: {}", rx.recv().unwrap());
}

非阻塞操作

try_send 和 try_recv

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
let (tx, rx) = mpsc::sync_channel(1);

// 非阻塞发送
match tx.try_send("first") {
Ok(()) => println!("发送成功"),
Err(e) => println!("发送失败: {:?}", e),
}

match tx.try_send("second") {
Ok(()) => println!("发送成功"),
Err(e) => println!("发送失败: {:?}", e), // 会失败,因为通道已满
}

// 非阻塞接收
match rx.try_recv() {
Ok(msg) => println!("接收到: {}", msg),
Err(e) => println!("接收失败: {:?}", e),
}

match rx.try_recv() {
Ok(msg) => println!("接收到: {}", msg),
Err(e) => println!("接收失败: {:?}", e), // 会失败,因为通道为空
}
}

带超时的接收

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
thread::sleep(Duration::from_secs(2));
tx.send("delayed message").unwrap();
});

// 等待 1 秒
match rx.recv_timeout(Duration::from_secs(1)) {
Ok(msg) => println!("接收到: {}", msg),
Err(e) => println!("超时: {:?}", e),
}

// 等待 2 秒
match rx.recv_timeout(Duration::from_secs(2)) {
Ok(msg) => println!("接收到: {}", msg),
Err(e) => println!("超时: {:?}", e),
}
}

实际应用示例

任务分发系统

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

#[derive(Debug)]
enum Task {
Process(i32),
Shutdown,
}

#[derive(Debug)]
enum Result {
Processed(i32, i32), // (input, result)
WorkerShutdown(usize),
}

fn worker(id: usize, task_rx: mpsc::Receiver<Task>, result_tx: mpsc::Sender<Result>) {
loop {
match task_rx.recv() {
Ok(Task::Process(data)) => {
println!("工作者 {} 处理数据: {}", id, data);
thread::sleep(Duration::from_millis(100)); // 模拟工作
let result = data * data; // 简单的处理
result_tx.send(Result::Processed(data, result)).unwrap();
}
Ok(Task::Shutdown) => {
println!("工作者 {} 关闭", id);
result_tx.send(Result::WorkerShutdown(id)).unwrap();
break;
}
Err(_) => {
println!("工作者 {} 通道关闭", id);
break;
}
}
}
}

fn main() {
let (task_tx, task_rx) = mpsc::channel();
let (result_tx, result_rx) = mpsc::channel();

// 创建多个工作者
let num_workers = 3;
for i in 0..num_workers {
let task_rx = task_rx.clone();
let result_tx = result_tx.clone();
thread::spawn(move || {
worker(i, task_rx, result_tx);
});
}

// 发送任务
for i in 1..=10 {
task_tx.send(Task::Process(i)).unwrap();
}

// 发送关闭信号
for _ in 0..num_workers {
task_tx.send(Task::Shutdown).unwrap();
}

drop(task_tx); // 关闭任务通道
drop(result_tx); // 关闭结果通道

// 收集结果
let mut processed_count = 0;
let mut shutdown_count = 0;

for result in result_rx {
match result {
Result::Processed(input, output) => {
println!("结果: {} -> {}", input, output);
processed_count += 1;
}
Result::WorkerShutdown(id) => {
println!("工作者 {} 已关闭", id);
shutdown_count += 1;
if shutdown_count == num_workers {
break;
}
}
}
}

println!("处理了 {} 个任务", processed_count);
}

事件系统

use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};

#[derive(Debug, Clone)]
enum Event {
UserClick { x: i32, y: i32 },
KeyPress { key: char },
Timer { id: u32 },
Quit,
}

struct EventSystem {
event_tx: mpsc::Sender<Event>,
}

impl EventSystem {
fn new() -> (Self, mpsc::Receiver<Event>) {
let (tx, rx) = mpsc::channel();
(EventSystem { event_tx: tx }, rx)
}

fn send_event(&self, event: Event) {
self.event_tx.send(event).unwrap();
}

fn start_timer(&self, id: u32, duration: Duration) {
let tx = self.event_tx.clone();
thread::spawn(move || {
thread::sleep(duration);
tx.send(Event::Timer { id }).unwrap();
});
}
}

fn handle_event(event: Event) {
match event {
Event::UserClick { x, y } => {
println!("用户点击位置: ({}, {})", x, y);
}
Event::KeyPress { key } => {
println!("按键: {}", key);
}
Event::Timer { id } => {
println!("定时器 {} 触发", id);
}
Event::Quit => {
println!("退出事件");
}
}
}

fn main() {
let (event_system, event_rx) = EventSystem::new();

// 模拟事件生成器
let event_system_clone = EventSystem {
event_tx: event_system.event_tx.clone(),
};

thread::spawn(move || {
// 模拟用户交互
event_system_clone.send_event(Event::UserClick { x: 100, y: 200 });
thread::sleep(Duration::from_millis(500));

event_system_clone.send_event(Event::KeyPress { key: 'a' });
thread::sleep(Duration::from_millis(500));

event_system_clone.send_event(Event::KeyPress { key: 'b' });
thread::sleep(Duration::from_millis(500));

event_system_clone.send_event(Event::Quit);
});

// 启动定时器
event_system.start_timer(1, Duration::from_millis(300));
event_system.start_timer(2, Duration::from_millis(800));

// 事件循环
let start_time = Instant::now();
for event in event_rx {
println!("[{:.2}s] ", start_time.elapsed().as_secs_f32());
handle_event(event.clone());

if matches!(event, Event::Quit) {
break;
}
}
}

生产者-消费者缓冲区

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

struct BufferedChannel<T> {
sender: mpsc::Sender<T>,
}

impl<T> BufferedChannel<T>
where
T: Send + 'static,
{
fn new<F>(buffer_size: usize, mut processor: F) -> Self
where
F: FnMut(T) + Send + 'static,
{
let (tx, rx) = mpsc::sync_channel(buffer_size);

thread::spawn(move || {
for item in rx {
processor(item);
}
});

BufferedChannel { sender: tx }
}

fn send(&self, item: T) -> Result<(), mpsc::SendError<T>> {
self.sender.send(item)
}

fn try_send(&self, item: T) -> Result<(), mpsc::TrySendError<T>> {
self.sender.try_send(item)
}
}

fn main() {
// 创建一个缓冲通道,处理字符串
let channel = BufferedChannel::new(5, |msg: String| {
println!("处理消息: {}", msg);
thread::sleep(Duration::from_millis(100)); // 模拟处理时间
});

// 快速发送消息
for i in 1..=10 {
let msg = format!("消息 {}", i);
match channel.try_send(msg.clone()) {
Ok(()) => println!("发送成功: {}", msg),
Err(mpsc::TrySendError::Full(_)) => {
println!("缓冲区满,等待...");
channel.send(msg).unwrap(); // 阻塞发送
}
Err(mpsc::TrySendError::Disconnected(_)) => {
println!("通道已关闭");
break;
}
}
}

// 等待处理完成
thread::sleep(Duration::from_secs(2));
}

选择操作 (Select)

虽然标准库没有提供 select,但可以使用第三方库如 crossbeam-channel

// 注意:这需要添加 crossbeam-channel 依赖
// [dependencies]
// crossbeam-channel = "0.5"

/*
use crossbeam_channel::{select, unbounded};
use std::thread;
use std::time::Duration;

fn main() {
let (tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded();

thread::spawn(move || {
thread::sleep(Duration::from_millis(500));
tx1.send("来自通道1的消息").unwrap();
});

thread::spawn(move || {
thread::sleep(Duration::from_millis(300));
tx2.send("来自通道2的消息").unwrap();
});

select! {
recv(rx1) -> msg => println!("通道1: {:?}", msg),
recv(rx2) -> msg => println!("通道2: {:?}", msg),
}
}
*/

错误处理

通道关闭处理

use std::sync::mpsc;
use std::thread;

fn main() {
let (tx, rx) = mpsc::channel();

let handle = thread::spawn(move || {
for i in 1..=3 {
if tx.send(i).is_err() {
println!("接收者已关闭");
break;
}
}
});

// 只接收一个消息就关闭接收者
if let Ok(msg) = rx.recv() {
println!("接收到: {}", msg);
}
drop(rx); // 显式关闭接收者

handle.join().unwrap();
}

最佳实践

  1. 优先使用消息传递:而不是共享状态
  2. 合理选择通道类型:异步 vs 同步,缓冲大小
  3. 处理通道关闭:优雅地处理发送/接收错误
  4. 避免死锁:确保有足够的接收者
  5. 使用类型安全的消息:定义清晰的消息类型
use std::sync::mpsc;
use std::thread;

// 好的实践示例
#[derive(Debug)]
enum Message {
Work(String),
Terminate,
}

fn worker_thread(id: usize, rx: mpsc::Receiver<Message>) {
loop {
match rx.recv() {
Ok(Message::Work(data)) => {
println!("工作者 {} 处理: {}", id, data);
// 处理工作...
}
Ok(Message::Terminate) => {
println!("工作者 {} 终止", id);
break;
}
Err(_) => {
println!("工作者 {} 通道关闭", id);
break;
}
}
}
}

fn main() {
let (tx, rx) = mpsc::channel();

let handle = thread::spawn(move || {
worker_thread(1, rx);
});

// 发送工作
tx.send(Message::Work("任务1".to_string())).unwrap();
tx.send(Message::Work("任务2".to_string())).unwrap();

// 发送终止信号
tx.send(Message::Terminate).unwrap();

handle.join().unwrap();
}

消息传递是 Rust 并发编程的核心模式,它提供了安全、高效的线程间通信方式。通过合理使用通道,可以构建出健壮的并发系统。