跳到主要内容

数据同步方案设计

问题

如何设计一套可靠的数据同步系统,实现 MySQL 到 Elasticsearch / Redis / 数据仓库等异构数据源的实时同步?

答案

同步场景

场景说明
DB → ES搜索引擎索引同步
DB → Redis缓存预热与更新
DB → DB分库分表数据迁移
DB → 数据仓库离线/实时分析
DB → MQ事件驱动解耦

方案对比

方案实时性侵入性可靠性复杂度
双写实时高(业务耦合)低(易不一致)
MQ 异步秒级
Binlog 订阅 (CDC)秒级无侵入中高
定时全量/增量同步分钟~小时级
推荐方案

生产环境首选 Binlog CDC,对业务代码零侵入。Canal(阿里开源)和 Debezium 是最主流的两个工具。

Binlog CDC 原理

Canal 核心原理:

  1. 伪装为 MySQL Slave,向 Master 发送 dump 协议
  2. Master 推送 Binlog 事件流
  3. Canal 解析 Binlog(ROW 模式),提取变更数据
  4. 将变更事件投递到 MQ / 直接写入目标

Canal 使用示例

Canal 客户端消费 Binlog 事件
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("canal-server", 11111), "example", "", "");

connector.connect();
connector.subscribe("mydb\\.order.*"); // 订阅 mydb 的 order 相关表

while (true) {
Message message = connector.getWithoutAck(100);
long batchId = message.getId();

if (batchId != -1) {
for (Entry entry : message.getEntries()) {
if (entry.getEntryType() == EntryType.ROWDATA) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();

for (RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.INSERT) {
syncToES(rowData.getAfterColumnsList());
} else if (eventType == EventType.UPDATE) {
updateES(rowData.getAfterColumnsList());
} else if (eventType == EventType.DELETE) {
deleteFromES(rowData.getBeforeColumnsList());
}
}
}
}
connector.ack(batchId); // 确认消费
}
}

Debezium 方案

Debezium MySQL Connector 配置
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql-host",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "1001",
"topic.prefix": "dbserver1",
"database.include.list": "mydb",
"table.include.list": "mydb.orders,mydb.users",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.mydb"
}
}
Canal vs Debezium
对比CanalDebezium
厂商阿里巴巴Red Hat
支持数据库MySQLMySQL/PostgreSQL/MongoDB/Oracle
传输自带 Client / MQKafka Connect
国内生态成熟较少
Schema 追踪不支持支持

数据一致性保障

策略说明
ACK 机制消费成功后才确认,失败重试
幂等处理基于主键做 upsert,重复消费不产生副作用
定时对账定期全量/增量对比,发现不一致则修复
监控告警同步延迟超阈值报警

常见面试问题

Q1: 为什么不用双写?

答案

双写(业务代码同时写 DB 和 ES)存在严重问题:

  1. 一致性难保证:DB 成功但 ES 失败,数据不一致
  2. 业务耦合高:每个写操作都要加同步逻辑
  3. 性能下降:写操作链路变长,RT 增加

Q2: Binlog 同步延迟怎么处理?

答案

  • 正常延迟在毫秒~秒级
  • 延迟过高时检查:从库同步延迟、Canal 消费积压、目标写入瓶颈
  • 降级策略:同步延迟时查主库兜底

Q3: 同步过程中目标库宕机怎么办?

答案

  • MQ 缓冲:Canal → MQ → 目标,MQ 持久化保障不丢
  • 位点管理:Canal 记录消费位点(Binlog position / GTID),恢复后从断点续传
  • 死信队列:多次重试失败进入死信,人工排查后重新投递

Q4: 如何做数据迁移的全量 + 增量?

答案

  1. 记录当前 Binlog 位点
  2. 全量导出存量数据(mysqldump / SELECT 分批)写入目标
  3. 从记录的位点开始增量订阅 Binlog
  4. 增量追上实时后,启动对账校验
  5. 校验通过,切换读流量到新库

相关链接