Go 协程池

2023-03-01 15:58:16 浏览数 (1)

协程

Goroutine 是 Golang 提供的一种轻量级线程,我们通常称之为「协程」,相比较线程,创建一个协程的成本是很低的。所以你会经常看到 Golang 开发的应用出现上千个协程并发的场景。

协程池

高并发场景下,会启动大量协程进行业务处理,此时如果使用协程池可以复用对象,减少协程池内存分配的效率与创建协程池点创建开销,提高协程的执行效率。字节官方开源了gopkg库提供的 gopool 协程池实现。

协程池实现原理

线程池设计

代码语言:javascript复制
type pool struct {
   // pool 的名字,打 metrics 和打 log 时用到
   name string
   // pool 的容量,也就是最大的真正在工作的 goroutine 的数量
   // 为了性能考虑,可能会有小误差
   cap int32
   // 配置信息
   config *Config
   // 任务链表
   taskHead  *task
   taskTail  *task
   taskLock  sync.Mutex
   taskCount int32
 
   // 记录正在运行的 worker 数量
   workerCount int32
 
   // 用来标记是否关闭
   closed int32
 
   // worker panic 的时候会调用这个方法
   panicHandler func(context.Context, interface{})
 }

gopool最核心的方法

代码语言:javascript复制
func (p *pool) CtxGo(ctx context.Context, f func()) {
   // 首先是一个对象池,避免task对象反复分配
   t := taskPool.Get().(*task)
   // 拿到的对象做一些变量的设置
   t.ctx = ctx
   t.f = f
   
   // 这里的加上task锁,把生成的task绑定到task链表上
   p.taskLock.Lock()
   if p.taskHead == nil {
     p.taskHead = t
     p.taskTail = t
   } else {
     p.taskTail.next = t
     p.taskTail = t
   }
   p.taskLock.Unlock()
   // 解锁之后做一些统计信息变更
   atomic.AddInt32(&p.taskCount, 1)
   
   // 这个地方的逻辑是不是放到一开头判断一下也可以,
   // 或者上下都判断一下;
   // 这里写在下面可能考虑到上面的lock和taskpool的get可能会有一点时间的等待,
   // 所以如果我来写可能方法的入口和这里都会判断一下吧。不过这里也不是什么重点问题。

   // 如果 pool 已经被关闭了,就 panic
   if atomic.LoadInt32(&p.closed) == 1 {
     panic("use closed pool")
   }
   
   // 下面的注释作者的意图都已经说明了,
   // 解决的问题就是协程池goroutine需不需要加的问题;
   // 以及协程池是不是啥都没有的问题
 
   // 满足以下两个条件:
   // 1. task 数量大于阈值
   // 2. 目前的 worker 数量小于上限 p.cap
   // 或者目前没有 worker
   if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {
     p.incWorkerCount() // 加一个工作协程
     w := workerPool.Get().(*worker) // 对象池的优化
     w.pool = p
     w.run() // 让worker跑起来。
   }
 }

worker跑起来是如何执行的

代码语言:javascript复制
 func (w *worker) run() {
   // 通过goroutine异步执行
   go func() {
     for {
       //select {
       //case <-w.stopChan:
       // w.close()
       // return
       //default:
       
       // 这里是从task池中获取一个task
       var t *task
       w.pool.taskLock.Lock()
       if w.pool.taskHead != nil {
         t = w.pool.taskHead
         w.pool.taskHead = w.pool.taskHead.next
         atomic.AddInt32(&w.pool.taskCount, -1)
       }
       if t == nil {
         // 如果没有任务要做了,就释放资源,退出
         w.close()
         w.pool.taskLock.Unlock()
         w.Recycle()
         return
       }
       w.pool.taskLock.Unlock()
       // 这里就是成功的获取一个任务,然后准备开始执行这个任务
       func() {
         // 防止任务panic做的一些打点逻辑
         defer func() {
           if r := recover(); r != nil {
             logs.CtxFatal(t.ctx, "GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())
             if w.pool.config.EnablePanicMetrics {
               panicMetricsClient.EmitCounter(panicKey, 1, metrics.T{Name: "pool", Value: w.pool.name})
             }
             // 这里如果没有设置panicHandler可能会有空指针
             w.pool.panicHandler(t.ctx, r)
           }
         }()
         // 执行函数
         t.f()
       }()
       // 这个task已经做完了,回收对象
       t.Recycle()
       //}
     }
   }()
 }
  1. 来一个任务,放到任务链表中,使用锁控制保证并发安全
  2. 如果 task 数量大于阈值且当前的 worker 数量小于上限 p.cap 或者目前没有 worker,那么新建一个工作协程来执行任务。
  3. 任务执行过程中,使用 for 循环不断遍历 task 链表,如果链表不为空,则从链表中拿任务执行。链表为空,协程关闭,工作协程数减一。

0 人点赞