设计定时任务系统
问题
如何用 Go 实现定时任务?如何在分布式环境下保证定时任务只执行一次?
答案
单机定时任务
1. time.Ticker(最基础)
func StartTicker() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
doTask()
}
}
2. robfig/cron(Cron 表达式)
import "github.com/robfig/cron/v3"
func StartCron() {
c := cron.New(cron.WithSeconds()) // 支持秒级
// 每 30 秒执行
c.AddFunc("*/30 * * * * *", func() {
log.Println("每30秒执行")
})
// 每天凌晨 2 点执行
c.AddFunc("0 0 2 * * *", func() {
log.Println("每日 2:00 执行")
})
c.Start()
defer c.Stop()
select {} // 阻塞主 goroutine
}
3. 时间轮(高效延迟任务)
// 简化版时间轮:O(1) 添加和触发
type TimeWheel struct {
interval time.Duration
slots int
current int
buckets [][]func()
ticker *time.Ticker
}
func NewTimeWheel(interval time.Duration, slots int) *TimeWheel {
tw := &TimeWheel{
interval: interval,
slots: slots,
buckets: make([][]func(), slots),
}
for i := range tw.buckets {
tw.buckets[i] = []func(){}
}
return tw
}
// 添加延迟任务
func (tw *TimeWheel) AddTask(delay time.Duration, task func()) {
steps := int(delay / tw.interval)
pos := (tw.current + steps) % tw.slots
tw.buckets[pos] = append(tw.buckets[pos], task)
}
func (tw *TimeWheel) Start() {
tw.ticker = time.NewTicker(tw.interval)
go func() {
for range tw.ticker.C {
tw.current = (tw.current + 1) % tw.slots
// 执行当前槽的所有任务
tasks := tw.buckets[tw.current]
tw.buckets[tw.current] = nil
for _, task := range tasks {
go task()
}
}
}()
}
分布式定时任务
核心问题:多实例部署时,同一任务只能执行一次。
方案一:分布式锁抢占
func DistributedCronTask(rdb *redis.Client) {
c := cron.New()
c.AddFunc("0 */5 * * *", func() {
lockKey := "cron:report:daily"
// 抢锁(TTL = 任务最大执行时间)
ok, _ := rdb.SetNX(context.Background(), lockKey, "node-1", 5*time.Minute).Result()
if !ok {
log.Println("其他节点已在执行,跳过")
return
}
defer rdb.Del(context.Background(), lockKey)
// 执行任务
generateDailyReport()
})
c.Start()
}
方案二:Leader 选举(etcd)
func LeaderElection(cli *clientv3.Client) {
session, _ := concurrency.NewSession(cli, concurrency.WithTTL(10))
defer session.Close()
election := concurrency.NewElection(session, "/election/cron-leader")
// 竞选 Leader(阻塞直到成为 Leader)
if err := election.Campaign(context.Background(), "node-1"); err != nil {
log.Fatal(err)
}
log.Println("成为 Leader,开始执行定时任务")
// Leader 运行 cron
c := cron.New()
c.AddFunc("@every 1m", doTask)
c.Start()
// 监听 session 过期
select {
case <-session.Done():
log.Println("Leader 身份丢失")
c.Stop()
}
}
方案对比
| 方案 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| time.Ticker | 简单定时 | 零依赖 | 不支持 Cron 表达式 |
| robfig/cron | 单机 Cron | 功能完善 | 多实例重复执行 |
| Redis 锁 | 简单分布式 | 实现简单 | 可靠性一般 |
| etcd Leader | 强一致 | 自动主备切换 | 引入 etcd |
| 时间轮 | 大量延迟任务 | O(1) 添加 | 精度受限 |
常见面试问题
Q1: 时间轮和 Timer 堆有什么区别?
答案:
- Timer 堆(Go 标准库):基于四叉堆,添加 ,触发
- 时间轮:添加 ,触发 ,适合大量延迟任务
- Go 1.14+ 的 Timer 已优化为 per-P 堆,性能很好,通常不需要自己实现时间轮
Q2: 分布式环境下如何保证任务不丢失?
答案:
- 任务持久化到数据库/Redis
- 任务执行前标记为"执行中",完成后标记"已完成"
- 超时未完成的任务由其他节点接管
- 关键任务加告警监控