跳到主要内容

设计实时通讯系统(IM)

问题

如何设计一个支持亿级用户的实时通讯系统(IM)?从长连接管理、消息投递模型、存储方案到已读回执与离线推送,请详细说明核心模块的设计思路与关键技术实现。

答案

IM 系统是互联网最复杂的分布式系统之一,需要同时解决 消息可靠投递顺序一致性多端同步海量并发 四大核心难题。与弹幕系统"允许丢消息"不同,IM 系统对消息的可靠性和一致性有极高要求——一条消息都不能丢,顺序不能乱

核心设计原则

IM 系统的本质是:在不可靠的网络环境中,构建可靠的消息传输通道。所有架构设计都围绕这个目标展开。


一、需求分析

功能需求

模块功能点
单聊一对一私聊、文本/图片/语音/视频/文件等多种消息类型
群聊多人群组、群公告、@提醒、群管理(禁言/踢人/权限)
已读回执单聊已读/未读、群聊已读人数统计
在线状态在线/离线/忙碌等状态展示、最后在线时间
离线消息用户离线时暂存消息,上线后拉取、推送通知
消息漫游跨设备消息同步、历史消息云端存储、消息搜索
辅助功能消息撤回/编辑、@提醒、表情回应、消息转发、引用回复

非功能需求

指标目标
消息可靠投递消息不丢失、不重复,至少一次(at-least-once)+ 客户端去重
顺序保证单会话内消息严格有序,跨会话最终一致
低延迟端到端投递延迟 < 200ms(同地域)、< 500ms(跨地域)
高并发支持千万级同时在线用户、百万级 QPS 消息收发
多端同步手机、平板、PC、Web 多端消息实时同步
可扩展支持端到端加密、消息搜索、大群(万人群)等扩展
关键约束

与弹幕系统不同,IM 系统不允许消息丢失。即使网络抖动、服务宕机,也必须保证消息最终送达。这决定了 IM 系统需要复杂的 ACK 确认、重试、幂等去重等机制。


二、整体架构

架构分层说明

层级职责关键技术
接入层长连接管理、协议解析、鉴权WebSocket、Nginx、SSL/TLS
逻辑层消息路由、状态管理、群组逻辑微服务、gRPC、一致性哈希
存储层消息持久化、缓存加速、全文检索MySQL/TiDB、Redis、Kafka、ES
推送层离线消息推送、消息聚合APNs、FCM、厂商推送通道

三、核心模块设计

3.1 长连接管理

IM 系统的基础是 长连接,它决定了消息能否实时送达。

协议选型

协议优点缺点适用场景
WebSocket全双工、浏览器原生支持、生态成熟协议开销比自定义协议大Web IM、跨平台
TCP 长连接开销最小、灵活定义协议浏览器不支持、需自定义协议原生 App
SSE简单、自动重连单向(仅服务器→客户端)通知推送降级方案
MQTT轻量、QoS 支持需要 MQTT BrokerIoT 场景
推荐方案

Web 端使用 WebSocket + Protobuf 序列化,原生端使用 TCP 长连接 + 自定义二进制协议,降级方案使用 长轮询

心跳与重连机制

多端同步

用户可能同时在手机、PC、Web 上登录。每个设备维护独立的长连接,服务端通过 设备路由表 将消息推送到所有在线设备。

设备路由表存储在 Redis 中:

device-route.ts
interface DeviceRoute {
userId: string;
devices: {
deviceId: string;
platform: 'ios' | 'android' | 'web' | 'desktop';
gatewayId: string; // 连接所在的网关节点
connId: string; // 连接 ID
lastActive: number; // 最后活跃时间
}[];
}

// Redis 存储结构:Hash
// Key: route:{userId}
// Field: deviceId
// Value: JSON { gatewayId, connId, platform, lastActive }

3.2 消息投递模型

消息投递是 IM 系统最核心的模块,需要解决 扩散方式选择顺序保证可靠投递 三个关键问题。

写扩散 vs 读扩散

对比项写扩散(推模式)读扩散(拉模式)
原理发送时写入每个接收者的收件箱发送时只写一份,接收者从会话拉取
写放大N 个群成员 → 写 N 次只写 1 次
读放大每人只读自己收件箱每人需拉取所有会话的消息
适用场景单聊、小群(< 500 人)大群(> 500 人)、频道
代表产品微信(私聊)、Twitter TimelineDiscord(频道)、Slack
大群陷阱

万人群如果用写扩散,发一条消息要写入一万个收件箱,写放大严重。因此大群必须使用读扩散或混合模式。

Timeline 模型(推荐)

Timeline 模型是目前业界主流的消息同步方案(钉钉、阿里云 IM 采用),结合了写扩散和读扩散的优势:

核心思想:每个用户维护一条 Timeline(时间线),记录需要同步的消息索引。消息实体只存一份(读扩散),但每个用户有独立的同步游标(写扩散索引)。

