并发模式
问题
Go 有哪些经典的并发模式?如何选择合适的并发模式?
答案
1. Pipeline 管道模式
将数据处理拆分为多个阶段,每个阶段用 goroutine 处理,通过 Channel 串联:
// 阶段 1:生成数据
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// 阶段 2:平方
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
// 阶段 3:过滤偶数
func filterEven(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n%2 == 0 {
out <- n
}
}
}()
return out
}
// 组装管道
func main() {
ch := generate(1, 2, 3, 4, 5)
ch = square(ch)
ch = filterEven(ch)
for v := range ch {
fmt.Println(v) // 4, 16
}
}
2. Fan-out / Fan-in(扇出/扇入)
Fan-out:多个 goroutine 从同一个 Channel 读取(并行处理)。
Fan-in:多个 Channel 的结果合并到一个 Channel。
// Fan-in:合并多个 Channel
func merge(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
merged <- v
}
}(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
// 使用
func main() {
in := generate(1, 2, 3, 4, 5, 6, 7, 8)
// Fan-out:启动 3 个 worker 并行处理
c1 := square(in) // 3 个 goroutine 竞争读取 in
c2 := square(in)
c3 := square(in)
// Fan-in:合并结果
for v := range merge(c1, c2, c3) {
fmt.Println(v)
}
}
3. Worker Pool 工作池
固定数量的 goroutine 处理任务队列:
func workerPool(numWorkers int, jobs <-chan int, results chan<- int) {
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for job := range jobs {
// 处理任务
result := process(job)
results <- result
}
}(i)
}
// 所有 worker 完成后关闭结果 Channel
go func() {
wg.Wait()
close(results)
}()
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// 启动 5 个 worker
workerPool(5, jobs, results)
// 发送任务
go func() {
for i := 0; i < 50; i++ {
jobs <- i
}
close(jobs)
}()
// 收集结果
for r := range results {
fmt.Println(r)
}
}
4. Semaphore 信号量
用带缓冲的 Channel 控制并发数:
func main() {
urls := []string{/* ... 100 个 URL ... */}
sem := make(chan struct{}, 10) // 最多 10 个并发
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1)
sem <- struct{}{} // 获取信号量(满了就阻塞)
go func(u string) {
defer wg.Done()
defer func() { <-sem }() // 释放信号量
fetch(u)
}(url)
}
wg.Wait()
}
也可以用 golang.org/x/sync/semaphore 包:
import "golang.org/x/sync/semaphore"
var sem = semaphore.NewWeighted(10)
func worker(ctx context.Context) error {
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)
return doWork()
}
5. errgroup——带错误收集的并发
import "golang.org/x/sync/errgroup"
func fetchAll(ctx context.Context, urls []string) ([]string, error) {
g, ctx := errgroup.WithContext(ctx)
results := make([]string, len(urls))
for i, url := range urls {
i, url := i, url // 变量捕获(Go 1.22 前需要)
g.Go(func() error {
resp, err := fetch(ctx, url)
if err != nil {
return err // 任一失败,ctx 自动取消
}
results[i] = resp
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err // 返回第一个错误
}
return results, nil
}
errgroup 可以配合 SetLimit 控制并发数(Go 1.20+):
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10) // 最多 10 个并发 goroutine
errgroup 是最常用的并发模式
实际项目中,errgroup 覆盖了大多数并发场景:并行执行 + 错误收集 + Context 取消 + 并发限制。推荐优先使用。
6. Or-Done Channel
从可能未关闭的 Channel 读取,同时监听 Context 取消:
func orDone(ctx context.Context, c <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case v, ok := <-c:
if !ok {
return
}
select {
case out <- v:
case <-ctx.Done():
return
}
}
}
}()
return out
}
7. Tee Channel
将一个 Channel 复制为两个(广播):
func tee(ctx context.Context, in <-chan int) (<-chan int, <-chan int) {
out1, out2 := make(chan int), make(chan int)
go func() {
defer close(out1)
defer close(out2)
for val := range orDone(ctx, in) {
// 用局部变量避免重复发送
o1, o2 := out1, out2
for i := 0; i < 2; i++ {
select {
case o1 <- val:
o1 = nil // 已发送,置 nil 防止重复
case o2 <- val:
o2 = nil
}
}
}
}()
return out1, out2
}
模式选型
| 场景 | 推荐模式 |
|---|---|
| 多阶段数据处理 | Pipeline |
| 并行处理 + 合并结果 | Fan-out / Fan-in |
| 固定并发数处理大量任务 | Worker Pool |
| 限制并发数 | Semaphore |
| 并行 + 错误收集 | errgroup ⭐ |
| 超时取消 | Context + select |
常见面试问题
Q1: Worker Pool 的 worker 数量怎么确定?
答案:
- CPU 密集型:worker 数 = CPU 核心数(
runtime.NumCPU()) - IO 密集型:worker 数 > CPU 核心数,具体取决于 IO 耗时。通常
核心数 × 2 ~ 核心数 × 10 - 最佳实践:通过压测确定最优值
Q2: Go 和 Java 的线程池有什么区别?
答案:
| 对比 | Go Worker Pool | Java ThreadPoolExecutor |
|---|---|---|
| 实体 | goroutine(用户态) | OS 线程(内核态) |
| 创建成本 | 极低(~2KB 栈) | 高(~1MB 栈) |
| 调度 | Go 运行时调度 | OS 调度 |
| 是否需要池化 | 不是必须,但有用 | 必须(线程太贵) |
| 标准库支持 | 无内置,用 Channel + WaitGroup | java.util.concurrent |
Go 的 goroutine 本身就很廉价,Worker Pool 的主要目的不是复用 goroutine,而是控制并发数。
Q3: Pipeline 模式的关键注意事项?
答案:
- 每个阶段必须 close 输出 Channel——否则下游
range永远不会结束 - 注意 goroutine 泄漏——如果下游不再消费,上游阻塞在 send 上。需要用 Context 控制取消
- 错误处理——Pipeline 中某个阶段出错,需要通知其他阶段停止
Q4: errgroup 和 WaitGroup 怎么选?
答案:
- 不需要错误处理:
sync.WaitGroup - 需要错误收集:
errgroup(返回第一个错误,自动取消 Context) - 需要收集所有错误:自定义,或用第三方库(如
go.uber.org/multierr)