跳到主要内容

并发工具类

问题

CountDownLatch、CyclicBarrier、Semaphore 的区别和应用场景是什么?它们的底层实现原理是什么?

答案

CountDownLatch(倒计时门闩)

一次性的倒计时器,让一个或多个线程等待其他线程完成操作:

CountDownLatchExample.java
// 场景:主线程等待 3 个子任务全部完成
CountDownLatch latch = new CountDownLatch(3); // 计数器初始值 3

for (int i = 0; i < 3; i++) {
final int taskId = i;
new Thread(() -> {
try {
System.out.println("任务 " + taskId + " 执行中...");
Thread.sleep(1000 + taskId * 500);
System.out.println("任务 " + taskId + " 完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 计数器 -1
}
}).start();
}

System.out.println("主线程等待所有任务完成...");
latch.await(); // 阻塞直到计数器为 0
// latch.await(5, TimeUnit.SECONDS); // 超时等待
System.out.println("所有任务已完成!");

原理:基于 AQS 共享模式。state 初始化为 count,每次 countDown() 将 state 减 1,当 state 减到 0 时唤醒所有等待线程。

CountDownLatch 不可重用

CountDownLatch 是一次性的,计数器减到 0 后无法重置。如果需要重复使用,请用 CyclicBarrier。

典型场景

  • 主线程等待多个子任务完成(并行初始化)
  • 多个线程同时开始(将 count 设为 1,所有线程 await,主线程 countDown 后同时放行)

CyclicBarrier(循环栅栏)

让一组线程互相等待,全部到达栅栏点后一起继续执行,可循环使用

CyclicBarrierExample.java
// 场景:3 个线程各自准备数据,全部就绪后一起处理
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
// 所有线程到达后执行的回调(可选,由最后到达的线程执行)
System.out.println("所有线程已就绪,开始合并处理");
});

for (int i = 0; i < 3; i++) {
final int taskId = i;
new Thread(() -> {
try {
System.out.println("线程 " + taskId + " 准备数据中...");
Thread.sleep(1000 + taskId * 500);
System.out.println("线程 " + taskId + " 准备完毕,等待其他线程");
barrier.await(); // 等待其他线程到达栅栏
// 所有线程到达后继续执行
System.out.println("线程 " + taskId + " 继续执行");
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}).start();
}

原理:基于 ReentrantLock + Condition。内部维护一个计数器 count,每个线程调用 await() 时 count 减 1 并阻塞。最后一个线程(count = 0)唤醒所有线程,并重置计数器(可循环使用)。

典型场景

  • 多线程分段计算后汇总(MapReduce)
  • 多轮比赛,每轮所有选手就绪后同时开始

CountDownLatch vs CyclicBarrier

对比CountDownLatchCyclicBarrier
等待方一个或多个线程等待其他线程线程之间互相等待
计数变化countDown() 减计数await() 减计数
可重用不可(一次性)可循环使用reset()
回调支持(所有线程到达后执行)
底层AQS 共享模式ReentrantLock + Condition
异常无特殊处理BrokenBarrierException

Semaphore(信号量)

控制同时访问资源的线程数量,常用于限流:

SemaphoreExample.java
// 场景:限制同时最多 3 个线程访问资源(如数据库连接池)
Semaphore semaphore = new Semaphore(3); // 3 个许可

for (int i = 0; i < 10; i++) {
final int taskId = i;
new Thread(() -> {
try {
semaphore.acquire(); // 获取一个许可(阻塞)
// semaphore.tryAcquire(1, TimeUnit.SECONDS); // 超时获取
System.out.println("线程 " + taskId + " 获取到许可,正在执行...");
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // 释放许可
System.out.println("线程 " + taskId + " 释放许可");
}
}).start();
}

原理:基于 AQS 共享模式。state 表示可用许可数。acquire() 时 state 减 1,如果 state < 0 则阻塞。release() 时 state 加 1 并唤醒等待线程。

Semaphore(1) ≈ 互斥锁

当许可数为 1 时,Semaphore 等价于一个互斥锁。与 ReentrantLock 不同的是,Semaphore 可以由不同线程释放(不要求获取和释放是同一线程)。

典型场景

  • 限流(控制并发数)
  • 资源池(连接池、线程池的许可管理)

Exchanger(交换器)

两个线程之间交换数据

ExchangerExample.java
Exchanger<String> exchanger = new Exchanger<>();

new Thread(() -> {
try {
String data = "来自线程A的数据";
String received = exchanger.exchange(data); // 交换数据
System.out.println("线程A收到: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();

new Thread(() -> {
try {
String data = "来自线程B的数据";
String received = exchanger.exchange(data); // 交换数据
System.out.println("线程B收到: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// 输出:
// 线程A收到: 来自线程B的数据
// 线程B收到: 来自线程A的数据

典型场景:遗传算法中两个线程交换染色体数据、双缓冲技术。

Phaser(阶段器,JDK 7)

Phaser 是 CyclicBarrier 和 CountDownLatch 的增强版,支持动态注册/注销参与者多阶段同步:

PhaserExample.java
Phaser phaser = new Phaser(3); // 3 个参与者

for (int i = 0; i < 3; i++) {
final int id = i;
new Thread(() -> {
// 阶段 1
System.out.println("线程 " + id + " 完成阶段1");
phaser.arriveAndAwaitAdvance(); // 等待所有参与者到达

// 阶段 2
System.out.println("线程 " + id + " 完成阶段2");
phaser.arriveAndAwaitAdvance(); // 等待所有参与者到达

// 注销
phaser.arriveAndDeregister();
}).start();
}

常见面试问题

Q1: CountDownLatch 和 CyclicBarrier 的区别?

答案

核心区别在于等待关系可重用性

  • CountDownLatch:一个线程等待其他线程完成,一次性不可重用
  • CyclicBarrier:线程之间互相等待,可循环使用

CountDownLatch 的 countDown()await() 可以是不同线程调用,而 CyclicBarrier 的 await() 由参与者线程调用。

Q2: Semaphore 的使用场景?

答案

  1. 限流:控制接口的并发访问数量
  2. 资源池:管理有限资源(数据库连接、文件句柄)
  3. 生产者-消费者:空位信号量 + 产品信号量(但通常用 BlockingQueue 更方便)

Q3: Semaphore 是公平的还是非公平的?

答案

Semaphore 支持公平和非公平两种模式:

new Semaphore(3);            // 非公平(默认)
new Semaphore(3, true); // 公平

底层与 ReentrantLock 类似:公平模式先检查 AQS 队列是否有等待线程,非公平模式直接 CAS 竞争。

Q4: CountDownLatch 的 countDown() 和 await() 可以在不同线程调用吗?

答案

可以。CountDownLatch 的设计就是让 countDown()await() 分离:

  • 工作线程调用 countDown(),完成一个任务
  • 等待线程调用 await(),等待所有任务完成

一个线程也可以多次调用 countDown()

Q5: CyclicBarrier 的 BrokenBarrierException 何时抛出?

答案

以下情况会导致栅栏"损坏",其他等待线程收到 BrokenBarrierException

  1. 某个等待线程被中断
  2. 某个等待线程超时
  3. 栅栏的回调动作抛出异常
  4. 调用了 barrier.reset()

栅栏损坏后可以调用 reset() 重置。

Q6: 这些工具类的底层实现分别是什么?

答案

工具类底层实现
CountDownLatchAQS 共享模式,state = count
CyclicBarrierReentrantLock + Condition
SemaphoreAQS 共享模式,state = permits
ExchangerCAS + LockSupport.park/unpark
Phaser内部状态机 + CAS

相关链接