timeline-model.ts
// 消息实体(全局存储,每条消息只存一份)
interface Message {
msgId: string; // 全局唯一消息 ID(Snowflake)
conversationId: string; // 会话 ID
senderId: string;
content: string;
type: 'text' | 'image' | 'voice' | 'video' | 'file';
timestamp: number;
seqId: number; // 会话内自增序列号(保证会话内有序)
}

// 用户 Timeline(同步索引)
interface TimelineEntry {
userId: string;
syncId: number; // 用户维度的自增 ID(全局递增)
msgId: string; // 指向消息实体
conversationId: string;
}

同步流程:客户端记住上次同步的 syncId,每次连接或定期拉取 syncId > lastSyncId 的增量消息。

消息 ACK 与可靠投递

IM 消息投递采用 三次握手 保证可靠性:

去重是关键

由于网络重传、服务端重推等原因,客户端可能收到重复消息。必须基于 msgId幂等去重,保证同一消息只展示一次。


3.3 消息存储

消息表设计

schema.ts
// 消息主表(按会话分表)
interface MessageTable {
msg_id: string; // 主键,Snowflake ID
conversation_id: string; // 会话 ID(索引)
seq_id: number; // 会话内自增序列号(索引)
sender_id: string; // 发送者 ID
msg_type: number; // 消息类型:1=文本 2=图片 3=语音 ...
content: string; // 消息内容(加密存储)
status: number; // 0=正常 1=撤回 2=删除
created_at: number; // 发送时间戳
extra: string; // 扩展字段 JSON(@列表、引用等)
}

// 用户同步表(Timeline 索引)
interface SyncTable {
id: number; // 自增主键
user_id: string; // 用户 ID(索引)
sync_id: number; // 用户维度递增序列号
msg_id: string; // 消息 ID
conversation_id: string; // 会话 ID
}

// 会话表
interface ConversationTable {
conversation_id: string; // 会话 ID
type: number; // 1=单聊 2=群聊
last_msg_id: string; // 最后一条消息 ID
last_msg_preview: string; // 最后一条消息摘要
updated_at: number; // 最后更新时间(排序用)
}

索引策略

查询场景索引设计
拉取会话消息列表(conversation_id, seq_id) 联合索引
用户增量同步(user_id, sync_id) 联合索引
消息去重msg_id 唯一索引
会话列表排序(user_id, updated_at) 联合索引

分库分表策略

分表规则
  • 单聊conversation_id = min(userA, userB) + "_" + max(userA, userB),保证两人会话 ID 唯一
  • 群聊conversation_id = "group_" + groupId
  • 分表数量:根据数据量通常 256 或 1024 张表

3.4 已读回执与在线状态

单聊已读回执

单聊已读相对简单,只需记录每个会话中对方已读的最后一条消息的 seqId

read-receipt.ts
// Redis 存储
// Key: read:{conversationId}:{userId}
// Value: lastReadSeqId

interface ReadReceipt {
conversationId: string;
userId: string;
lastReadSeqId: number; // 该用户在此会话中已读的最大 seqId
timestamp: number;
}

群已读回执优化

群聊已读回执的挑战是:N 个群成员 x M 条消息 = N*M 条已读记录,存储和查询开销极大。

优化方案:Bitmap 存储

group-read-bitmap.ts
// 方案一:精确到消息级别(适合小群)
// Redis Bitmap: read:{conversationId}:{msgSeqId} → Bitmap
// 每个 bit 代表一个群成员是否已读

// 方案二:只记录每人已读的最大 seqId(适合大群)
// Redis Hash: group_read:{conversationId}
// Field: userId
// Value: lastReadSeqId

interface GroupReadStrategy {
// 小群(< 100 人):精确统计每条消息的已读人列表
smallGroup: {
method: 'bitmap';
// 已读人数 = popcount(bitmap)
// 已读人列表 = 遍历 bitmap 中为 1 的位
};

// 大群(> 100 人):只统计已读人数,不列出已读列表
largeGroup: {
method: 'counter';
// 每条消息维护一个已读计数器
// 不支持查看具体已读人列表
};
}
微信群已读的实现

微信群聊不显示已读人数,只有企业微信支持群已读。这是因为大群的已读回执存储成本太高。钉钉通过 Bitmap + 分层策略解决了这个问题。

在线状态管理

presence.ts
// 在线状态存储(Redis)
// Key: presence:{userId}
// Value: Hash { status, lastSeen, devices }

type UserStatus = 'online' | 'offline' | 'busy' | 'away';

interface PresenceInfo {
userId: string;
status: UserStatus;
lastSeen: number; // 最后在线时间戳
devices: string[]; // 在线设备列表
}

// 状态变更发布到 Redis Pub/Sub
// Channel: presence_change
// 只推送给订阅了该用户状态的人(好友列表)
在线状态的一致性

用户突然断网时,服务端需要通过心跳超时来判断离线。这意味着在线状态有 30-90 秒的延迟。不要依赖在线状态做业务逻辑判断。


3.5 离线推送

当用户所有设备都不在线时,消息需要通过 厂商推送通道 送达。

消息聚合策略

短时间内收到多条消息时,避免频繁推送通知:

