跳到主要内容

设计定时任务系统

问题

如何用 Python 设计一个定时任务调度系统?APScheduler 和 Celery Beat 有什么区别?

答案

APScheduler

scheduler/apscheduler_demo.py
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.jobstores.redis import RedisJobStore

scheduler = AsyncIOScheduler(
jobstores={
"default": RedisJobStore(host="localhost", port=6379, db=2),
},
job_defaults={
"coalesce": True, # 错过多次只执行一次
"max_instances": 1, # 同一任务最多并行 1 个
"misfire_grace_time": 60, # 超时容忍 60 秒
},
)

# Cron 触发器
@scheduler.scheduled_job(CronTrigger(hour=2, minute=0))
async def daily_cleanup():
"""每天凌晨 2 点清理过期数据"""
await cleanup_expired_records()

# 间隔触发器
@scheduler.scheduled_job(IntervalTrigger(minutes=5))
async def health_check():
"""每 5 分钟健康检查"""
await check_all_services()

# 动态添加任务
def add_job(func, trigger, job_id: str, **kwargs):
scheduler.add_job(
func,
trigger=trigger,
id=job_id,
replace_existing=True,
**kwargs,
)

scheduler.start()

Celery Beat

scheduler/celery_beat.py
from celery import Celery
from celery.schedules import crontab

app = Celery("scheduler", broker="redis://localhost:6379/0")

app.conf.beat_schedule = {
"report-every-morning": {
"task": "tasks.generate_report",
"schedule": crontab(hour=8, minute=0),
"args": ("daily",),
},
"sync-every-hour": {
"task": "tasks.sync_data",
"schedule": 3600.0,
},
}

# 启动: celery -A scheduler beat --loglevel=info
# Worker: celery -A scheduler worker --loglevel=info

自定义调度器

scheduler/custom.py
import asyncio
import heapq
import time
from dataclasses import dataclass, field
from typing import Callable, Awaitable

@dataclass(order=True)
class ScheduledTask:
next_run: float
interval: float = field(compare=False)
func: Callable[[], Awaitable] = field(compare=False)
name: str = field(compare=False, default="")

class SimpleScheduler:
def __init__(self):
self.tasks: list[ScheduledTask] = []
self.running = False

def add_task(self, func: Callable, interval: float, name: str = ""):
task = ScheduledTask(
next_run=time.monotonic(),
interval=interval,
func=func,
name=name,
)
heapq.heappush(self.tasks, task)

async def run(self):
self.running = True
while self.running and self.tasks:
task = self.tasks[0]
now = time.monotonic()
if task.next_run > now:
await asyncio.sleep(task.next_run - now)

task = heapq.heappop(self.tasks)
try:
await task.func()
except Exception as e:
print(f"Task {task.name} failed: {e}")

# 重新调度
task.next_run = time.monotonic() + task.interval
heapq.heappush(self.tasks, task)

常见面试问题

Q1: APScheduler vs Celery Beat?

答案

特性APSchedulerCelery Beat
依赖独立库需 Celery
动态任务支持运行时增删需 django-celery-beat
持久化SQLAlchemy/Redis数据库
分布式需额外处理天然分布式 Worker
适用中小项目已用 Celery 的项目

Q2: 分布式定时任务如何防止重复执行?

答案

  1. 分布式锁:Redis SETNX 抢锁,只有一个节点执行
  2. 单节点调度:只部署一个 Beat 进程
  3. 数据库锁:SELECT FOR UPDATE 抢任务
def ensure_single_execution(task_id: str, ttl: int = 300):
"""Redis 分布式锁防重复"""
lock_key = f"scheduler:lock:{task_id}"
acquired = redis.set(lock_key, "1", nx=True, ex=ttl)
return acquired

Q3: 任务错过执行时间怎么办?

答案

  • coalesce:跳过中间缺失的,只执行最后一次
  • misfire_grace_time:在容忍时间内仍然执行
  • 补偿执行:记录执行日志,启动后检查并补执行

相关链接