异步模式与最佳实践
问题
Rust 异步编程有哪些常见模式和注意事项?
答案
并发限制
use tokio::sync::Semaphore;
use std::sync::Arc;
async fn limited_concurrency(urls: Vec<String>) {
let semaphore = Arc::new(Semaphore::new(10)); // 最多 10 个并发
let mut handles = vec![];
for url in urls {
let permit = semaphore.clone().acquire_owned().await.unwrap();
handles.push(tokio::spawn(async move {
let result = fetch(&url).await;
drop(permit); // 释放许可
result
}));
}
for handle in handles {
let _ = handle.await;
}
}
优雅关闭
use tokio::signal;
use tokio::sync::broadcast;
async fn main_server() {
let (shutdown_tx, _) = broadcast::channel(1);
// 多个工作任务
for i in 0..4 {
let mut shutdown_rx = shutdown_tx.subscribe();
tokio::spawn(async move {
loop {
tokio::select! {
_ = do_work(i) => {},
_ = shutdown_rx.recv() => {
println!("Worker {} 收到关闭信号", i);
break;
}
}
}
});
}
// 等待 Ctrl+C
signal::ctrl_c().await.unwrap();
println!("收到关闭信号,通知所有 worker...");
let _ = shutdown_tx.send(());
// 等待清理
tokio::time::sleep(Duration::from_secs(1)).await;
}
Stream 流式处理
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
async fn stream_example() {
let (tx, rx) = tokio::sync::mpsc::channel(100);
let mut stream = ReceiverStream::new(rx);
// 流式处理
while let Some(item) = stream.next().await {
process(item).await;
}
// 适配器
let results: Vec<_> = stream
.filter(|x| x.is_valid())
.map(|x| x.transform())
.take(10)
.collect()
.await;
}
取消安全(Cancellation Safety)
// ⚠️ 不是取消安全的
async fn read_exact(reader: &mut TcpStream, buf: &mut [u8]) {
let mut pos = 0;
while pos < buf.len() {
pos += reader.read(&mut buf[pos..]).await?;
// 如果在这里被 select! 取消,pos 的进度丢失
}
}
// ✅ 取消安全:用 tokio::io::AsyncReadExt::read_exact
// 或者将状态保存在外部
select! 中的取消安全
select! 会取消(drop)未胜出的分支。确保被取消的 Future 不会丢失数据或状态。tokio::sync::mpsc::Receiver::recv() 是取消安全的,但 read() 不一定是。
常见反模式
// ❌ 在 async 中阻塞
async fn bad() {
std::thread::sleep(Duration::from_secs(1)); // 阻塞线程!
std::fs::read_to_string("file.txt"); // 阻塞 IO!
}
// ✅ 正确做法
async fn good() {
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::fs::read_to_string("file.txt").await;
// 或用 spawn_blocking 处理同步代码
tokio::task::spawn_blocking(|| {
std::fs::read_to_string("large_file.txt")
}).await;
}
常见面试问题
Q1: 如何处理异步代码中的超时?
答案:
use tokio::time::timeout;
match timeout(Duration::from_secs(5), async_operation()).await {
Ok(result) => handle(result),
Err(_) => println!("操作超时"),
}
Q2: tokio::select! 和 futures::select! 有什么区别?
答案:
tokio::select!:在分支就绪时取消其他分支(drop),语法更简洁futures::select!:需要Unpin的 Future,使用fuse()防止重复 poll
实际项目中推荐用 tokio::select!。
Q3: 什么是背压(Backpressure)?
答案:
当生产者产生数据的速度超过消费者处理速度时,需要背压机制防止内存无限增长。在 Rust 中:
- 有界 Channel(
mpsc::channel(n))满了会阻塞发送者 - Semaphore 限制并发数
- Stream 的
buffer()控制缓冲大小
Q4: async 代码能用 ? 错误传播吗?
答案:
可以,async 函数和普通函数一样用 ?:
async fn fetch() -> Result<String, Box<dyn std::error::Error>> {
let response = reqwest::get("https://api.com").await?;
let text = response.text().await?;
Ok(text)
}