push-aggregation.ts
interface PushConfig {
// 聚合窗口:5 秒内的消息合并为一条推送
aggregateWindowMs: 5000;

// 推送频率限制:每分钟最多 10 条
rateLimitPerMinute: 10;

// 推送内容策略
contentStrategy: {
singleMessage: '张三: 消息内容'; // 单条消息显示内容
multiMessage: '张三发送了 3 条消息'; // 多条消息合并
multiConversation: '你有 5 个会话的新消息'; // 多会话合并
};

// 免打扰:特定会话/时间段不推送
muteConfig: {
mutedConversations: string[];
quietHours: { start: '23:00'; end: '07:00' };
};
}

四、关键技术实现

4.1 IMClient 核心类

im-client.ts
interface IMClientConfig {
url: string;
token: string;
heartbeatInterval?: number;
reconnectMaxRetries?: number;
}

interface IMMessage {
msgId: string;
clientMsgId: string;
conversationId: string;
senderId: string;
content: string;
type: 'text' | 'image' | 'voice' | 'video' | 'file';
timestamp: number;
seqId: number;
}

type IMEvent =
| 'connected'
| 'disconnected'
| 'message'
| 'message:ack'
| 'message:read'
| 'presence:change'
| 'conversation:update';

class IMClient {
private conn: ConnectionManager;
private msgManager: MessageManager;
private syncManager: SyncManager;
private listeners = new Map<string, Set<Function>>();

constructor(private config: IMClientConfig) {
this.conn = new ConnectionManager(config);
this.msgManager = new MessageManager(this.conn);
this.syncManager = new SyncManager(this.conn);

this.setupEventForwarding();
}

/** 连接服务器 */
async connect(): Promise<void> {
await this.conn.connect();
// 连接成功后,同步离线消息
await this.syncManager.syncOfflineMessages();
}

/** 发送消息 */
async sendMessage(
conversationId: string,
content: string,
type: IMMessage['type'] = 'text'
): Promise<IMMessage> {
return this.msgManager.send(conversationId, content, type);
}

/** 标记已读 */
async markRead(conversationId: string, seqId: number): Promise<void> {
return this.msgManager.markRead(conversationId, seqId);
}

/** 监听事件 */
on(event: IMEvent, callback: Function): () => void {
if (!this.listeners.has(event)) {
this.listeners.set(event, new Set());
}
this.listeners.get(event)!.add(callback);
return () => this.listeners.get(event)?.delete(callback);
}

/** 断开连接 */
disconnect(): void {
this.conn.disconnect();
}

private setupEventForwarding(): void {
this.conn.on('message', (data: IMMessage) => {
this.emit('message', data);
});
this.conn.on('connected', () => this.emit('connected'));
this.conn.on('disconnected', () => this.emit('disconnected'));
}

private emit(event: string, ...args: unknown[]): void {
this.listeners.get(event)?.forEach((cb) => cb(...args));
}
}

4.2 ConnectionManager(连接管理)

connection-manager.ts
class ConnectionManager {
private ws: WebSocket | null = null;
private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
private reconnectAttempts = 0;
private readonly maxReconnectAttempts: number;
private readonly heartbeatInterval: number;
private listeners = new Map<string, Set<Function>>();
private pendingAcks = new Map<string, {
resolve: Function;
reject: Function;
timer: ReturnType<typeof setTimeout>;
}>();

constructor(private config: IMClientConfig) {
this.maxReconnectAttempts = config.reconnectMaxRetries ?? 10;
this.heartbeatInterval = config.heartbeatInterval ?? 30000;
}

async connect(): Promise<void> {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(`${this.config.url}?token=${this.config.token}`);

this.ws.onopen = () => {
this.reconnectAttempts = 0;
this.startHeartbeat();
this.emit('connected');
resolve();
};

this.ws.onmessage = (event: MessageEvent) => {
const packet = JSON.parse(event.data) as Packet;
this.handlePacket(packet);
};

this.ws.onclose = () => {
this.stopHeartbeat();
this.emit('disconnected');
this.attemptReconnect();
};

this.ws.onerror = () => reject(new Error('WebSocket connection failed'));
});
}

/** 发送数据包并等待 ACK */
async sendWithAck(packet: Packet, timeoutMs = 5000): Promise<Packet> {
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
this.pendingAcks.delete(packet.id);
reject(new Error(`ACK timeout for packet ${packet.id}`));
}, timeoutMs);

this.pendingAcks.set(packet.id, { resolve, reject, timer });
this.ws?.send(JSON.stringify(packet));
});
}

disconnect(): void {
this.stopHeartbeat();
this.ws?.close();
this.ws = null;
}

on(event: string, callback: Function): void {
if (!this.listeners.has(event)) {
this.listeners.set(event, new Set());
}
this.listeners.get(event)!.add(callback);
}

/** 指数退避重连 */
private attemptReconnect(): void {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
this.emit('reconnect:failed');
return;
}

const delay = Math.min(
1000 * Math.pow(2, this.reconnectAttempts) + Math.random() * 1000,
30000 // 最大 30 秒
);

setTimeout(() => {
this.reconnectAttempts++;
this.connect().catch(() => this.attemptReconnect());
}, delay);
}

