跳到主要内容

设计日志收集系统

问题

如何用 Go 设计一个高吞吐、低延迟的日志收集系统,支持结构化日志、多级采集和查询?

答案

整体架构

日志模型

type LogEntry struct {
Timestamp time.Time `json:"timestamp"`
Level string `json:"level"` // DEBUG, INFO, WARN, ERROR
Service string `json:"service"`
TraceID string `json:"trace_id"`
SpanID string `json:"span_id"`
Message string `json:"message"`
Fields map[string]any `json:"fields"` // 结构化字段
Caller string `json:"caller"` // 调用位置
Host string `json:"host"`
}

Agent 采集端

Agent 部署在每台机器上,负责本地日志采集和批量转发:

type Agent struct {
buffer chan *LogEntry // 缓冲通道
producer *kafka.Writer
batch []*LogEntry
mu sync.Mutex
}

func NewAgent(kafkaBrokers []string, topic string) *Agent {
a := &Agent{
buffer: make(chan *LogEntry, 10000), // 缓冲区
producer: &kafka.Writer{
Addr: kafka.TCP(kafkaBrokers...),
Topic: topic,
Balancer: &kafka.LeastBytes{},
// 批量发送配置
BatchSize: 1000,
BatchTimeout: 100 * time.Millisecond,
},
}
go a.flushLoop()
return a
}

// 接收日志
func (a *Agent) Collect(entry *LogEntry) {
select {
case a.buffer <- entry:
default:
// 缓冲满了,丢弃或写本地磁盘兜底
log.Println("日志缓冲区满,丢弃日志")
}
}

// 批量刷写到 Kafka
func (a *Agent) flushLoop() {
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()

batch := make([]kafka.Message, 0, 1000)

for {
select {
case entry := <-a.buffer:
data, _ := json.Marshal(entry)
batch = append(batch, kafka.Message{Value: data})
// 达到批量大小立即发送
if len(batch) >= 1000 {
a.sendBatch(batch)
batch = batch[:0]
}
case <-ticker.C:
if len(batch) > 0 {
a.sendBatch(batch)
batch = batch[:0]
}
}
}
}

func (a *Agent) sendBatch(batch []kafka.Message) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := a.producer.WriteMessages(ctx, batch...); err != nil {
log.Printf("发送 Kafka 失败: %v", err)
// 降级:写本地磁盘
}
}

消费处理服务

type LogConsumer struct {
reader *kafka.Reader
es *elasticsearch.Client
}

func (c *LogConsumer) Consume(ctx context.Context) {
for {
msg, err := c.reader.ReadMessage(ctx)
if err != nil {
log.Printf("读取消息失败: %v", err)
continue
}

var entry LogEntry
if err := json.Unmarshal(msg.Value, &entry); err != nil {
continue
}

// 写入 Elasticsearch
indexName := fmt.Sprintf("logs-%s-%s", entry.Service, entry.Timestamp.Format("2006.01.02"))
data, _ := json.Marshal(entry)

c.es.Index(indexName, bytes.NewReader(data))
}
}

日志采样

高流量场景对 DEBUG/INFO 日志做采样,降低存储量:

type Sampler struct {
counter atomic.Int64
rate int64 // 每 N 条采 1 条
}

func (s *Sampler) ShouldLog(level string) bool {
switch level {
case "ERROR", "WARN":
return true // 错误日志全量采集
default:
n := s.counter.Add(1)
return n%s.rate == 0
}
}

关键设计决策

问题方案
日志丢失Agent 本地磁盘兜底 + Kafka 持久化
高吞吐批量发送 + 异步 Channel
存储膨胀日志分级、采样、冷热分层(7天ES→归档到 S3)
查询性能ES 按日期分索引 + 服务维度分片
敏感信息消费端脱敏处理(手机号、身份证等)

常见面试问题

Q1: Agent 挂了日志会丢吗?

答案

  • Agent 使用内存 Channel 缓冲,如果 Agent 进程崩溃,Channel 中的数据会丢失
  • 可以加磁盘 WAL(Write-Ahead Log):先写本地文件,再异步发 Kafka
  • Kafka 本身有持久化保障,发送成功的不会丢

Q2: 如何处理日志量激增(如故障时大量 ERROR)?

答案

  • Kafka 分区扩容 + 消费者水平扩展
  • Agent 端限速(令牌桶限流)
  • DEBUG/INFO 自动采样降级
  • 告警触发后临时调整采样率

Q3: ES 索引策略如何设计?

答案

  • service + 日期 分索引:logs-user-service-2024.01.15
  • 热数据用 SSD 节点,7 天以上转冷节点
  • 30 天以上归档到对象存储
  • 使用 ILM(Index Lifecycle Management)自动管理

Q4: Go 的日志库怎么选?

答案

特点
log/slogGo 1.21 标准库,结构化日志,推荐新项目
zapUber 出品,高性能,零分配
zerolog零分配 JSON 日志
logrus老牌库,API 友好但性能一般

生产环境推荐 zapslog,配合 JSON 输出方便采集。

相关链接