跳到主要内容

设计日志收集器

问题

如何用 Rust 设计一个高吞吐的日志收集系统?

答案

架构设计

异步日志收集器

use tokio::sync::mpsc;
use tokio::io::AsyncWriteExt;
use tokio::fs::OpenOptions;
use std::time::Duration;

#[derive(Debug)]
pub struct LogEntry {
pub level: LogLevel,
pub message: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
}

#[derive(Debug)]
pub enum LogLevel { Info, Warn, Error }

pub struct LogCollector {
sender: mpsc::Sender<LogEntry>,
}

impl LogCollector {
pub fn new(output_path: &str) -> Self {
let (tx, rx) = mpsc::channel::<LogEntry>(10_000);
let path = output_path.to_string();

// 后台写入 task
tokio::spawn(async move {
Self::writer_loop(rx, &path).await;
});

Self { sender: tx }
}

pub async fn log(&self, level: LogLevel, message: String) {
let entry = LogEntry {
level,
message,
timestamp: chrono::Utc::now(),
};
// 非阻塞发送,channel 满了丢弃(日志不阻塞业务)
let _ = self.sender.try_send(entry);
}

/// 后台批量写入循环
async fn writer_loop(mut rx: mpsc::Receiver<LogEntry>, path: &str) {
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(path)
.await
.unwrap();

let mut buffer = Vec::with_capacity(100);

loop {
// 批量收集:等待第一条或超时
tokio::select! {
Some(entry) = rx.recv() => {
buffer.push(entry);
// 尽量多取(非阻塞)
while buffer.len() < 100 {
match rx.try_recv() {
Ok(entry) => buffer.push(entry),
Err(_) => break,
}
}
}
_ = tokio::time::sleep(Duration::from_millis(100)) => {}
}

// 批量写入
if !buffer.is_empty() {
let mut output = String::new();
for entry in buffer.drain(..) {
output.push_str(&format!(
"[{}] {:?}: {}\n",
entry.timestamp.format("%Y-%m-%d %H:%M:%S"),
entry.level,
entry.message
));
}
let _ = file.write_all(output.as_bytes()).await;
let _ = file.flush().await;
}
}
}
}

关键设计决策

决策选择原因
传输mpsc::channel多个生产者发送到一个写入线程
背压try_send(丢弃)日志不应阻塞业务逻辑
写入批量 flush减少系统调用次数
文件轮转按日期/大小防止单文件过大
格式JSON / 结构化便于后续分析

常见面试问题

Q1: 日志系统如何做到不影响业务性能?

答案

  1. 异步写入:日志 API 只是往 channel 发消息,不等待写入完成
  2. try_send:channel 满时丢弃日志,不阻塞业务
  3. 批量刷盘:聚合多条日志后一次写入,减少 IO 系统调用
  4. 独立线程:写入在单独的 task 中,不竞争业务线程

相关链接