跳到主要内容

限流调度器

问题

实现一个并发调度器 Scheduler,限制同时执行的任务数量不超过指定的最大并发数。

答案

限流调度器是控制异步任务并发执行的核心工具,常用于批量请求、文件上传等场景。


基础实现

class Scheduler {
private maxConcurrent: number;
private running: number = 0;
private queue: Array<() => void> = [];

constructor(maxConcurrent: number) {
this.maxConcurrent = maxConcurrent;
}

add<T>(task: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
const runTask = () => {
this.running++;
task()
.then(resolve)
.catch(reject)
.finally(() => {
this.running--;
this.schedule();
});
};

if (this.running < this.maxConcurrent) {
runTask();
} else {
this.queue.push(runTask);
}
});
}

private schedule(): void {
if (this.queue.length > 0 && this.running < this.maxConcurrent) {
const task = this.queue.shift()!;
task();
}
}
}

// 使用示例
const scheduler = new Scheduler(2);

const delay = (time: number, name: string) => {
return scheduler.add(() =>
new Promise<void>((resolve) => {
console.log(`${name} start`);
setTimeout(() => {
console.log(`${name} end`);
resolve();
}, time);
})
);
};

delay(1000, 'A');
delay(500, 'B');
delay(300, 'C');
delay(400, 'D');

// 输出顺序:
// A start
// B start
// B end
// C start
// A end
// D start
// C end
// D end

执行流程图


增强版本

支持优先级

interface Task<T> {
fn: () => Promise<T>;
priority: number;
resolve: (value: T) => void;
reject: (reason: unknown) => void;
}

class PriorityScheduler {
private maxConcurrent: number;
private running: number = 0;
private queue: Array<Task<unknown>> = [];

constructor(maxConcurrent: number) {
this.maxConcurrent = maxConcurrent;
}

add<T>(task: () => Promise<T>, priority: number = 0): Promise<T> {
return new Promise((resolve, reject) => {
const taskItem: Task<T> = {
fn: task,
priority,
resolve,
reject,
};

if (this.running < this.maxConcurrent) {
this.run(taskItem);
} else {
// 按优先级插入队列
const insertIndex = this.queue.findIndex((t) => t.priority < priority);
if (insertIndex === -1) {
this.queue.push(taskItem as Task<unknown>);
} else {
this.queue.splice(insertIndex, 0, taskItem as Task<unknown>);
}
}
});
}

private run<T>(task: Task<T>): void {
this.running++;
task.fn()
.then(task.resolve)
.catch(task.reject)
.finally(() => {
this.running--;
this.schedule();
});
}

private schedule(): void {
if (this.queue.length > 0 && this.running < this.maxConcurrent) {
const task = this.queue.shift()!;
this.run(task);
}
}
}

// 使用
const priorityScheduler = new PriorityScheduler(2);

priorityScheduler.add(() => delay(1000), 1); // 低优先级
priorityScheduler.add(() => delay(1000), 10); // 高优先级,优先执行
priorityScheduler.add(() => delay(1000), 5); // 中优先级

function delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}

支持取消

interface CancelableTask<T> {
promise: Promise<T>;
cancel: () => void;
}

class CancelableScheduler {
private maxConcurrent: number;
private running: number = 0;
private queue: Array<{
run: () => void;
id: symbol;
}> = [];
private canceledTasks: Set<symbol> = new Set();

constructor(maxConcurrent: number) {
this.maxConcurrent = maxConcurrent;
}

add<T>(task: () => Promise<T>): CancelableTask<T> {
const taskId = Symbol();
let resolveCancel: () => void;

const promise = new Promise<T>((resolve, reject) => {
resolveCancel = () => {
reject(new Error('Task canceled'));
};

const runTask = () => {
if (this.canceledTasks.has(taskId)) {
this.canceledTasks.delete(taskId);
this.running--;
this.schedule();
reject(new Error('Task canceled'));
return;
}

task()
.then(resolve)
.catch(reject)
.finally(() => {
this.running--;
this.schedule();
});
};

if (this.running < this.maxConcurrent) {
this.running++;
runTask();
} else {
this.queue.push({ run: runTask, id: taskId });
}
});

const cancel = () => {
// 从队列中移除
const index = this.queue.findIndex((t) => t.id === taskId);
if (index !== -1) {
this.queue.splice(index, 1);
resolveCancel();
} else {
// 标记为已取消
this.canceledTasks.add(taskId);
}
};

return { promise, cancel };
}

private schedule(): void {
while (this.queue.length > 0 && this.running < this.maxConcurrent) {
const task = this.queue.shift()!;
this.running++;
task.run();
}
}
}

