跳到主要内容

设计通知推送系统

问题

如何用 Python 设计一个多渠道通知推送系统?如何解耦消息生产和发送?

答案

架构

通知服务核心

notification/service.py
from enum import Enum
from dataclasses import dataclass
from abc import ABC, abstractmethod

class Channel(Enum):
EMAIL = "email"
SMS = "sms"
PUSH = "push"
IN_APP = "in_app"

@dataclass
class Notification:
user_id: int
channel: Channel
template: str
params: dict

class NotificationSender(ABC):
@abstractmethod
async def send(self, to: str, subject: str, body: str) -> bool: ...

class EmailSender(NotificationSender):
async def send(self, to: str, subject: str, body: str) -> bool:
import aiosmtplib
from email.message import EmailMessage
msg = EmailMessage()
msg["From"] = "noreply@example.com"
msg["To"] = to
msg["Subject"] = subject
msg.set_content(body, subtype="html")
await aiosmtplib.send(msg, hostname="smtp.example.com", port=587)
return True

class SMSSender(NotificationSender):
async def send(self, to: str, subject: str, body: str) -> bool:
from twilio.rest import Client
client = Client(ACCOUNT_SID, AUTH_TOKEN)
client.messages.create(body=body, from_="+1234567890", to=to)
return True

# 渠道注册表
SENDERS: dict[Channel, NotificationSender] = {
Channel.EMAIL: EmailSender(),
Channel.SMS: SMSSender(),
}

模板引擎

notification/template.py
from jinja2 import Environment, FileSystemLoader

env = Environment(loader=FileSystemLoader("templates"))

class TemplateEngine:
@staticmethod
def render(template_name: str, params: dict) -> tuple[str, str]:
"""返回 (subject, body)"""
tmpl = env.get_template(f"{template_name}.html")
body = tmpl.render(**params)

subject_tmpl = env.get_template(f"{template_name}_subject.txt")
subject = subject_tmpl.render(**params)
return subject, body
templates/order_shipped.html
<h2>{{ user_name }},您的订单已发货!</h2>
<p>订单号:{{ order_id }}</p>
<p>快递:{{ carrier }} {{ tracking_no }}</p>

异步发送(Celery)

notification/tasks.py
from celery import Celery

app = Celery("notification", broker="redis://localhost:6379/0")

@app.task(bind=True, max_retries=3, default_retry_delay=30)
def send_notification(self, user_id: int, channel: str, template: str, params: dict):
user = get_user(user_id)
subject, body = TemplateEngine.render(template, params)
sender = SENDERS[Channel(channel)]

try:
import asyncio
asyncio.run(sender.send(to=user.contact(channel), subject=subject, body=body))
save_log(user_id, channel, template, "success")
except Exception as e:
save_log(user_id, channel, template, "failed", str(e))
self.retry(exc=e)

用户偏好与频率控制

notification/preference.py
class NotificationService:
def notify(self, notification: Notification):
user = get_user(notification.user_id)
# 1. 用户偏好检查
if not user.prefers(notification.channel):
return

# 2. 频率控制(1 小时最多 5 条同类通知)
key = f"notify:{notification.user_id}:{notification.template}"
count = redis.incr(key)
if count == 1:
redis.expire(key, 3600)
if count > 5:
return # 静默丢弃

# 3. 异步发送
send_notification.delay(
notification.user_id,
notification.channel.value,
notification.template,
notification.params,
)

常见面试问题

Q1: 如何保证消息不丢?

答案

  1. 业务系统写入消息表(持久化)
  2. Celery acks_late=True,消费成功才确认
  3. 失败自动重试(指数退避)
  4. 死信队列兜底,人工处理

Q2: 多渠道通知的优先级?

答案

场景首选渠道备选
紧急告警短信 + 电话Push
订单通知App Push站内信
营销活动邮件站内信
验证码短信邮件

Q3: 如何支持国际化通知?

答案

  • 模板按 locale 分文件:order_shipped_zh.htmlorder_shipped_en.html
  • 用户 profile 存储语言偏好
  • 渲染时根据 locale 选择模板

相关链接