数据管道问题排查
问题
Python 数据处理管道中常见的性能和准确性问题有哪些?如何排查和优化?
答案
常见问题
大数据内存优化
pipeline/memory.py
import pandas as pd
# ❌ 一次加载全量(内存爆炸)
df = pd.read_csv("huge_file.csv") # 10GB 文件直接 OOM
# ✅ 分块读取
def process_in_chunks(filepath: str, chunk_size: int = 10_000):
results = []
for chunk in pd.read_csv(filepath, chunksize=chunk_size):
# 每个 chunk 是 DataFrame
processed = transform(chunk)
results.append(processed)
return pd.concat(results, ignore_index=True)
# ✅ 指定数据类型减少内存
dtypes = {
"user_id": "int32", # 默认 int64,节省 50%
"amount": "float32",
"status": "category", # 枚举值用 category
}
df = pd.read_csv("data.csv", dtype=dtypes)
# ✅ 用 Polars 替代 Pandas(更快更省内存)
import polars as pl
df = pl.scan_csv("huge_file.csv") # 延迟加载
result = (
df.filter(pl.col("status") == "active")
.group_by("category")
.agg(pl.col("amount").sum())
.collect() # 最终执行
)
数据质量检查
pipeline/quality.py
import pandas as pd
from dataclasses import dataclass
@dataclass
class QualityReport:
total_rows: int
null_counts: dict
duplicate_count: int
anomalies: list[str]
def check_quality(df: pd.DataFrame) -> QualityReport:
anomalies = []
# 空值检查
null_counts = df.isnull().sum().to_dict()
for col, count in null_counts.items():
if count > len(df) * 0.1: # 空值超过 10%
anomalies.append(f"列 {col} 空值率 {count/len(df):.1%}")
# 重复行检查
dup_count = df.duplicated().sum()
if dup_count > 0:
anomalies.append(f"发现 {dup_count} 行重复数据")
# 异常值检查(IQR 方法)
for col in df.select_dtypes(include="number"):
q1, q3 = df[col].quantile([0.25, 0.75])
iqr = q3 - q1
outliers = ((df[col] < q1 - 1.5 * iqr) | (df[col] > q3 + 1.5 * iqr)).sum()
if outliers > 0:
anomalies.append(f"列 {col} 有 {outliers} 个异常值")
return QualityReport(
total_rows=len(df),
null_counts=null_counts,
duplicate_count=dup_count,
anomalies=anomalies,
)
幂等性保证
pipeline/idempotent.py
import hashlib
def idempotent_load(df: pd.DataFrame, table: str, engine):
"""幂等写入:按日期分区覆盖"""
date = df["date"].iloc[0]
# 先删除该分区
engine.execute(f"DELETE FROM {table} WHERE date = :date", {"date": date})
# 再插入
df.to_sql(table, engine, if_exists="append", index=False)
def dedup_by_key(df: pd.DataFrame, keys: list[str]) -> pd.DataFrame:
"""按业务主键去重(保留最新)"""
return df.sort_values("updated_at").drop_duplicates(subset=keys, keep="last")
常见面试问题
Q1: Pandas vs Polars vs Dask?
答案:
| 库 | 数据量 | 特点 |
|---|---|---|
| Pandas | <1GB | 最通用,生态最好 |
| Polars | <100GB | 快、延迟执行、Rust 实现 |
| Dask | >100GB | 分布式 Pandas,集群计算 |
| PySpark | TB级 | Spark 生态,集群必选 |
Q2: 管道失败如何重试?
答案:
- 幂等设计:重跑不会产生重复数据
- 检查点:每一步完成后记录状态,失败从断点恢复
- Airflow 重试:
retries=3, retry_delay=timedelta(minutes=5)
Q3: 如何监控数据管道?
答案:
- 处理行数、成功率指标(Prometheus)
- 执行时间趋势(Grafana 看板)
- 数据质量报告(自动邮件)
- 异常告警(Slack/钉钉通知)