跳到主要内容

设计任务调度器

问题

如何用 Rust 设计一个支持定时任务和延迟任务的调度器?

答案

时间轮 + 优先队列架构

核心实现

use std::collections::BinaryHeap;
use std::cmp::Reverse;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{Instant, Duration};

type Task = Box<dyn FnOnce() -> futures::future::BoxFuture<'static, ()> + Send>;

struct ScheduledTask {
run_at: Instant,
task: Task,
}

impl PartialEq for ScheduledTask {
fn eq(&self, other: &Self) -> bool {
self.run_at == other.run_at
}
}
impl Eq for ScheduledTask {}
impl PartialOrd for ScheduledTask {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ScheduledTask {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// 反转顺序,让最小的在堆顶
other.run_at.cmp(&self.run_at)
}
}

pub struct Scheduler {
tasks: Arc<Mutex<BinaryHeap<ScheduledTask>>>,
notify: tokio::sync::Notify,
}

impl Scheduler {
pub fn new() -> Self {
Self {
tasks: Arc::new(Mutex::new(BinaryHeap::new())),
notify: tokio::sync::Notify::new(),
}
}

/// 调度延迟任务
pub async fn schedule_after<F, Fut>(&self, delay: Duration, f: F)
where
F: FnOnce() -> Fut + Send + 'static,
Fut: std::future::Future<Output = ()> + Send + 'static,
{
let task = ScheduledTask {
run_at: Instant::now() + delay,
task: Box::new(move || Box::pin(f())),
};
self.tasks.lock().await.push(task);
self.notify.notify_one(); // 通知调度循环
}

/// 调度循环
pub async fn run(&self) {
loop {
let next_wake = {
let mut tasks = self.tasks.lock().await;
if let Some(task) = tasks.peek() {
let now = Instant::now();
if task.run_at <= now {
// 到期,执行任务
let task = tasks.pop().unwrap();
tokio::spawn((task.task)());
continue;
}
Some(task.run_at - now)
} else {
None
}
};

match next_wake {
Some(duration) => {
tokio::select! {
_ = tokio::time::sleep(duration) => {},
_ = self.notify.notified() => {},
}
}
None => {
self.notify.notified().await;
}
}
}
}
}

使用示例

let scheduler = Arc::new(Scheduler::new());

// 5 秒后执行
scheduler.schedule_after(Duration::from_secs(5), || async {
println!("5 seconds later!");
}).await;

// 启动调度器
scheduler.run().await;

常见面试问题

Q1: 时间轮 vs 最小堆?

答案

方案添加任务取下一个任务适用场景
最小堆O(log n)O(log n)任务数量中等
时间轮O(1)O(1)大量定时任务
层级时间轮O(1)O(1)跨度大的定时器

Tokio 内部使用层级时间轮管理所有 sleep/timeout

相关链接