阻塞队列
问题
BlockingQueue 有哪些实现?各自的特点和适用场景是什么?阻塞队列如何实现生产者-消费者模式?
答案
BlockingQueue 接口
BlockingQueue 扩展了 Queue 接口,增加了阻塞的入队/出队操作:
| 操作 | 抛异常 | 返回特殊值 | 阻塞 | 超时 |
|---|---|---|---|---|
| 插入 | add(e) | offer(e) → false | put(e) | offer(e, timeout, unit) |
| 移除 | remove() | poll() → null | take() | poll(timeout, unit) |
| 检查 | element() | peek() → null | - | - |
核心方法
- put():队列满时阻塞等待
- take():队列空时阻塞等待
这是阻塞队列的核心价值——自动实现生产者-消费者的等待/通知机制。
7 种 BlockingQueue 实现
1. ArrayBlockingQueue
基于数组的有界阻塞队列,FIFO 顺序:
ArrayBlockingQueue
// 必须指定容量
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
// 支持公平锁(默认非公平)
BlockingQueue<String> fairQueue = new ArrayBlockingQueue<>(100, true);
- 底层:数组 + 一把 ReentrantLock + 两个 Condition(notEmpty / notFull)
- 特点:容量固定,不能扩容,公平/非公平可选
- 适用:已知容量上限,内存敏感场景
2. LinkedBlockingQueue
基于单向链表的可选有界阻塞队列:
LinkedBlockingQueue
// 默认容量 Integer.MAX_VALUE(近似无界,慎用!)
BlockingQueue<String> unbounded = new LinkedBlockingQueue<>();
// 推荐指定容量
BlockingQueue<String> bounded = new LinkedBlockingQueue<>(1000);
- 底层:单向链表 + 两把锁(putLock + takeLock),生产和消费可并行
- 特点:吞吐量通常高于 ArrayBlockingQueue(双锁),不指定容量则近似无界
- 适用:吞吐量要求高的场景
Executors 中的隐患
Executors.newFixedThreadPool() 使用的就是不指定容量的 LinkedBlockingQueue(容量 = Integer.MAX_VALUE),可能任务堆积导致 OOM。详见 线程池。
3. SynchronousQueue
不存储元素的队列,每个入队操作必须等待一个出队操作:
SynchronousQueue
BlockingQueue<String> queue = new SynchronousQueue<>(); // 非公平
BlockingQueue<String> fairQueue = new SynchronousQueue<>(true); // 公平
// put 会阻塞,直到有线程 take
// 相当于生产者和消费者直接"交接"
- 底层:非公平模式用栈(TransferStack),公平模式用队列(TransferQueue)
- 特点:容量为 0,不存储元素,直接传递
- 适用:
Executors.newCachedThreadPool()使用此队列
4. PriorityBlockingQueue
基于堆的无界优先级阻塞队列:
PriorityBlockingQueue
// 元素必须实现 Comparable 或提供 Comparator
BlockingQueue<Task> queue = new PriorityBlockingQueue<>(11,
Comparator.comparingInt(Task::getPriority));
// 注意:只保证出队时取到优先级最高的,遍历不保证顺序
- 底层:数组实现的最小堆 + 一把 ReentrantLock
- 特点:无界(自动扩容),
put()永不阻塞,take()在空时阻塞 - 适用:任务有优先级的场景
5. DelayQueue
元素需要延迟一定时间后才能出队:
DelayQueueExample.java
public class DelayedTask implements Delayed {
private final String name;
private final long expireTime; // 到期时间
public DelayedTask(String name, long delay, TimeUnit unit) {
this.name = name;
this.expireTime = System.currentTimeMillis() + unit.toMillis(delay);
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS),
o.getDelay(TimeUnit.MILLISECONDS));
}
}
DelayQueue<DelayedTask> queue = new DelayQueue<>();
queue.put(new DelayedTask("任务A", 5, TimeUnit.SECONDS));
queue.put(new DelayedTask("任务B", 2, TimeUnit.SECONDS));
// take() 会按延迟时间顺序取出(先到期的先出)
DelayedTask task = queue.take(); // 2 秒后返回"任务B"
- 底层:PriorityQueue + 一把 ReentrantLock + Condition
- 适用:定时任务、缓存过期、订单超时
6. LinkedTransferQueue(JDK 7)
无界队列,融合了 SynchronousQueue 的直接传递和 LinkedBlockingQueue 的存储能力:
LinkedTransferQueue
LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();
// transfer:如果有消费者在等待则直接交给消费者,否则阻塞等待消费者
queue.transfer("data");
// tryTransfer:尝试直接交给消费者,没有消费者则返回 false(不阻塞)
queue.tryTransfer("data");
7. LinkedBlockingDeque
基于双向链表的可选有界双端阻塞队列:
LinkedBlockingDeque
LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<>(100);
deque.putFirst("头部插入");
deque.putLast("尾部插入");
deque.takeFirst(); // 从头部取
deque.takeLast(); // 从尾部取
- 适用:工作窃取算法(ForkJoinPool)
实现对比
| 队列 | 底层 | 边界 | 锁 | 特点 |
|---|---|---|---|---|
| ArrayBlockingQueue | 数组 | 有界 | 1 把锁 | 简单可靠 |
| LinkedBlockingQueue | 链表 | 可选 | 2 把锁 | 吞吐量高 |
| SynchronousQueue | 无存储 | 0 | CAS | 直接传递 |
| PriorityBlockingQueue | 堆 | 无界 | 1 把锁 | 优先级排序 |
| DelayQueue | 堆 | 无界 | 1 把锁 | 延时出队 |
| LinkedTransferQueue | 链表 | 无界 | CAS | 直接传递+存储 |
| LinkedBlockingDeque | 链表 | 可选 | 1 把锁 | 双端操作 |
生产者-消费者模式
ProducerConsumer.java
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
// 生产者
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 100; i++) {
queue.put("产品-" + i); // 队列满则阻塞
System.out.println("生产: 产品-" + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 消费者
Thread consumer = new Thread(() -> {
try {
while (true) {
String product = queue.take(); // 队列空则阻塞
System.out.println("消费: " + product);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
阻塞队列自动处理了生产者-消费者之间的等待/通知,不需要手动使用 wait/notify 或 Condition。
常见面试问题
Q1: ArrayBlockingQueue 和 LinkedBlockingQueue 的区别?
答案:
| 对比 | ArrayBlockingQueue | LinkedBlockingQueue |
|---|---|---|
| 底层 | 数组 | 单向链表 |
| 容量 | 必须指定(有界) | 可选,默认 Integer.MAX_VALUE |
| 锁 | 1 把锁(生产消费互斥) | 2 把锁(生产消费可并行) |
| 内存 | 预分配数组 | 动态创建节点 |
| GC | 无额外 GC 压力 | 节点创建/回收有 GC 开销 |
| 公平性 | 可选 | 不支持公平 |
| 吞吐量 | 较低 | 较高(双锁并行) |
Q2: SynchronousQueue 的特点?
答案:
SynchronousQueue 不存储元素(容量为 0),每次 put() 必须等待一个 take() 匹配。适合直接传递任务的场景(如 CachedThreadPool),吞吐量高。
公平模式用 TransferQueue(FIFO),非公平模式用 TransferStack(LIFO)。
Q3: put() 和 offer() 的区别?
答案:
- put():队列满时阻塞等待,直到有空间
- offer():队列满时立即返回 false,不阻塞
- offer(e, timeout, unit):队列满时等待指定时间,超时返回 false
Q4: 阻塞队列在线程池中的作用?
答案:
线程池中阻塞队列用于存放等待执行的任务。当核心线程数已满时,新提交的任务进入队列排队。队列的类型和大小直接影响线程池的行为:
- 无界队列:maximumPoolSize 无效(永远不会创建非核心线程),可能 OOM
- 有界队列:队列满后才会创建非核心线程,更可控
- SynchronousQueue:不存储任务,直接创建线程处理
Q5: DelayQueue 的应用场景?
答案:
- 定时任务调度:
ScheduledThreadPoolExecutor内部使用类似原理 - 缓存过期:缓存项到期后从 DelayQueue 取出并清理
- 订单超时关闭:下单后放入 DelayQueue,30 分钟后取出检查是否支付
- 限流:请求放入 DelayQueue,间隔一定时间才能取出
相关链接
- BlockingQueue - Java 17 API
- 线程池 - 阻塞队列在线程池中的应用
- Lock 接口与 AQS - Condition 是阻塞队列的底层机制
- 并发工具类 - 其他并发协作工具