限流与熔断
问题
什么是限流、熔断和降级?有哪些常见限流算法?如何在 Node.js / NestJS 中实现?分布式场景下如何做限流?
答案
在高并发系统中,限流、熔断和降级是保障服务稳定性的三大核心手段。它们通常配合使用,共同构成系统的容错防护体系。
| 概念 | 定义 | 类比 |
|---|---|---|
| 限流 | 控制单位时间内的请求数量,超出部分拒绝或排队 | 高速公路入口匝道控制 |
| 熔断 | 当下游服务异常时,主动断开调用,快速失败 | 电路保险丝 |
| 降级 | 当资源紧张时,关闭非核心功能,保障核心链路 | 飞机紧急情况下关闭娱乐系统 |
限流算法
1. 固定窗口计数器(Fixed Window Counter)
将时间划分为固定大小的窗口(如每秒、每分钟),在每个窗口内维护一个计数器,请求到来时计数器加一,超过阈值则拒绝。
class FixedWindowLimiter {
// 当前窗口的请求计数
private count = 0;
// 当前窗口的起始时间
private windowStart: number;
constructor(
// 窗口大小(毫秒),如 60000 表示 1 分钟
private windowMs: number,
// 窗口内最大请求数
private maxRequests: number,
) {
this.windowStart = Date.now();
}
tryAcquire(): boolean {
const now = Date.now();
// 如果当前时间已经超出窗口范围,重置计数器
if (now - this.windowStart >= this.windowMs) {
this.count = 0;
this.windowStart = now;
}
// 判断是否超过阈值
if (this.count < this.maxRequests) {
this.count++;
return true; // 放行
}
return false; // 限流
}
}
// 使用示例:每秒最多 100 个请求
const limiter = new FixedWindowLimiter(1000, 100);
固定窗口的最大缺陷是边界突刺:假设限制每秒 100 个请求,用户在第 1 秒的最后 100ms 发送 100 个请求,又在第 2 秒的最初 100ms 发送 100 个请求,那么在 200ms 内实际通过了 200 个请求,是阈值的 2 倍。
2. 滑动窗口(Sliding Window)
滑动窗口通过记录每个请求的精确时间戳,实时计算最近一段时间内的请求数,避免了固定窗口的边界突刺问题。
方案 A:内存实现(单机)
class SlidingWindowLimiter {
// 存储每个请求的时间戳
private timestamps: number[] = [];
constructor(
private windowMs: number, // 窗口大小(毫秒)
private maxRequests: number, // 窗口内最大请求数
) {}
tryAcquire(): boolean {
const now = Date.now();
const windowStart = now - this.windowMs;
// 移除窗口之外的旧记录(从头部移除)
while (this.timestamps.length > 0 && this.timestamps[0] <= windowStart) {
this.timestamps.shift();
}
// 判断当前窗口内的请求数量
if (this.timestamps.length < this.maxRequests) {
this.timestamps.push(now);
return true;
}
return false;
}
}
方案 B:Redis Sorted Set 实现(分布式)
利用 Redis 有序集合,以时间戳作为 score,天然支持范围查询和过期清理。
import Redis from 'ioredis';
const redis = new Redis();
/**
* 基于 Redis Sorted Set 的滑动窗口限流
* @param key 限流标识(如 rate:user:123 或 rate:ip:1.2.3.4)
* @param limit 窗口内最大请求数
* @param windowMs 窗口大小(毫秒)
* @returns 是否被限流(true = 被限流)
*/
async function isRateLimited(
key: string,
limit: number,
windowMs: number,
): Promise<boolean> {
const now = Date.now();
const windowStart = now - windowMs;
// 使用 pipeline 批量执行,减少网络往返
const pipeline = redis.pipeline();
// 1. 删除窗口之外的过期记录
pipeline.zremrangebyscore(key, 0, windowStart);
// 2. 将当前请求加入集合(member 需唯一,加入随机数防冲突)
pipeline.zadd(key, now, `${now}-${Math.random().toString(36).slice(2)}`);
// 3. 统计当前窗口内的请求数
pipeline.zcard(key);
// 4. 设置 key 的过期时间,避免冷 key 永驻内存
pipeline.expire(key, Math.ceil(windowMs / 1000));
const results = await pipeline.exec();
const count = results?.[2]?.[1] as number;
return count > limit;
}
// 使用示例
async function handleRequest(userId: string): Promise<void> {
const limited = await isRateLimited(`rate:user:${userId}`, 100, 60000);
if (limited) {
throw new Error('Rate limit exceeded');
}
// 正常处理请求...
}
上面的 pipeline 方案虽然性能好,但在极端并发下可能有微小的竞态问题。生产环境推荐使用 Lua 脚本保证原子性,详见 Q5: 分布式限流怎么实现。
3. 漏桶算法(Leaky Bucket)
漏桶以固定速率处理请求。请求先进入桶中排队,桶以恒定速率"漏水"(处理请求)。如果桶满了,新请求被丢弃。
class LeakyBucket {
// 当前桶中的水量(待处理的请求数)
private water = 0;
// 上次漏水的时间
private lastLeakTime: number;
constructor(
// 桶容量:最多积压多少请求
private capacity: number,
// 漏水速率:每秒处理多少个请求
private leakRate: number,
) {
this.lastLeakTime = Date.now();
}
/**
* 尝试将请求放入桶中
* @returns true 表示放入成功(请求被接受),false 表示桶满(请求被拒绝)
*/
tryAcquire(): boolean {
this.leak();
if (this.water < this.capacity) {
this.water += 1;
return true;
}
// 桶已满,拒绝请求
return false;
}
/**
* 模拟漏水过程:根据时间流逝减少桶中的水量
*/
private leak(): void {
const now = Date.now();
const elapsed = (now - this.lastLeakTime) / 1000; // 经过的秒数
// 计算这段时间内漏掉的水量
const leaked = elapsed * this.leakRate;
if (leaked > 0) {
this.water = Math.max(0, this.water - leaked);
this.lastLeakTime = now;
}
}
}
// 使用:桶容量 50,每秒处理 10 个请求
const bucket = new LeakyBucket(50, 10);
4. 令牌桶算法(Token Bucket)
令牌桶以固定速率向桶中放入令牌,请求到来时尝试取出一个令牌,取到则通过,取不到则拒绝。桶中可以积累令牌(最多到容量上限),因此允许一定程度的突发流量。
单机版实现:
class TokenBucket {
// 当前可用的令牌数
private tokens: number;
// 上次补充令牌的时间
private lastRefill: number;
constructor(
// 桶容量:最多积累多少个令牌
private capacity: number,
// 每秒产生的令牌数
private refillRate: number,
) {
// 初始时桶是满的
this.tokens = capacity;
this.lastRefill = Date.now();
}
/**
* 尝试消费一个令牌
* @param count 消费的令牌数,默认 1
* @returns true 表示获取成功,false 表示令牌不足
*/
tryConsume(count: number = 1): boolean {
// 先补充经过时间内应该产生的令牌
this.refill();
if (this.tokens >= count) {
this.tokens -= count;
return true;
}
return false;
}
/**
* 根据时间流逝补充令牌
* 使用懒计算:不需要定时器,每次请求时计算应补充的量
*/
private refill(): void {
const now = Date.now();
const elapsed = (now - this.lastRefill) / 1000;
// 计算这段时间产生的令牌数,但不超过桶容量
this.tokens = Math.min(
this.capacity,
this.tokens + elapsed * this.refillRate,
);
this.lastRefill = now;
}
/** 获取当前可用令牌数(用于监控) */
getAvailableTokens(): number {
this.refill();
return Math.floor(this.tokens);
}
}
Redis 分布式令牌桶(Lua 脚本保证原子性):
import Redis from 'ioredis';
const redis = new Redis();
// Lua 脚本:原子性地执行令牌桶逻辑
const TOKEN_BUCKET_SCRIPT = `
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
-- 获取当前状态
local data = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(data[1]) or capacity
local last_refill = tonumber(data[2]) or now
-- 计算需要补充的令牌
local elapsed = (now - last_refill) / 1000
tokens = math.min(capacity, tokens + elapsed * refill_rate)
-- 尝试消费
local allowed = 0
if tokens >= requested then
tokens = tokens - requested
allowed = 1
end
-- 更新状态
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 1)
return allowed
`;
async function tokenBucketAllow(
key: string,
capacity: number,
refillRate: number,
requested: number = 1,
): Promise<boolean> {
const result = await redis.eval(
TOKEN_BUCKET_SCRIPT,
1, // KEYS 数量
key, // KEYS[1]
capacity, // ARGV[1]
refillRate, // ARGV[2]
Date.now(), // ARGV[3]
requested, // ARGV[4]
);
return result === 1;
}
// 使用:每秒 50 个令牌,桶容量 100
const allowed = await tokenBucketAllow('bucket:api:/users', 100, 50);
四种算法对比
| 特性 | 固定窗口 | 滑动窗口 | 漏桶 | 令牌桶 |
|---|---|---|---|---|
| 实现复杂度 | ⭐ 最简单 | ⭐⭐ 中等 | ⭐⭐ 中等 | ⭐⭐⭐ 较高 |
| 内存开销 | 低(一个计数器) | 高(存储每个时间戳) | 低(一个计数器) | 低(两个变量) |
| 突发流量 | 边界处会突刺 | 平滑,无突刺 | 完全平滑 | 允许短暂突发 |
| 请求均匀度 | 低 | 中 | 高(强制恒速) | 中 |
| 分布式实现 | 简单(Redis INCR) | 中等(Redis ZSET) | 复杂 | 中等(Redis Lua) |
| 适用场景 | 简单计数场景 | API 限流、用户限流 | 消息队列消费 | 最常用,API 网关限流 |
令牌桶是实际生产中最常用的算法,因为它在限流的同时允许一定的突发流量,更符合真实业务场景。Google Guava 的 RateLimiter、Nginx 的 limit_req 模块底层都使用令牌桶(或其变体)。
限流粒度
限流需要根据业务场景选择合适的粒度,不同粒度解决不同问题:
| 粒度 | Key 示例 | 场景 |
|---|---|---|
| IP 限流 | rate:ip:192.168.1.1 | 防爬虫、防 DDoS |
| 用户限流 | rate:user:12345 | 防止单用户刷接口 |
| 接口限流 | rate:api:POST:/orders | 保护高消耗接口 |
| 用户+接口 | rate:user:123:api:/orders | 精细化控制 |
| 全局限流 | rate:global | 保护系统整体容量 |
NestJS 自定义限流装饰器:
import { SetMetadata } from '@nestjs/common';
// 限流配置接口
export interface RateLimitOptions {
// 限流粒度:ip、user、api、global
granularity: 'ip' | 'user' | 'api' | 'global';
// 窗口大小(秒)
windowSeconds: number;
// 窗口内最大请求数
maxRequests: number;
}
// 限流装饰器
export const RATE_LIMIT_KEY = 'RATE_LIMIT';
export const RateLimit = (options: RateLimitOptions) =>
SetMetadata(RATE_LIMIT_KEY, options);
import {
Injectable,
CanActivate,
ExecutionContext,
HttpException,
HttpStatus,
} from '@nestjs/common';
import { Reflector } from '@nestjs/core';
import Redis from 'ioredis';
import { RATE_LIMIT_KEY, RateLimitOptions } from './rate-limit.decorator';
@Injectable()
export class RateLimitGuard implements CanActivate {
private redis = new Redis();
constructor(private reflector: Reflector) {}
async canActivate(context: ExecutionContext): Promise<boolean> {
// 从装饰器元数据中获取限流配置
const options = this.reflector.get<RateLimitOptions>(
RATE_LIMIT_KEY,
context.getHandler(),
);
if (!options) return true; // 没有配置限流则放行
const request = context.switchToHttp().getRequest();
const key = this.buildKey(request, options, context);
const limited = await this.isRateLimited(
key,
options.maxRequests,
options.windowSeconds * 1000,
);
if (limited) {
throw new HttpException(
{
statusCode: HttpStatus.TOO_MANY_REQUESTS,
message: 'Too many requests, please try again later',
retryAfter: options.windowSeconds,
},
HttpStatus.TOO_MANY_REQUESTS,
);
}
return true;
}
/**
* 根据限流粒度构建 Redis key
*/
private buildKey(
request: any,
options: RateLimitOptions,
context: ExecutionContext,
): string {
const { granularity } = options;
switch (granularity) {
case 'ip':
return `rate:ip:${request.ip}`;
case 'user':
return `rate:user:${request.user?.id ?? 'anonymous'}`;
case 'api': {
const handler = context.getHandler().name;
return `rate:api:${request.method}:${handler}`;
}
case 'global':
return 'rate:global';
default:
return `rate:unknown:${request.ip}`;
}
}
/**
* 滑动窗口限流判断(Redis Sorted Set)
*/
private async isRateLimited(
key: string,
limit: number,
windowMs: number,
): Promise<boolean> {
const now = Date.now();
const pipeline = this.redis.pipeline();
pipeline.zremrangebyscore(key, 0, now - windowMs);
pipeline.zadd(key, now, `${now}-${Math.random().toString(36).slice(2)}`);
pipeline.zcard(key);
pipeline.expire(key, Math.ceil(windowMs / 1000));
const results = await pipeline.exec();
return (results?.[2]?.[1] as number) > limit;
}
}
import { Controller, Post, UseGuards } from '@nestjs/common';
import { RateLimit } from './rate-limit.decorator';
import { RateLimitGuard } from './rate-limit.guard';
@Controller('orders')
@UseGuards(RateLimitGuard)
export class OrdersController {
@RateLimit({ granularity: 'user', windowSeconds: 60, maxRequests: 10 })
@Post()
async createOrder() {
// 每个用户每分钟最多下 10 单
return { message: 'Order created' };
}
@RateLimit({ granularity: 'ip', windowSeconds: 1, maxRequests: 100 })
@Post('search')
async searchOrders() {
// 每个 IP 每秒最多 100 次搜索
return { message: 'Search results' };
}
}
NestJS 限流实战
使用 @nestjs/throttler
@nestjs/throttler 是 NestJS 官方的限流模块,内置固定窗口限流并支持 Redis 存储。
import { Module } from '@nestjs/common';
import { ThrottlerModule, ThrottlerGuard } from '@nestjs/throttler';
import { APP_GUARD } from '@nestjs/core';
@Module({
imports: [
ThrottlerModule.forRoot([
{
name: 'short', // 短期限制:每秒 3 次
ttl: 1000,
limit: 3,
},
{
name: 'medium', // 中期限制:每 10 秒 20 次
ttl: 10000,
limit: 20,
},
{
name: 'long', // 长期限制:每分钟 100 次
ttl: 60000,
limit: 100,
},
]),
],
providers: [
{
// 全局启用限流 Guard
provide: APP_GUARD,
useClass: ThrottlerGuard,
},
],
})
export class AppModule {}
import { Controller, Get } from '@nestjs/common';
import { SkipThrottle, Throttle } from '@nestjs/throttler';
@Controller('users')
export class UsersController {
// 使用全局默认限流配置
@Get()
findAll() {
return [];
}
// 对特定接口覆盖限流配置
@Throttle({ short: { ttl: 1000, limit: 1 } })
@Get('sensitive')
getSensitiveData() {
return { data: 'sensitive' };
}
// 跳过限流(如健康检查接口)
@SkipThrottle()
@Get('health')
healthCheck() {
return { status: 'ok' };
}
}
自定义 ThrottlerGuard(按用户限流)
默认的 ThrottlerGuard 按 IP 限流,但生产中常需要按用户维度限流:
import { Injectable, ExecutionContext } from '@nestjs/common';
import { ThrottlerGuard } from '@nestjs/throttler';
@Injectable()
export class CustomThrottlerGuard extends ThrottlerGuard {
/**
* 重写 getTracker 方法,按登录用户 ID 限流
* 未登录用户降级为按 IP 限流
*/
protected async getTracker(req: Record<string, any>): Promise<string> {
return req.user?.id ?? req.ip;
}
/**
* 可以在 handleRequest 中添加白名单逻辑
*/
protected async shouldSkip(context: ExecutionContext): Promise<boolean> {
const request = context.switchToHttp().getRequest();
// 内部服务调用不限流
if (request.headers['x-internal-service'] === 'true') {
return true;
}
return false;
}
}
Nginx 限流
Nginx 是最常用的网关层限流方案,提供两种限流指令:
limit_req_zone — 请求速率限流
基于漏桶算法,限制单位时间内的请求速率。
http {
# 定义限流区域
# $binary_remote_addr: 按客户端 IP 限流
# zone=api_limit:10m: 分配 10MB 共享内存(约 16 万个 IP)
# rate=10r/s: 每秒 10 个请求(即每 100ms 一个请求)
limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s;
# 按接口路径限流
limit_req_zone $binary_remote_addr zone=login_limit:5m rate=1r/s;
server {
listen 80;
location /api/ {
# 应用限流规则
# burst=20: 允许突发队列 20 个请求
# nodelay: 突发请求不排队,立即处理(超出仍拒绝)
limit_req zone=api_limit burst=20 nodelay;
# 自定义限流返回状态码(默认 503)
limit_req_status 429;
proxy_pass http://backend;
}
location /api/auth/login {
# 登录接口更严格:每秒 1 次,突发 5 次
limit_req zone=login_limit burst=5 nodelay;
limit_req_status 429;
proxy_pass http://backend;
}
}
}
limit_conn_zone — 并发连接数限流
限制同一来源的并发连接数量。
http {
# 按 IP 限制并发连接数
limit_conn_zone $binary_remote_addr zone=conn_limit:10m;
server {
location /api/ {
# 每个 IP 最多 50 个并发连接
limit_conn conn_limit 50;
limit_conn_status 429;
proxy_pass http://backend;
}
location /download/ {
# 下载接口限制更严格
limit_conn conn_limit 5;
# 限制每个连接的下载速率
limit_rate 500k; # 500KB/s
proxy_pass http://file_server;
}
}
}
burst 是突发队列大小。假设 rate=10r/s burst=20:
- 稳态下每秒放行 10 个请求
- 突发时可以多接受 20 个请求进入队列等待处理
- 加
nodelay:突发请求立即处理,但后续仍按 rate 恢复令牌 - 不加
nodelay:突发请求在队列中排队,逐个处理
熔断器模式
熔断器(Circuit Breaker)是微服务架构中的关键组件,当下游服务出现异常时,主动切断调用链,防止雪崩效应。
状态机
| 状态 | 行为 | 说明 |
|---|---|---|
| Closed(关闭) | 正常放行所有请求 | 持续统计失败率,达到阈值则转为 Open |
| Open(打开) | 所有请求直接失败(快速失败) | 不再调用下游服务,等待超时后转为 HalfOpen |
| HalfOpen(半开) | 放行少量探测请求 | 如果探测成功则恢复为 Closed,失败则回到 Open |
完整实现
enum CircuitState {
CLOSED = 'CLOSED',
OPEN = 'OPEN',
HALF_OPEN = 'HALF_OPEN',
}
interface CircuitBreakerOptions {
/** 失败率阈值(0-1),如 0.5 表示 50% */
failureRateThreshold: number;
/** 统计窗口内的最小请求数,低于此值不触发熔断 */
minimumRequestCount: number;
/** 熔断打开后多久进入半开状态(毫秒) */
resetTimeout: number;
/** 半开状态下允许的探测请求数 */
halfOpenMaxRequests: number;
/** 统计窗口大小(毫秒) */
statisticsWindowMs: number;
}
interface RequestResult {
timestamp: number;
success: boolean;
}
class CircuitBreaker {
private state = CircuitState.CLOSED;
// 请求结果记录(用于计算失败率)
private results: RequestResult[] = [];
// 熔断打开的时间
private openedAt = 0;
// 半开状态下已放行的探测请求数
private halfOpenRequestCount = 0;
private readonly options: CircuitBreakerOptions;
constructor(options?: Partial<CircuitBreakerOptions>) {
this.options = {
failureRateThreshold: 0.5, // 默认 50% 失败率触发熔断
minimumRequestCount: 10, // 至少 10 次请求才开始统计
resetTimeout: 30_000, // 30 秒后尝试恢复
halfOpenMaxRequests: 3, // 半开状态放行 3 个探测请求
statisticsWindowMs: 60_000, // 统计最近 60 秒的数据
...options,
};
}
/**
* 执行受熔断器保护的操作
*/
async execute<T>(fn: () => Promise<T>): Promise<T> {
// 检查是否允许执行
if (!this.allowRequest()) {
throw new CircuitBreakerError(
`Circuit breaker is ${this.state}`,
this.state,
);
}
try {
const result = await fn();
this.recordSuccess();
return result;
} catch (error) {
this.recordFailure();
throw error;
}
}
/**
* 判断当前是否允许请求通过
*/
private allowRequest(): boolean {
switch (this.state) {
case CircuitState.CLOSED:
return true;
case CircuitState.OPEN:
// 检查是否已过恢复时间
if (Date.now() - this.openedAt >= this.options.resetTimeout) {
this.transitionTo(CircuitState.HALF_OPEN);
this.halfOpenRequestCount = 0;
return true;
}
return false;
case CircuitState.HALF_OPEN:
// 半开状态只放行有限数量的探测请求
return this.halfOpenRequestCount < this.options.halfOpenMaxRequests;
}
}
/**
* 记录成功请求
*/
private recordSuccess(): void {
this.results.push({ timestamp: Date.now(), success: true });
this.cleanOldResults();
if (this.state === CircuitState.HALF_OPEN) {
this.halfOpenRequestCount++;
// 半开状态下,所有探测请求都成功才恢复
if (this.halfOpenRequestCount >= this.options.halfOpenMaxRequests) {
this.transitionTo(CircuitState.CLOSED);
}
}
}
/**
* 记录失败请求
*/
private recordFailure(): void {
this.results.push({ timestamp: Date.now(), success: false });
this.cleanOldResults();
if (this.state === CircuitState.HALF_OPEN) {
// 半开状态下任何一次失败都重新打开熔断
this.transitionTo(CircuitState.OPEN);
return;
}
// Closed 状态下检查是否需要打开熔断
if (this.state === CircuitState.CLOSED) {
this.checkFailureRate();
}
}
/**
* 检查失败率是否达到阈值
*/
private checkFailureRate(): void {
const windowResults = this.getWindowResults();
// 样本数不够,不触发熔断
if (windowResults.length < this.options.minimumRequestCount) {
return;
}
const failureCount = windowResults.filter((r) => !r.success).length;
const failureRate = failureCount / windowResults.length;
if (failureRate >= this.options.failureRateThreshold) {
this.transitionTo(CircuitState.OPEN);
}
}
/**
* 获取统计窗口内的结果
*/
private getWindowResults(): RequestResult[] {
const windowStart = Date.now() - this.options.statisticsWindowMs;
return this.results.filter((r) => r.timestamp >= windowStart);
}
/**
* 清理过期记录,避免内存泄漏
*/
private cleanOldResults(): void {
const windowStart = Date.now() - this.options.statisticsWindowMs;
this.results = this.results.filter((r) => r.timestamp >= windowStart);
}
/**
* 状态转换
*/
private transitionTo(newState: CircuitState): void {
const oldState = this.state;
this.state = newState;
if (newState === CircuitState.OPEN) {
this.openedAt = Date.now();
}
// 可以在这里添加日志、事件通知
console.log(`CircuitBreaker: ${oldState} → ${newState}`);
}
/** 获取当前状态(用于监控) */
getState(): CircuitState {
return this.state;
}
/** 获取当前失败率(用于监控) */
getFailureRate(): number {
const results = this.getWindowResults();
if (results.length === 0) return 0;
return results.filter((r) => !r.success).length / results.length;
}
}
/** 自定义熔断错误 */
class CircuitBreakerError extends Error {
constructor(
message: string,
public readonly circuitState: CircuitState,
) {
super(message);
this.name = 'CircuitBreakerError';
}
}
NestJS 熔断拦截器
import {
Injectable,
NestInterceptor,
ExecutionContext,
CallHandler,
HttpException,
HttpStatus,
} from '@nestjs/common';
import { Observable, throwError, of } from 'rxjs';
import { catchError } from 'rxjs/operators';
// 为每个服务维护独立的熔断器实例
const breakers = new Map<string, CircuitBreaker>();
function getBreaker(name: string): CircuitBreaker {
if (!breakers.has(name)) {
breakers.set(name, new CircuitBreaker({
failureRateThreshold: 0.5,
minimumRequestCount: 10,
resetTimeout: 30_000,
halfOpenMaxRequests: 3,
}));
}
return breakers.get(name)!;
}
@Injectable()
export class CircuitBreakerInterceptor implements NestInterceptor {
constructor(private readonly serviceName: string) {}
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
const breaker = getBreaker(this.serviceName);
if (breaker.getState() === 'OPEN') {
// 熔断状态直接返回降级响应
throw new HttpException(
{
message: `Service ${this.serviceName} is temporarily unavailable`,
fallback: true,
},
HttpStatus.SERVICE_UNAVAILABLE,
);
}
return next.handle().pipe(
catchError((error) => {
// 记录失败并重新抛出
breaker.execute(() => Promise.reject(error)).catch(() => {});
return throwError(() => error);
}),
);
}
}
降级策略详解
降级是在系统资源紧张或下游服务不可用时,有计划地关闭部分功能,确保核心链路可用。
多级降级
interface Product {
id: string;
name: string;
price: number;
}
// 静态兜底数据
const DEFAULT_PRODUCTS: Product[] = [
{ id: 'default-1', name: '暂无数据', price: 0 },
];
class ProductService {
constructor(
private circuitBreaker: CircuitBreaker,
private cache: CacheService,
) {}
/**
* 多级降级获取商品列表
* Level 0: 实时接口数据
* Level 1: Redis 缓存数据
* Level 2: 本地内存缓存
* Level 3: 静态默认数据
*/
async getProductList(): Promise<{
data: Product[];
source: 'live' | 'redis-cache' | 'memory-cache' | 'static';
}> {
// Level 0: 尝试调用实时接口
try {
const data = await this.circuitBreaker.execute(
() => this.fetchFromAPI(),
);
// 成功后更新缓存
await this.cache.set('products:list', data, 300); // 缓存 5 分钟
this.memoryCache = data;
return { data, source: 'live' };
} catch {
// 接口失败,进入降级链路
}
// Level 1: 尝试 Redis 缓存
try {
const cached = await this.cache.get<Product[]>('products:list');
if (cached) {
return { data: cached, source: 'redis-cache' };
}
} catch {
// Redis 也不可用
}
// Level 2: 本地内存缓存
if (this.memoryCache.length > 0) {
return { data: this.memoryCache, source: 'memory-cache' };
}
// Level 3: 静态默认数据
return { data: DEFAULT_PRODUCTS, source: 'static' };
}
private memoryCache: Product[] = [];
private async fetchFromAPI(): Promise<Product[]> {
// 调用下游服务 API
const response = await fetch('http://product-service/api/products');
if (!response.ok) throw new Error(`HTTP ${response.status}`);
return response.json();
}
}
基于 Feature Flag 的降级
interface FeatureFlags {
enableRecommendation: boolean; // 推荐系统
enableRealTimePrice: boolean; // 实时价格
enableComments: boolean; // 评论功能
enableNotification: boolean; // 通知推送
}
class FeatureFlagService {
private flags: FeatureFlags = {
enableRecommendation: true,
enableRealTimePrice: true,
enableComments: true,
enableNotification: true,
};
/**
* 根据系统负载自动降级非核心功能
* @param cpuUsage 当前 CPU 使用率(0-1)
*/
autoDegrade(cpuUsage: number): void {
if (cpuUsage > 0.9) {
// CPU > 90%: 关闭所有非核心功能
this.flags.enableNotification = false;
this.flags.enableComments = false;
this.flags.enableRecommendation = false;
console.warn('[降级] Level 3: 仅保留核心功能');
} else if (cpuUsage > 0.8) {
// CPU > 80%: 关闭推荐和通知
this.flags.enableNotification = false;
this.flags.enableRecommendation = false;
console.warn('[降级] Level 2: 关闭推荐和通知');
} else if (cpuUsage > 0.7) {
// CPU > 70%: 关闭通知
this.flags.enableNotification = false;
console.warn('[降级] Level 1: 关闭通知推送');
} else {
// 恢复所有功能
this.flags = {
enableRecommendation: true,
enableRealTimePrice: true,
enableComments: true,
enableNotification: true,
};
}
}
isEnabled(feature: keyof FeatureFlags): boolean {
return this.flags[feature];
}
}
背压(Backpressure)
背压是指当生产者的速度超过消费者的处理能力时,消费者向上游反馈"慢一点"的机制。它与限流不同:限流是主动拒绝请求,背压是让上游主动降速。
| 对比 | 限流 | 背压 |
|---|---|---|
| 方向 | 从外部限制进入的流量 | 从下游反馈给上游 |
| 行为 | 超限请求被拒绝 | 上游主动降低发送速率 |
| 场景 | API 网关、接口保护 | 流处理、消息队列、数据管道 |
Node.js Stream 背压
Node.js 的 Stream 内置了背压机制。当 writable stream 处理不过来时,write() 返回 false,通知上游暂停发送。
import { createReadStream, createWriteStream } from 'fs';
import { Transform, TransformCallback } from 'stream';
/**
* 自定义 Transform 流:模拟耗时的数据处理
* Stream 的背压机制会自动生效:
* - 当下游 write() 返回 false 时,Transform 会暂停从上游读取
* - 当下游 drain 事件触发时,恢复读取
*/
class SlowProcessor extends Transform {
_transform(
chunk: Buffer,
_encoding: string,
callback: TransformCallback,
): void {
// 模拟耗时处理
setTimeout(() => {
const processed = chunk.toString().toUpperCase();
callback(null, processed);
}, 100);
}
}
// 使用 pipe 自动处理背压
// pipe() 内部自动监听 drain 事件,无需手动管理
createReadStream('input.txt')
.pipe(new SlowProcessor())
.pipe(createWriteStream('output.txt'));
/**
* 手动处理背压(不使用 pipe 的场景)
*/
function manualBackpressure(): void {
const readable = createReadStream('large-file.txt');
const writable = createWriteStream('output.txt');
readable.on('data', (chunk: Buffer) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
// 下游处理不过来,暂停上游读取
readable.pause();
// 等下游消化完毕后恢复
writable.once('drain', () => {
readable.resume();
});
}
});
readable.on('end', () => writable.end());
}
基于队列的背压
在 API 服务中,可以用内存队列实现简易背压:
interface QueuedTask<T> {
execute: () => Promise<T>;
resolve: (value: T) => void;
reject: (error: Error) => void;
}
class BackpressureQueue {
private queue: QueuedTask<any>[] = [];
private running = 0;
constructor(
// 最大并发处理数
private maxConcurrency: number,
// 队列最大长度(超出则拒绝新任务)
private maxQueueSize: number,
) {}
/**
* 提交任务到队列
* 如果队列已满,抛出错误(产生背压)
*/
async submit<T>(fn: () => Promise<T>): Promise<T> {
if (this.queue.length >= this.maxQueueSize) {
// 队列满了,向上游施加背压
throw new Error('Queue is full, please slow down');
}
return new Promise<T>((resolve, reject) => {
this.queue.push({ execute: fn, resolve, reject });
this.processNext();
});
}
private async processNext(): Promise<void> {
if (this.running >= this.maxConcurrency || this.queue.length === 0) {
return;
}
const task = this.queue.shift()!;
this.running++;
try {
const result = await task.execute();
task.resolve(result);
} catch (error) {
task.reject(error as Error);
} finally {
this.running--;
this.processNext();
}
}
/** 当前队列长度(用于监控告警) */
getQueueSize(): number {
return this.queue.length;
}
}
// 使用:最大并发 10,队列最长 100
const queue = new BackpressureQueue(10, 100);
实际架构中的限流熔断
生产环境中,限流和熔断通常部署在多个层级,形成纵深防御:
| 层级 | 方案 | 限流维度 | 工具 |
|---|---|---|---|
| CDN/WAF | DDoS 防护、IP 黑名单 | IP、地域 | Cloudflare、阿里云 WAF |
| 网关层 | 速率限制、并发限制 | IP、全局 QPS | Nginx、Kong、APISIX |
| 应用层 | 细粒度限流 | 用户、接口、角色 | @nestjs/throttler、自定义 Guard |
| 服务间 | 熔断器、超时控制 | 服务维度 | CircuitBreaker、opossum |
| 数据层 | 连接池、慢查询熔断 | 连接数 | TypeORM pool、Redis maxclients |
单层限流是不够的。网关层防住外部攻击,应用层做细粒度控制,服务间做熔断保护。缺少任何一层,都可能导致雪崩效应:一个服务挂了,拖垮整条调用链。
常见面试问题
Q1: 限流应该放在哪一层?
答案:
生产环境中通常多层限流,每一层解决不同问题:
| 层级 | 职责 | 示例 |
|---|---|---|
| CDN/WAF | 防 DDoS、IP 黑名单 | Cloudflare Rate Limiting |
| Nginx 网关 | 粗粒度:IP 限流、全局 QPS | limit_req_zone、limit_conn_zone |
| API 网关 | 中粒度:按路由、按租户 | Kong、APISIX 限流插件 |
| 应用层 | 细粒度:按用户、按接口 | @nestjs/throttler、自定义 Guard |
| 数据库层 | 连接池本身就是限流 | max_connections、连接池 poolSize |
原则:越靠前的层越粗粒度,越靠后的层越细粒度。粗粒度限流挡住大部分恶意流量,细粒度限流做精准控制。
Q2: 令牌桶和漏桶的区别?
答案:
| 对比项 | 漏桶(Leaky Bucket) | 令牌桶(Token Bucket) |
|---|---|---|
| 核心思想 | 请求进桶排队,以恒定速率流出 | 令牌以恒定速率生成,请求需获取令牌 |
| 突发流量 | 不允许,强制恒速输出 | 允许,桶中积累的令牌可应对短暂峰值 |
| 处理速度 | 恒定 | 可变(有令牌时可瞬时处理) |
| 桶满行为 | 溢出的请求被丢弃 | 多余的令牌被丢弃(不影响请求) |
| 类比 | 水龙头以固定速率滴水 | ATM 取款,有余额才能取 |
| 适用场景 | 需要严格匀速的场景(如消息队列消费) | 需要允许一定突发的场景(如 API 网关) |
// 漏桶:无论请求怎么来,始终匀速处理
// 10:00:00 来 100 个请求 → 以 10/s 的速度处理,需要 10 秒
// 令牌桶:桶中有积累的令牌时,可以瞬间处理
// 桶中积累了 50 个令牌 → 50 个请求可以立即处理
// 之后恢复到 10/s 的速率
面试结论:令牌桶更常用,因为真实场景中流量总是有波动的,令牌桶能在限流的同时更好地应对合理的突发请求。
Q3: 返回 429 之后前端应该怎么做?
答案:
- 读取
Retry-After响应头,等待指定时间后重试:
// Retry-After 可以是秒数或日期
const retryAfter = response.headers.get('Retry-After');
const delayMs = Number(retryAfter) * 1000;
await new Promise((resolve) => setTimeout(resolve, delayMs));
- 指数退避(Exponential Backoff):如果没有
Retry-After,使用1s → 2s → 4s → 8s递增延迟 - 用户提示:显示"操作过于频繁,请稍后重试"等友好提示
- 请求队列:将被限流的请求放入队列,延迟重发
- 降级缓存:如果有本地缓存数据,先展示缓存,后台静默重试
参考:请求重试与超时策略
Q4: 熔断和降级的区别?
答案:
| 对比 | 熔断(Circuit Breaking) | 降级(Degradation) |
|---|---|---|
| 本质 | 一种自动化保护机制 | 一种有损的服务策略 |
| 触发条件 | 下游服务异常率达到阈值,自动触发 | 系统负载过高或故障时,可手动或自动触发 |
| 目标 | 防止故障扩散(雪崩效应) | 保障核心功能可用 |
| 关系 | 熔断是手段 | 降级是目的 |
两者通常配合使用:
- 熔断器检测到下游服务异常 → 熔断(切断调用)
- 熔断后执行降级逻辑(返回缓存/默认数据)
- 熔断器定时探测 → 恢复后取消降级
async function getRecommendations(userId: string): Promise<Product[]> {
try {
// 通过熔断器调用推荐服务
return await breaker.execute(() => recommendService.get(userId));
} catch (error) {
// 熔断后降级:返回热门商品
return await getHotProducts();
}
}
Q5: 分布式限流怎么实现?
答案:
分布式限流的核心挑战是多个服务实例共享同一个计数器。最常用的方案是 Redis + Lua 脚本,Lua 脚本在 Redis 中原子执行,避免竞态条件。
滑动窗口 Lua 脚本(生产推荐):
import Redis from 'ioredis';
const redis = new Redis();
/**
* 原子性滑动窗口限流 Lua 脚本
* 所有操作在 Redis 单线程中原子执行,无竞态问题
*/
const SLIDING_WINDOW_SCRIPT = `
local key = KEYS[1]
local window = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
-- 移除窗口外的记录
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
-- 获取当前窗口内的请求数
local count = redis.call('ZCARD', key)
if count < limit then
-- 未超限,记录本次请求
redis.call('ZADD', key, now, now .. '-' .. math.random(1000000))
redis.call('PEXPIRE', key, window)
return 0 -- 允许
else
return 1 -- 拒绝
end
`;
async function distributedRateLimit(
key: string,
limit: number,
windowMs: number,
): Promise<boolean> {
const result = await redis.eval(
SLIDING_WINDOW_SCRIPT,
1,
key,
windowMs,
limit,
Date.now(),
);
return result === 0; // true = 允许, false = 限流
}
// 使用示例:全局 API 限流,每秒 1000 次
const allowed = await distributedRateLimit('rate:global:api', 1000, 1000);
Pipeline 将多个命令打包发送,但它们不是原子执行的。在极端并发下,两个请求可能同时读到 count=99(限制 100),然后都通过,实际通过 101 个。Lua 脚本在 Redis 单线程中原子执行,不会有这个问题。
Q6: 如何确定限流阈值?
答案:
限流阈值不能拍脑袋决定,需要基于数据来设定:
1. 压测确定系统容量
# 使用 wrk 进行压力测试
wrk -t12 -c400 -d30s http://localhost:3000/api/products
找到系统的最大 QPS(在响应时间和错误率可接受的范围内),然后将限流阈值设为最大 QPS 的 70%-80%,留出余量。
2. 分级设定
| 接口类型 | 限流建议 | 示例 |
|---|---|---|
| 读接口 | 较宽松 | 100 次/秒/用户 |
| 写接口 | 较严格 | 10 次/秒/用户 |
| 登录/注册 | 严格 | 5 次/分钟/IP |
| 短信验证码 | 非常严格 | 1 次/分钟/手机号 |
| 支付 | 非常严格 | 1 次/秒/用户 |
3. 动态调整
- 监控实际流量模式,根据历史数据调整
- 大促期间临时调高阈值
- 根据用户等级差异化限流(VIP 用户更宽松)
Q7: 限流后怎么做优雅降级?
答案:
被限流不等于什么都不返回,应该提供有意义的降级响应:
import { ExceptionFilter, Catch, ArgumentsHost, HttpStatus } from '@nestjs/common';
import { ThrottlerException } from '@nestjs/throttler';
@Catch(ThrottlerException)
export class ThrottlerExceptionFilter implements ExceptionFilter {
catch(exception: ThrottlerException, host: ArgumentsHost): void {
const ctx = host.switchToHttp();
const response = ctx.getResponse();
const request = ctx.getRequest();
response.status(HttpStatus.TOO_MANY_REQUESTS).json({
statusCode: 429,
message: '请求过于频繁,请稍后重试',
// 告诉客户端多久后可以重试
retryAfter: 60,
// 如果有缓存数据,附带降级数据
cachedData: this.getCachedResponse(request.url),
timestamp: new Date().toISOString(),
});
// 设置标准的 Retry-After 响应头
response.header('Retry-After', '60');
}
private getCachedResponse(url: string): any {
// 从缓存中获取该接口的最近一次正常响应
return null; // 实际从 Redis/内存缓存获取
}
}
降级优先级:
- 返回缓存数据 + 标记数据时间,让用户知道数据可能不是最新的
- 返回简化数据,如列表接口只返回前 10 条
- 排队提示,告诉用户当前排队位置和预计等待时间
- 引导分流,引导用户去访问其他低负载页面
Q8: 熔断器的三种状态转换条件是什么?
答案:
Closed → Open:
条件:统计窗口内失败率 >= 阈值(如 50%),且请求数 >= 最小样本数(如 10)
行为:所有请求直接快速失败,不再调用下游
Open → HalfOpen:
条件:熔断打开后经过 resetTimeout(如 30 秒)
行为:允许少量探测请求通过
HalfOpen → Closed:
条件:探测请求全部成功(或成功率达标)
行为:恢复正常,所有请求放行
HalfOpen → Open:
条件:探测请求中有任何一个失败
行为:重新熔断,重新等待 resetTimeout
关键参数:
| 参数 | 含义 | 推荐值 |
|---|---|---|
failureRateThreshold | 触发熔断的失败率 | 50% |
minimumRequestCount | 最小样本数 | 10-20 |
resetTimeout | Open → HalfOpen 的等待时间 | 10-60 秒 |
halfOpenMaxRequests | 半开状态允许的探测请求数 | 3-5 |
如果没有最小样本数限制,第一个请求失败就是 100% 失败率,会立即触发熔断。设置 minimumRequestCount 可以避免因少量请求波动导致的误熔断。
Q9: Node.js 中有哪些限流方案?
答案:
| 方案 | 类型 | 特点 |
|---|---|---|
@nestjs/throttler | NestJS 模块 | 官方出品,支持多种存储后端 |
rate-limiter-flexible | 通用库 | 支持 Redis/Mongo/内存,多种算法 |
express-rate-limit | Express 中间件 | 简单易用,适合小项目 |
opossum | 熔断器 | Netflix Hystrix 的 Node.js 版 |
bottleneck | 调度器 | 任务调度+限流,支持 Redis 集群 |
Nginx limit_req | 网关层 | 高性能,适合粗粒度限流 |
| Redis + Lua | 自定义 | 最灵活,适合分布式场景 |
选型建议:
- NestJS 项目 →
@nestjs/throttler+ 自定义 Guard - Express 项目 →
express-rate-limit+rate-limiter-flexible - 需要熔断 →
opossum - 分布式场景 → Redis + Lua 自定义
- 网关层 → Nginx
limit_req_zone
Q10: 如何对 WebSocket 连接做限流?
答案:
WebSocket 限流需要考虑两个维度:连接数限流和消息频率限流。
import { WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
@WebSocketGateway()
export class ChatGateway {
@WebSocketServer()
server!: Server;
// 每个用户的消息时间戳记录
private messageTimestamps = new Map<string, number[]>();
/**
* 连接数限流:在 connection 事件中检查
*/
handleConnection(client: Socket): void {
const userId = client.handshake.auth.userId as string;
// 1. 单用户最多 3 个并发连接
const userSockets = this.getUserSockets(userId);
if (userSockets.length >= 3) {
client.emit('error', { message: 'Too many connections' });
client.disconnect();
return;
}
// 2. 全局最大连接数
if (this.server.sockets.sockets.size >= 10000) {
client.emit('error', { message: 'Server is full' });
client.disconnect();
return;
}
}
/**
* 消息频率限流:滑动窗口
*/
handleMessage(client: Socket, data: any): void {
const userId = client.handshake.auth.userId as string;
if (!this.checkMessageRate(userId, 10, 1000)) {
// 每秒最多 10 条消息
client.emit('rate_limited', {
message: '发送过于频繁,请稍后再试',
retryAfter: 1000,
});
return;
}
// 正常处理消息...
this.server.emit('message', data);
}
private checkMessageRate(
userId: string,
limit: number,
windowMs: number,
): boolean {
const now = Date.now();
const timestamps = this.messageTimestamps.get(userId) ?? [];
// 清理过期记录
const valid = timestamps.filter((t) => t > now - windowMs);
valid.push(now);
this.messageTimestamps.set(userId, valid);
return valid.length <= limit;
}
private getUserSockets(userId: string): Socket[] {
const sockets: Socket[] = [];
this.server.sockets.sockets.forEach((socket) => {
if (socket.handshake.auth.userId === userId) {
sockets.push(socket);
}
});
return sockets;
}
}
WebSocket 限流的注意点:
- 连接阶段限制并发连接数(防止连接耗尽)
- 消息阶段限制发送频率(防止消息洪水)
- 被限流时不要断开连接,发送警告让客户端降速
- 对于恶意用户(持续超限),可以临时封禁
相关链接
- 前端容错与降级策略 - 前端降级实践
- 请求重试与超时策略 - 前端重试与退避策略
- Redis 数据结构与应用 - Redis 在限流中的应用
- NestJS 框架深入 - NestJS Guard、Interceptor 机制
- 健康检查与优雅停机 - 服务健康检测与优雅关闭
- Nginx 配置与部署 - Nginx 限流与负载均衡配置
- 微服务基础 - 微服务架构中的熔断模式
- MDN - HTTP 429 状态码
- Nginx limit_req 文档
- Netflix Hystrix Wiki - 熔断器设计思想