Redis 操作
问题
Go 中如何使用 Redis?go-redis 库的核心用法和最佳实践是什么?
答案
连接配置
import "github.com/redis/go-redis/v9"
// 单节点
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
PoolSize: 10, // 连接池大小
MinIdleConns: 5, // 最小空闲连接
DialTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
})
// 集群
rdb := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{":7000", ":7001", ":7002"},
})
// 哨兵
rdb := redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: "mymaster",
SentinelAddrs: []string{":26379"},
})
基本操作
ctx := context.Background()
// String
rdb.Set(ctx, "key", "value", time.Hour) // 设置,1小时过期
val, err := rdb.Get(ctx, "key").Result()
if errors.Is(err, redis.Nil) {
// key 不存在
}
// Hash
rdb.HSet(ctx, "user:1", "name", "Alice", "age", 25)
rdb.HGetAll(ctx, "user:1").Result()
// List
rdb.LPush(ctx, "queue", "task1", "task2")
rdb.BRPop(ctx, 0, "queue").Result() // 阻塞弹出
// Set
rdb.SAdd(ctx, "tags", "go", "redis")
rdb.SMembers(ctx, "tags").Result()
// Sorted Set
rdb.ZAdd(ctx, "rank", redis.Z{Score: 100, Member: "alice"})
rdb.ZRevRangeWithScores(ctx, "rank", 0, 9).Result() // Top 10
Pipeline 批量操作
Pipeline 将多个命令打包发送,减少网络往返:
// Pipeline(不保证原子性)
pipe := rdb.Pipeline()
incr := pipe.Incr(ctx, "counter")
pipe.Expire(ctx, "counter", time.Hour)
_, err := pipe.Exec(ctx) // 一次性发送
fmt.Println(incr.Val()) // 获取结果
// TxPipeline(MULTI/EXEC 原子执行)
pipe := rdb.TxPipeline()
pipe.Set(ctx, "key1", "val1", 0)
pipe.Set(ctx, "key2", "val2", 0)
_, err := pipe.Exec(ctx) // 在事务中执行
Lua 脚本
// 原子操作:先检查再设置
var incrIfExist = redis.NewScript(`
if redis.call("EXISTS", KEYS[1]) == 1 then
return redis.call("INCR", KEYS[1])
end
return 0
`)
result, err := incrIfExist.Run(ctx, rdb, []string{"counter"}).Int64()
分布式锁
// 使用 SetNX 实现简单锁
func TryLock(ctx context.Context, rdb *redis.Client, key string, ttl time.Duration) (bool, error) {
return rdb.SetNX(ctx, key, "locked", ttl).Result()
}
func Unlock(ctx context.Context, rdb *redis.Client, key string) error {
// Lua 脚本保证原子性:只删除自己设的锁
script := redis.NewScript(`
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
end
return 0
`)
_, err := script.Run(ctx, rdb, []string{key}, "locked").Result()
return err
}
生产环境建议
简单锁用上面的方式,高可用场景推荐 Redsync(Redlock 算法实现)。
发布订阅
// 订阅
sub := rdb.Subscribe(ctx, "chat")
ch := sub.Channel()
for msg := range ch {
fmt.Println(msg.Channel, msg.Payload)
}
// 发布
rdb.Publish(ctx, "chat", "hello")
常用模式
| 模式 | 实现方式 |
|---|---|
| 缓存 | Get → miss → 查 DB → Set |
| 计数器 | Incr、HIncrBy |
| 排行榜 | ZAdd、ZRevRange |
| 限流 | Lua 脚本 + 滑动窗口 |
| 延迟队列 | ZAdd(score=时间戳) + 轮询 ZRangeByScore |
| 分布式锁 | SetNX + TTL + Lua 释放 |
常见面试问题
Q1: Pipeline 和事务的区别?
答案:
| 维度 | Pipeline | TxPipeline |
|---|---|---|
| 命令 | 批量发送 | MULTI/EXEC 包裹 |
| 原子性 | 不保证 | 保证 |
| 性能 | 高 | 略低(事务开销) |
| 使用场景 | 批量读写 | 需要原子操作 |
Q2: Redis 连接池怎么配置?
答案:
&redis.Options{
PoolSize: runtime.GOMAXPROCS(0) * 10, // 每个 CPU 10 个
MinIdleConns: 5, // 最小空闲
MaxIdleTime: 5 * time.Minute, // 空闲超时
}
关键是 PoolSize 不宜过大,超过 Redis 处理能力反而增加延迟。
Q3: 如何防止缓存穿透?
答案:
- 空值缓存:查 DB 不存在时缓存空值,短 TTL
- 布隆过滤器:在缓存前加一层判断 key 是否可能存在
- singleflight:并发请求同一 key 只查一次 DB