实现日志收集器
问题
如何用 Go 实现一个高性能的日志收集器?支持异步写入、批量发送。
答案
核心设计
异步日志收集器
type LogEntry struct {
Level string `json:"level"`
Message string `json:"message"`
Fields map[string]interface{} `json:"fields,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
type AsyncLogger struct {
ch chan LogEntry
writer io.Writer
wg sync.WaitGroup
batchSize int
flushInterval time.Duration
}
func NewAsyncLogger(writer io.Writer, bufSize, batchSize int, flushInterval time.Duration) *AsyncLogger {
l := &AsyncLogger{
ch: make(chan LogEntry, bufSize),
writer: writer,
batchSize: batchSize,
flushInterval: flushInterval,
}
l.wg.Add(1)
go l.consumeLoop()
return l
}
func (l *AsyncLogger) Log(level, message string, fields map[string]interface{}) {
entry := LogEntry{
Level: level,
Message: message,
Fields: fields,
Timestamp: time.Now(),
}
select {
case l.ch <- entry:
default:
// channel 满:根据策略丢弃或阻塞
fmt.Fprintf(os.Stderr, "日志缓冲满,丢弃: %s\n", message)
}
}
// 批量消费循环
func (l *AsyncLogger) consumeLoop() {
defer l.wg.Done()
batch := make([]LogEntry, 0, l.batchSize)
ticker := time.NewTicker(l.flushInterval)
defer ticker.Stop()
for {
select {
case entry, ok := <-l.ch:
if !ok {
// channel 关闭,刷出剩余日志
l.flush(batch)
return
}
batch = append(batch, entry)
if len(batch) >= l.batchSize {
l.flush(batch)
batch = batch[:0] // 复用 slice
}
case <-ticker.C:
// 定时刷出(防止低流量时日志延迟太大)
if len(batch) > 0 {
l.flush(batch)
batch = batch[:0]
}
}
}
}
func (l *AsyncLogger) flush(batch []LogEntry) {
for _, entry := range batch {
data, _ := json.Marshal(entry)
data = append(data, '\n')
l.writer.Write(data)
}
}
func (l *AsyncLogger) Close() {
close(l.ch) // 触发 consumeLoop 退出
l.wg.Wait() // 等待剩余日志写完
}
使用示例
func main() {
file, _ := os.OpenFile("app.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
defer file.Close()
logger := NewAsyncLogger(
file,
10000, // 缓冲 1 万条
100, // 每 100 条刷一次
time.Second, // 或每秒刷一次
)
defer logger.Close()
logger.Log("INFO", "服务启动", map[string]interface{}{"port": 8080})
logger.Log("ERROR", "请求失败", map[string]interface{}{"url": "/api/user", "status": 500})
}
发送到 Kafka
func NewKafkaLogger(brokers []string, topic string) *AsyncLogger {
producer := newKafkaProducer(brokers)
writer := &kafkaWriter{producer: producer, topic: topic}
return NewAsyncLogger(writer, 10000, 200, time.Second)
}
type kafkaWriter struct {
producer sarama.SyncProducer
topic string
}
func (w *kafkaWriter) Write(p []byte) (int, error) {
msg := &sarama.ProducerMessage{
Topic: w.topic,
Value: sarama.ByteEncoder(p),
}
_, _, err := w.producer.SendMessage(msg)
return len(p), err
}
日志轮转
import "gopkg.in/natefinch/lumberjack.v2"
// lumberjack 自动处理日志文件轮转
writer := &lumberjack.Logger{
Filename: "/var/log/app.log",
MaxSize: 100, // 100MB 后轮转
MaxBackups: 7, // 保留 7 个旧文件
MaxAge: 30, // 保留 30 天
Compress: true, // gzip 压缩旧文件
}
logger := NewAsyncLogger(writer, 10000, 100, time.Second)
常见面试问题
Q1: 日志缓冲满了怎么办?
答案:三种策略:
- 丢弃(
select default):不影响业务性能 - 阻塞(直接
ch <-):可能影响业务 - 降级:丢弃低级别日志(DEBUG/INFO),保留 ERROR
Q2: 如何保证进程退出时日志不丢失?
答案:
Close()关闭 channel 并wg.Wait()等待消费完- 配合优雅关闭(signal 处理),确保 logger.Close() 在最后执行