跳到主要内容

设计限流器

问题

如何用 Python 实现一个限流器?令牌桶和滑动窗口算法的原理和实现?

答案

令牌桶算法

rate_limiter/token_bucket.py
import time
import threading

class TokenBucket:
def __init__(self, rate: float, capacity: int):
"""
rate: 每秒放入的令牌数
capacity: 桶的最大容量
"""
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_refill = time.monotonic()
self.lock = threading.Lock()

def allow(self) -> bool:
with self.lock:
now = time.monotonic()
elapsed = now - self.last_refill
# 补充令牌
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_refill = now

if self.tokens >= 1:
self.tokens -= 1
return True
return False

滑动窗口算法

rate_limiter/sliding_window.py
import time
from collections import deque
import threading

class SlidingWindowLog:
"""滑动窗口日志算法"""
def __init__(self, max_requests: int, window_seconds: float):
self.max_requests = max_requests
self.window = window_seconds
self.timestamps: deque[float] = deque()
self.lock = threading.Lock()

def allow(self) -> bool:
with self.lock:
now = time.monotonic()
# 移除过期时间戳
while self.timestamps and self.timestamps[0] <= now - self.window:
self.timestamps.popleft()
if len(self.timestamps) < self.max_requests:
self.timestamps.append(now)
return True
return False


class SlidingWindowCounter:
"""滑动窗口计数器(近似算法,更省内存)"""
def __init__(self, max_requests: int, window_seconds: float):
self.max_requests = max_requests
self.window = window_seconds
self.prev_count = 0
self.curr_count = 0
self.window_start = time.monotonic()
self.lock = threading.Lock()

def allow(self) -> bool:
with self.lock:
now = time.monotonic()
elapsed = now - self.window_start
if elapsed >= self.window:
self.prev_count = self.curr_count
self.curr_count = 0
self.window_start = now
elapsed = 0

# 加权计算
weight = 1 - elapsed / self.window
estimated = self.prev_count * weight + self.curr_count

if estimated < self.max_requests:
self.curr_count += 1
return True
return False

分布式限流(Redis)

rate_limiter/redis_limiter.py
import redis
import time

class RedisRateLimiter:
def __init__(self, redis_client: redis.Redis, key_prefix: str = "rl"):
self.redis = redis_client
self.prefix = key_prefix

def allow(self, key: str, max_requests: int, window: int) -> bool:
"""滑动窗口限流"""
redis_key = f"{self.prefix}:{key}"
now = time.time()
pipe = self.redis.pipeline()

# Sorted Set:score=时间戳,member=唯一标识
pipe.zremrangebyscore(redis_key, 0, now - window) # 移除过期
pipe.zcard(redis_key) # 当前计数
pipe.zadd(redis_key, {f"{now}": now}) # 添加当前请求
pipe.expire(redis_key, window)

results = pipe.execute()
current_count = results[1]
if current_count >= max_requests:
# 超限,移除刚添加的
self.redis.zrem(redis_key, f"{now}")
return False
return True

FastAPI 限流中间件

middleware/rate_limit.py
from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware

class RateLimitMiddleware(BaseHTTPMiddleware):
def __init__(self, app, limiter: RedisRateLimiter):
super().__init__(app)
self.limiter = limiter

async def dispatch(self, request: Request, call_next):
client_ip = request.client.host
if not self.limiter.allow(client_ip, max_requests=100, window=60):
raise HTTPException(
status_code=429,
detail="Too many requests",
headers={"Retry-After": "60"},
)
return await call_next(request)

常见面试问题

Q1: 四种限流算法对比?

答案

算法优点缺点
固定窗口简单边界突发
滑动窗口日志精确内存高
滑动窗口计数近似、省内存不精确
令牌桶允许突发、平滑实现稍复杂
漏桶固定速率不允许突发

Q2: 分布式限流难点?

答案

  • 时钟偏差:各节点时间不一致,用 Redis 服务器时间
  • 原子性:Lua 脚本保证 Redis 操作原子
  • 性能:高频限流可用本地预分配 + Redis 全局兜底

Q3: 限流触发后如何通知客户端?

答案

  • HTTP 429 状态码
  • Retry-After Header 告知等待时间
  • X-RateLimit-Remaining 剩余额度
  • X-RateLimit-Reset 重置时间

相关链接