跳到主要内容

限流与熔断

问题

什么是限流、熔断和降级?有哪些常见限流算法?如何在 Node.js / NestJS 中实现?分布式场景下如何做限流?

答案

在高并发系统中,限流熔断降级是保障服务稳定性的三大核心手段。它们通常配合使用,共同构成系统的容错防护体系

概念定义类比
限流控制单位时间内的请求数量,超出部分拒绝或排队高速公路入口匝道控制
熔断当下游服务异常时,主动断开调用,快速失败电路保险丝
降级当资源紧张时,关闭非核心功能,保障核心链路飞机紧急情况下关闭娱乐系统

限流算法

1. 固定窗口计数器(Fixed Window Counter)

将时间划分为固定大小的窗口(如每秒、每分钟),在每个窗口内维护一个计数器,请求到来时计数器加一,超过阈值则拒绝。

fixed-window.ts
class FixedWindowLimiter {
// 当前窗口的请求计数
private count = 0;
// 当前窗口的起始时间
private windowStart: number;

constructor(
// 窗口大小(毫秒),如 60000 表示 1 分钟
private windowMs: number,
// 窗口内最大请求数
private maxRequests: number,
) {
this.windowStart = Date.now();
}

tryAcquire(): boolean {
const now = Date.now();

// 如果当前时间已经超出窗口范围,重置计数器
if (now - this.windowStart >= this.windowMs) {
this.count = 0;
this.windowStart = now;
}

// 判断是否超过阈值
if (this.count < this.maxRequests) {
this.count++;
return true; // 放行
}

return false; // 限流
}
}

// 使用示例:每秒最多 100 个请求
const limiter = new FixedWindowLimiter(1000, 100);
边界突刺问题

固定窗口的最大缺陷是边界突刺:假设限制每秒 100 个请求,用户在第 1 秒的最后 100ms 发送 100 个请求,又在第 2 秒的最初 100ms 发送 100 个请求,那么在 200ms 内实际通过了 200 个请求,是阈值的 2 倍。

2. 滑动窗口(Sliding Window)

滑动窗口通过记录每个请求的精确时间戳,实时计算最近一段时间内的请求数,避免了固定窗口的边界突刺问题。

方案 A:内存实现(单机)

sliding-window-memory.ts
class SlidingWindowLimiter {
// 存储每个请求的时间戳
private timestamps: number[] = [];

constructor(
private windowMs: number, // 窗口大小(毫秒)
private maxRequests: number, // 窗口内最大请求数
) {}

tryAcquire(): boolean {
const now = Date.now();
const windowStart = now - this.windowMs;

// 移除窗口之外的旧记录(从头部移除)
while (this.timestamps.length > 0 && this.timestamps[0] <= windowStart) {
this.timestamps.shift();
}

// 判断当前窗口内的请求数量
if (this.timestamps.length < this.maxRequests) {
this.timestamps.push(now);
return true;
}

return false;
}
}

方案 B:Redis Sorted Set 实现(分布式)

利用 Redis 有序集合,以时间戳作为 score,天然支持范围查询和过期清理。

sliding-window-redis.ts
import Redis from 'ioredis';

const redis = new Redis();

/**
* 基于 Redis Sorted Set 的滑动窗口限流
* @param key 限流标识(如 rate:user:123 或 rate:ip:1.2.3.4)
* @param limit 窗口内最大请求数
* @param windowMs 窗口大小(毫秒)
* @returns 是否被限流(true = 被限流)
*/
async function isRateLimited(
key: string,
limit: number,
windowMs: number,
): Promise<boolean> {
const now = Date.now();
const windowStart = now - windowMs;

// 使用 pipeline 批量执行,减少网络往返
const pipeline = redis.pipeline();

// 1. 删除窗口之外的过期记录
pipeline.zremrangebyscore(key, 0, windowStart);

// 2. 将当前请求加入集合(member 需唯一,加入随机数防冲突)
pipeline.zadd(key, now, `${now}-${Math.random().toString(36).slice(2)}`);

// 3. 统计当前窗口内的请求数
pipeline.zcard(key);

// 4. 设置 key 的过期时间,避免冷 key 永驻内存
pipeline.expire(key, Math.ceil(windowMs / 1000));

const results = await pipeline.exec();
const count = results?.[2]?.[1] as number;

return count > limit;
}

// 使用示例
async function handleRequest(userId: string): Promise<void> {
const limited = await isRateLimited(`rate:user:${userId}`, 100, 60000);
if (limited) {
throw new Error('Rate limit exceeded');
}
// 正常处理请求...
}
Redis Lua 脚本原子性

上面的 pipeline 方案虽然性能好,但在极端并发下可能有微小的竞态问题。生产环境推荐使用 Lua 脚本保证原子性,详见 Q5: 分布式限流怎么实现

3. 漏桶算法(Leaky Bucket)

漏桶以固定速率处理请求。请求先进入桶中排队,桶以恒定速率"漏水"(处理请求)。如果桶满了,新请求被丢弃。

leaky-bucket.ts
class LeakyBucket {
// 当前桶中的水量(待处理的请求数)
private water = 0;
// 上次漏水的时间
private lastLeakTime: number;

constructor(
// 桶容量:最多积压多少请求
private capacity: number,
// 漏水速率:每秒处理多少个请求
private leakRate: number,
) {
this.lastLeakTime = Date.now();
}

/**
* 尝试将请求放入桶中
* @returns true 表示放入成功(请求被接受),false 表示桶满(请求被拒绝)
*/
tryAcquire(): boolean {
this.leak();

if (this.water < this.capacity) {
this.water += 1;
return true;
}

// 桶已满,拒绝请求
return false;
}

/**
* 模拟漏水过程:根据时间流逝减少桶中的水量
*/
private leak(): void {
const now = Date.now();
const elapsed = (now - this.lastLeakTime) / 1000; // 经过的秒数
// 计算这段时间内漏掉的水量
const leaked = elapsed * this.leakRate;

if (leaked > 0) {
this.water = Math.max(0, this.water - leaked);
this.lastLeakTime = now;
}
}
}