private startHeartbeat(): void {
this.heartbeatTimer = setInterval(() => {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: 'ping', timestamp: Date.now() }));
}
}, this.heartbeatInterval);
}

private stopHeartbeat(): void {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}

private handlePacket(packet: Packet): void {
switch (packet.type) {
case 'pong':
break; // 心跳响应
case 'ack':
this.handleAck(packet);
break;
case 'message':
this.emit('message', packet.data);
// 发送接收 ACK
this.ws?.send(JSON.stringify({ type: 'ack', id: packet.id }));
break;
case 'read_receipt':
this.emit('read_receipt', packet.data);
break;
default:
this.emit(packet.type, packet.data);
}
}

private handleAck(packet: Packet): void {
const pending = this.pendingAcks.get(packet.ackId!);
if (pending) {
clearTimeout(pending.timer);
this.pendingAcks.delete(packet.ackId!);
pending.resolve(packet);
}
}

private emit(event: string, ...args: unknown[]): void {
this.listeners.get(event)?.forEach((cb) => cb(...args));
}
}

interface Packet {
id: string;
type: string;
ackId?: string;
data?: unknown;
timestamp: number;
}

4.3 MessageManager(消息管理)

message-manager.ts
class MessageManager {
private receivedMsgIds = new Set<string>(); // 消息去重
private localCache: MessageCache;

constructor(private conn: ConnectionManager) {
this.localCache = new MessageCache();
this.setupMessageHandler();
}

/** 发送消息 */
async send(
conversationId: string,
content: string,
type: IMMessage['type']
): Promise<IMMessage> {
const clientMsgId = this.generateClientMsgId();

// 1. 先写入本地(乐观更新)
const localMsg: IMMessage = {
msgId: '', // 服务端分配
clientMsgId,
conversationId,
senderId: 'self',
content,
type,
timestamp: Date.now(),
seqId: 0, // 服务端分配
};
this.localCache.addPending(localMsg);

// 2. 发送并等待 ACK
try {
const ackPacket = await this.conn.sendWithAck({
id: clientMsgId,
type: 'message',
data: { conversationId, content, type },
timestamp: Date.now(),
});

// 3. ACK 成功,更新本地消息
const serverData = ackPacket.data as { msgId: string; seqId: number };
const confirmedMsg: IMMessage = {
...localMsg,
msgId: serverData.msgId,
seqId: serverData.seqId,
};
this.localCache.confirmPending(clientMsgId, confirmedMsg);
return confirmedMsg;
} catch (error) {
// ACK 超时,标记发送失败,支持重试
this.localCache.markFailed(clientMsgId);
throw error;
}
}

/** 标记已读 */
async markRead(conversationId: string, seqId: number): Promise<void> {
await this.conn.sendWithAck({
id: this.generateClientMsgId(),
type: 'read_receipt',
data: { conversationId, seqId },
timestamp: Date.now(),
});
}

/** 接收消息处理(含去重) */
private setupMessageHandler(): void {
this.conn.on('message', (msg: IMMessage) => {
// 幂等去重:同一消息只处理一次
if (this.receivedMsgIds.has(msg.msgId)) {
return;
}
this.receivedMsgIds.add(msg.msgId);

// 写入本地缓存
this.localCache.insert(msg);
});
}

private generateClientMsgId(): string {
return `${Date.now()}_${Math.random().toString(36).slice(2, 10)}`;
}
}

4.4 SyncManager(消息同步)

sync-manager.ts
class SyncManager {
private lastSyncId: number = 0; // 上次同步的位点

constructor(private conn: ConnectionManager) {
this.lastSyncId = this.loadSyncIdFromLocal();
}

/** 上线后同步离线消息 */
async syncOfflineMessages(): Promise<IMMessage[]> {
const allMessages: IMMessage[] = [];
let hasMore = true;

// 分页拉取增量消息
while (hasMore) {
const response = await this.conn.sendWithAck({
id: `sync_${Date.now()}`,
type: 'sync',
data: {
lastSyncId: this.lastSyncId,
limit: 200, // 每次拉取 200 条
},
timestamp: Date.now(),
});

const { messages, nextSyncId, more } = response.data as {
messages: IMMessage[];
nextSyncId: number;
more: boolean;
};

allMessages.push(...messages);
this.lastSyncId = nextSyncId;
this.saveSyncIdToLocal(nextSyncId);
hasMore = more;
}

return allMessages;
}

private loadSyncIdFromLocal(): number {
const stored = localStorage.getItem('im_last_sync_id');
return stored ? parseInt(stored, 10) : 0;
}

private saveSyncIdToLocal(syncId: number): void {
localStorage.setItem('im_last_sync_id', String(syncId));
}
}

4.5 MessageQueue(客户端消息队列)

