设计分布式任务队列
问题
如何用 Python 设计一个分布式任务队列?Celery 的核心原理是什么?
答案
架构
Celery 基础
tasks.py
from celery import Celery
app = Celery("myapp", broker="redis://localhost:6379/0", backend="redis://localhost:6379/1")
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_email(self, user_id: int, template: str):
try:
user = get_user(user_id)
email_service.send(user.email, template)
except ConnectionError as e:
self.retry(exc=e)
@app.task
def process_data(data: list[dict]) -> dict:
result = heavy_computation(data)
return {"processed": len(result)}
# 调用任务
result = send_email.delay(user_id=1, template="welcome")
result = send_email.apply_async(args=[1, "welcome"], countdown=300) # 5 分钟后执行
# 获取结果
print(result.status) # PENDING -> STARTED -> SUCCESS
print(result.get(timeout=10))
任务编排
from celery import chain, group, chord
# 链式:A → B → C
chain(task_a.s(1), task_b.s(), task_c.s())()
# 并行:A + B + C 同时执行
group(task_a.s(1), task_b.s(2), task_c.s(3))()
# chord:并行 + 回调
chord([download.s(url) for url in urls])(merge_results.s())
定时任务
celery_config.py
from celery.schedules import crontab
app.conf.beat_schedule = {
"daily-report": {
"task": "tasks.generate_report",
"schedule": crontab(hour=8, minute=0), # 每天 8:00
},
"cleanup": {
"task": "tasks.cleanup_expired",
"schedule": 3600, # 每小时
},
}
常见面试问题
Q1: Celery 如何保证任务不丢?
答案:
- Broker 持久化:RabbitMQ 持久化队列;Redis 开启 AOF
- ACK 机制:
acks_late=True,Worker 处理完才确认 - 任务持久化:
task_serializer="json",任务序列化存储 - 结果后端:存储任务状态和返回值
Q2: 如何处理任务幂等性?
答案:
@app.task(bind=True)
def charge_order(self, order_id: int):
# 幂等性检查
if is_already_charged(order_id):
return {"status": "already_charged"}
# 使用唯一 task_id 去重
charge(order_id)
Q3: Worker 挂了怎么办?
答案:
acks_late=True:Worker 挂了,未 ACK 的任务自动回到队列task_reject_on_worker_lost=True:Worker 进程被 kill 时任务回到队列- Supervisor / systemd 自动重启 Worker
Q4: Celery vs RQ vs Dramatiq?
答案:
| 特性 | Celery | RQ | Dramatiq |
|---|---|---|---|
| Broker | Redis/RabbitMQ/SQS | 仅 Redis | Redis/RabbitMQ |
| 功能 | 全面 | 简单 | 中等 |
| 性能 | 中 | 高 | 高 |
| 学习曲线 | 高 | 低 | 低 |