跳到主要内容

实现 Goroutine 池

问题

如何用 Go 实现一个 goroutine 池?为什么需要控制 goroutine 数量?

答案

为什么需要 Goroutine 池

虽然 goroutine 轻量(~2KB 栈),但无限创建仍有问题:

  • 调度开销随数量增长
  • 内存占用不可控
  • 文件描述符/连接数可能耗尽

基础实现:Channel + Worker

type Pool struct {
taskQueue chan func()
wg sync.WaitGroup
}

func NewPool(workerCount int) *Pool {
p := &Pool{
taskQueue: make(chan func(), workerCount*2), // 缓冲队列
}

// 启动固定数量的 worker
for i := 0; i < workerCount; i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for task := range p.taskQueue {
task()
}
}()
}
return p
}

// 提交任务
func (p *Pool) Submit(task func()) {
p.taskQueue <- task
}

// 关闭池:等待所有任务完成
func (p *Pool) Shutdown() {
close(p.taskQueue)
p.wg.Wait()
}

// 使用
func main() {
pool := NewPool(10)
defer pool.Shutdown()

for i := 0; i < 100; i++ {
i := i
pool.Submit(func() {
fmt.Printf("处理任务 %d\n", i)
time.Sleep(100 * time.Millisecond)
})
}
}

进阶:带超时和错误收集

type TaskPool struct {
taskQueue chan func() error
results chan error
wg sync.WaitGroup
}

func NewTaskPool(workers, queueSize int) *TaskPool {
p := &TaskPool{
taskQueue: make(chan func() error, queueSize),
results: make(chan error, queueSize),
}

for i := 0; i < workers; i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for task := range p.taskQueue {
if err := task(); err != nil {
p.results <- err
}
}
}()
}
return p
}

// 带超时的提交
func (p *TaskPool) SubmitWithTimeout(task func() error, timeout time.Duration) error {
select {
case p.taskQueue <- task:
return nil
case <-time.After(timeout):
return fmt.Errorf("submit timeout")
}
}

func (p *TaskPool) Shutdown() {
close(p.taskQueue)
p.wg.Wait()
close(p.results)
}

func (p *TaskPool) Errors() <-chan error {
return p.results
}

使用 semaphore 控制并发

import "golang.org/x/sync/semaphore"

func ProcessWithSemaphore(items []string, maxConcurrency int64) {
sem := semaphore.NewWeighted(maxConcurrency)
var wg sync.WaitGroup

for _, item := range items {
wg.Add(1)
sem.Acquire(context.Background(), 1)
go func(item string) {
defer wg.Done()
defer sem.Release(1)
process(item)
}(item)
}

wg.Wait()
}

使用 errgroup

import "golang.org/x/sync/errgroup"

func ProcessWithErrgroup(items []string, maxConcurrency int) error {
g, _ := errgroup.WithContext(context.Background())
g.SetLimit(maxConcurrency)

for _, item := range items {
item := item
g.Go(func() error {
return process(item)
})
}

return g.Wait() // 返回第一个非 nil 错误
}

三方库 ants

import "github.com/panjf2000/ants/v2"

func UseAnts() {
pool, _ := ants.NewPoolWithFunc(10, func(payload interface{}) {
n := payload.(int)
fmt.Printf("处理: %d\n", n)
})
defer pool.Release()

for i := 0; i < 100; i++ {
pool.Invoke(i)
}
}

方案对比

方案复杂度特点
Channel + Worker最基础,适合简单场景
semaphore控制并发数,不复用 goroutine
errgroup自带错误处理和限流
ants功能最全,goroutine 复用
实际项目推荐

简单并发控制用 errgroup.SetLimit(),复杂场景用 ants。手写 Pool 主要用于面试和理解原理。


常见面试问题

Q1: goroutine 泄漏怎么排查?

答案

  • runtime.NumGoroutine() 监控 goroutine 数量
  • pprof 的 goroutine profile 查看阻塞位置
  • 常见原因:channel 无消费者、忘记关闭 channel、Context 未取消

Q2: goroutine 池和直接创建 goroutine 的区别?

答案:池复用 goroutine,减少创建销毁开销,控制最大并发数。直接创建更简单但可能导致资源耗尽。Go 的 goroutine 足够轻量,中小规模直接创建即可,只有大量任务时才需要池化。

相关链接