message-queue.ts
/**
* 客户端消息发送队列
* 保证消息按顺序发送,失败自动重试
*/
class MessageSendQueue {
private queue: QueueItem[] = [];
private processing = false;
private readonly maxRetries = 3;

constructor(private msgManager: MessageManager) {}

/** 入队发送 */
enqueue(
conversationId: string,
content: string,
type: IMMessage['type']
): Promise<IMMessage> {
return new Promise((resolve, reject) => {
this.queue.push({
conversationId,
content,
type,
retries: 0,
resolve,
reject,
});
this.process();
});
}

/** 按序处理队列 */
private async process(): Promise<void> {
if (this.processing || this.queue.length === 0) return;
this.processing = true;

while (this.queue.length > 0) {
const item = this.queue[0];
try {
const msg = await this.msgManager.send(
item.conversationId,
item.content,
item.type
);
this.queue.shift();
item.resolve(msg);
} catch (error) {
item.retries++;
if (item.retries >= this.maxRetries) {
this.queue.shift();
item.reject(error);
} else {
// 指数退避等待后重试
await this.delay(1000 * Math.pow(2, item.retries));
}
}
}

this.processing = false;
}

private delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}

interface QueueItem {
conversationId: string;
content: string;
type: IMMessage['type'];
retries: number;
resolve: (msg: IMMessage) => void;
reject: (error: unknown) => void;
}

五、性能优化

5.1 消息压缩(Protobuf)

JSON 序列化体积大、解析慢。IM 系统推荐使用 Protocol Buffers 作为消息序列化协议:

对比项JSONProtobuf
体积100% 基准约 30-50%(体积减少 50-70%)
编解码速度基准快 2-10 倍
类型安全强类型 Schema
可读性人类可读二进制不可读
兼容性通用需要 protoc 工具链
protobuf-usage.ts
// .proto 定义
// message IMPacket {
// string id = 1;
// PacketType type = 2;
// bytes payload = 3;
// int64 timestamp = 4;
// }

// 使用 protobufjs 在前端编解码
import { IMPacket } from './generated/proto';

// 编码
const buffer: Uint8Array = IMPacket.encode({
id: 'msg_001',
type: PacketType.MESSAGE,
payload: new TextEncoder().encode(JSON.stringify(content)),
timestamp: Date.now(),
}).finish();

// 通过 WebSocket 发送二进制
ws.send(buffer);

// 解码
ws.onmessage = (event: MessageEvent<ArrayBuffer>) => {
const packet = IMPacket.decode(new Uint8Array(event.data));
};

5.2 本地缓存(IndexedDB)

消息历史存储在 IndexedDB 中,减少网络请求,加速消息列表渲染:

message-cache.ts
class MessageCache {
private dbPromise: Promise<IDBDatabase>;

constructor() {
this.dbPromise = this.openDB();
}

private openDB(): Promise<IDBDatabase> {
return new Promise((resolve, reject) => {
const request = indexedDB.open('im_cache', 1);

request.onupgradeneeded = (event) => {
const db = (event.target as IDBOpenDBRequest).result;
// 消息存储
const msgStore = db.createObjectStore('messages', { keyPath: 'msgId' });
msgStore.createIndex('conversation_seq', ['conversationId', 'seqId']);
// 会话存储
db.createObjectStore('conversations', { keyPath: 'conversationId' });
};

request.onsuccess = () => resolve(request.result);
request.onerror = () => reject(request.error);
});
}

/** 插入消息 */
async insert(msg: IMMessage): Promise<void> {
const db = await this.dbPromise;
const tx = db.transaction('messages', 'readwrite');
tx.objectStore('messages').put(msg);
}

/** 查询会话消息(分页) */
async queryMessages(
conversationId: string,
beforeSeqId: number,
limit = 20
): Promise<IMMessage[]> {
const db = await this.dbPromise;
const tx = db.transaction('messages', 'readonly');
const index = tx.objectStore('messages').index('conversation_seq');

// 使用游标反向遍历(最新消息在前)
const range = IDBKeyRange.bound(
[conversationId, 0],
[conversationId, beforeSeqId],
false,
true
);

return new Promise((resolve) => {
const results: IMMessage[] = [];
const cursor = index.openCursor(range, 'prev');

cursor.onsuccess = (event) => {
const cur = (event.target as IDBRequest<IDBCursorWithValue>).result;
if (cur && results.length < limit) {
results.push(cur.value);
cur.continue();
} else {
resolve(results);
}
};
});
}

/** 乐观更新:先插入待发送消息 */
addPending(msg: IMMessage): void {
this.insert(msg);
}

/** 发送成功后确认 */
async confirmPending(clientMsgId: string, confirmedMsg: IMMessage): Promise<void> {
const db = await this.dbPromise;
const tx = db.transaction('messages', 'readwrite');
const store = tx.objectStore('messages');
store.delete(clientMsgId);
store.put(confirmedMsg);
}

/** 标记发送失败 */
async markFailed(clientMsgId: string): Promise<void> {
const db = await this.dbPromise;
const tx = db.transaction('messages', 'readwrite');
const store = tx.objectStore('messages');
const msg = await new Promise<IMMessage>((resolve) => {
store.get(clientMsgId).onsuccess = (e) =>
resolve((e.target as IDBRequest).result);
});
if (msg) {
(msg as IMMessage & { status: string }).status = 'failed';
store.put(msg);
}
}
}