// 使用:桶容量 50,每秒处理 10 个请求
const bucket = new LeakyBucket(50, 10);

4. 令牌桶算法(Token Bucket)

令牌桶以固定速率向桶中放入令牌,请求到来时尝试取出一个令牌,取到则通过,取不到则拒绝。桶中可以积累令牌(最多到容量上限),因此允许一定程度的突发流量

单机版实现:

token-bucket.ts
class TokenBucket {
// 当前可用的令牌数
private tokens: number;
// 上次补充令牌的时间
private lastRefill: number;

constructor(
// 桶容量:最多积累多少个令牌
private capacity: number,
// 每秒产生的令牌数
private refillRate: number,
) {
// 初始时桶是满的
this.tokens = capacity;
this.lastRefill = Date.now();
}

/**
* 尝试消费一个令牌
* @param count 消费的令牌数,默认 1
* @returns true 表示获取成功,false 表示令牌不足
*/
tryConsume(count: number = 1): boolean {
// 先补充经过时间内应该产生的令牌
this.refill();

if (this.tokens >= count) {
this.tokens -= count;
return true;
}

return false;
}

/**
* 根据时间流逝补充令牌
* 使用懒计算:不需要定时器,每次请求时计算应补充的量
*/
private refill(): void {
const now = Date.now();
const elapsed = (now - this.lastRefill) / 1000;
// 计算这段时间产生的令牌数,但不超过桶容量
this.tokens = Math.min(
this.capacity,
this.tokens + elapsed * this.refillRate,
);
this.lastRefill = now;
}

/** 获取当前可用令牌数(用于监控) */
getAvailableTokens(): number {
this.refill();
return Math.floor(this.tokens);
}
}

Redis 分布式令牌桶(Lua 脚本保证原子性):

token-bucket-redis.ts
import Redis from 'ioredis';

const redis = new Redis();

// Lua 脚本:原子性地执行令牌桶逻辑
const TOKEN_BUCKET_SCRIPT = `
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

-- 获取当前状态
local data = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(data[1]) or capacity
local last_refill = tonumber(data[2]) or now

-- 计算需要补充的令牌
local elapsed = (now - last_refill) / 1000
tokens = math.min(capacity, tokens + elapsed * refill_rate)

-- 尝试消费
local allowed = 0
if tokens >= requested then
tokens = tokens - requested
allowed = 1
end

-- 更新状态
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 1)

return allowed
`;

async function tokenBucketAllow(
key: string,
capacity: number,
refillRate: number,
requested: number = 1,
): Promise<boolean> {
const result = await redis.eval(
TOKEN_BUCKET_SCRIPT,
1, // KEYS 数量
key, // KEYS[1]
capacity, // ARGV[1]
refillRate, // ARGV[2]
Date.now(), // ARGV[3]
requested, // ARGV[4]
);
return result === 1;
}

// 使用:每秒 50 个令牌,桶容量 100
const allowed = await tokenBucketAllow('bucket:api:/users', 100, 50);

四种算法对比

特性固定窗口滑动窗口漏桶令牌桶
实现复杂度⭐ 最简单⭐⭐ 中等⭐⭐ 中等⭐⭐⭐ 较高
内存开销低(一个计数器)高(存储每个时间戳)低(一个计数器)低(两个变量)
突发流量边界处会突刺平滑,无突刺完全平滑允许短暂突发
请求均匀度高(强制恒速)
分布式实现简单(Redis INCR)中等(Redis ZSET)复杂中等(Redis Lua)
适用场景简单计数场景API 限流、用户限流消息队列消费最常用,API 网关限流
面试重点

令牌桶是实际生产中最常用的算法,因为它在限流的同时允许一定的突发流量,更符合真实业务场景。Google Guava 的 RateLimiter、Nginx 的 limit_req 模块底层都使用令牌桶(或其变体)。


限流粒度

限流需要根据业务场景选择合适的粒度,不同粒度解决不同问题:

粒度Key 示例场景
IP 限流rate:ip:192.168.1.1防爬虫、防 DDoS
用户限流rate:user:12345防止单用户刷接口
接口限流rate:api:POST:/orders保护高消耗接口
用户+接口rate:user:123:api:/orders精细化控制
全局限流rate:global保护系统整体容量

NestJS 自定义限流装饰器:

rate-limit.decorator.ts
import { SetMetadata } from '@nestjs/common';

// 限流配置接口
export interface RateLimitOptions {
// 限流粒度:ip、user、api、global
granularity: 'ip' | 'user' | 'api' | 'global';
// 窗口大小(秒)
windowSeconds: number;
// 窗口内最大请求数
maxRequests: number;
}

// 限流装饰器
export const RATE_LIMIT_KEY = 'RATE_LIMIT';
export const RateLimit = (options: RateLimitOptions) =>
SetMetadata(RATE_LIMIT_KEY, options);
rate-limit.guard.ts
import {
Injectable,
CanActivate,
ExecutionContext,
HttpException,
HttpStatus,
} from '@nestjs/common';
import { Reflector } from '@nestjs/core';
import Redis from 'ioredis';
import { RATE_LIMIT_KEY, RateLimitOptions } from './rate-limit.decorator';

