设计即时通讯系统
问题
如何用 Go 设计一个即时通讯(IM)系统?
答案
核心架构
连接管理(Hub)
type Client struct {
ID string
Conn *websocket.Conn
Send chan []byte
}
type Hub struct {
mu sync.RWMutex
clients map[string]*Client // userID → Client
}
func NewHub() *Hub {
return &Hub{clients: make(map[string]*Client)}
}
func (h *Hub) Register(client *Client) {
h.mu.Lock()
h.clients[client.ID] = client
h.mu.Unlock()
}
func (h *Hub) Unregister(clientID string) {
h.mu.Lock()
if c, ok := h.clients[clientID]; ok {
close(c.Send)
delete(h.clients, clientID)
}
h.mu.Unlock()
}
// 发送给指定用户
func (h *Hub) SendTo(userID string, msg []byte) bool {
h.mu.RLock()
client, ok := h.clients[userID]
h.mu.RUnlock()
if !ok {
return false // 用户不在线
}
select {
case client.Send <- msg:
return true
default:
return false // 发送缓冲满
}
}
WebSocket 处理
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
func HandleWebSocket(hub *Hub, w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
userID := r.URL.Query().Get("user_id")
client := &Client{ID: userID, Conn: conn, Send: make(chan []byte, 256)}
hub.Register(client)
// 读写分离
go client.readPump(hub)
go client.writePump()
}
// 读取客户端消息
func (c *Client) readPump(hub *Hub) {
defer func() {
hub.Unregister(c.ID)
c.Conn.Close()
}()
c.Conn.SetReadLimit(4096)
c.Conn.SetPongHandler(func(string) error {
c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
for {
_, message, err := c.Conn.ReadMessage()
if err != nil {
break
}
// 解析并路由消息
var msg ChatMessage
json.Unmarshal(message, &msg)
msg.From = c.ID
msg.Timestamp = time.Now().UnixMilli()
// 投递给目标用户
data, _ := json.Marshal(msg)
if !hub.SendTo(msg.To, data) {
// 用户不在线,存离线消息
saveOfflineMessage(msg)
}
}
}
// 写回客户端
func (c *Client) writePump() {
ticker := time.NewTicker(30 * time.Second)
defer func() {
ticker.Stop()
c.Conn.Close()
}()
for {
select {
case msg, ok := <-c.Send:
if !ok {
c.Conn.WriteMessage(websocket.CloseMessage, nil)
return
}
c.Conn.WriteMessage(websocket.TextMessage, msg)
case <-ticker.C:
// 心跳 Ping
c.Conn.WriteMessage(websocket.PingMessage, nil)
}
}
}
消息模型
type ChatMessage struct {
ID string `json:"id"`
From string `json:"from"`
To string `json:"to"` // 私聊对象 / 群ID
Type string `json:"type"` // text/image/file
Content string `json:"content"`
Timestamp int64 `json:"timestamp"`
}
群聊设计
type Group struct {
ID string
Members map[string]bool
}
func (h *Hub) SendToGroup(group *Group, msg []byte, excludeID string) {
for memberID := range group.Members {
if memberID != excludeID { // 不发给自己
h.SendTo(memberID, msg)
}
}
}
已读回执 & 消息状态
// 客户端发送已读回执
type ReadReceipt struct {
ConversationID string `json:"conversation_id"`
LastReadMsgID string `json:"last_read_msg_id"`
}
多节点扩展
单节点的 Hub 只管理本节点连接。多节点需要通过 Redis Pub/Sub 或消息队列广播:
// 发送消息时:先查本地 Hub,不在则发到 Redis
func SendMessage(hub *Hub, rdb *redis.Client, msg ChatMessage) {
data, _ := json.Marshal(msg)
// 本地投递
if hub.SendTo(msg.To, data) {
return
}
// 通过 Redis Pub/Sub 广播到其他节点
rdb.Publish(context.Background(), "im:messages", data)
}
// 每个节点订阅 Redis 并投递
func SubscribeMessages(hub *Hub, rdb *redis.Client) {
sub := rdb.Subscribe(context.Background(), "im:messages")
for msg := range sub.Channel() {
var chatMsg ChatMessage
json.Unmarshal([]byte(msg.Payload), &chatMsg)
hub.SendTo(chatMsg.To, []byte(msg.Payload))
}
}
常见面试问题
Q1: 如何保证消息不丢失?
答案:
- 客户端发送后等待服务端 ACK
- 服务端先持久化再投递
- 投递失败存离线消息表
- 客户端拉取离线消息(上线时同步)
Q2: 消息顺序如何保证?
答案:
- 服务端为每条消息分配单调递增的序列号(如 Snowflake ID)
- 客户端按序列号排序展示
- 同一会话的消息由同一节点处理(一致性哈希路由)
Q3: 万人群聊如何优化?
答案:
- 消息写扩散改为读扩散:只存一份消息,各成员读取时拉取
- 分层推送:在线成员 WebSocket 推,离线成员攒批推送
- 热点群限流:消息合并发送