5.3 消息列表虚拟滚动

IM 聊天窗口可能有成千上万条消息,使用 虚拟列表 只渲染可见区域:

virtual-message-list.tsx
import { useRef, useState, useCallback, useEffect } from 'react';

interface VirtualListConfig {
itemHeight: number; // 预估消息高度
overscan: number; // 上下额外渲染条数
containerHeight: number; // 容器高度
}

function useVirtualMessageList(
messages: IMMessage[],
config: VirtualListConfig
) {
const [scrollTop, setScrollTop] = useState(0);
const containerRef = useRef<HTMLDivElement>(null);

const startIndex = Math.max(
0,
Math.floor(scrollTop / config.itemHeight) - config.overscan
);
const endIndex = Math.min(
messages.length,
Math.ceil((scrollTop + config.containerHeight) / config.itemHeight) + config.overscan
);

const visibleMessages = messages.slice(startIndex, endIndex);
const totalHeight = messages.length * config.itemHeight;
const offsetY = startIndex * config.itemHeight;

const handleScroll = useCallback((e: React.UIEvent<HTMLDivElement>) => {
setScrollTop(e.currentTarget.scrollTop);
}, []);

// 滚动到底部(新消息到达时)
const scrollToBottom = useCallback(() => {
if (containerRef.current) {
containerRef.current.scrollTop = containerRef.current.scrollHeight;
}
}, []);

return {
containerRef,
visibleMessages,
totalHeight,
offsetY,
startIndex,
handleScroll,
scrollToBottom,
};
}

5.4 图片与文件优化

优化策略实现方式
缩略图上传后服务端生成多尺寸缩略图,列表展示小图
懒加载图片消息使用 IntersectionObserver 进入视口后加载
断点续传大文件分片上传,断网后从断点继续
预览优化图片先显示模糊占位(BlurHash),加载完成后替换
CDN 加速静态资源走 CDN,就近访问

六、扩展设计

6.1 端到端加密(E2EE)

基于 Signal Protocol 实现端到端加密,服务端无法解密消息内容:

核心流程

  1. 密钥生成:每个用户生成身份密钥(Identity Key)、签名预密钥(Signed Pre Key)和一次性预密钥(One-Time Pre Key)
  2. X3DH 协商:首次通信时通过扩展三方 Diffie-Hellman 协商会话密钥
  3. Double Ratchet:每条消息使用不同的加密密钥(前向安全 + 后向安全)
端到端加密的代价
  • 服务端无法做消息搜索和内容审核
  • 换设备后历史消息无法恢复(除非备份密钥)
  • 群聊加密复杂度高(每个成员需要单独加密)

6.2 消息撤回与编辑

message-operations.ts
// 消息撤回
interface RecallMessage {
type: 'recall';
originalMsgId: string;
timeLimit: 120000; // 2 分钟内可撤回
}

// 消息编辑
interface EditMessage {
type: 'edit';
originalMsgId: string;
newContent: string;
editedAt: number;
editHistory: { content: string; editedAt: number }[]; // 编辑历史
}

// 服务端处理
// 1. 校验发送者身份和时间限制
// 2. 更新消息状态(撤回)或内容(编辑)
// 3. 向所有会话参与者推送撤回/编辑通知
// 4. 客户端收到通知后更新本地消息

6.3 @提醒

mention.ts
interface MentionInfo {
// @all 或 @特定用户
type: 'all' | 'specific';
userIds: string[]; // 被 @的用户 ID 列表
positions: { // @文本在消息中的位置
start: number;
end: number;
userId: string;
displayName: string;
}[];
}

// 存储在消息的 extra 字段中
// 被 @的用户即使开启了免打扰也会收到推送通知

6.4 消息搜索

message-search.ts
// 前端本地搜索(IndexedDB 全文搜索)
async function localSearch(keyword: string): Promise<IMMessage[]> {
// IndexedDB 不支持全文索引,需要遍历
// 适合小规模数据搜索
const db = await openDB();
const tx = db.transaction('messages', 'readonly');
const store = tx.objectStore('messages');
const results: IMMessage[] = [];

return new Promise((resolve) => {
store.openCursor().onsuccess = (event) => {
const cursor = (event.target as IDBRequest<IDBCursorWithValue>).result;
if (cursor) {
if (cursor.value.content?.includes(keyword)) {
results.push(cursor.value);
}
cursor.continue();
} else {
resolve(results);
}
};
});
}

// 服务端全文搜索(Elasticsearch)
// POST /api/messages/search
// { keyword: "xxx", conversationId?: "xxx", timeRange?: [start, end] }
// 返回匹配的消息列表 + 高亮片段

常见面试问题

Q1: IM 消息如何保证不丢失?

答案

通过 三级可靠性保障 确保消息不丢失:

第一级:发送方 ACK 确认

send-ack.ts
// 发送方发送消息后等待服务端 ACK
// 超时未收到 ACK → 自动重发(最多 3 次)
async function reliableSend(msg: IMMessage): Promise<void> {
for (let retry = 0; retry < 3; retry++) {
try {
await sendWithAck(msg, 5000); // 5 秒超时
return; // ACK 成功
} catch {
// 超时重试,使用相同的 clientMsgId(服务端幂等去重)
}
}
throw new Error('发送失败');
}

