跳到主要内容

异步模式与最佳实践

问题

Rust 异步编程有哪些常见模式和注意事项?

答案

并发限制

use tokio::sync::Semaphore;
use std::sync::Arc;

async fn limited_concurrency(urls: Vec<String>) {
let semaphore = Arc::new(Semaphore::new(10)); // 最多 10 个并发
let mut handles = vec![];

for url in urls {
let permit = semaphore.clone().acquire_owned().await.unwrap();
handles.push(tokio::spawn(async move {
let result = fetch(&url).await;
drop(permit); // 释放许可
result
}));
}

for handle in handles {
let _ = handle.await;
}
}

优雅关闭

use tokio::signal;
use tokio::sync::broadcast;

async fn main_server() {
let (shutdown_tx, _) = broadcast::channel(1);

// 多个工作任务
for i in 0..4 {
let mut shutdown_rx = shutdown_tx.subscribe();
tokio::spawn(async move {
loop {
tokio::select! {
_ = do_work(i) => {},
_ = shutdown_rx.recv() => {
println!("Worker {} 收到关闭信号", i);
break;
}
}
}
});
}

// 等待 Ctrl+C
signal::ctrl_c().await.unwrap();
println!("收到关闭信号,通知所有 worker...");
let _ = shutdown_tx.send(());
// 等待清理
tokio::time::sleep(Duration::from_secs(1)).await;
}

Stream 流式处理

use tokio_stream::{StreamExt, wrappers::ReceiverStream};

async fn stream_example() {
let (tx, rx) = tokio::sync::mpsc::channel(100);
let mut stream = ReceiverStream::new(rx);

// 流式处理
while let Some(item) = stream.next().await {
process(item).await;
}

// 适配器
let results: Vec<_> = stream
.filter(|x| x.is_valid())
.map(|x| x.transform())
.take(10)
.collect()
.await;
}

取消安全(Cancellation Safety)

// ⚠️ 不是取消安全的
async fn read_exact(reader: &mut TcpStream, buf: &mut [u8]) {
let mut pos = 0;
while pos < buf.len() {
pos += reader.read(&mut buf[pos..]).await?;
// 如果在这里被 select! 取消,pos 的进度丢失
}
}

// ✅ 取消安全:用 tokio::io::AsyncReadExt::read_exact
// 或者将状态保存在外部
select! 中的取消安全

select! 会取消(drop)未胜出的分支。确保被取消的 Future 不会丢失数据或状态。tokio::sync::mpsc::Receiver::recv() 是取消安全的,但 read() 不一定是。

常见反模式

// ❌ 在 async 中阻塞
async fn bad() {
std::thread::sleep(Duration::from_secs(1)); // 阻塞线程!
std::fs::read_to_string("file.txt"); // 阻塞 IO!
}

// ✅ 正确做法
async fn good() {
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::fs::read_to_string("file.txt").await;
// 或用 spawn_blocking 处理同步代码
tokio::task::spawn_blocking(|| {
std::fs::read_to_string("large_file.txt")
}).await;
}

常见面试问题

Q1: 如何处理异步代码中的超时?

答案

use tokio::time::timeout;

match timeout(Duration::from_secs(5), async_operation()).await {
Ok(result) => handle(result),
Err(_) => println!("操作超时"),
}

Q2: tokio::select!futures::select! 有什么区别?

答案

  • tokio::select!:在分支就绪时取消其他分支(drop),语法更简洁
  • futures::select!:需要 Unpin 的 Future,使用 fuse() 防止重复 poll

实际项目中推荐用 tokio::select!

Q3: 什么是背压(Backpressure)?

答案

当生产者产生数据的速度超过消费者处理速度时,需要背压机制防止内存无限增长。在 Rust 中:

  • 有界 Channel(mpsc::channel(n))满了会阻塞发送者
  • Semaphore 限制并发数
  • Stream 的 buffer() 控制缓冲大小

Q4: async 代码能用 ? 错误传播吗?

答案

可以,async 函数和普通函数一样用 ?

async fn fetch() -> Result<String, Box<dyn std::error::Error>> {
let response = reqwest::get("https://api.com").await?;
let text = response.text().await?;
Ok(text)
}

相关链接