@Injectable()
export class RateLimitGuard implements CanActivate {
private redis = new Redis();

constructor(private reflector: Reflector) {}

async canActivate(context: ExecutionContext): Promise<boolean> {
// 从装饰器元数据中获取限流配置
const options = this.reflector.get<RateLimitOptions>(
RATE_LIMIT_KEY,
context.getHandler(),
);

if (!options) return true; // 没有配置限流则放行

const request = context.switchToHttp().getRequest();
const key = this.buildKey(request, options, context);

const limited = await this.isRateLimited(
key,
options.maxRequests,
options.windowSeconds * 1000,
);

if (limited) {
throw new HttpException(
{
statusCode: HttpStatus.TOO_MANY_REQUESTS,
message: 'Too many requests, please try again later',
retryAfter: options.windowSeconds,
},
HttpStatus.TOO_MANY_REQUESTS,
);
}

return true;
}

/**
* 根据限流粒度构建 Redis key
*/
private buildKey(
request: any,
options: RateLimitOptions,
context: ExecutionContext,
): string {
const { granularity } = options;

switch (granularity) {
case 'ip':
return `rate:ip:${request.ip}`;
case 'user':
return `rate:user:${request.user?.id ?? 'anonymous'}`;
case 'api': {
const handler = context.getHandler().name;
return `rate:api:${request.method}:${handler}`;
}
case 'global':
return 'rate:global';
default:
return `rate:unknown:${request.ip}`;
}
}

/**
* 滑动窗口限流判断(Redis Sorted Set)
*/
private async isRateLimited(
key: string,
limit: number,
windowMs: number,
): Promise<boolean> {
const now = Date.now();
const pipeline = this.redis.pipeline();
pipeline.zremrangebyscore(key, 0, now - windowMs);
pipeline.zadd(key, now, `${now}-${Math.random().toString(36).slice(2)}`);
pipeline.zcard(key);
pipeline.expire(key, Math.ceil(windowMs / 1000));
const results = await pipeline.exec();
return (results?.[2]?.[1] as number) > limit;
}
}
orders.controller.ts
import { Controller, Post, UseGuards } from '@nestjs/common';
import { RateLimit } from './rate-limit.decorator';
import { RateLimitGuard } from './rate-limit.guard';

@Controller('orders')
@UseGuards(RateLimitGuard)
export class OrdersController {
@RateLimit({ granularity: 'user', windowSeconds: 60, maxRequests: 10 })
@Post()
async createOrder() {
// 每个用户每分钟最多下 10 单
return { message: 'Order created' };
}

@RateLimit({ granularity: 'ip', windowSeconds: 1, maxRequests: 100 })
@Post('search')
async searchOrders() {
// 每个 IP 每秒最多 100 次搜索
return { message: 'Search results' };
}
}

NestJS 限流实战

使用 @nestjs/throttler

@nestjs/throttler 是 NestJS 官方的限流模块,内置固定窗口限流并支持 Redis 存储。

app.module.ts
import { Module } from '@nestjs/common';
import { ThrottlerModule, ThrottlerGuard } from '@nestjs/throttler';
import { APP_GUARD } from '@nestjs/core';

@Module({
imports: [
ThrottlerModule.forRoot([
{
name: 'short', // 短期限制:每秒 3 次
ttl: 1000,
limit: 3,
},
{
name: 'medium', // 中期限制:每 10 秒 20 次
ttl: 10000,
limit: 20,
},
{
name: 'long', // 长期限制:每分钟 100 次
ttl: 60000,
limit: 100,
},
]),
],
providers: [
{
// 全局启用限流 Guard
provide: APP_GUARD,
useClass: ThrottlerGuard,
},
],
})
export class AppModule {}
users.controller.ts
import { Controller, Get } from '@nestjs/common';
import { SkipThrottle, Throttle } from '@nestjs/throttler';

@Controller('users')
export class UsersController {
// 使用全局默认限流配置
@Get()
findAll() {
return [];
}

// 对特定接口覆盖限流配置
@Throttle({ short: { ttl: 1000, limit: 1 } })
@Get('sensitive')
getSensitiveData() {
return { data: 'sensitive' };
}

// 跳过限流(如健康检查接口)
@SkipThrottle()
@Get('health')
healthCheck() {
return { status: 'ok' };
}
}

自定义 ThrottlerGuard(按用户限流)

默认的 ThrottlerGuard 按 IP 限流,但生产中常需要按用户维度限流:

custom-throttler.guard.ts
import { Injectable, ExecutionContext } from '@nestjs/common';
import { ThrottlerGuard } from '@nestjs/throttler';

@Injectable()
export class CustomThrottlerGuard extends ThrottlerGuard {
/**
* 重写 getTracker 方法,按登录用户 ID 限流
* 未登录用户降级为按 IP 限流
*/
protected async getTracker(req: Record<string, any>): Promise<string> {
return req.user?.id ?? req.ip;
}

/**
* 可以在 handleRequest 中添加白名单逻辑
*/
protected async shouldSkip(context: ExecutionContext): Promise<boolean> {
const request = context.switchToHttp().getRequest();
// 内部服务调用不限流
if (request.headers['x-internal-service'] === 'true') {
return true;
}
return false;
}
}

Nginx 限流

Nginx 是最常用的网关层限流方案,提供两种限流指令:

limit_req_zone — 请求速率限流

基于漏桶算法,限制单位时间内的请求速率。

