跳到主要内容

设计分布式任务队列

问题

如何用 Python 设计一个分布式任务队列?Celery 的核心原理是什么?

答案

架构

Celery 基础

tasks.py
from celery import Celery

app = Celery("myapp", broker="redis://localhost:6379/0", backend="redis://localhost:6379/1")

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_email(self, user_id: int, template: str):
try:
user = get_user(user_id)
email_service.send(user.email, template)
except ConnectionError as e:
self.retry(exc=e)

@app.task
def process_data(data: list[dict]) -> dict:
result = heavy_computation(data)
return {"processed": len(result)}
# 调用任务
result = send_email.delay(user_id=1, template="welcome")
result = send_email.apply_async(args=[1, "welcome"], countdown=300) # 5 分钟后执行

# 获取结果
print(result.status) # PENDING -> STARTED -> SUCCESS
print(result.get(timeout=10))

任务编排

from celery import chain, group, chord

# 链式:A → B → C
chain(task_a.s(1), task_b.s(), task_c.s())()

# 并行:A + B + C 同时执行
group(task_a.s(1), task_b.s(2), task_c.s(3))()

# chord:并行 + 回调
chord([download.s(url) for url in urls])(merge_results.s())

定时任务

celery_config.py
from celery.schedules import crontab

app.conf.beat_schedule = {
"daily-report": {
"task": "tasks.generate_report",
"schedule": crontab(hour=8, minute=0), # 每天 8:00
},
"cleanup": {
"task": "tasks.cleanup_expired",
"schedule": 3600, # 每小时
},
}

常见面试问题

Q1: Celery 如何保证任务不丢?

答案

  1. Broker 持久化:RabbitMQ 持久化队列;Redis 开启 AOF
  2. ACK 机制acks_late=True,Worker 处理完才确认
  3. 任务持久化task_serializer="json",任务序列化存储
  4. 结果后端:存储任务状态和返回值

Q2: 如何处理任务幂等性?

答案

@app.task(bind=True)
def charge_order(self, order_id: int):
# 幂等性检查
if is_already_charged(order_id):
return {"status": "already_charged"}
# 使用唯一 task_id 去重
charge(order_id)

Q3: Worker 挂了怎么办?

答案

  • acks_late=True:Worker 挂了,未 ACK 的任务自动回到队列
  • task_reject_on_worker_lost=True:Worker 进程被 kill 时任务回到队列
  • Supervisor / systemd 自动重启 Worker

Q4: Celery vs RQ vs Dramatiq?

答案

特性CeleryRQDramatiq
BrokerRedis/RabbitMQ/SQS仅 RedisRedis/RabbitMQ
功能全面简单中等
性能
学习曲线

相关链接