跳到主要内容

Fork/Join 框架

问题

Fork/Join 框架的工作原理是什么?什么是工作窃取算法?ForkJoinPool 和普通线程池有什么区别?

答案

Fork/Join 框架概述

Fork/Join 框架是 JDK 7 引入的一个并行执行框架,基于分治(Divide and Conquer) 思想:

  1. Fork:将大任务拆分为多个小任务,递归直到足够小
  2. Join:合并子任务的结果

核心类

说明
ForkJoinPool执行 ForkJoinTask 的线程池
ForkJoinTask<V>在 ForkJoinPool 中执行的任务基类
RecursiveTask<V>有返回值的 ForkJoinTask
RecursiveAction无返回值的 ForkJoinTask

使用示例:并行求和

ParallelSum.java
public class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10000; // 拆分阈值
private final long[] array;
private final int start;
private final int end;

public SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
int length = end - start;

// 基本情况:任务足够小,直接计算
if (length <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
}

// 分治:将任务拆分为两半
int mid = start + length / 2;
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);

// fork:异步执行左半部分
leftTask.fork();
// 当前线程直接执行右半部分(避免浪费当前线程)
Long rightResult = rightTask.compute();
// join:等待左半部分完成并获取结果
Long leftResult = leftTask.join();

// 合并结果
return leftResult + rightResult;
}
}

// 使用
ForkJoinPool pool = new ForkJoinPool(); // 默认线程数 = CPU 核数
long[] array = LongStream.rangeClosed(1, 10_000_000).toArray();
long result = pool.invoke(new SumTask(array, 0, array.length));
System.out.println("Sum = " + result);
fork/compute 顺序

推荐先 fork() 一个子任务,然后在当前线程 compute() 另一个子任务,最后 join() 等待 fork 的结果。这样可以充分利用当前线程,避免空等。

leftTask.fork();                    // 异步执行左任务
Long right = rightTask.compute(); // 当前线程执行右任务
Long left = leftTask.join(); // 等待左任务结果

工作窃取算法(Work-Stealing)

ForkJoinPool 中每个工作线程都有自己的双端队列(Deque)

工作流程

  1. 线程 fork 产生的子任务放入自己的 Deque 头部
  2. 线程从自己的 Deque 头部取任务执行(LIFO,优先处理最近的任务)
  3. 当自己的 Deque 为空时,从其他线程的 Deque 尾部窃取任务(FIFO)

为什么用双端队列?

  • 自己从头部取(LIFO):优先执行最近 fork 的大任务,便于递归分解
  • 窃取从尾部取(FIFO):窃取最老的大任务,可以继续分解,减少竞争

ForkJoinPool vs ThreadPoolExecutor

对比ForkJoinPoolThreadPoolExecutor
任务类型可分解的递归任务独立的异步任务
队列每线程一个 Deque共享一个 BlockingQueue
调度工作窃取从共享队列取任务
适用场景计算密集型、分治通用异步任务
线程数默认CPU 核数需手动指定
JDK 版本JDK 7JDK 5

ForkJoinPool.commonPool()

JDK 8 引入了一个静态共享的 ForkJoinPool

ForkJoinPool commonPool = ForkJoinPool.commonPool();
// 线程数 = Runtime.getRuntime().availableProcessors() - 1

以下功能默认使用 commonPool:

  • CompletableFuture.supplyAsync() / runAsync()
  • Stream.parallel()
commonPool 的风险

所有使用 commonPool 的代码共享同一个线程池,某个慢任务可能拖累所有其他任务。I/O 密集型任务不应使用 commonPool。

在生产环境中,建议为 CompletableFuture 传入自定义线程池。

parallelStream 与 Fork/Join

ParallelStream.java
// parallelStream 底层使用 ForkJoinPool.commonPool()
long sum = LongStream.rangeClosed(1, 10_000_000)
.parallel()
.sum();

// 如果需要使用自定义 ForkJoinPool
ForkJoinPool customPool = new ForkJoinPool(8);
long result = customPool.submit(() ->
LongStream.rangeClosed(1, 10_000_000)
.parallel()
.sum()
).get();

使用注意事项

Fork/Join 注意事项
  1. 避免在 ForkJoinTask 中阻塞 I/O:工作窃取算法假设任务是 CPU 密集型的
  2. 合理设置阈值:阈值太小导致任务拆分过多(开销大于收益),太大则并行度不够
  3. 避免在 ForkJoinTask 中使用 synchronized:可能导致工作窃取线程阻塞
  4. 先 fork 再 compute 再 join:充分利用当前线程

常见面试问题

Q1: 什么是工作窃取算法?

答案

工作窃取(Work-Stealing)是 ForkJoinPool 的核心调度策略。每个工作线程维护一个双端队列,从自己队列的头部取任务执行。当自己的队列为空时,从其他线程队列的尾部"窃取"任务执行。

好处:

  • 充分利用 CPU,减少线程空闲
  • 窃取从尾部取,自己从头部取,减少竞争
  • 窃取大粒度任务,可继续分解

Q2: Fork/Join 框架适用于什么场景?

答案

适用于可递归拆分计算密集型任务,典型场景:

  • 大数组排序/求和
  • 递归搜索(如盘点文件系统)
  • 矩阵运算
  • 图遍历

不适用于 I/O 密集型任务(阻塞会浪费工作线程)。

Q3: RecursiveTask 和 RecursiveAction 的区别?

答案

返回值方法场景
RecursiveTask<V>V compute()需要返回结果(如求和)
RecursiveActionvoid compute()不需要返回结果(如排序)

Q4: ForkJoinPool 和普通线程池的区别?

答案

  • 普通线程池(ThreadPoolExecutor):所有线程竞争一个共享队列,适合独立任务
  • ForkJoinPool:每个线程有独立的双端队列 + 工作窃取,适合可分解的递归任务

ForkJoinPool 在分治场景下能更高效地利用 CPU。

Q5: CompletableFuture 和 parallelStream 默认用什么线程池?有什么风险?

答案

都默认使用 ForkJoinPool.commonPool(),这是一个全局共享的线程池。

风险:

  1. 慢任务拖累所有使用 commonPool 的功能
  2. I/O 操作阻塞工作线程
  3. 在 Web 应用中,多个请求共享同一个 commonPool

建议:重要的异步任务使用自定义线程池

相关链接