日志分析与排查
问题
如何通过日志快速定位生产环境问题?Python 日志最佳实践是什么?
答案
结构化日志配置
logging_config.py
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"line": record.lineno,
}
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
# 附加额外字段
if hasattr(record, "extra_data"):
log_data.update(record.extra_data)
return json.dumps(log_data, ensure_ascii=False)
# 配置
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logging.basicConfig(level=logging.INFO, handlers=[handler])
请求链路日志
middleware/request_log.py
import uuid
import time
import logging
from contextvars import ContextVar
from fastapi import Request
trace_id_var: ContextVar[str] = ContextVar("trace_id", default="")
logger = logging.getLogger("api")
class RequestLogMiddleware:
async def __call__(self, request: Request, call_next):
trace_id = request.headers.get("X-Trace-ID", str(uuid.uuid4())[:8])
trace_id_var.set(trace_id)
start = time.perf_counter()
logger.info("request_start", extra={"extra_data": {
"trace_id": trace_id,
"method": request.method,
"path": request.url.path,
"client_ip": request.client.host,
}})
try:
response = await call_next(request)
duration = time.perf_counter() - start
logger.info("request_end", extra={"extra_data": {
"trace_id": trace_id,
"status": response.status_code,
"duration_ms": round(duration * 1000, 2),
}})
return response
except Exception as e:
logger.error("request_error", exc_info=True, extra={"extra_data": {
"trace_id": trace_id,
"error": str(e),
}})
raise
命令行日志排查
# 查找错误日志
grep '"level": "ERROR"' app.log | tail -20
# 按 trace_id 追踪请求
grep '"trace_id": "abc123"' app.log
# 统计各接口的请求量
cat app.log | jq -r 'select(.message == "request_end") | .path' | sort | uniq -c | sort -rn
# 查找慢接口(>1s)
cat app.log | jq 'select(.duration_ms > 1000)'
# 统计错误分布
cat app.log | jq -r 'select(.level == "ERROR") | .message' | sort | uniq -c | sort -rn
# 查看某时间段的日志
awk '/2024-01-15T10:00/,/2024-01-15T11:00/' app.log
异常聚合
error_aggregator.py
import traceback
import hashlib
from collections import defaultdict
class ErrorAggregator:
"""聚合相同异常,避免重复告警"""
def __init__(self):
self.errors: dict[str, list] = defaultdict(list)
def record(self, exc: Exception):
# 用堆栈指纹聚合
tb = traceback.format_exception(type(exc), exc, exc.__traceback__)
fingerprint = hashlib.md5("".join(tb[-3:]).encode()).hexdigest()
self.errors[fingerprint].append({
"error": str(exc),
"count": 1,
"traceback": "".join(tb),
})
def report(self) -> list[dict]:
return [
{"fingerprint": fp, "count": len(items), "sample": items[0]}
for fp, items in sorted(self.errors.items(), key=lambda x: -len(x[1]))
]
常见面试问题
Q1: 日志级别怎么用?
答案:
| 级别 | 含义 | 示例 |
|---|---|---|
| DEBUG | 调试信息 | 变量值、SQL 语句 |
| INFO | 正常业务事件 | 用户登录、订单创建 |
| WARNING | 异常但可恢复 | 重试成功、降级触发 |
| ERROR | 需关注的错误 | 接口失败、数据异常 |
| CRITICAL | 致命错误 | 服务挂了、数据丢失 |
Q2: 日志太多影响性能怎么办?
答案:
- 生产环境 INFO 级别(关闭 DEBUG)
- 异步写入日志(
QueueHandler) - 高频日志采样(如每 100 次记录 1 次)
- 结构化索引,只写关键字段
Q3: 如何排查偶现的 500 错误?
答案:
- 根据时间段过滤 ERROR 日志
- 用 trace_id 追踪完整请求链路
- 查看异常堆栈和请求参数
- 检查是否与外部依赖故障、资源耗尽相关