nginx.conf
http {
# 定义限流区域
# $binary_remote_addr: 按客户端 IP 限流
# zone=api_limit:10m: 分配 10MB 共享内存(约 16 万个 IP)
# rate=10r/s: 每秒 10 个请求(即每 100ms 一个请求)
limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s;

# 按接口路径限流
limit_req_zone $binary_remote_addr zone=login_limit:5m rate=1r/s;

server {
listen 80;

location /api/ {
# 应用限流规则
# burst=20: 允许突发队列 20 个请求
# nodelay: 突发请求不排队,立即处理(超出仍拒绝)
limit_req zone=api_limit burst=20 nodelay;

# 自定义限流返回状态码(默认 503)
limit_req_status 429;

proxy_pass http://backend;
}

location /api/auth/login {
# 登录接口更严格:每秒 1 次,突发 5 次
limit_req zone=login_limit burst=5 nodelay;
limit_req_status 429;

proxy_pass http://backend;
}
}
}

limit_conn_zone — 并发连接数限流

限制同一来源的并发连接数量。

nginx-conn.conf
http {
# 按 IP 限制并发连接数
limit_conn_zone $binary_remote_addr zone=conn_limit:10m;

server {
location /api/ {
# 每个 IP 最多 50 个并发连接
limit_conn conn_limit 50;
limit_conn_status 429;

proxy_pass http://backend;
}

location /download/ {
# 下载接口限制更严格
limit_conn conn_limit 5;

# 限制每个连接的下载速率
limit_rate 500k; # 500KB/s

proxy_pass http://file_server;
}
}
}
burst 参数说明

burst 是突发队列大小。假设 rate=10r/s burst=20

  • 稳态下每秒放行 10 个请求
  • 突发时可以多接受 20 个请求进入队列等待处理
  • nodelay:突发请求立即处理,但后续仍按 rate 恢复令牌
  • 不加 nodelay:突发请求在队列中排队,逐个处理

熔断器模式

熔断器(Circuit Breaker)是微服务架构中的关键组件,当下游服务出现异常时,主动切断调用链,防止雪崩效应

状态机

状态行为说明
Closed(关闭)正常放行所有请求持续统计失败率,达到阈值则转为 Open
Open(打开)所有请求直接失败(快速失败)不再调用下游服务,等待超时后转为 HalfOpen
HalfOpen(半开)放行少量探测请求如果探测成功则恢复为 Closed,失败则回到 Open

完整实现

circuit-breaker.ts
enum CircuitState {
CLOSED = 'CLOSED',
OPEN = 'OPEN',
HALF_OPEN = 'HALF_OPEN',
}

interface CircuitBreakerOptions {
/** 失败率阈值(0-1),如 0.5 表示 50% */
failureRateThreshold: number;
/** 统计窗口内的最小请求数,低于此值不触发熔断 */
minimumRequestCount: number;
/** 熔断打开后多久进入半开状态(毫秒) */
resetTimeout: number;
/** 半开状态下允许的探测请求数 */
halfOpenMaxRequests: number;
/** 统计窗口大小(毫秒) */
statisticsWindowMs: number;
}

interface RequestResult {
timestamp: number;
success: boolean;
}

class CircuitBreaker {
private state = CircuitState.CLOSED;
// 请求结果记录(用于计算失败率)
private results: RequestResult[] = [];
// 熔断打开的时间
private openedAt = 0;
// 半开状态下已放行的探测请求数
private halfOpenRequestCount = 0;

private readonly options: CircuitBreakerOptions;

constructor(options?: Partial<CircuitBreakerOptions>) {
this.options = {
failureRateThreshold: 0.5, // 默认 50% 失败率触发熔断
minimumRequestCount: 10, // 至少 10 次请求才开始统计
resetTimeout: 30_000, // 30 秒后尝试恢复
halfOpenMaxRequests: 3, // 半开状态放行 3 个探测请求
statisticsWindowMs: 60_000, // 统计最近 60 秒的数据
...options,
};
}

/**
* 执行受熔断器保护的操作
*/
async execute<T>(fn: () => Promise<T>): Promise<T> {
// 检查是否允许执行
if (!this.allowRequest()) {
throw new CircuitBreakerError(
`Circuit breaker is ${this.state}`,
this.state,
);
}

try {
const result = await fn();
this.recordSuccess();
return result;
} catch (error) {
this.recordFailure();
throw error;
}
}

/**
* 判断当前是否允许请求通过
*/
private allowRequest(): boolean {
switch (this.state) {
case CircuitState.CLOSED:
return true;

case CircuitState.OPEN:
// 检查是否已过恢复时间
if (Date.now() - this.openedAt >= this.options.resetTimeout) {
this.transitionTo(CircuitState.HALF_OPEN);
this.halfOpenRequestCount = 0;
return true;
}
return false;

case CircuitState.HALF_OPEN:
// 半开状态只放行有限数量的探测请求
return this.halfOpenRequestCount < this.options.halfOpenMaxRequests;
}
}

/**
* 记录成功请求
*/
private recordSuccess(): void {
this.results.push({ timestamp: Date.now(), success: true });
this.cleanOldResults();

if (this.state === CircuitState.HALF_OPEN) {
this.halfOpenRequestCount++;
// 半开状态下,所有探测请求都成功才恢复
if (this.halfOpenRequestCount >= this.options.halfOpenMaxRequests) {
this.transitionTo(CircuitState.CLOSED);
}
}
}

/**
* 记录失败请求
*/
private recordFailure(): void {
this.results.push({ timestamp: Date.now(), success: false });
this.cleanOldResults();

if (this.state === CircuitState.HALF_OPEN) {
// 半开状态下任何一次失败都重新打开熔断
this.transitionTo(CircuitState.OPEN);
return;
}

// Closed 状态下检查是否需要打开熔断
if (this.state === CircuitState.CLOSED) {
this.checkFailureRate();
}
}

/**
* 检查失败率是否达到阈值
*/
private checkFailureRate(): void {
const windowResults = this.getWindowResults();

// 样本数不够,不触发熔断
if (windowResults.length < this.options.minimumRequestCount) {
return;
}

const failureCount = windowResults.filter((r) => !r.success).length;
const failureRate = failureCount / windowResults.length;

if (failureRate >= this.options.failureRateThreshold) {
this.transitionTo(CircuitState.OPEN);
}
}

/**
* 获取统计窗口内的结果
*/
private getWindowResults(): RequestResult[] {
const windowStart = Date.now() - this.options.statisticsWindowMs;
return this.results.filter((r) => r.timestamp >= windowStart);
}

/**
* 清理过期记录,避免内存泄漏
*/
private cleanOldResults(): void {
const windowStart = Date.now() - this.options.statisticsWindowMs;
this.results = this.results.filter((r) => r.timestamp >= windowStart);
}

/**
* 状态转换
*/
private transitionTo(newState: CircuitState): void {
const oldState = this.state;
this.state = newState;

if (newState === CircuitState.OPEN) {
this.openedAt = Date.now();
}

// 可以在这里添加日志、事件通知
console.log(`CircuitBreaker: ${oldState}${newState}`);
}

/** 获取当前状态(用于监控) */
getState(): CircuitState {
return this.state;
}

/** 获取当前失败率(用于监控) */
getFailureRate(): number {
const results = this.getWindowResults();
if (results.length === 0) return 0;
return results.filter((r) => !r.success).length / results.length;
}
}

