跳到主要内容

并发模式

问题

Go 有哪些经典的并发模式?如何选择合适的并发模式?

答案

1. Pipeline 管道模式

将数据处理拆分为多个阶段,每个阶段用 goroutine 处理,通过 Channel 串联:

// 阶段 1:生成数据
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}

// 阶段 2:平方
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}

// 阶段 3:过滤偶数
func filterEven(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n%2 == 0 {
out <- n
}
}
}()
return out
}

// 组装管道
func main() {
ch := generate(1, 2, 3, 4, 5)
ch = square(ch)
ch = filterEven(ch)

for v := range ch {
fmt.Println(v) // 4, 16
}
}

2. Fan-out / Fan-in(扇出/扇入)

Fan-out:多个 goroutine 从同一个 Channel 读取(并行处理)。
Fan-in:多个 Channel 的结果合并到一个 Channel。

// Fan-in:合并多个 Channel
func merge(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)

for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
merged <- v
}
}(ch)
}

go func() {
wg.Wait()
close(merged)
}()

return merged
}

// 使用
func main() {
in := generate(1, 2, 3, 4, 5, 6, 7, 8)

// Fan-out:启动 3 个 worker 并行处理
c1 := square(in) // 3 个 goroutine 竞争读取 in
c2 := square(in)
c3 := square(in)

// Fan-in:合并结果
for v := range merge(c1, c2, c3) {
fmt.Println(v)
}
}

3. Worker Pool 工作池

固定数量的 goroutine 处理任务队列:

func workerPool(numWorkers int, jobs <-chan int, results chan<- int) {
var wg sync.WaitGroup

for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for job := range jobs {
// 处理任务
result := process(job)
results <- result
}
}(i)
}

// 所有 worker 完成后关闭结果 Channel
go func() {
wg.Wait()
close(results)
}()
}

func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)

// 启动 5 个 worker
workerPool(5, jobs, results)

// 发送任务
go func() {
for i := 0; i < 50; i++ {
jobs <- i
}
close(jobs)
}()

// 收集结果
for r := range results {
fmt.Println(r)
}
}

4. Semaphore 信号量

用带缓冲的 Channel 控制并发数:

func main() {
urls := []string{/* ... 100 个 URL ... */}
sem := make(chan struct{}, 10) // 最多 10 个并发
var wg sync.WaitGroup

for _, url := range urls {
wg.Add(1)
sem <- struct{}{} // 获取信号量(满了就阻塞)

go func(u string) {
defer wg.Done()
defer func() { <-sem }() // 释放信号量

fetch(u)
}(url)
}

wg.Wait()
}

也可以用 golang.org/x/sync/semaphore 包:

import "golang.org/x/sync/semaphore"

var sem = semaphore.NewWeighted(10)

func worker(ctx context.Context) error {
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)
return doWork()
}

5. errgroup——带错误收集的并发

import "golang.org/x/sync/errgroup"

func fetchAll(ctx context.Context, urls []string) ([]string, error) {
g, ctx := errgroup.WithContext(ctx)
results := make([]string, len(urls))

for i, url := range urls {
i, url := i, url // 变量捕获(Go 1.22 前需要)
g.Go(func() error {
resp, err := fetch(ctx, url)
if err != nil {
return err // 任一失败,ctx 自动取消
}
results[i] = resp
return nil
})
}

if err := g.Wait(); err != nil {
return nil, err // 返回第一个错误
}
return results, nil
}

errgroup 可以配合 SetLimit 控制并发数(Go 1.20+):

g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10) // 最多 10 个并发 goroutine
errgroup 是最常用的并发模式

实际项目中,errgroup 覆盖了大多数并发场景:并行执行 + 错误收集 + Context 取消 + 并发限制。推荐优先使用。

6. Or-Done Channel

从可能未关闭的 Channel 读取,同时监听 Context 取消:

func orDone(ctx context.Context, c <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case v, ok := <-c:
if !ok {
return
}
select {
case out <- v:
case <-ctx.Done():
return
}
}
}
}()
return out
}

7. Tee Channel

将一个 Channel 复制为两个(广播):

func tee(ctx context.Context, in <-chan int) (<-chan int, <-chan int) {
out1, out2 := make(chan int), make(chan int)
go func() {
defer close(out1)
defer close(out2)
for val := range orDone(ctx, in) {
// 用局部变量避免重复发送
o1, o2 := out1, out2
for i := 0; i < 2; i++ {
select {
case o1 <- val:
o1 = nil // 已发送,置 nil 防止重复
case o2 <- val:
o2 = nil
}
}
}
}()
return out1, out2
}

模式选型

场景推荐模式
多阶段数据处理Pipeline
并行处理 + 合并结果Fan-out / Fan-in
固定并发数处理大量任务Worker Pool
限制并发数Semaphore
并行 + 错误收集errgroup
超时取消Context + select

常见面试问题

Q1: Worker Pool 的 worker 数量怎么确定?

答案

  • CPU 密集型:worker 数 = CPU 核心数(runtime.NumCPU()
  • IO 密集型:worker 数 > CPU 核心数,具体取决于 IO 耗时。通常 核心数 × 2 ~ 核心数 × 10
  • 最佳实践:通过压测确定最优值

Q2: Go 和 Java 的线程池有什么区别?

答案

对比Go Worker PoolJava ThreadPoolExecutor
实体goroutine(用户态)OS 线程(内核态)
创建成本极低(~2KB 栈)高(~1MB 栈)
调度Go 运行时调度OS 调度
是否需要池化不是必须,但有用必须(线程太贵)
标准库支持无内置,用 Channel + WaitGroupjava.util.concurrent

Go 的 goroutine 本身就很廉价,Worker Pool 的主要目的不是复用 goroutine,而是控制并发数

Q3: Pipeline 模式的关键注意事项?

答案

  1. 每个阶段必须 close 输出 Channel——否则下游 range 永远不会结束
  2. 注意 goroutine 泄漏——如果下游不再消费,上游阻塞在 send 上。需要用 Context 控制取消
  3. 错误处理——Pipeline 中某个阶段出错,需要通知其他阶段停止

Q4: errgroup 和 WaitGroup 怎么选?

答案

  • 不需要错误处理sync.WaitGroup
  • 需要错误收集errgroup(返回第一个错误,自动取消 Context)
  • 需要收集所有错误:自定义,或用第三方库(如 go.uber.org/multierr

相关链接