跳到主要内容

设计定时任务系统

问题

如何用 Go 实现定时任务?如何在分布式环境下保证定时任务只执行一次?

答案

单机定时任务

1. time.Ticker(最基础)

func StartTicker() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for range ticker.C {
doTask()
}
}

2. robfig/cron(Cron 表达式)

import "github.com/robfig/cron/v3"

func StartCron() {
c := cron.New(cron.WithSeconds()) // 支持秒级

// 每 30 秒执行
c.AddFunc("*/30 * * * * *", func() {
log.Println("每30秒执行")
})

// 每天凌晨 2 点执行
c.AddFunc("0 0 2 * * *", func() {
log.Println("每日 2:00 执行")
})

c.Start()
defer c.Stop()

select {} // 阻塞主 goroutine
}

3. 时间轮(高效延迟任务)

// 简化版时间轮:O(1) 添加和触发
type TimeWheel struct {
interval time.Duration
slots int
current int
buckets [][]func()
ticker *time.Ticker
}

func NewTimeWheel(interval time.Duration, slots int) *TimeWheel {
tw := &TimeWheel{
interval: interval,
slots: slots,
buckets: make([][]func(), slots),
}
for i := range tw.buckets {
tw.buckets[i] = []func(){}
}
return tw
}

// 添加延迟任务
func (tw *TimeWheel) AddTask(delay time.Duration, task func()) {
steps := int(delay / tw.interval)
pos := (tw.current + steps) % tw.slots
tw.buckets[pos] = append(tw.buckets[pos], task)
}

func (tw *TimeWheel) Start() {
tw.ticker = time.NewTicker(tw.interval)
go func() {
for range tw.ticker.C {
tw.current = (tw.current + 1) % tw.slots
// 执行当前槽的所有任务
tasks := tw.buckets[tw.current]
tw.buckets[tw.current] = nil
for _, task := range tasks {
go task()
}
}
}()
}

分布式定时任务

核心问题:多实例部署时,同一任务只能执行一次。

方案一:分布式锁抢占

func DistributedCronTask(rdb *redis.Client) {
c := cron.New()
c.AddFunc("0 */5 * * *", func() {
lockKey := "cron:report:daily"
// 抢锁(TTL = 任务最大执行时间)
ok, _ := rdb.SetNX(context.Background(), lockKey, "node-1", 5*time.Minute).Result()
if !ok {
log.Println("其他节点已在执行,跳过")
return
}
defer rdb.Del(context.Background(), lockKey)

// 执行任务
generateDailyReport()
})
c.Start()
}

方案二:Leader 选举(etcd)

func LeaderElection(cli *clientv3.Client) {
session, _ := concurrency.NewSession(cli, concurrency.WithTTL(10))
defer session.Close()

election := concurrency.NewElection(session, "/election/cron-leader")

// 竞选 Leader(阻塞直到成为 Leader)
if err := election.Campaign(context.Background(), "node-1"); err != nil {
log.Fatal(err)
}
log.Println("成为 Leader,开始执行定时任务")

// Leader 运行 cron
c := cron.New()
c.AddFunc("@every 1m", doTask)
c.Start()

// 监听 session 过期
select {
case <-session.Done():
log.Println("Leader 身份丢失")
c.Stop()
}
}

方案对比

方案适用场景优点缺点
time.Ticker简单定时零依赖不支持 Cron 表达式
robfig/cron单机 Cron功能完善多实例重复执行
Redis 锁简单分布式实现简单可靠性一般
etcd Leader强一致自动主备切换引入 etcd
时间轮大量延迟任务O(1) 添加精度受限

常见面试问题

Q1: 时间轮和 Timer 堆有什么区别?

答案

  • Timer 堆(Go 标准库):基于四叉堆,添加 O(logn)O(\log n),触发 O(logn)O(\log n)
  • 时间轮:添加 O(1)O(1),触发 O(1)O(1),适合大量延迟任务
  • Go 1.14+ 的 Timer 已优化为 per-P 堆,性能很好,通常不需要自己实现时间轮

Q2: 分布式环境下如何保证任务不丢失?

答案

  • 任务持久化到数据库/Redis
  • 任务执行前标记为"执行中",完成后标记"已完成"
  • 超时未完成的任务由其他节点接管
  • 关键任务加告警监控

相关链接