/** 自定义熔断错误 */
class CircuitBreakerError extends Error {
constructor(
message: string,
public readonly circuitState: CircuitState,
) {
super(message);
this.name = 'CircuitBreakerError';
}
}

NestJS 熔断拦截器

circuit-breaker.interceptor.ts
import {
Injectable,
NestInterceptor,
ExecutionContext,
CallHandler,
HttpException,
HttpStatus,
} from '@nestjs/common';
import { Observable, throwError, of } from 'rxjs';
import { catchError } from 'rxjs/operators';

// 为每个服务维护独立的熔断器实例
const breakers = new Map<string, CircuitBreaker>();

function getBreaker(name: string): CircuitBreaker {
if (!breakers.has(name)) {
breakers.set(name, new CircuitBreaker({
failureRateThreshold: 0.5,
minimumRequestCount: 10,
resetTimeout: 30_000,
halfOpenMaxRequests: 3,
}));
}
return breakers.get(name)!;
}

@Injectable()
export class CircuitBreakerInterceptor implements NestInterceptor {
constructor(private readonly serviceName: string) {}

intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
const breaker = getBreaker(this.serviceName);

if (breaker.getState() === 'OPEN') {
// 熔断状态直接返回降级响应
throw new HttpException(
{
message: `Service ${this.serviceName} is temporarily unavailable`,
fallback: true,
},
HttpStatus.SERVICE_UNAVAILABLE,
);
}

return next.handle().pipe(
catchError((error) => {
// 记录失败并重新抛出
breaker.execute(() => Promise.reject(error)).catch(() => {});
return throwError(() => error);
}),
);
}
}

降级策略详解

降级是在系统资源紧张或下游服务不可用时,有计划地关闭部分功能,确保核心链路可用。

多级降级

multi-level-fallback.ts
interface Product {
id: string;
name: string;
price: number;
}

// 静态兜底数据
const DEFAULT_PRODUCTS: Product[] = [
{ id: 'default-1', name: '暂无数据', price: 0 },
];

class ProductService {
constructor(
private circuitBreaker: CircuitBreaker,
private cache: CacheService,
) {}

/**
* 多级降级获取商品列表
* Level 0: 实时接口数据
* Level 1: Redis 缓存数据
* Level 2: 本地内存缓存
* Level 3: 静态默认数据
*/
async getProductList(): Promise<{
data: Product[];
source: 'live' | 'redis-cache' | 'memory-cache' | 'static';
}> {
// Level 0: 尝试调用实时接口
try {
const data = await this.circuitBreaker.execute(
() => this.fetchFromAPI(),
);
// 成功后更新缓存
await this.cache.set('products:list', data, 300); // 缓存 5 分钟
this.memoryCache = data;
return { data, source: 'live' };
} catch {
// 接口失败,进入降级链路
}

// Level 1: 尝试 Redis 缓存
try {
const cached = await this.cache.get<Product[]>('products:list');
if (cached) {
return { data: cached, source: 'redis-cache' };
}
} catch {
// Redis 也不可用
}

// Level 2: 本地内存缓存
if (this.memoryCache.length > 0) {
return { data: this.memoryCache, source: 'memory-cache' };
}

// Level 3: 静态默认数据
return { data: DEFAULT_PRODUCTS, source: 'static' };
}

private memoryCache: Product[] = [];
private async fetchFromAPI(): Promise<Product[]> {
// 调用下游服务 API
const response = await fetch('http://product-service/api/products');
if (!response.ok) throw new Error(`HTTP ${response.status}`);
return response.json();
}
}

基于 Feature Flag 的降级

feature-flag-fallback.ts
interface FeatureFlags {
enableRecommendation: boolean; // 推荐系统
enableRealTimePrice: boolean; // 实时价格
enableComments: boolean; // 评论功能
enableNotification: boolean; // 通知推送
}

