消息顺序性
问题
如何保证消息的顺序消费?全局有序和局部有序有什么区别?不同消息队列如何实现顺序消息?
答案
为什么会乱序
全局有序 vs 局部有序
| 类型 | 说明 | 代价 | 适用场景 |
|---|---|---|---|
| 全局有序 | 所有消息严格按发送顺序消费 | 只能单分区单消费者,吞吐量极低 | 几乎不用 |
| 局部有序 | 同一业务 key 的消息有序 | 同 key 路由到同一分区即可 | 绝大多数场景 |
面试核心结论
实际业务中需要的是 局部有序:同一订单的消息有序(创建→支付→发货),不同订单之间不需要有序。
各 MQ 的顺序消息方案
Kafka
Kafka 保证 Partition 内有序。同一 key 的消息发到同一 Partition 即可保证顺序。
Kafka 顺序消息
// Producer:指定 key,相同 key 路由到同一 Partition
producer.send(new ProducerRecord<>("order-topic", orderId, message));
// Kafka 默认分区策略:hash(key) % partitionCount
// 额外配置:防止重试导致乱序
props.put("enable.idempotence", true); // 开启幂等(自动保证单分区内有序)
// 或者
props.put("max.in.flight.requests.per.connection", 1); // 同时只有一个未确认请求
Consumer 端保证顺序:
- 一个 Partition 只分配给一个 Consumer(Consumer Group 自动保证)
- 如果消费端有多线程处理,需要按 key 分发到同一线程
Consumer 多线程有序消费
// 按业务 key hash 分发到固定线程
ExecutorService[] workers = new ExecutorService[threadCount];
for (ConsumerRecord<String, String> record : records) {
int index = Math.abs(record.key().hashCode()) % threadCount;
workers[index].submit(() -> processMessage(record));
}
RocketMQ
RocketMQ 原生支持顺序消息,提供 MessageQueueSelector 和 MessageListenerOrderly:
RocketMQ 顺序消息
// Producer:相同 orderId 发到同一 MessageQueue
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
String orderId = (String) arg;
int index = Math.abs(orderId.hashCode()) % mqs.size();
return mqs.get(index);
}
}, orderId);
// Consumer:使用 MessageListenerOrderly(单线程顺序消费每个 Queue)
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
processMessage(msg);
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
MessageListenerOrderly vs MessageListenerConcurrently
MessageListenerOrderly:每个 Queue 加锁,单线程顺序消费,保证顺序MessageListenerConcurrently:多线程并发消费,不保证顺序但吞吐量高
RabbitMQ
RabbitMQ 单个 Queue 内天然有序(FIFO),保证顺序的关键是:
- 消息发到 同一个 Queue
- 该 Queue 只有 一个 Consumer(或
prefetchCount=1串行处理)
RabbitMQ 顺序消费
// 设置 prefetchCount=1,一次只处理一条
channel.basicQos(1);
channel.basicConsume("order-queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
processMessage(body);
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
顺序消息方案对比
| 维度 | Kafka | RocketMQ | RabbitMQ |
|---|---|---|---|
| 有序粒度 | Partition 内 | MessageQueue 内 | Queue 内 |
| 实现方式 | key hash + 单 Partition 单 Consumer | MessageQueueSelector + OrderlyListener | 单 Queue 单 Consumer |
| 消费失败处理 | 手动处理 | 自动挂起当前 Queue 重试 | Nack + requeue |
| 吞吐量影响 | 中等 | 中等 | 较大 |
常见面试问题
Q1: 如何保证消息的顺序消费?
答案:
核心思路:同一业务 key 的消息路由到同一分区/队列,由同一消费者按序消费。
具体方案:
- Producer 端:按业务 key(如 orderId)hash 取模,发到同一分区
- Broker 端:分区内消息天然有序(append-only)
- Consumer 端:一个分区只分配给一个 Consumer;如果消费端多线程,按 key 分发到固定线程
Q2: 顺序消费时消费失败怎么办?
答案:
顺序消费场景下,消费失败不能简单跳过(否则后续消息依赖前一条的结果就会出错)。
处理策略:
- 阻塞重试:当前消息失败后持续重试,直到成功(RocketMQ OrderlyListener 的默认行为)
- 有限重试 + 告警:重试 N 次后跳过并告警,人工介入
- 暂存 + 补偿:失败消息存入错误表,后续消息继续消费,最后统一补偿
RocketMQ 的做法:消费失败时挂起当前 Queue,间隔一段时间后重试同一条消息。
Q3: 全局有序消息怎么实现?
答案:
将 Topic 的分区数设为 1,所有消息进入同一分区且只有一个 Consumer 消费。
代价极大:完全丧失并行能力,吞吐量取决于单个 Consumer 的处理速度。实际生产中几乎不用全局有序,而是用局部有序(按业务 key 分区)满足需求。
Q4: Kafka 的重试会导致消息乱序吗?
答案:
会。当 max.in.flight.requests.per.connection > 1 时,第一批消息发送失败需要重试,而第二批消息已经发送成功,就会导致乱序。
解决方案:
- 推荐:开启幂等
enable.idempotence=true(Kafka 0.11+),即使max.in.flight = 5也能保证单分区内有序 - 备选:将
max.in.flight.requests.per.connection设为 1,但会降低吞吐量