跳到主要内容

日志分析与排查

问题

如何通过日志快速定位生产环境问题?Python 日志最佳实践是什么?

答案

结构化日志配置

logging_config.py
import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"line": record.lineno,
}
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
# 附加额外字段
if hasattr(record, "extra_data"):
log_data.update(record.extra_data)
return json.dumps(log_data, ensure_ascii=False)

# 配置
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logging.basicConfig(level=logging.INFO, handlers=[handler])

请求链路日志

middleware/request_log.py
import uuid
import time
import logging
from contextvars import ContextVar
from fastapi import Request

trace_id_var: ContextVar[str] = ContextVar("trace_id", default="")

logger = logging.getLogger("api")

class RequestLogMiddleware:
async def __call__(self, request: Request, call_next):
trace_id = request.headers.get("X-Trace-ID", str(uuid.uuid4())[:8])
trace_id_var.set(trace_id)

start = time.perf_counter()
logger.info("request_start", extra={"extra_data": {
"trace_id": trace_id,
"method": request.method,
"path": request.url.path,
"client_ip": request.client.host,
}})

try:
response = await call_next(request)
duration = time.perf_counter() - start
logger.info("request_end", extra={"extra_data": {
"trace_id": trace_id,
"status": response.status_code,
"duration_ms": round(duration * 1000, 2),
}})
return response
except Exception as e:
logger.error("request_error", exc_info=True, extra={"extra_data": {
"trace_id": trace_id,
"error": str(e),
}})
raise

命令行日志排查

# 查找错误日志
grep '"level": "ERROR"' app.log | tail -20

# 按 trace_id 追踪请求
grep '"trace_id": "abc123"' app.log

# 统计各接口的请求量
cat app.log | jq -r 'select(.message == "request_end") | .path' | sort | uniq -c | sort -rn

# 查找慢接口(>1s)
cat app.log | jq 'select(.duration_ms > 1000)'

# 统计错误分布
cat app.log | jq -r 'select(.level == "ERROR") | .message' | sort | uniq -c | sort -rn

# 查看某时间段的日志
awk '/2024-01-15T10:00/,/2024-01-15T11:00/' app.log

异常聚合

error_aggregator.py
import traceback
import hashlib
from collections import defaultdict

class ErrorAggregator:
"""聚合相同异常,避免重复告警"""
def __init__(self):
self.errors: dict[str, list] = defaultdict(list)

def record(self, exc: Exception):
# 用堆栈指纹聚合
tb = traceback.format_exception(type(exc), exc, exc.__traceback__)
fingerprint = hashlib.md5("".join(tb[-3:]).encode()).hexdigest()
self.errors[fingerprint].append({
"error": str(exc),
"count": 1,
"traceback": "".join(tb),
})

def report(self) -> list[dict]:
return [
{"fingerprint": fp, "count": len(items), "sample": items[0]}
for fp, items in sorted(self.errors.items(), key=lambda x: -len(x[1]))
]

常见面试问题

Q1: 日志级别怎么用?

答案

级别含义示例
DEBUG调试信息变量值、SQL 语句
INFO正常业务事件用户登录、订单创建
WARNING异常但可恢复重试成功、降级触发
ERROR需关注的错误接口失败、数据异常
CRITICAL致命错误服务挂了、数据丢失

Q2: 日志太多影响性能怎么办?

答案

  1. 生产环境 INFO 级别(关闭 DEBUG)
  2. 异步写入日志(QueueHandler
  3. 高频日志采样(如每 100 次记录 1 次)
  4. 结构化索引,只写关键字段

Q3: 如何排查偶现的 500 错误?

答案

  1. 根据时间段过滤 ERROR 日志
  2. 用 trace_id 追踪完整请求链路
  3. 查看异常堆栈和请求参数
  4. 检查是否与外部依赖故障、资源耗尽相关

相关链接