限流调度器
问题
实现一个并发调度器 Scheduler,限制同时执行的任务数量不超过指定的最大并发数。
答案
限流调度器是控制异步任务并发执行的核心工具,常用于批量请求、文件上传等场景。
基础实现
class Scheduler {
private maxConcurrent: number;
private running: number = 0;
private queue: Array<() => void> = [];
constructor(maxConcurrent: number) {
this.maxConcurrent = maxConcurrent;
}
add<T>(task: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
const runTask = () => {
this.running++;
task()
.then(resolve)
.catch(reject)
.finally(() => {
this.running--;
this.schedule();
});
};
if (this.running < this.maxConcurrent) {
runTask();
} else {
this.queue.push(runTask);
}
});
}
private schedule(): void {
if (this.queue.length > 0 && this.running < this.maxConcurrent) {
const task = this.queue.shift()!;
task();
}
}
}
// 使用示例
const scheduler = new Scheduler(2);
const delay = (time: number, name: string) => {
return scheduler.add(() =>
new Promise<void>((resolve) => {
console.log(`${name} start`);
setTimeout(() => {
console.log(`${name} end`);
resolve();
}, time);
})
);
};
delay(1000, 'A');
delay(500, 'B');
delay(300, 'C');
delay(400, 'D');
// 输出顺序:
// A start
// B start
// B end
// C start
// A end
// D start
// C end
// D end
执行流程图
增强版本
支持优先级
interface Task<T> {
fn: () => Promise<T>;
priority: number;
resolve: (value: T) => void;
reject: (reason: unknown) => void;
}
class PriorityScheduler {
private maxConcurrent: number;
private running: number = 0;
private queue: Array<Task<unknown>> = [];
constructor(maxConcurrent: number) {
this.maxConcurrent = maxConcurrent;
}
add<T>(task: () => Promise<T>, priority: number = 0): Promise<T> {
return new Promise((resolve, reject) => {
const taskItem: Task<T> = {
fn: task,
priority,
resolve,
reject,
};
if (this.running < this.maxConcurrent) {
this.run(taskItem);
} else {
// 按优先级插入队列
const insertIndex = this.queue.findIndex((t) => t.priority < priority);
if (insertIndex === -1) {
this.queue.push(taskItem as Task<unknown>);
} else {
this.queue.splice(insertIndex, 0, taskItem as Task<unknown>);
}
}
});
}
private run<T>(task: Task<T>): void {
this.running++;
task.fn()
.then(task.resolve)
.catch(task.reject)
.finally(() => {
this.running--;
this.schedule();
});
}
private schedule(): void {
if (this.queue.length > 0 && this.running < this.maxConcurrent) {
const task = this.queue.shift()!;
this.run(task);
}
}
}
// 使用
const priorityScheduler = new PriorityScheduler(2);
priorityScheduler.add(() => delay(1000), 1); // 低优先级
priorityScheduler.add(() => delay(1000), 10); // 高优先级,优先执行
priorityScheduler.add(() => delay(1000), 5); // 中优先级
function delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
支持取消
interface CancelableTask<T> {
promise: Promise<T>;
cancel: () => void;
}
class CancelableScheduler {
private maxConcurrent: number;
private running: number = 0;
private queue: Array<{
run: () => void;
id: symbol;
}> = [];
private canceledTasks: Set<symbol> = new Set();
constructor(maxConcurrent: number) {
this.maxConcurrent = maxConcurrent;
}
add<T>(task: () => Promise<T>): CancelableTask<T> {
const taskId = Symbol();
let resolveCancel: () => void;
const promise = new Promise<T>((resolve, reject) => {
resolveCancel = () => {
reject(new Error('Task canceled'));
};
const runTask = () => {
if (this.canceledTasks.has(taskId)) {
this.canceledTasks.delete(taskId);
this.running--;
this.schedule();
reject(new Error('Task canceled'));
return;
}
task()
.then(resolve)
.catch(reject)
.finally(() => {
this.running--;
this.schedule();
});
};
if (this.running < this.maxConcurrent) {
this.running++;
runTask();
} else {
this.queue.push({ run: runTask, id: taskId });
}
});
const cancel = () => {
// 从队列中移除
const index = this.queue.findIndex((t) => t.id === taskId);
if (index !== -1) {
this.queue.splice(index, 1);
resolveCancel();
} else {
// 标记为已取消
this.canceledTasks.add(taskId);
}
};
return { promise, cancel };
}
private schedule(): void {
while (this.queue.length > 0 && this.running < this.maxConcurrent) {
const task = this.queue.shift()!;
this.running++;
task.run();
}
}
}
支持超时
class TimeoutScheduler {
private maxConcurrent: number;
private running: number = 0;
private queue: Array<() => void> = [];
private timeout: number;
constructor(maxConcurrent: number, timeout: number = 30000) {
this.maxConcurrent = maxConcurrent;
this.timeout = timeout;
}
add<T>(task: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
const runTask = () => {
this.running++;
const timeoutId = setTimeout(() => {
reject(new Error('Task timeout'));
this.running--;
this.schedule();
}, this.timeout);
task()
.then((result) => {
clearTimeout(timeoutId);
resolve(result);
})
.catch((error) => {
clearTimeout(timeoutId);
reject(error);
})
.finally(() => {
this.running--;
this.schedule();
});
};
if (this.running < this.maxConcurrent) {
runTask();
} else {
this.queue.push(runTask);
}
});
}
private schedule(): void {
if (this.queue.length > 0 && this.running < this.maxConcurrent) {
const task = this.queue.shift()!;
task();
}
}
}
简洁版实现
// 函数式实现
function createScheduler(maxConcurrent: number) {
const queue: Array<() => void> = [];
let running = 0;
const schedule = () => {
while (queue.length > 0 && running < maxConcurrent) {
const task = queue.shift()!;
running++;
task();
}
};
return function add<T>(task: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
queue.push(() => {
task()
.then(resolve)
.catch(reject)
.finally(() => {
running--;
schedule();
});
});
schedule();
});
};
}
// 使用
const add = createScheduler(2);
add(() => fetch('/api/1'));
add(() => fetch('/api/2'));
add(() => fetch('/api/3'));
批量执行工具函数
// 并发执行数组任务
async function asyncPool<T, R>(
concurrency: number,
items: T[],
iteratorFn: (item: T, index: number) => Promise<R>
): Promise<R[]> {
const results: R[] = [];
const executing = new Set<Promise<void>>();
let index = 0;
for (const item of items) {
const currentIndex = index++;
const p = Promise.resolve()
.then(() => iteratorFn(item, currentIndex))
.then((result) => {
results[currentIndex] = result;
});
const e: Promise<void> = p.finally(() => executing.delete(e));
executing.add(e);
if (executing.size >= concurrency) {
await Promise.race(executing);
}
}
await Promise.all(executing);
return results;
}
// 使用示例
const urls = ['/api/1', '/api/2', '/api/3', '/api/4', '/api/5'];
const results = await asyncPool(2, urls, async (url) => {
const response = await fetch(url);
return response.json();
});
p-limit 风格实现
function pLimit(concurrency: number) {
const queue: Array<{
fn: () => Promise<unknown>;
resolve: (value: unknown) => void;
reject: (reason: unknown) => void;
}> = [];
let activeCount = 0;
const next = () => {
activeCount--;
if (queue.length > 0) {
// eslint-disable-next-line @typescript-eslint/no-use-before-define
run(queue.shift()!);
}
};
const run = async ({ fn, resolve, reject }: typeof queue[0]) => {
activeCount++;
try {
const result = await fn();
resolve(result);
} catch (error) {
reject(error);
}
next();
};
const enqueue = <T>(fn: () => Promise<T>): Promise<T> => {
return new Promise((resolve, reject) => {
const task = { fn, resolve, reject } as typeof queue[0];
if (activeCount < concurrency) {
run(task);
} else {
queue.push(task);
}
});
};
return <T>(fn: () => Promise<T>): Promise<T> => enqueue(fn);
}
// 使用
const limit = pLimit(2);
const tasks = [
limit(() => fetch('/api/1')),
limit(() => fetch('/api/2')),
limit(() => fetch('/api/3')),
];
const results = await Promise.all(tasks);
常见面试问题
Q1: 为什么需要限流调度器?
答案:
| 场景 | 问题 | 解决方案 |
|---|---|---|
| 批量请求 | 请求过多导致服务器拒绝 | 限制并发数 |
| 文件上传 | 浏览器连接数限制 | 分批上传 |
| 爬虫 | IP 被封禁 | 控制请求频率 |
| 数据库操作 | 连接池耗尽 | 限制并发连接 |
Q2: 队列为空时如何优化?
答案:
class OptimizedScheduler {
private maxConcurrent: number;
private running: number = 0;
private queue: Array<() => void> = [];
constructor(maxConcurrent: number) {
this.maxConcurrent = maxConcurrent;
}
add<T>(task: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
const runTask = () => {
this.running++;
task()
.then(resolve)
.catch(reject)
.finally(() => {
this.running--;
// 直接调度,不需要 schedule 方法
if (this.queue.length > 0) {
const nextTask = this.queue.shift()!;
nextTask();
}
});
};
// 未达并发限制时直接执行
if (this.running < this.maxConcurrent) {
runTask();
} else {
this.queue.push(runTask);
}
});
}
}
Q3: 如何实现动态调整并发数?
答案:
class DynamicScheduler {
private _maxConcurrent: number;
private running: number = 0;
private queue: Array<() => void> = [];
constructor(maxConcurrent: number) {
this._maxConcurrent = maxConcurrent;
}
get maxConcurrent(): number {
return this._maxConcurrent;
}
set maxConcurrent(value: number) {
this._maxConcurrent = value;
// 增加并发数时,立即调度更多任务
this.schedule();
}
add<T>(task: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
this.queue.push(() => {
this.running++;
task()
.then(resolve)
.catch(reject)
.finally(() => {
this.running--;
this.schedule();
});
});
this.schedule();
});
}
private schedule(): void {
while (this.queue.length > 0 && this.running < this._maxConcurrent) {
const task = this.queue.shift()!;
task();
}
}
}
// 根据系统负载动态调整
const scheduler = new DynamicScheduler(5);
// 负载高时减少并发
scheduler.maxConcurrent = 2;
// 负载低时增加并发
scheduler.maxConcurrent = 10;
Q4: 如何实现一个支持优先级的任务调度器?
答案:
支持优先级的调度器需要在任务入队时按优先级排序,高优先级任务在队列前面,会被优先调度执行。核心是用优先级队列(简单场景用排序数组,高性能场景用最小堆)。
interface PriorityTask<T = unknown> {
fn: () => Promise<T>;
priority: number; // 数字越大优先级越高
resolve: (value: T) => void;
reject: (reason: unknown) => void;
}
class PriorityScheduler {
private maxConcurrent: number;
private running = 0;
private queue: PriorityTask[] = [];
constructor(maxConcurrent: number) {
this.maxConcurrent = maxConcurrent;
}
add<T>(fn: () => Promise<T>, priority: number = 0): Promise<T> {
return new Promise<T>((resolve, reject) => {
const task: PriorityTask<T> = { fn, priority, resolve, reject };
if (this.running < this.maxConcurrent) {
this.run(task);
} else {
this.enqueue(task as PriorityTask);
}
});
}
// 按优先级插入队列(保持降序)
private enqueue(task: PriorityTask): void {
const index = this.queue.findIndex((t) => t.priority < task.priority);
if (index === -1) {
this.queue.push(task);
} else {
this.queue.splice(index, 0, task);
}
}
private run<T>(task: PriorityTask<T>): void {
this.running++;
task
.fn()
.then(task.resolve)
.catch(task.reject)
.finally(() => {
this.running--;
this.schedule();
});
}
private schedule(): void {
while (this.queue.length > 0 && this.running < this.maxConcurrent) {
const task = this.queue.shift()!;
this.run(task);
}
}
}
高性能版:最小堆实现优先级队列
当任务量很大时,数组插入排序是 ,使用最小堆可以将入队优化到 :
class MinHeap<T> {
private heap: T[] = [];
constructor(private compare: (a: T, b: T) => number) {}
push(val: T): void {
this.heap.push(val);
this.bubbleUp(this.heap.length - 1);
}
pop(): T | undefined {
if (this.heap.length === 0) return undefined;
const top = this.heap[0];
const last = this.heap.pop()!;
if (this.heap.length > 0) {
this.heap[0] = last;
this.sinkDown(0);
}
return top;
}
get size(): number {
return this.heap.length;
}
private bubbleUp(i: number): void {
while (i > 0) {
const parent = Math.floor((i - 1) / 2);
if (this.compare(this.heap[i], this.heap[parent]) >= 0) break;
[this.heap[i], this.heap[parent]] = [this.heap[parent], this.heap[i]];
i = parent;
}
}
private sinkDown(i: number): void {
const n = this.heap.length;
while (true) {
let smallest = i;
const left = 2 * i + 1;
const right = 2 * i + 2;
if (left < n && this.compare(this.heap[left], this.heap[smallest]) < 0)
smallest = left;
if (right < n && this.compare(this.heap[right], this.heap[smallest]) < 0)
smallest = right;
if (smallest === i) break;
[this.heap[i], this.heap[smallest]] = [this.heap[smallest], this.heap[i]];
i = smallest;
}
}
}
class HeapPriorityScheduler {
private maxConcurrent: number;
private running = 0;
// 优先级大的排前面(最大堆效果),compare 返回负数表示 a 优先
private queue = new MinHeap<PriorityTask>(
(a, b) => b.priority - a.priority
);
constructor(maxConcurrent: number) {
this.maxConcurrent = maxConcurrent;
}
add<T>(fn: () => Promise<T>, priority: number = 0): Promise<T> {
return new Promise<T>((resolve, reject) => {
const task: PriorityTask<T> = { fn, priority, resolve, reject };
this.queue.push(task as PriorityTask);
this.schedule();
});
}
private schedule(): void {
while (this.queue.size > 0 && this.running < this.maxConcurrent) {
const task = this.queue.pop()!;
this.running++;
task
.fn()
.then(task.resolve)
.catch(task.reject)
.finally(() => {
this.running--;
this.schedule();
});
}
}
}
使用示例:
const scheduler = new PriorityScheduler(2);
scheduler.add(() => delay(1000, 'low'), 1); // 低优先级
scheduler.add(() => delay(1000, 'high'), 10); // 高优先级,优先执行
scheduler.add(() => delay(1000, 'medium'), 5); // 中优先级
scheduler.add(() => delay(1000, 'urgent'), 100); // 紧急任务,插队
function delay<T>(ms: number, value: T): Promise<T> {
return new Promise((r) => setTimeout(() => r(value), ms));
}
| 实现方式 | 入队复杂度 | 出队复杂度 | 适用场景 |
|---|---|---|---|
| 排序数组 | 任务量小 | ||
| 最小堆 | 任务量大 |
Q5: 如何实现一个可取消的并发调度器?
答案:
可取消调度器需要区分两种取消场景:取消排队中的任务(直接从队列移除)和取消正在执行的任务(通过 AbortController 通知任务中断)。
interface CancelToken {
promise: Promise<unknown>;
cancel: () => void;
}
class CancelableScheduler {
private maxConcurrent: number;
private running = 0;
private queue: Array<{
run: () => void;
id: symbol;
reject: (reason: unknown) => void;
}> = [];
constructor(maxConcurrent: number) {
this.maxConcurrent = maxConcurrent;
}
add<T>(
taskFn: (signal: AbortSignal) => Promise<T>
): CancelToken {
const taskId = Symbol();
const abortController = new AbortController();
const promise = new Promise<T>((resolve, reject) => {
const runTask = () => {
// 如果在排队期间已被取消
if (abortController.signal.aborted) {
this.running--;
this.schedule();
reject(new DOMException('Task canceled', 'AbortError'));
return;
}
// 执行任务,传入 signal 让任务内部可以响应取消
taskFn(abortController.signal)
.then(resolve)
.catch(reject)
.finally(() => {
this.running--;
this.schedule();
});
};
if (this.running < this.maxConcurrent) {
this.running++;
runTask();
} else {
this.queue.push({ run: runTask, id: taskId, reject });
}
});
const cancel = () => {
// 1. 尝试从队列中移除(取消排队中的任务)
const index = this.queue.findIndex((t) => t.id === taskId);
if (index !== -1) {
const [removed] = this.queue.splice(index, 1);
removed.reject(new DOMException('Task canceled', 'AbortError'));
return;
}
// 2. 任务已在执行,通过 AbortController 通知取消
abortController.abort();
};
return { promise, cancel };
}
private schedule(): void {
while (this.queue.length > 0 && this.running < this.maxConcurrent) {
const task = this.queue.shift()!;
this.running++;
task.run();
}
}
// 取消所有任务
cancelAll(): void {
// 清空队列
for (const task of this.queue) {
task.reject(new DOMException('All tasks canceled', 'AbortError'));
}
this.queue = [];
}
}
使用示例:
const scheduler = new CancelableScheduler(2);
// 任务内部需要检查 signal 以支持取消
const task1 = scheduler.add(async (signal: AbortSignal) => {
const response = await fetch('/api/data', { signal });
return response.json();
});
const task2 = scheduler.add(async (signal: AbortSignal) => {
// 长时间计算中定期检查 signal
for (let i = 0; i < 1000000; i++) {
if (signal.aborted) {
throw new DOMException('Aborted', 'AbortError');
}
// ... 计算逻辑
}
return 'done';
});
// 取消单个任务
task1.cancel();
// 取消所有排队中的任务
scheduler.cancelAll();
// 处理取消
task1.promise.catch((err) => {
if (err.name === 'AbortError') {
console.log('任务被取消,资源已清理');
}
});
注意
- 排队中的任务可以直接从队列移除,代价很小
- 执行中的任务需要任务函数内部配合
AbortSignal才能真正中断(如fetch原生支持 signal) - 取消后一定要做好资源清理,避免内存泄漏
- 建议统一使用
DOMException('...', 'AbortError')作为取消错误,便于通过err.name === 'AbortError'区分取消和真正的异常