第二级:服务端持久化 + 接收方 ACK

服务端收到消息后先持久化到数据库,再推送给接收方。如果接收方未回 ACK,服务端会在接收方下次上线时重推。

第三级:客户端同步兜底

客户端定期或重连时,通过 Timeline 的 syncId 增量拉取所有未同步的消息,作为最后兜底。

层级机制保障范围
发送方ACK + 重试消息到达服务端
服务端持久化 + 重推消息到达接收端
接收方增量同步补全任何遗漏

Q2: 写扩散和读扩散如何选择?

答案

维度写扩散读扩散
写入每个接收者一份(写放大)只写一份
读取只读自己收件箱拉取所有会话(读放大)
适用场景单聊、小群(< 500)大群、频道
存储成本高(N 份副本)低(仅一份)
实时性高(直接推送到收件箱)中(需要拉取)

实践建议:混合模式。

dispatch-strategy.ts
function chooseDispatchStrategy(conversation: Conversation): 'write' | 'read' {
if (conversation.type === 'single') {
return 'write'; // 单聊用写扩散
}
if (conversation.memberCount <= 500) {
return 'write'; // 小群用写扩散
}
return 'read'; // 大群用读扩散
}

// Timeline 模型更进一步:
// 消息实体用读扩散(只存一份),同步索引用写扩散(每人一条 timeline 记录)
// 这是当前业界最佳实践

Q3: 如何保证消息的顺序性?

答案

消息乱序的根因是 网络延迟不一致多服务器时钟不同步。解决方案:

  1. 服务端分配自增序列号(seqId):每个会话维护一个自增计数器,消息到达服务端时分配 seqId
  2. 客户端按 seqId 排序:不依赖客户端时间戳
message-ordering.ts
// 服务端:使用 Redis INCRBY 生成会话级 seqId
// INCRBY conv_seq:{conversationId} 1

// 客户端:消息列表始终按 seqId 排序
function sortMessages(messages: IMMessage[]): IMMessage[] {
return messages.sort((a, b) => a.seqId - b.seqId);
}

// 检测消息缺口
function detectGap(messages: IMMessage[]): number[] {
const missing: number[] = [];
for (let i = 1; i < messages.length; i++) {
const expected = messages[i - 1].seqId + 1;
if (messages[i].seqId !== expected) {
for (let seq = expected; seq < messages[i].seqId; seq++) {
missing.push(seq);
}
}
}
return missing; // 返回缺失的 seqId,需要向服务端补拉
}
时间戳不能作为排序依据

客户端本地时间可能不准确(用户手动调整过),不同客户端的时间戳无法直接比较。必须使用 服务端统一分配的序列号 作为排序依据。


Q4: 万人大群的消息怎么设计?

答案

万人大群面临三大挑战:写扩散爆炸已读回执存储爆炸在线推送风暴

large-group.ts
interface LargeGroupConfig {
// 1. 消息投递:读扩散
dispatchMode: 'read'; // 消息只存一份,群成员主动拉取

// 2. 已读回执:只统计数量,不记录列表
readReceipt: {
mode: 'counter'; // 原子计数器,不记录具体谁已读
// INCR group_read_count:{conversationId}:{seqId}
};

// 3. 推送策略:分批推送 + 限流
pushStrategy: {
batchSize: 500; // 每批推送 500 人
intervalMs: 100; // 批次间隔 100ms
prioritize: 'active_first'; // 活跃用户优先推送
};

// 4. 消息频率限制
rateLimit: {
userLimit: 5; // 每人每 10 秒最多发 5 条
groupLimit: 100; // 全群每秒最多 100 条
// 超出限制排队或丢弃
};
}
优化策略说明
读扩散消息只存一份,避免写入放大
分批推送避免瞬间推送万条 WebSocket 消息
频率限制防止消息刷屏,保证用户体验
已读计数器替代精确已读列表,降低存储成本
消息分页拉取用户打开群聊时按需加载

Q5: 前端如何实现离线消息同步?

答案

离线消息同步的核心是 增量拉取,基于 Timeline 模型的 syncId 实现:

