设计实时通讯系统(IM)
问题
如何设计一个支持亿级用户的实时通讯系统(IM)?从长连接管理、消息投递模型、存储方案到已读回执与离线推送,请详细说明核心模块的设计思路与关键技术实现。
答案
IM 系统是互联网最复杂的分布式系统之一,需要同时解决 消息可靠投递、顺序一致性、多端同步 和 海量并发 四大核心难题。与弹幕系统"允许丢消息"不同,IM 系统对消息的可靠性和一致性有极高要求——一条消息都不能丢,顺序不能乱。
IM 系统的本质是:在不可靠的网络环境中,构建可靠的消息传输通道。所有架构设计都围绕这个目标展开。
一、需求分析
功能需求
| 模块 | 功能点 |
|---|---|
| 单聊 | 一对一私聊、文本/图片/语音/视频/文件等多种消息类型 |
| 群聊 | 多人群组、群公告、@提醒、群管理(禁言/踢人/权限) |
| 已读回执 | 单聊已读/未读、群聊已读人数统计 |
| 在线状态 | 在线/离线/忙碌等状态展示、最后在线时间 |
| 离线消息 | 用户离线时暂存消息,上线后拉取、推送通知 |
| 消息漫游 | 跨设备消息同步、历史消息云端存储、消息搜索 |
| 辅助功能 | 消息撤回/编辑、@提醒、表情回应、消息转发、引用回复 |
非功能需求
| 指标 | 目标 |
|---|---|
| 消息可靠投递 | 消息不丢失、不重复,至少一次(at-least-once)+ 客户端去重 |
| 顺序保证 | 单会话内消息严格有序,跨会话最终一致 |
| 低延迟 | 端到端投递延迟 < 200ms(同地域)、< 500ms(跨地域) |
| 高并发 | 支持千万级同时在线用户、百万级 QPS 消息收发 |
| 多端同步 | 手机、平板、PC、Web 多端消息实时同步 |
| 可扩展 | 支持端到端加密、消息搜索、大群(万人群)等扩展 |
与弹幕系统不同,IM 系统不允许消息丢失。即使网络抖动、服务宕机,也必须保证消息最终送达。这决定了 IM 系统需要复杂的 ACK 确认、重试、幂等去重等机制。
二、整体架构
架构分层说明
| 层级 | 职责 | 关键技术 |
|---|---|---|
| 接入层 | 长连接管理、协议解析、鉴权 | WebSocket、Nginx、SSL/TLS |
| 逻辑层 | 消息路由、状态管理、群组逻辑 | 微服务、gRPC、一致性哈希 |
| 存储层 | 消息持久化、缓存加速、全文检索 | MySQL/TiDB、Redis、Kafka、ES |
| 推送层 | 离线消息推送、消息聚合 | APNs、FCM、厂商推送通道 |
三、核心模块设计
3.1 长连接管理
IM 系统的基础是 长连接,它决定了消息能否实时送达。
协议选型
| 协议 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| WebSocket | 全双工、浏览器原生支持、生态成熟 | 协议开销比自定义协议大 | Web IM、跨平台 |
| TCP 长连接 | 开销最小、灵活定义协议 | 浏览器不支持、需自定义协议 | 原生 App |
| SSE | 简单、自动重连 | 单向(仅服务器→客户端) | 通知推送降级方案 |
| MQTT | 轻量、QoS 支持 | 需要 MQTT Broker | IoT 场景 |
Web 端使用 WebSocket + Protobuf 序列化,原生端使用 TCP 长连接 + 自定义二进制协议,降级方案使用 长轮询。
心跳与重连机制
多端同步
用户可能同时在手机、PC、Web 上登录。每个设备维护独立的长连接,服务端通过 设备路由表 将消息推送到所有在线设备。
设备路由表存储在 Redis 中:
interface DeviceRoute {
userId: string;
devices: {
deviceId: string;
platform: 'ios' | 'android' | 'web' | 'desktop';
gatewayId: string; // 连接所在的网关节点
connId: string; // 连接 ID
lastActive: number; // 最后活跃时间
}[];
}
// Redis 存储结构:Hash
// Key: route:{userId}
// Field: deviceId
// Value: JSON { gatewayId, connId, platform, lastActive }
3.2 消息投递模型
消息投递是 IM 系统最核心的模块,需要解决 扩散方式选择、顺序保证 和 可靠投递 三个关键问题。
写扩散 vs 读扩散
| 对比项 | 写扩散(推模式) | 读扩散(拉模式) |
|---|---|---|
| 原理 | 发送时写入每个接收者的收件箱 | 发送时只写一份,接收者从会话拉取 |
| 写放大 | N 个群成员 → 写 N 次 | 只写 1 次 |
| 读放大 | 每人只读自己收件箱 | 每人需拉取所有会话的消息 |
| 适用场景 | 单聊、小群(< 500 人) | 大群(> 500 人)、频道 |
| 代表产品 | 微信(私聊)、Twitter Timeline | Discord(频道)、Slack |
万人群如果用写扩散,发一条消息要写入一万个收件箱,写放大严重。因此大群必须使用读扩散或混合模式。
Timeline 模型(推荐)
Timeline 模型是目前业界主流的消息同步方案(钉钉、阿里云 IM 采用),结合了写扩散和读扩散的优势:
核心思想:每个用户维护一条 Timeline(时间线),记录需要同步的消息索引。消息实体只存一份(读扩散),但每个用户有独立的同步游标(写扩散索引)。
// 消息实体(全局存储,每条消息只存一份)
interface Message {
msgId: string; // 全局唯一消息 ID(Snowflake)
conversationId: string; // 会话 ID
senderId: string;
content: string;
type: 'text' | 'image' | 'voice' | 'video' | 'file';
timestamp: number;
seqId: number; // 会话内自增序列号(保证会话内有序)
}
// 用户 Timeline(同步索引)
interface TimelineEntry {
userId: string;
syncId: number; // 用户维度的自增 ID(全局递增)
msgId: string; // 指向消息实体
conversationId: string;
}
同步流程:客户端记住上次同步的 syncId,每次连接或定期拉取 syncId > lastSyncId 的增量消息。
消息 ACK 与可靠投递
IM 消息投递采用 三次握手 保证可靠性:
由于网络重传、服务端重推等原因,客户端可能收到重复消息。必须基于 msgId 做 幂等去重,保证同一消息只展示一次。
3.3 消息存储
消息表设计
// 消息主表(按会话分表)
interface MessageTable {
msg_id: string; // 主键,Snowflake ID
conversation_id: string; // 会话 ID(索引)
seq_id: number; // 会话内自增序列号(索引)
sender_id: string; // 发送者 ID
msg_type: number; // 消息类型:1=文本 2=图片 3=语音 ...
content: string; // 消息内容(加密存储)
status: number; // 0=正常 1=撤回 2=删除
created_at: number; // 发送时间戳
extra: string; // 扩展字段 JSON(@列表、引用等)
}
// 用户同步表(Timeline 索引)
interface SyncTable {
id: number; // 自增主键
user_id: string; // 用户 ID(索引)
sync_id: number; // 用户维度递增序列号
msg_id: string; // 消息 ID
conversation_id: string; // 会话 ID
}
// 会话表
interface ConversationTable {
conversation_id: string; // 会话 ID
type: number; // 1=单聊 2=群聊
last_msg_id: string; // 最后一条消息 ID
last_msg_preview: string; // 最后一条消息摘要
updated_at: number; // 最后更新时间(排序用)
}
索引策略
| 查询场景 | 索引设计 |
|---|---|
| 拉取会话消息列表 | (conversation_id, seq_id) 联合索引 |
| 用户增量同步 | (user_id, sync_id) 联合索引 |
| 消息去重 | msg_id 唯一索引 |
| 会话列表排序 | (user_id, updated_at) 联合索引 |
分库分表策略
- 单聊:
conversation_id = min(userA, userB) + "_" + max(userA, userB),保证两人会话 ID 唯一 - 群聊:
conversation_id = "group_" + groupId - 分表数量:根据数据量通常 256 或 1024 张表
3.4 已读回执与在线状态
单聊已读回执
单聊已读相对简单,只需记录每个会话中对方已读的最后一条消息的 seqId:
// Redis 存储
// Key: read:{conversationId}:{userId}
// Value: lastReadSeqId
interface ReadReceipt {
conversationId: string;
userId: string;
lastReadSeqId: number; // 该用户在此会话中已读的最大 seqId
timestamp: number;
}
群已读回执优化
群聊已读回执的挑战是:N 个群成员 x M 条消息 = N*M 条已读记录,存储和查询开销极大。
优化方案:Bitmap 存储
// 方案一:精确到消息级别(适合小群)
// Redis Bitmap: read:{conversationId}:{msgSeqId} → Bitmap
// 每个 bit 代表一个群成员是否已读
// 方案二:只记录每人已读的最大 seqId(适合大群)
// Redis Hash: group_read:{conversationId}
// Field: userId
// Value: lastReadSeqId
interface GroupReadStrategy {
// 小群(< 100 人):精确统计每条消息的已读人列表
smallGroup: {
method: 'bitmap';
// 已读人数 = popcount(bitmap)
// 已读人列表 = 遍历 bitmap 中为 1 的位
};
// 大群(> 100 人):只统计已读人数,不列出已读列表
largeGroup: {
method: 'counter';
// 每条消息维护一个已读计数器
// 不支持查看具体已读人列表
};
}
微信群聊不显示已读人数,只有企业微信支持群已读。这是因为大群的已读回执存储成本太高。钉钉通过 Bitmap + 分层策略解决了这个问题。
在线状态管理
// 在线状态存储(Redis)
// Key: presence:{userId}
// Value: Hash { status, lastSeen, devices }
type UserStatus = 'online' | 'offline' | 'busy' | 'away';
interface PresenceInfo {
userId: string;
status: UserStatus;
lastSeen: number; // 最后在线时间戳
devices: string[]; // 在线设备列表
}
// 状态变更发布到 Redis Pub/Sub
// Channel: presence_change
// 只推送给订阅了该用户状态的人(好友列表)
用户突然断网时,服务端需要通过心跳超时来判断离线。这意味着在线状态有 30-90 秒的延迟。不要依赖在线状态做业务逻辑判断。
3.5 离线推送
当用户所有设备都不在线时,消息需要通过 厂商推送通道 送达。
消息聚合策略
短时间内收到多条消息时,避免频繁推送通知:
interface PushConfig {
// 聚合窗口:5 秒内的消息合并为一条推送
aggregateWindowMs: 5000;
// 推送频率限制:每分钟最多 10 条
rateLimitPerMinute: 10;
// 推送内容策略
contentStrategy: {
singleMessage: '张三: 消息内容'; // 单条消息显示内容
multiMessage: '张三发送了 3 条消息'; // 多条消息合并
multiConversation: '你有 5 个会话的新消息'; // 多会话合并
};
// 免打扰:特定会话/时间段不推送
muteConfig: {
mutedConversations: string[];
quietHours: { start: '23:00'; end: '07:00' };
};
}
四、关键技术实现
4.1 IMClient 核心类
interface IMClientConfig {
url: string;
token: string;
heartbeatInterval?: number;
reconnectMaxRetries?: number;
}
interface IMMessage {
msgId: string;
clientMsgId: string;
conversationId: string;
senderId: string;
content: string;
type: 'text' | 'image' | 'voice' | 'video' | 'file';
timestamp: number;
seqId: number;
}
type IMEvent =
| 'connected'
| 'disconnected'
| 'message'
| 'message:ack'
| 'message:read'
| 'presence:change'
| 'conversation:update';
class IMClient {
private conn: ConnectionManager;
private msgManager: MessageManager;
private syncManager: SyncManager;
private listeners = new Map<string, Set<Function>>();
constructor(private config: IMClientConfig) {
this.conn = new ConnectionManager(config);
this.msgManager = new MessageManager(this.conn);
this.syncManager = new SyncManager(this.conn);
this.setupEventForwarding();
}
/** 连接服务器 */
async connect(): Promise<void> {
await this.conn.connect();
// 连接成功后,同步离线消息
await this.syncManager.syncOfflineMessages();
}
/** 发送消息 */
async sendMessage(
conversationId: string,
content: string,
type: IMMessage['type'] = 'text'
): Promise<IMMessage> {
return this.msgManager.send(conversationId, content, type);
}
/** 标记已读 */
async markRead(conversationId: string, seqId: number): Promise<void> {
return this.msgManager.markRead(conversationId, seqId);
}
/** 监听事件 */
on(event: IMEvent, callback: Function): () => void {
if (!this.listeners.has(event)) {
this.listeners.set(event, new Set());
}
this.listeners.get(event)!.add(callback);
return () => this.listeners.get(event)?.delete(callback);
}
/** 断开连接 */
disconnect(): void {
this.conn.disconnect();
}
private setupEventForwarding(): void {
this.conn.on('message', (data: IMMessage) => {
this.emit('message', data);
});
this.conn.on('connected', () => this.emit('connected'));
this.conn.on('disconnected', () => this.emit('disconnected'));
}
private emit(event: string, ...args: unknown[]): void {
this.listeners.get(event)?.forEach((cb) => cb(...args));
}
}
4.2 ConnectionManager(连接管理)
class ConnectionManager {
private ws: WebSocket | null = null;
private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
private reconnectAttempts = 0;
private readonly maxReconnectAttempts: number;
private readonly heartbeatInterval: number;
private listeners = new Map<string, Set<Function>>();
private pendingAcks = new Map<string, {
resolve: Function;
reject: Function;
timer: ReturnType<typeof setTimeout>;
}>();
constructor(private config: IMClientConfig) {
this.maxReconnectAttempts = config.reconnectMaxRetries ?? 10;
this.heartbeatInterval = config.heartbeatInterval ?? 30000;
}
async connect(): Promise<void> {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(`${this.config.url}?token=${this.config.token}`);
this.ws.onopen = () => {
this.reconnectAttempts = 0;
this.startHeartbeat();
this.emit('connected');
resolve();
};
this.ws.onmessage = (event: MessageEvent) => {
const packet = JSON.parse(event.data) as Packet;
this.handlePacket(packet);
};
this.ws.onclose = () => {
this.stopHeartbeat();
this.emit('disconnected');
this.attemptReconnect();
};
this.ws.onerror = () => reject(new Error('WebSocket connection failed'));
});
}
/** 发送数据包并等待 ACK */
async sendWithAck(packet: Packet, timeoutMs = 5000): Promise<Packet> {
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
this.pendingAcks.delete(packet.id);
reject(new Error(`ACK timeout for packet ${packet.id}`));
}, timeoutMs);
this.pendingAcks.set(packet.id, { resolve, reject, timer });
this.ws?.send(JSON.stringify(packet));
});
}
disconnect(): void {
this.stopHeartbeat();
this.ws?.close();
this.ws = null;
}
on(event: string, callback: Function): void {
if (!this.listeners.has(event)) {
this.listeners.set(event, new Set());
}
this.listeners.get(event)!.add(callback);
}
/** 指数退避重连 */
private attemptReconnect(): void {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
this.emit('reconnect:failed');
return;
}
const delay = Math.min(
1000 * Math.pow(2, this.reconnectAttempts) + Math.random() * 1000,
30000 // 最大 30 秒
);
setTimeout(() => {
this.reconnectAttempts++;
this.connect().catch(() => this.attemptReconnect());
}, delay);
}
private startHeartbeat(): void {
this.heartbeatTimer = setInterval(() => {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: 'ping', timestamp: Date.now() }));
}
}, this.heartbeatInterval);
}
private stopHeartbeat(): void {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
private handlePacket(packet: Packet): void {
switch (packet.type) {
case 'pong':
break; // 心跳响应
case 'ack':
this.handleAck(packet);
break;
case 'message':
this.emit('message', packet.data);
// 发送接收 ACK
this.ws?.send(JSON.stringify({ type: 'ack', id: packet.id }));
break;
case 'read_receipt':
this.emit('read_receipt', packet.data);
break;
default:
this.emit(packet.type, packet.data);
}
}
private handleAck(packet: Packet): void {
const pending = this.pendingAcks.get(packet.ackId!);
if (pending) {
clearTimeout(pending.timer);
this.pendingAcks.delete(packet.ackId!);
pending.resolve(packet);
}
}
private emit(event: string, ...args: unknown[]): void {
this.listeners.get(event)?.forEach((cb) => cb(...args));
}
}
interface Packet {
id: string;
type: string;
ackId?: string;
data?: unknown;
timestamp: number;
}
4.3 MessageManager(消息管理)
class MessageManager {
private receivedMsgIds = new Set<string>(); // 消息去重
private localCache: MessageCache;
constructor(private conn: ConnectionManager) {
this.localCache = new MessageCache();
this.setupMessageHandler();
}
/** 发送消息 */
async send(
conversationId: string,
content: string,
type: IMMessage['type']
): Promise<IMMessage> {
const clientMsgId = this.generateClientMsgId();
// 1. 先写入本地(乐观更新)
const localMsg: IMMessage = {
msgId: '', // 服务端分配
clientMsgId,
conversationId,
senderId: 'self',
content,
type,
timestamp: Date.now(),
seqId: 0, // 服务端分配
};
this.localCache.addPending(localMsg);
// 2. 发送并等待 ACK
try {
const ackPacket = await this.conn.sendWithAck({
id: clientMsgId,
type: 'message',
data: { conversationId, content, type },
timestamp: Date.now(),
});
// 3. ACK 成功,更新本地消息
const serverData = ackPacket.data as { msgId: string; seqId: number };
const confirmedMsg: IMMessage = {
...localMsg,
msgId: serverData.msgId,
seqId: serverData.seqId,
};
this.localCache.confirmPending(clientMsgId, confirmedMsg);
return confirmedMsg;
} catch (error) {
// ACK 超时,标记发送失败,支持重试
this.localCache.markFailed(clientMsgId);
throw error;
}
}
/** 标记已读 */
async markRead(conversationId: string, seqId: number): Promise<void> {
await this.conn.sendWithAck({
id: this.generateClientMsgId(),
type: 'read_receipt',
data: { conversationId, seqId },
timestamp: Date.now(),
});
}
/** 接收消息处理(含去重) */
private setupMessageHandler(): void {
this.conn.on('message', (msg: IMMessage) => {
// 幂等去重:同一消息只处理一次
if (this.receivedMsgIds.has(msg.msgId)) {
return;
}
this.receivedMsgIds.add(msg.msgId);
// 写入本地缓存
this.localCache.insert(msg);
});
}
private generateClientMsgId(): string {
return `${Date.now()}_${Math.random().toString(36).slice(2, 10)}`;
}
}
4.4 SyncManager(消息同步)
class SyncManager {
private lastSyncId: number = 0; // 上次同步的位点
constructor(private conn: ConnectionManager) {
this.lastSyncId = this.loadSyncIdFromLocal();
}
/** 上线后同步离线消息 */
async syncOfflineMessages(): Promise<IMMessage[]> {
const allMessages: IMMessage[] = [];
let hasMore = true;
// 分页拉取增量消息
while (hasMore) {
const response = await this.conn.sendWithAck({
id: `sync_${Date.now()}`,
type: 'sync',
data: {
lastSyncId: this.lastSyncId,
limit: 200, // 每次拉取 200 条
},
timestamp: Date.now(),
});
const { messages, nextSyncId, more } = response.data as {
messages: IMMessage[];
nextSyncId: number;
more: boolean;
};
allMessages.push(...messages);
this.lastSyncId = nextSyncId;
this.saveSyncIdToLocal(nextSyncId);
hasMore = more;
}
return allMessages;
}
private loadSyncIdFromLocal(): number {
const stored = localStorage.getItem('im_last_sync_id');
return stored ? parseInt(stored, 10) : 0;
}
private saveSyncIdToLocal(syncId: number): void {
localStorage.setItem('im_last_sync_id', String(syncId));
}
}
4.5 MessageQueue(客户端消息队列)
/**
* 客户端消息发送队列
* 保证消息按顺序发送,失败自动重试
*/
class MessageSendQueue {
private queue: QueueItem[] = [];
private processing = false;
private readonly maxRetries = 3;
constructor(private msgManager: MessageManager) {}
/** 入队发送 */
enqueue(
conversationId: string,
content: string,
type: IMMessage['type']
): Promise<IMMessage> {
return new Promise((resolve, reject) => {
this.queue.push({
conversationId,
content,
type,
retries: 0,
resolve,
reject,
});
this.process();
});
}
/** 按序处理队列 */
private async process(): Promise<void> {
if (this.processing || this.queue.length === 0) return;
this.processing = true;
while (this.queue.length > 0) {
const item = this.queue[0];
try {
const msg = await this.msgManager.send(
item.conversationId,
item.content,
item.type
);
this.queue.shift();
item.resolve(msg);
} catch (error) {
item.retries++;
if (item.retries >= this.maxRetries) {
this.queue.shift();
item.reject(error);
} else {
// 指数退避等待后重试
await this.delay(1000 * Math.pow(2, item.retries));
}
}
}
this.processing = false;
}
private delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}
interface QueueItem {
conversationId: string;
content: string;
type: IMMessage['type'];
retries: number;
resolve: (msg: IMMessage) => void;
reject: (error: unknown) => void;
}
五、性能优化
5.1 消息压缩(Protobuf)
JSON 序列化体积大、解析慢。IM 系统推荐使用 Protocol Buffers 作为消息序列化协议:
| 对比项 | JSON | Protobuf |
|---|---|---|
| 体积 | 100% 基准 | 约 30-50%(体积减少 50-70%) |
| 编解码速度 | 基准 | 快 2-10 倍 |
| 类型安全 | 无 | 强类型 Schema |
| 可读性 | 人类可读 | 二进制不可读 |
| 兼容性 | 通用 | 需要 protoc 工具链 |
// .proto 定义
// message IMPacket {
// string id = 1;
// PacketType type = 2;
// bytes payload = 3;
// int64 timestamp = 4;
// }
// 使用 protobufjs 在前端编解码
import { IMPacket } from './generated/proto';
// 编码
const buffer: Uint8Array = IMPacket.encode({
id: 'msg_001',
type: PacketType.MESSAGE,
payload: new TextEncoder().encode(JSON.stringify(content)),
timestamp: Date.now(),
}).finish();
// 通过 WebSocket 发送二进制
ws.send(buffer);
// 解码
ws.onmessage = (event: MessageEvent<ArrayBuffer>) => {
const packet = IMPacket.decode(new Uint8Array(event.data));
};
5.2 本地缓存(IndexedDB)
消息历史存储在 IndexedDB 中,减少网络请求,加速消息列表渲染:
class MessageCache {
private dbPromise: Promise<IDBDatabase>;
constructor() {
this.dbPromise = this.openDB();
}
private openDB(): Promise<IDBDatabase> {
return new Promise((resolve, reject) => {
const request = indexedDB.open('im_cache', 1);
request.onupgradeneeded = (event) => {
const db = (event.target as IDBOpenDBRequest).result;
// 消息存储
const msgStore = db.createObjectStore('messages', { keyPath: 'msgId' });
msgStore.createIndex('conversation_seq', ['conversationId', 'seqId']);
// 会话存储
db.createObjectStore('conversations', { keyPath: 'conversationId' });
};
request.onsuccess = () => resolve(request.result);
request.onerror = () => reject(request.error);
});
}
/** 插入消息 */
async insert(msg: IMMessage): Promise<void> {
const db = await this.dbPromise;
const tx = db.transaction('messages', 'readwrite');
tx.objectStore('messages').put(msg);
}
/** 查询会话消息(分页) */
async queryMessages(
conversationId: string,
beforeSeqId: number,
limit = 20
): Promise<IMMessage[]> {
const db = await this.dbPromise;
const tx = db.transaction('messages', 'readonly');
const index = tx.objectStore('messages').index('conversation_seq');
// 使用游标反向遍历(最新消息在前)
const range = IDBKeyRange.bound(
[conversationId, 0],
[conversationId, beforeSeqId],
false,
true
);
return new Promise((resolve) => {
const results: IMMessage[] = [];
const cursor = index.openCursor(range, 'prev');
cursor.onsuccess = (event) => {
const cur = (event.target as IDBRequest<IDBCursorWithValue>).result;
if (cur && results.length < limit) {
results.push(cur.value);
cur.continue();
} else {
resolve(results);
}
};
});
}
/** 乐观更新:先插入待发送消息 */
addPending(msg: IMMessage): void {
this.insert(msg);
}
/** 发送成功后确认 */
async confirmPending(clientMsgId: string, confirmedMsg: IMMessage): Promise<void> {
const db = await this.dbPromise;
const tx = db.transaction('messages', 'readwrite');
const store = tx.objectStore('messages');
store.delete(clientMsgId);
store.put(confirmedMsg);
}
/** 标记发送失败 */
async markFailed(clientMsgId: string): Promise<void> {
const db = await this.dbPromise;
const tx = db.transaction('messages', 'readwrite');
const store = tx.objectStore('messages');
const msg = await new Promise<IMMessage>((resolve) => {
store.get(clientMsgId).onsuccess = (e) =>
resolve((e.target as IDBRequest).result);
});
if (msg) {
(msg as IMMessage & { status: string }).status = 'failed';
store.put(msg);
}
}
}
5.3 消息列表虚拟滚动
IM 聊天窗口可能有成千上万条消息,使用 虚拟列表 只渲染可见区域:
import { useRef, useState, useCallback, useEffect } from 'react';
interface VirtualListConfig {
itemHeight: number; // 预估消息高度
overscan: number; // 上下额外渲染条数
containerHeight: number; // 容器高度
}
function useVirtualMessageList(
messages: IMMessage[],
config: VirtualListConfig
) {
const [scrollTop, setScrollTop] = useState(0);
const containerRef = useRef<HTMLDivElement>(null);
const startIndex = Math.max(
0,
Math.floor(scrollTop / config.itemHeight) - config.overscan
);
const endIndex = Math.min(
messages.length,
Math.ceil((scrollTop + config.containerHeight) / config.itemHeight) + config.overscan
);
const visibleMessages = messages.slice(startIndex, endIndex);
const totalHeight = messages.length * config.itemHeight;
const offsetY = startIndex * config.itemHeight;
const handleScroll = useCallback((e: React.UIEvent<HTMLDivElement>) => {
setScrollTop(e.currentTarget.scrollTop);
}, []);
// 滚动到底部(新消息到达时)
const scrollToBottom = useCallback(() => {
if (containerRef.current) {
containerRef.current.scrollTop = containerRef.current.scrollHeight;
}
}, []);
return {
containerRef,
visibleMessages,
totalHeight,
offsetY,
startIndex,
handleScroll,
scrollToBottom,
};
}
5.4 图片与文件优化
| 优化策略 | 实现方式 |
|---|---|
| 缩略图 | 上传后服务端生成多尺寸缩略图,列表展示小图 |
| 懒加载 | 图片消息使用 IntersectionObserver 进入视口后加载 |
| 断点续传 | 大文件分片上传,断网后从断点继续 |
| 预览优化 | 图片先显示模糊占位(BlurHash),加载完成后替换 |
| CDN 加速 | 静态资源走 CDN,就近访问 |
六、扩展设计
6.1 端到端加密(E2EE)
基于 Signal Protocol 实现端到端加密,服务端无法解密消息内容:
核心流程:
- 密钥生成:每个用户生成身份密钥(Identity Key)、签名预密钥(Signed Pre Key)和一次性预密钥(One-Time Pre Key)
- X3DH 协商:首次通信时通过扩展三方 Diffie-Hellman 协商会话密钥
- Double Ratchet:每条消息使用不同的加密密钥(前向安全 + 后向安全)
- 服务端无法做消息搜索和内容审核
- 换设备后历史消息无法恢复(除非备份密钥)
- 群聊加密复杂度高(每个成员需要单独加密)
6.2 消息撤回与编辑
// 消息撤回
interface RecallMessage {
type: 'recall';
originalMsgId: string;
timeLimit: 120000; // 2 分钟内可撤回
}
// 消息编辑
interface EditMessage {
type: 'edit';
originalMsgId: string;
newContent: string;
editedAt: number;
editHistory: { content: string; editedAt: number }[]; // 编辑历史
}
// 服务端处理
// 1. 校验发送者身份和时间限制
// 2. 更新消息状态(撤回)或内容(编辑)
// 3. 向所有会话参与者推送撤回/编辑通知
// 4. 客户端收到通知后更新本地消息
6.3 @提醒
interface MentionInfo {
// @all 或 @特定用户
type: 'all' | 'specific';
userIds: string[]; // 被 @的用户 ID 列表
positions: { // @文本在消息中的位置
start: number;
end: number;
userId: string;
displayName: string;
}[];
}
// 存储在消息的 extra 字段中
// 被 @的用户即使开启了免打扰也会收到推送通知
6.4 消息搜索
// 前端本地搜索(IndexedDB 全文搜索)
async function localSearch(keyword: string): Promise<IMMessage[]> {
// IndexedDB 不支持全文索引,需要遍历
// 适合小规模数据搜索
const db = await openDB();
const tx = db.transaction('messages', 'readonly');
const store = tx.objectStore('messages');
const results: IMMessage[] = [];
return new Promise((resolve) => {
store.openCursor().onsuccess = (event) => {
const cursor = (event.target as IDBRequest<IDBCursorWithValue>).result;
if (cursor) {
if (cursor.value.content?.includes(keyword)) {
results.push(cursor.value);
}
cursor.continue();
} else {
resolve(results);
}
};
});
}
// 服务端全文搜索(Elasticsearch)
// POST /api/messages/search
// { keyword: "xxx", conversationId?: "xxx", timeRange?: [start, end] }
// 返回匹配的消息列表 + 高亮片段
常见面试问题
Q1: IM 消息如何保证不丢失?
答案:
通过 三级可靠性保障 确保消息不丢失:
第一级:发送方 ACK 确认
// 发送方发送消息后等待服务端 ACK
// 超时未收到 ACK → 自动重发(最多 3 次)
async function reliableSend(msg: IMMessage): Promise<void> {
for (let retry = 0; retry < 3; retry++) {
try {
await sendWithAck(msg, 5000); // 5 秒超时
return; // ACK 成功
} catch {
// 超时重试,使用相同的 clientMsgId(服务端幂等去重)
}
}
throw new Error('发送失败');
}
第二级:服务端持久化 + 接收方 ACK
服务端收到消息后先持久化到数据库,再推送给接收方。如果接收方未回 ACK,服务端会在接收方下次上线时重推。
第三级:客户端同步兜底
客户端定期或重连时,通过 Timeline 的 syncId 增量拉取所有未同步的消息,作为最后兜底。
| 层级 | 机制 | 保障范围 |
|---|---|---|
| 发送方 | ACK + 重试 | 消息到达服务端 |
| 服务端 | 持久化 + 重推 | 消息到达接收端 |
| 接收方 | 增量同步 | 补全任何遗漏 |
Q2: 写扩散和读扩散如何选择?
答案:
| 维度 | 写扩散 | 读扩散 |
|---|---|---|
| 写入 | 每个接收者一份(写放大) | 只写一份 |
| 读取 | 只读自己收件箱 | 拉取所有会话(读放大) |
| 适用场景 | 单聊、小群(< 500) | 大群、频道 |
| 存储成本 | 高(N 份副本) | 低(仅一份) |
| 实时性 | 高(直接推送到收件箱) | 中(需要拉取) |
实践建议:混合模式。
function chooseDispatchStrategy(conversation: Conversation): 'write' | 'read' {
if (conversation.type === 'single') {
return 'write'; // 单聊用写扩散
}
if (conversation.memberCount <= 500) {
return 'write'; // 小群用写扩散
}
return 'read'; // 大群用读扩散
}
// Timeline 模型更进一步:
// 消息实体用读扩散(只存一份),同步索引用写扩散(每人一条 timeline 记录)
// 这是当前业界最佳实践
Q3: 如何保证消息的顺序性?
答案:
消息乱序的根因是 网络延迟不一致 和 多服务器时钟不同步。解决方案:
- 服务端分配自增序列号(seqId):每个会话维护一个自增计数器,消息到达服务端时分配 seqId
- 客户端按 seqId 排序:不依赖客户端时间戳
// 服务端:使用 Redis INCRBY 生成会话级 seqId
// INCRBY conv_seq:{conversationId} 1
// 客户端:消息列表始终按 seqId 排序
function sortMessages(messages: IMMessage[]): IMMessage[] {
return messages.sort((a, b) => a.seqId - b.seqId);
}
// 检测消息缺口
function detectGap(messages: IMMessage[]): number[] {
const missing: number[] = [];
for (let i = 1; i < messages.length; i++) {
const expected = messages[i - 1].seqId + 1;
if (messages[i].seqId !== expected) {
for (let seq = expected; seq < messages[i].seqId; seq++) {
missing.push(seq);
}
}
}
return missing; // 返回缺失的 seqId,需要向服务端补拉
}
客户端本地时间可能不准确(用户手动调整过),不同客户端的时间戳无法直接比较。必须使用 服务端统一分配的序列号 作为排序依据。
Q4: 万人大群的消息怎么设计?
答案:
万人大群面临三大挑战:写扩散爆炸、已读回执存储爆炸、在线推送风暴。
interface LargeGroupConfig {
// 1. 消息投递:读扩散
dispatchMode: 'read'; // 消息只存一份,群成员主动拉取
// 2. 已读回执:只统计数量,不记录列表
readReceipt: {
mode: 'counter'; // 原子计数器,不记录具体谁已读
// INCR group_read_count:{conversationId}:{seqId}
};
// 3. 推送策略:分批推送 + 限流
pushStrategy: {
batchSize: 500; // 每批推送 500 人
intervalMs: 100; // 批次间隔 100ms
prioritize: 'active_first'; // 活跃用户优先推送
};
// 4. 消息频率限制
rateLimit: {
userLimit: 5; // 每人每 10 秒最多发 5 条
groupLimit: 100; // 全群每秒最多 100 条
// 超出限制排队或丢弃
};
}
| 优化策略 | 说明 |
|---|---|
| 读扩散 | 消息只存一份,避免写入放大 |
| 分批推送 | 避免瞬间推送万条 WebSocket 消息 |
| 频率限制 | 防止消息刷屏,保证用户体验 |
| 已读计数器 | 替代精确已读列表,降低存储成本 |
| 消息分页拉取 | 用户打开群聊时按需加载 |
Q5: 前端如何实现离线消息同步?
答案:
离线消息同步的核心是 增量拉取,基于 Timeline 模型的 syncId 实现:
class OfflineSync {
// 同步流程
async sync(): Promise<void> {
// 1. 从 IndexedDB 读取上次同步位点
const lastSyncId = await this.getLocalSyncId();
// 2. 向服务端请求增量消息
let currentSyncId = lastSyncId;
let hasMore = true;
while (hasMore) {
const response = await fetch('/api/sync', {
method: 'POST',
body: JSON.stringify({
lastSyncId: currentSyncId,
limit: 200,
}),
});
const data = await response.json();
// 3. 写入本地 IndexedDB
await this.batchInsertMessages(data.messages);
// 4. 更新会话列表
await this.updateConversations(data.messages);
// 5. 更新同步位点
currentSyncId = data.nextSyncId;
await this.saveLocalSyncId(currentSyncId);
hasMore = data.hasMore;
}
// 6. 通知 UI 层刷新
this.notifyUI();
}
private async batchInsertMessages(messages: IMMessage[]): Promise<void> {
const db = await this.openDB();
const tx = db.transaction('messages', 'readwrite');
const store = tx.objectStore('messages');
for (const msg of messages) {
store.put(msg); // put 自动去重(相同 msgId 覆盖)
}
}
private async updateConversations(messages: IMMessage[]): Promise<void> {
// 按 conversationId 分组,更新每个会话的最后一条消息和未读数
const convMap = new Map<string, IMMessage>();
for (const msg of messages) {
const existing = convMap.get(msg.conversationId);
if (!existing || msg.seqId > existing.seqId) {
convMap.set(msg.conversationId, msg);
}
}
// 写入 IndexedDB conversations store
}
private async getLocalSyncId(): Promise<number> {
return parseInt(localStorage.getItem('im_sync_id') ?? '0', 10);
}
private async saveLocalSyncId(syncId: number): Promise<void> {
localStorage.setItem('im_sync_id', String(syncId));
}
private notifyUI(): void {
window.dispatchEvent(new CustomEvent('im:synced'));
}
private openDB(): Promise<IDBDatabase> {
return new Promise((resolve, reject) => {
const req = indexedDB.open('im_cache', 1);
req.onsuccess = () => resolve(req.result);
req.onerror = () => reject(req.error);
});
}
}
关键设计点:
syncId是用户维度的全局递增 ID,不依赖时间戳- 分页拉取(每次 200 条),避免一次性加载过多数据
- IndexedDB 的
put操作天然幂等,重复消息自动覆盖 - 同步完成后通知 UI 刷新,而不是每条消息触发一次渲染
Q6: WebSocket 断连后如何保证消息不丢?
答案:
WebSocket 断连是 IM 系统最常见的异常场景。通过 四层防护 保证消息不丢:
class ReconnectionHandler {
// 第 1 层:心跳检测 + 快速重连
// 30s 发一次心跳,90s 无响应判定断连
// 指数退避重连:1s → 2s → 4s → 8s → 16s → 30s(上限)
// 第 2 层:重连后增量同步
async onReconnected(): Promise<void> {
// 从上次 syncId 开始拉取所有遗漏的消息
await this.syncManager.syncOfflineMessages();
}
// 第 3 层:发送队列持久化
// 断连时未发送成功的消息保存在 IndexedDB
// 重连后按顺序重新发送
async retrySendQueue(): Promise<void> {
const pendingMessages = await this.loadPendingFromDB();
for (const msg of pendingMessages) {
await this.msgManager.send(msg.conversationId, msg.content, msg.type);
}
}
// 第 4 层:离线推送兜底
// 如果长时间未重连,服务端通过 APNs/FCM 推送通知
// 用户点击通知打开 App → 触发增量同步
}
| 防护层级 | 触发条件 | 恢复方式 |
|---|---|---|
| 心跳重连 | 短暂断网(< 30s) | 自动重连 + 增量同步 |
| 增量同步 | 断连较久(30s-数小时) | 基于 syncId 拉取遗漏消息 |
| 发送队列持久化 | 发送时断网 | 重连后重发未确认消息 |
| 离线推送 | 长时间离线 | APNs/FCM 推送 + 用户主动打开 |
Q7: 如何设计消息的存储分层?
答案:
IM 消息的访问具有明显的 时间局部性:最近的消息访问频率远高于历史消息。采用 冷热分层 存储:
interface StorageTier {
// 热数据:最近 3 个月
hot: {
storage: 'MySQL/TiDB'; // 关系数据库
cache: 'Redis'; // 最近 100 条消息缓存
feature: '快速查询、索引丰富';
};
// 温数据:3-12 个月
warm: {
storage: 'HBase/Cassandra'; // 列存数据库
feature: '高吞吐写入、范围查询';
};
// 冷数据:12 个月以上
cold: {
storage: 'S3/OSS'; // 对象存储
format: 'Parquet'; // 列式存储格式
feature: '成本极低、批量读取';
};
}
| 层级 | 存储 | 时间范围 | 访问延迟 | 成本 |
|---|---|---|---|---|
| L0 缓存 | Redis | 最近 N 条 | < 1ms | 高 |
| L1 热数据 | MySQL/TiDB | 0-3 个月 | < 10ms | 中 |
| L2 温数据 | HBase | 3-12 个月 | < 100ms | 低 |
| L3 冷数据 | S3/OSS | > 12 个月 | < 1s | 极低 |
数据迁移:后台定时任务按时间维度将热数据归档到温数据、冷数据层。查询时透明路由:先查热数据,未命中则查温数据,最后查冷数据。
Q8: IM 系统的前端架构如何设计?
答案:
前端 IM SDK 通常采用 分层架构,将网络、存储、业务逻辑解耦:
关键设计原则:
| 原则 | 说明 |
|---|---|
| SDK 与 UI 分离 | IM SDK 不依赖任何 UI 框架,可在 React/Vue/小程序中复用 |
| 事件驱动 | SDK 通过事件(EventEmitter)通知 UI 层更新 |
| 乐观更新 | 发送消息立即显示在界面,ACK 失败再标记 |
| 本地优先 | 消息列表优先从 IndexedDB 读取,异步与服务端同步 |
| 连接状态管理 | UI 层响应连接状态变化,展示"连接中"/"已断开"等提示 |