支持超时

class TimeoutScheduler {
private maxConcurrent: number;
private running: number = 0;
private queue: Array<() => void> = [];
private timeout: number;

constructor(maxConcurrent: number, timeout: number = 30000) {
this.maxConcurrent = maxConcurrent;
this.timeout = timeout;
}

add<T>(task: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
const runTask = () => {
this.running++;

const timeoutId = setTimeout(() => {
reject(new Error('Task timeout'));
this.running--;
this.schedule();
}, this.timeout);

task()
.then((result) => {
clearTimeout(timeoutId);
resolve(result);
})
.catch((error) => {
clearTimeout(timeoutId);
reject(error);
})
.finally(() => {
this.running--;
this.schedule();
});
};

if (this.running < this.maxConcurrent) {
runTask();
} else {
this.queue.push(runTask);
}
});
}

private schedule(): void {
if (this.queue.length > 0 && this.running < this.maxConcurrent) {
const task = this.queue.shift()!;
task();
}
}
}

简洁版实现

// 函数式实现
function createScheduler(maxConcurrent: number) {
const queue: Array<() => void> = [];
let running = 0;

const schedule = () => {
while (queue.length > 0 && running < maxConcurrent) {
const task = queue.shift()!;
running++;
task();
}
};

return function add<T>(task: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
queue.push(() => {
task()
.then(resolve)
.catch(reject)
.finally(() => {
running--;
schedule();
});
});
schedule();
});
};
}

// 使用
const add = createScheduler(2);
add(() => fetch('/api/1'));
add(() => fetch('/api/2'));
add(() => fetch('/api/3'));

批量执行工具函数

// 并发执行数组任务
async function asyncPool<T, R>(
concurrency: number,
items: T[],
iteratorFn: (item: T, index: number) => Promise<R>
): Promise<R[]> {
const results: R[] = [];
const executing = new Set<Promise<void>>();

let index = 0;
for (const item of items) {
const currentIndex = index++;

const p = Promise.resolve()
.then(() => iteratorFn(item, currentIndex))
.then((result) => {
results[currentIndex] = result;
});

const e: Promise<void> = p.finally(() => executing.delete(e));
executing.add(e);

if (executing.size >= concurrency) {
await Promise.race(executing);
}
}

await Promise.all(executing);
return results;
}

// 使用示例
const urls = ['/api/1', '/api/2', '/api/3', '/api/4', '/api/5'];
const results = await asyncPool(2, urls, async (url) => {
const response = await fetch(url);
return response.json();
});

p-limit 风格实现

function pLimit(concurrency: number) {
const queue: Array<{
fn: () => Promise<unknown>;
resolve: (value: unknown) => void;
reject: (reason: unknown) => void;
}> = [];
let activeCount = 0;

const next = () => {
activeCount--;
if (queue.length > 0) {
// eslint-disable-next-line @typescript-eslint/no-use-before-define
run(queue.shift()!);
}
};

const run = async ({ fn, resolve, reject }: typeof queue[0]) => {
activeCount++;
try {
const result = await fn();
resolve(result);
} catch (error) {
reject(error);
}
next();
};

const enqueue = <T>(fn: () => Promise<T>): Promise<T> => {
return new Promise((resolve, reject) => {
const task = { fn, resolve, reject } as typeof queue[0];
if (activeCount < concurrency) {
run(task);
} else {
queue.push(task);
}
});
};

return <T>(fn: () => Promise<T>): Promise<T> => enqueue(fn);
}

// 使用
const limit = pLimit(2);

const tasks = [
limit(() => fetch('/api/1')),
limit(() => fetch('/api/2')),
limit(() => fetch('/api/3')),
];

const results = await Promise.all(tasks);

常见面试问题

Q1: 为什么需要限流调度器?

答案

场景问题解决方案
批量请求请求过多导致服务器拒绝限制并发数
文件上传浏览器连接数限制分批上传
爬虫IP 被封禁控制请求频率
数据库操作连接池耗尽限制并发连接

Q2: 队列为空时如何优化?

答案

class OptimizedScheduler {
private maxConcurrent: number;
private running: number = 0;
private queue: Array<() => void> = [];

constructor(maxConcurrent: number) {
this.maxConcurrent = maxConcurrent;
}

add<T>(task: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
const runTask = () => {
this.running++;
task()
.then(resolve)
.catch(reject)
.finally(() => {
this.running--;
// 直接调度,不需要 schedule 方法
if (this.queue.length > 0) {
const nextTask = this.queue.shift()!;
nextTask();
}
});
};

// 未达并发限制时直接执行
if (this.running < this.maxConcurrent) {
runTask();
} else {
this.queue.push(runTask);
}
});
}
}