class FeatureFlagService {
private flags: FeatureFlags = {
enableRecommendation: true,
enableRealTimePrice: true,
enableComments: true,
enableNotification: true,
};

/**
* 根据系统负载自动降级非核心功能
* @param cpuUsage 当前 CPU 使用率(0-1)
*/
autoDegrade(cpuUsage: number): void {
if (cpuUsage > 0.9) {
// CPU > 90%: 关闭所有非核心功能
this.flags.enableNotification = false;
this.flags.enableComments = false;
this.flags.enableRecommendation = false;
console.warn('[降级] Level 3: 仅保留核心功能');
} else if (cpuUsage > 0.8) {
// CPU > 80%: 关闭推荐和通知
this.flags.enableNotification = false;
this.flags.enableRecommendation = false;
console.warn('[降级] Level 2: 关闭推荐和通知');
} else if (cpuUsage > 0.7) {
// CPU > 70%: 关闭通知
this.flags.enableNotification = false;
console.warn('[降级] Level 1: 关闭通知推送');
} else {
// 恢复所有功能
this.flags = {
enableRecommendation: true,
enableRealTimePrice: true,
enableComments: true,
enableNotification: true,
};
}
}

isEnabled(feature: keyof FeatureFlags): boolean {
return this.flags[feature];
}
}

背压(Backpressure)

背压是指当生产者的速度超过消费者的处理能力时,消费者向上游反馈"慢一点"的机制。它与限流不同:限流是主动拒绝请求,背压是让上游主动降速。

对比限流背压
方向从外部限制进入的流量从下游反馈给上游
行为超限请求被拒绝上游主动降低发送速率
场景API 网关、接口保护流处理、消息队列、数据管道

Node.js Stream 背压

Node.js 的 Stream 内置了背压机制。当 writable stream 处理不过来时,write() 返回 false,通知上游暂停发送。

stream-backpressure.ts
import { createReadStream, createWriteStream } from 'fs';
import { Transform, TransformCallback } from 'stream';

/**
* 自定义 Transform 流:模拟耗时的数据处理
* Stream 的背压机制会自动生效:
* - 当下游 write() 返回 false 时,Transform 会暂停从上游读取
* - 当下游 drain 事件触发时,恢复读取
*/
class SlowProcessor extends Transform {
_transform(
chunk: Buffer,
_encoding: string,
callback: TransformCallback,
): void {
// 模拟耗时处理
setTimeout(() => {
const processed = chunk.toString().toUpperCase();
callback(null, processed);
}, 100);
}
}

// 使用 pipe 自动处理背压
// pipe() 内部自动监听 drain 事件,无需手动管理
createReadStream('input.txt')
.pipe(new SlowProcessor())
.pipe(createWriteStream('output.txt'));

/**
* 手动处理背压(不使用 pipe 的场景)
*/
function manualBackpressure(): void {
const readable = createReadStream('large-file.txt');
const writable = createWriteStream('output.txt');

readable.on('data', (chunk: Buffer) => {
const canContinue = writable.write(chunk);

if (!canContinue) {
// 下游处理不过来,暂停上游读取
readable.pause();

// 等下游消化完毕后恢复
writable.once('drain', () => {
readable.resume();
});
}
});

readable.on('end', () => writable.end());
}

基于队列的背压

在 API 服务中,可以用内存队列实现简易背压:

queue-backpressure.ts
interface QueuedTask<T> {
execute: () => Promise<T>;
resolve: (value: T) => void;
reject: (error: Error) => void;
}

class BackpressureQueue {
private queue: QueuedTask<any>[] = [];
private running = 0;

constructor(
// 最大并发处理数
private maxConcurrency: number,
// 队列最大长度(超出则拒绝新任务)
private maxQueueSize: number,
) {}

/**
* 提交任务到队列
* 如果队列已满,抛出错误(产生背压)
*/
async submit<T>(fn: () => Promise<T>): Promise<T> {
if (this.queue.length >= this.maxQueueSize) {
// 队列满了,向上游施加背压
throw new Error('Queue is full, please slow down');
}

return new Promise<T>((resolve, reject) => {
this.queue.push({ execute: fn, resolve, reject });
this.processNext();
});
}

private async processNext(): Promise<void> {
if (this.running >= this.maxConcurrency || this.queue.length === 0) {
return;
}

const task = this.queue.shift()!;
this.running++;

try {
const result = await task.execute();
task.resolve(result);
} catch (error) {
task.reject(error as Error);
} finally {
this.running--;
this.processNext();
}
}

/** 当前队列长度(用于监控告警) */
getQueueSize(): number {
return this.queue.length;
}
}

// 使用:最大并发 10,队列最长 100
const queue = new BackpressureQueue(10, 100);

实际架构中的限流熔断

生产环境中,限流和熔断通常部署在多个层级,形成纵深防御

层级方案限流维度工具
CDN/WAFDDoS 防护、IP 黑名单IP、地域Cloudflare、阿里云 WAF
网关层速率限制、并发限制IP、全局 QPSNginx、Kong、APISIX
应用层细粒度限流用户、接口、角色@nestjs/throttler、自定义 Guard
服务间熔断器、超时控制服务维度CircuitBreaker、opossum
数据层连接池、慢查询熔断连接数TypeORM pool、Redis maxclients
不要只在一层做限流

单层限流是不够的。网关层防住外部攻击,应用层做细粒度控制,服务间做熔断保护。缺少任何一层,都可能导致雪崩效应:一个服务挂了,拖垮整条调用链。


常见面试问题

Q1: 限流应该放在哪一层?

答案

生产环境中通常多层限流,每一层解决不同问题:

层级职责示例
CDN/WAF防 DDoS、IP 黑名单Cloudflare Rate Limiting
Nginx 网关粗粒度:IP 限流、全局 QPSlimit_req_zonelimit_conn_zone
API 网关中粒度:按路由、按租户Kong、APISIX 限流插件
应用层细粒度:按用户、按接口@nestjs/throttler、自定义 Guard
数据库层连接池本身就是限流max_connections、连接池 poolSize