offline-sync.ts
class OfflineSync {
// 同步流程
async sync(): Promise<void> {
// 1. 从 IndexedDB 读取上次同步位点
const lastSyncId = await this.getLocalSyncId();

// 2. 向服务端请求增量消息
let currentSyncId = lastSyncId;
let hasMore = true;

while (hasMore) {
const response = await fetch('/api/sync', {
method: 'POST',
body: JSON.stringify({
lastSyncId: currentSyncId,
limit: 200,
}),
});
const data = await response.json();

// 3. 写入本地 IndexedDB
await this.batchInsertMessages(data.messages);

// 4. 更新会话列表
await this.updateConversations(data.messages);

// 5. 更新同步位点
currentSyncId = data.nextSyncId;
await this.saveLocalSyncId(currentSyncId);

hasMore = data.hasMore;
}

// 6. 通知 UI 层刷新
this.notifyUI();
}

private async batchInsertMessages(messages: IMMessage[]): Promise<void> {
const db = await this.openDB();
const tx = db.transaction('messages', 'readwrite');
const store = tx.objectStore('messages');

for (const msg of messages) {
store.put(msg); // put 自动去重(相同 msgId 覆盖)
}
}

private async updateConversations(messages: IMMessage[]): Promise<void> {
// 按 conversationId 分组,更新每个会话的最后一条消息和未读数
const convMap = new Map<string, IMMessage>();
for (const msg of messages) {
const existing = convMap.get(msg.conversationId);
if (!existing || msg.seqId > existing.seqId) {
convMap.set(msg.conversationId, msg);
}
}
// 写入 IndexedDB conversations store
}

private async getLocalSyncId(): Promise<number> {
return parseInt(localStorage.getItem('im_sync_id') ?? '0', 10);
}

private async saveLocalSyncId(syncId: number): Promise<void> {
localStorage.setItem('im_sync_id', String(syncId));
}

private notifyUI(): void {
window.dispatchEvent(new CustomEvent('im:synced'));
}

private openDB(): Promise<IDBDatabase> {
return new Promise((resolve, reject) => {
const req = indexedDB.open('im_cache', 1);
req.onsuccess = () => resolve(req.result);
req.onerror = () => reject(req.error);
});
}
}

关键设计点

  • syncId 是用户维度的全局递增 ID,不依赖时间戳
  • 分页拉取(每次 200 条),避免一次性加载过多数据
  • IndexedDB 的 put 操作天然幂等,重复消息自动覆盖
  • 同步完成后通知 UI 刷新,而不是每条消息触发一次渲染

Q6: WebSocket 断连后如何保证消息不丢?

答案

WebSocket 断连是 IM 系统最常见的异常场景。通过 四层防护 保证消息不丢:

reconnect-sync.ts
class ReconnectionHandler {
// 第 1 层:心跳检测 + 快速重连
// 30s 发一次心跳,90s 无响应判定断连
// 指数退避重连:1s → 2s → 4s → 8s → 16s → 30s(上限)

// 第 2 层:重连后增量同步
async onReconnected(): Promise<void> {
// 从上次 syncId 开始拉取所有遗漏的消息
await this.syncManager.syncOfflineMessages();
}

// 第 3 层:发送队列持久化
// 断连时未发送成功的消息保存在 IndexedDB
// 重连后按顺序重新发送
async retrySendQueue(): Promise<void> {
const pendingMessages = await this.loadPendingFromDB();
for (const msg of pendingMessages) {
await this.msgManager.send(msg.conversationId, msg.content, msg.type);
}
}

// 第 4 层:离线推送兜底
// 如果长时间未重连,服务端通过 APNs/FCM 推送通知
// 用户点击通知打开 App → 触发增量同步
}
防护层级触发条件恢复方式
心跳重连短暂断网(< 30s)自动重连 + 增量同步
增量同步断连较久(30s-数小时)基于 syncId 拉取遗漏消息
发送队列持久化发送时断网重连后重发未确认消息
离线推送长时间离线APNs/FCM 推送 + 用户主动打开

Q7: 如何设计消息的存储分层?

答案

IM 消息的访问具有明显的 时间局部性:最近的消息访问频率远高于历史消息。采用 冷热分层 存储:

storage-tiering.ts
interface StorageTier {
// 热数据:最近 3 个月
hot: {
storage: 'MySQL/TiDB'; // 关系数据库
cache: 'Redis'; // 最近 100 条消息缓存
feature: '快速查询、索引丰富';
};

// 温数据:3-12 个月
warm: {
storage: 'HBase/Cassandra'; // 列存数据库
feature: '高吞吐写入、范围查询';
};

// 冷数据:12 个月以上
cold: {
storage: 'S3/OSS'; // 对象存储
format: 'Parquet'; // 列式存储格式
feature: '成本极低、批量读取';
};
}
层级存储时间范围访问延迟成本
L0 缓存Redis最近 N 条< 1ms
L1 热数据MySQL/TiDB0-3 个月< 10ms
L2 温数据HBase3-12 个月< 100ms
L3 冷数据S3/OSS> 12 个月< 1s极低

数据迁移:后台定时任务按时间维度将热数据归档到温数据、冷数据层。查询时透明路由:先查热数据,未命中则查温数据,最后查冷数据。


Q8: IM 系统的前端架构如何设计?

答案

前端 IM SDK 通常采用 分层架构,将网络、存储、业务逻辑解耦:

关键设计原则

原则说明
SDK 与 UI 分离IM SDK 不依赖任何 UI 框架,可在 React/Vue/小程序中复用
事件驱动SDK 通过事件(EventEmitter)通知 UI 层更新
乐观更新发送消息立即显示在界面,ACK 失败再标记
本地优先消息列表优先从 IndexedDB 读取,异步与服务端同步
连接状态管理UI 层响应连接状态变化,展示"连接中"/"已断开"等提示

相关链接