Q3: 如何实现动态调整并发数?

答案

class DynamicScheduler {
private _maxConcurrent: number;
private running: number = 0;
private queue: Array<() => void> = [];

constructor(maxConcurrent: number) {
this._maxConcurrent = maxConcurrent;
}

get maxConcurrent(): number {
return this._maxConcurrent;
}

set maxConcurrent(value: number) {
this._maxConcurrent = value;
// 增加并发数时,立即调度更多任务
this.schedule();
}

add<T>(task: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
this.queue.push(() => {
this.running++;
task()
.then(resolve)
.catch(reject)
.finally(() => {
this.running--;
this.schedule();
});
});
this.schedule();
});
}

private schedule(): void {
while (this.queue.length > 0 && this.running < this._maxConcurrent) {
const task = this.queue.shift()!;
task();
}
}
}

// 根据系统负载动态调整
const scheduler = new DynamicScheduler(5);
// 负载高时减少并发
scheduler.maxConcurrent = 2;
// 负载低时增加并发
scheduler.maxConcurrent = 10;

Q4: 如何实现一个支持优先级的任务调度器?

答案

支持优先级的调度器需要在任务入队时按优先级排序,高优先级任务在队列前面,会被优先调度执行。核心是用优先级队列(简单场景用排序数组,高性能场景用最小堆)。

interface PriorityTask<T = unknown> {
fn: () => Promise<T>;
priority: number; // 数字越大优先级越高
resolve: (value: T) => void;
reject: (reason: unknown) => void;
}

class PriorityScheduler {
private maxConcurrent: number;
private running = 0;
private queue: PriorityTask[] = [];

constructor(maxConcurrent: number) {
this.maxConcurrent = maxConcurrent;
}

add<T>(fn: () => Promise<T>, priority: number = 0): Promise<T> {
return new Promise<T>((resolve, reject) => {
const task: PriorityTask<T> = { fn, priority, resolve, reject };

if (this.running < this.maxConcurrent) {
this.run(task);
} else {
this.enqueue(task as PriorityTask);
}
});
}

// 按优先级插入队列(保持降序)
private enqueue(task: PriorityTask): void {
const index = this.queue.findIndex((t) => t.priority < task.priority);
if (index === -1) {
this.queue.push(task);
} else {
this.queue.splice(index, 0, task);
}
}

private run<T>(task: PriorityTask<T>): void {
this.running++;
task
.fn()
.then(task.resolve)
.catch(task.reject)
.finally(() => {
this.running--;
this.schedule();
});
}

private schedule(): void {
while (this.queue.length > 0 && this.running < this.maxConcurrent) {
const task = this.queue.shift()!;
this.run(task);
}
}
}

高性能版:最小堆实现优先级队列

当任务量很大时,数组插入排序是 O(n)O(n),使用最小堆可以将入队优化到 O(logn)O(\log n)

class MinHeap<T> {
private heap: T[] = [];
constructor(private compare: (a: T, b: T) => number) {}

push(val: T): void {
this.heap.push(val);
this.bubbleUp(this.heap.length - 1);
}

pop(): T | undefined {
if (this.heap.length === 0) return undefined;
const top = this.heap[0];
const last = this.heap.pop()!;
if (this.heap.length > 0) {
this.heap[0] = last;
this.sinkDown(0);
}
return top;
}

get size(): number {
return this.heap.length;
}

private bubbleUp(i: number): void {
while (i > 0) {
const parent = Math.floor((i - 1) / 2);
if (this.compare(this.heap[i], this.heap[parent]) >= 0) break;
[this.heap[i], this.heap[parent]] = [this.heap[parent], this.heap[i]];
i = parent;
}
}

private sinkDown(i: number): void {
const n = this.heap.length;
while (true) {
let smallest = i;
const left = 2 * i + 1;
const right = 2 * i + 2;
if (left < n && this.compare(this.heap[left], this.heap[smallest]) < 0)
smallest = left;
if (right < n && this.compare(this.heap[right], this.heap[smallest]) < 0)
smallest = right;
if (smallest === i) break;
[this.heap[i], this.heap[smallest]] = [this.heap[smallest], this.heap[i]];
i = smallest;
}
}
}