原则:越靠前的层越粗粒度,越靠后的层越细粒度。粗粒度限流挡住大部分恶意流量,细粒度限流做精准控制。

Q2: 令牌桶和漏桶的区别?

答案

对比项漏桶(Leaky Bucket)令牌桶(Token Bucket)
核心思想请求进桶排队,以恒定速率流出令牌以恒定速率生成,请求需获取令牌
突发流量不允许,强制恒速输出允许,桶中积累的令牌可应对短暂峰值
处理速度恒定可变(有令牌时可瞬时处理)
桶满行为溢出的请求被丢弃多余的令牌被丢弃(不影响请求)
类比水龙头以固定速率滴水ATM 取款,有余额才能取
适用场景需要严格匀速的场景(如消息队列消费)需要允许一定突发的场景(如 API 网关)
// 漏桶:无论请求怎么来,始终匀速处理
// 10:00:00 来 100 个请求 → 以 10/s 的速度处理,需要 10 秒

// 令牌桶:桶中有积累的令牌时,可以瞬间处理
// 桶中积累了 50 个令牌 → 50 个请求可以立即处理
// 之后恢复到 10/s 的速率

面试结论:令牌桶更常用,因为真实场景中流量总是有波动的,令牌桶能在限流的同时更好地应对合理的突发请求。

Q3: 返回 429 之后前端应该怎么做?

答案

  1. 读取 Retry-After 响应头,等待指定时间后重试:
// Retry-After 可以是秒数或日期
const retryAfter = response.headers.get('Retry-After');
const delayMs = Number(retryAfter) * 1000;
await new Promise((resolve) => setTimeout(resolve, delayMs));
  1. 指数退避(Exponential Backoff):如果没有 Retry-After,使用 1s → 2s → 4s → 8s 递增延迟
  2. 用户提示:显示"操作过于频繁,请稍后重试"等友好提示
  3. 请求队列:将被限流的请求放入队列,延迟重发
  4. 降级缓存:如果有本地缓存数据,先展示缓存,后台静默重试

参考:请求重试与超时策略

Q4: 熔断和降级的区别?

答案

对比熔断(Circuit Breaking)降级(Degradation)
本质一种自动化保护机制一种有损的服务策略
触发条件下游服务异常率达到阈值,自动触发系统负载过高或故障时,可手动或自动触发
目标防止故障扩散(雪崩效应)保障核心功能可用
关系熔断是手段降级是目的

两者通常配合使用:

  1. 熔断器检测到下游服务异常 → 熔断(切断调用)
  2. 熔断后执行降级逻辑(返回缓存/默认数据)
  3. 熔断器定时探测 → 恢复后取消降级
async function getRecommendations(userId: string): Promise<Product[]> {
try {
// 通过熔断器调用推荐服务
return await breaker.execute(() => recommendService.get(userId));
} catch (error) {
// 熔断后降级:返回热门商品
return await getHotProducts();
}
}

Q5: 分布式限流怎么实现?

答案

分布式限流的核心挑战是多个服务实例共享同一个计数器。最常用的方案是 Redis + Lua 脚本,Lua 脚本在 Redis 中原子执行,避免竞态条件。

滑动窗口 Lua 脚本(生产推荐):

distributed-rate-limiter.ts
import Redis from 'ioredis';

const redis = new Redis();

/**
* 原子性滑动窗口限流 Lua 脚本
* 所有操作在 Redis 单线程中原子执行,无竞态问题
*/
const SLIDING_WINDOW_SCRIPT = `
local key = KEYS[1]
local window = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])
local now = tonumber(ARGV[3])

-- 移除窗口外的记录
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)

-- 获取当前窗口内的请求数
local count = redis.call('ZCARD', key)

if count < limit then
-- 未超限,记录本次请求
redis.call('ZADD', key, now, now .. '-' .. math.random(1000000))
redis.call('PEXPIRE', key, window)
return 0 -- 允许
else
return 1 -- 拒绝
end
`;

async function distributedRateLimit(
key: string,
limit: number,
windowMs: number,
): Promise<boolean> {
const result = await redis.eval(
SLIDING_WINDOW_SCRIPT,
1,
key,
windowMs,
limit,
Date.now(),
);
return result === 0; // true = 允许, false = 限流
}

// 使用示例:全局 API 限流,每秒 1000 次
const allowed = await distributedRateLimit('rate:global:api', 1000, 1000);
为什么用 Lua 而不是 Pipeline?

Pipeline 将多个命令打包发送,但它们不是原子执行的。在极端并发下,两个请求可能同时读到 count=99(限制 100),然后都通过,实际通过 101 个。Lua 脚本在 Redis 单线程中原子执行,不会有这个问题。

Q6: 如何确定限流阈值?

答案

限流阈值不能拍脑袋决定,需要基于数据来设定:

1. 压测确定系统容量

# 使用 wrk 进行压力测试
wrk -t12 -c400 -d30s http://localhost:3000/api/products

找到系统的最大 QPS(在响应时间和错误率可接受的范围内),然后将限流阈值设为最大 QPS 的 70%-80%,留出余量。

2. 分级设定

接口类型限流建议示例
读接口较宽松100 次/秒/用户
写接口较严格10 次/秒/用户
登录/注册严格5 次/分钟/IP
短信验证码非常严格1 次/分钟/手机号
支付非常严格1 次/秒/用户

3. 动态调整

  • 监控实际流量模式,根据历史数据调整
  • 大促期间临时调高阈值
  • 根据用户等级差异化限流(VIP 用户更宽松)

