设计日志收集系统
问题
如何用 Python 设计日志收集与分析系统?如何实现结构化日志?
答案
架构
结构化日志
logger.py
import structlog
import logging
def setup_logger():
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
)
logger = structlog.get_logger()
# 使用
logger.info("user.login", user_id=12345, ip="192.168.1.1")
# 输出: {"event": "user.login", "user_id": 12345, "ip": "192.168.1.1",
# "level": "info", "timestamp": "2024-01-01T00:00:00Z"}
# 绑定上下文(请求级)
log = logger.bind(request_id="abc-123", trace_id="xyz-789")
log.info("order.created", order_id=1001)
请求链路追踪中间件
middleware/trace.py
import uuid
import structlog
from starlette.middleware.base import BaseHTTPMiddleware
class TraceMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
trace_id = request.headers.get("X-Trace-ID", str(uuid.uuid4()))
# 将 trace_id 绑定到当前上下文
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
trace_id=trace_id,
method=request.method,
path=request.url.path,
)
logger.info("request.start")
response = await call_next(request)
logger.info("request.end", status=response.status_code)
response.headers["X-Trace-ID"] = trace_id
return response
日志采集 SDK
log_collector/sdk.py
import json
import asyncio
import aiohttp
from collections import deque
from threading import Thread, Event
class LogCollector:
"""异步批量日志上报 SDK"""
def __init__(self, endpoint: str, batch_size: int = 50, flush_interval: float = 5):
self.endpoint = endpoint
self.batch_size = batch_size
self.flush_interval = flush_interval
self.buffer: deque[dict] = deque(maxlen=10000)
self._stop = Event()
self._thread = Thread(target=self._flush_loop, daemon=True)
self._thread.start()
def log(self, event: str, **kwargs):
self.buffer.append({"event": event, **kwargs})
if len(self.buffer) >= self.batch_size:
self._flush()
def _flush_loop(self):
while not self._stop.is_set():
self._stop.wait(self.flush_interval)
if self.buffer:
self._flush()
def _flush(self):
batch = []
while self.buffer and len(batch) < self.batch_size:
batch.append(self.buffer.popleft())
if batch:
self._send(batch)
def _send(self, batch: list[dict]):
import requests
try:
requests.post(self.endpoint, json=batch, timeout=5)
except Exception:
# 发送失败,放回队列
self.buffer.extendleft(reversed(batch))
def close(self):
self._stop.set()
self._flush()
常见面试问题
Q1: logging vs loguru vs structlog?
答案:
| 库 | 特点 | 适用 |
|---|---|---|
| logging | 标准库,配置复杂 | 简单项目 |
| loguru | 零配置,API 简洁 | 脚本、小项目 |
| structlog | 结构化日志,JSON 输出 | 微服务、生产系统 |
Q2: 日志级别如何设计?
答案:
- DEBUG:开发调试,生产环境关闭
- INFO:关键业务事件(用户登录、订单创建)
- WARNING:异常但可恢复(重试成功、降级触发)
- ERROR:需关注的错误(接口失败、数据异常)
- CRITICAL:严重故障(服务不可用、数据丢失)
Q3: 日志量太大怎么办?
答案:
- 采样:高频日志按比例采样(如 1%)
- 分级存储:ERROR 存 30 天,INFO 存 7 天
- 异步写入:不阻塞主线程
- 结构化索引:只对关键字段建索引