class HeapPriorityScheduler {
private maxConcurrent: number;
private running = 0;
// 优先级大的排前面(最大堆效果),compare 返回负数表示 a 优先
private queue = new MinHeap<PriorityTask>(
(a, b) => b.priority - a.priority
);

constructor(maxConcurrent: number) {
this.maxConcurrent = maxConcurrent;
}

add<T>(fn: () => Promise<T>, priority: number = 0): Promise<T> {
return new Promise<T>((resolve, reject) => {
const task: PriorityTask<T> = { fn, priority, resolve, reject };
this.queue.push(task as PriorityTask);
this.schedule();
});
}

private schedule(): void {
while (this.queue.size > 0 && this.running < this.maxConcurrent) {
const task = this.queue.pop()!;
this.running++;
task
.fn()
.then(task.resolve)
.catch(task.reject)
.finally(() => {
this.running--;
this.schedule();
});
}
}
}

使用示例

const scheduler = new PriorityScheduler(2);

scheduler.add(() => delay(1000, 'low'), 1); // 低优先级
scheduler.add(() => delay(1000, 'high'), 10); // 高优先级,优先执行
scheduler.add(() => delay(1000, 'medium'), 5); // 中优先级
scheduler.add(() => delay(1000, 'urgent'), 100); // 紧急任务,插队

function delay<T>(ms: number, value: T): Promise<T> {
return new Promise((r) => setTimeout(() => r(value), ms));
}
实现方式入队复杂度出队复杂度适用场景
排序数组O(n)O(n)O(1)O(1)任务量小
最小堆O(logn)O(\log n)O(logn)O(\log n)任务量大

Q5: 如何实现一个可取消的并发调度器?

答案

可取消调度器需要区分两种取消场景:取消排队中的任务(直接从队列移除)和取消正在执行的任务(通过 AbortController 通知任务中断)。

interface CancelToken {
promise: Promise<unknown>;
cancel: () => void;
}

class CancelableScheduler {
private maxConcurrent: number;
private running = 0;
private queue: Array<{
run: () => void;
id: symbol;
reject: (reason: unknown) => void;
}> = [];

constructor(maxConcurrent: number) {
this.maxConcurrent = maxConcurrent;
}

add<T>(
taskFn: (signal: AbortSignal) => Promise<T>
): CancelToken {
const taskId = Symbol();
const abortController = new AbortController();

const promise = new Promise<T>((resolve, reject) => {
const runTask = () => {
// 如果在排队期间已被取消
if (abortController.signal.aborted) {
this.running--;
this.schedule();
reject(new DOMException('Task canceled', 'AbortError'));
return;
}

// 执行任务,传入 signal 让任务内部可以响应取消
taskFn(abortController.signal)
.then(resolve)
.catch(reject)
.finally(() => {
this.running--;
this.schedule();
});
};

if (this.running < this.maxConcurrent) {
this.running++;
runTask();
} else {
this.queue.push({ run: runTask, id: taskId, reject });
}
});

const cancel = () => {
// 1. 尝试从队列中移除(取消排队中的任务)
const index = this.queue.findIndex((t) => t.id === taskId);
if (index !== -1) {
const [removed] = this.queue.splice(index, 1);
removed.reject(new DOMException('Task canceled', 'AbortError'));
return;
}

// 2. 任务已在执行,通过 AbortController 通知取消
abortController.abort();
};

return { promise, cancel };
}

private schedule(): void {
while (this.queue.length > 0 && this.running < this.maxConcurrent) {
const task = this.queue.shift()!;
this.running++;
task.run();
}
}

// 取消所有任务
cancelAll(): void {
// 清空队列
for (const task of this.queue) {
task.reject(new DOMException('All tasks canceled', 'AbortError'));
}
this.queue = [];
}
}

使用示例

const scheduler = new CancelableScheduler(2);

// 任务内部需要检查 signal 以支持取消
const task1 = scheduler.add(async (signal: AbortSignal) => {
const response = await fetch('/api/data', { signal });
return response.json();
});

const task2 = scheduler.add(async (signal: AbortSignal) => {
// 长时间计算中定期检查 signal
for (let i = 0; i < 1000000; i++) {
if (signal.aborted) {
throw new DOMException('Aborted', 'AbortError');
}
// ... 计算逻辑
}
return 'done';
});

// 取消单个任务
task1.cancel();

// 取消所有排队中的任务
scheduler.cancelAll();

// 处理取消
task1.promise.catch((err) => {
if (err.name === 'AbortError') {
console.log('任务被取消,资源已清理');
}
});
注意
  • 排队中的任务可以直接从队列移除,代价很小
  • 执行中的任务需要任务函数内部配合 AbortSignal 才能真正中断(如 fetch 原生支持 signal)
  • 取消后一定要做好资源清理,避免内存泄漏
  • 建议统一使用 DOMException('...', 'AbortError') 作为取消错误,便于通过 err.name === 'AbortError' 区分取消和真正的异常

相关链接