Q7: 限流后怎么做优雅降级?

答案

被限流不等于什么都不返回,应该提供有意义的降级响应

graceful-degradation.ts
import { ExceptionFilter, Catch, ArgumentsHost, HttpStatus } from '@nestjs/common';
import { ThrottlerException } from '@nestjs/throttler';

@Catch(ThrottlerException)
export class ThrottlerExceptionFilter implements ExceptionFilter {
catch(exception: ThrottlerException, host: ArgumentsHost): void {
const ctx = host.switchToHttp();
const response = ctx.getResponse();
const request = ctx.getRequest();

response.status(HttpStatus.TOO_MANY_REQUESTS).json({
statusCode: 429,
message: '请求过于频繁,请稍后重试',
// 告诉客户端多久后可以重试
retryAfter: 60,
// 如果有缓存数据,附带降级数据
cachedData: this.getCachedResponse(request.url),
timestamp: new Date().toISOString(),
});

// 设置标准的 Retry-After 响应头
response.header('Retry-After', '60');
}

private getCachedResponse(url: string): any {
// 从缓存中获取该接口的最近一次正常响应
return null; // 实际从 Redis/内存缓存获取
}
}

降级优先级:

  1. 返回缓存数据 + 标记数据时间,让用户知道数据可能不是最新的
  2. 返回简化数据,如列表接口只返回前 10 条
  3. 排队提示,告诉用户当前排队位置和预计等待时间
  4. 引导分流,引导用户去访问其他低负载页面

Q8: 熔断器的三种状态转换条件是什么?

答案

Closed → Open:
条件:统计窗口内失败率 >= 阈值(如 50%),且请求数 >= 最小样本数(如 10)
行为:所有请求直接快速失败,不再调用下游

Open → HalfOpen:
条件:熔断打开后经过 resetTimeout(如 30 秒)
行为:允许少量探测请求通过

HalfOpen → Closed:
条件:探测请求全部成功(或成功率达标)
行为:恢复正常,所有请求放行

HalfOpen → Open:
条件:探测请求中有任何一个失败
行为:重新熔断,重新等待 resetTimeout

关键参数:

参数含义推荐值
failureRateThreshold触发熔断的失败率50%
minimumRequestCount最小样本数10-20
resetTimeoutOpen → HalfOpen 的等待时间10-60 秒
halfOpenMaxRequests半开状态允许的探测请求数3-5
最小样本数

如果没有最小样本数限制,第一个请求失败就是 100% 失败率,会立即触发熔断。设置 minimumRequestCount 可以避免因少量请求波动导致的误熔断。

Q9: Node.js 中有哪些限流方案?

答案

方案类型特点
@nestjs/throttlerNestJS 模块官方出品,支持多种存储后端
rate-limiter-flexible通用库支持 Redis/Mongo/内存,多种算法
express-rate-limitExpress 中间件简单易用,适合小项目
opossum熔断器Netflix Hystrix 的 Node.js 版
bottleneck调度器任务调度+限流,支持 Redis 集群
Nginx limit_req网关层高性能,适合粗粒度限流
Redis + Lua自定义最灵活,适合分布式场景

选型建议:

  • NestJS 项目@nestjs/throttler + 自定义 Guard
  • Express 项目express-rate-limit + rate-limiter-flexible
  • 需要熔断opossum
  • 分布式场景 → Redis + Lua 自定义
  • 网关层 → Nginx limit_req_zone

Q10: 如何对 WebSocket 连接做限流?

答案

WebSocket 限流需要考虑两个维度:连接数限流消息频率限流

ws-rate-limiter.ts
import { WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';

@WebSocketGateway()
export class ChatGateway {
@WebSocketServer()
server!: Server;

// 每个用户的消息时间戳记录
private messageTimestamps = new Map<string, number[]>();

/**
* 连接数限流:在 connection 事件中检查
*/
handleConnection(client: Socket): void {
const userId = client.handshake.auth.userId as string;

// 1. 单用户最多 3 个并发连接
const userSockets = this.getUserSockets(userId);
if (userSockets.length >= 3) {
client.emit('error', { message: 'Too many connections' });
client.disconnect();
return;
}

// 2. 全局最大连接数
if (this.server.sockets.sockets.size >= 10000) {
client.emit('error', { message: 'Server is full' });
client.disconnect();
return;
}
}

/**
* 消息频率限流:滑动窗口
*/
handleMessage(client: Socket, data: any): void {
const userId = client.handshake.auth.userId as string;

if (!this.checkMessageRate(userId, 10, 1000)) {
// 每秒最多 10 条消息
client.emit('rate_limited', {
message: '发送过于频繁,请稍后再试',
retryAfter: 1000,
});
return;
}

// 正常处理消息...
this.server.emit('message', data);
}

private checkMessageRate(
userId: string,
limit: number,
windowMs: number,
): boolean {
const now = Date.now();
const timestamps = this.messageTimestamps.get(userId) ?? [];

// 清理过期记录
const valid = timestamps.filter((t) => t > now - windowMs);
valid.push(now);
this.messageTimestamps.set(userId, valid);

return valid.length <= limit;
}

private getUserSockets(userId: string): Socket[] {
const sockets: Socket[] = [];
this.server.sockets.sockets.forEach((socket) => {
if (socket.handshake.auth.userId === userId) {
sockets.push(socket);
}
});
return sockets;
}
}

WebSocket 限流的注意点:

  • 连接阶段限制并发连接数(防止连接耗尽)
  • 消息阶段限制发送频率(防止消息洪水)
  • 被限流时不要断开连接,发送警告让客户端降速
  • 对于恶意用户(持续超限),可以临时封禁

相关链接