Go语言协程池实现第二弹

2023-09-10 09:43:15 浏览数 (2)

之前写了Go语言协程池的实践以及动态QPS的实现,本来计划就是开始做一些测试了,但是发现协程池的实现有些问题也有一些BUG,所以连夜修改了部分功能。

为了不咋不明真相的读者造成困扰,赶紧写篇文章报告一下。

缺陷&BUG

这里先把测试中遇到的问题和BUG梳理一下:

  1. 活跃协程数计算错误
  2. 执行数量和收到计数错误
  3. QPS陡增和陡降的时候,无法及时增加压力和回收协程
  4. 协程回收存在问题不够优雅,效率太低

BUG分析

活跃协程数

这里计数错误的原因是因为在原来的实现中多次使用了ReduceWorker()AddWorker()方法,导致没有将添加和减少的功能收拢到某一个时机统一处理,有两处重复使用的问题,导致Active计算错误。

执行数量和收到计数错误

这里问题出现在这个方法,本意是设计一个执行固定次数任务的方法。通过将一部分任务合并到同一个任务丢给协程池。但是在实现过程中并没有将原始的任务单独计数。同时在记录到所有收到的任务ReceiveTotal中也存在这个问题。

代码语言:javascript复制
// ExecuteQps
//  @Description: 执行任务固定次数
//  @receiver pool
//  @param t
//  @param qps
//
func (pool *GorotinesPool) ExecuteQps(t func(), qps int) {
 mutiple := qps / pool.SingleTimes
 remainder := qps % pool.SingleTimes
 for i := 0; i < pool.SingleTimes; i   {
  pool.Execute(func() {
   for i := 0; i < mutiple; i   {
    t()
   }
  })
 }
 pool.Execute(func() {
  for i := 0; i < remainder; i   {
   t()
  }
 })
}

新建回收协程不及时

在QPS陡增陡降的场景测试中,存在2个,需要增加协程数的时候增加较慢,因为每1秒扫描一次的等待channel,增多增加1个,同样的也减少1个。这个效率非常低,设计太蠢了。

线程回收不优雅

在旧实现中用的是扫描等待channel判断是否增加还是减少协程。除了效率低以外,还存在无法及时回收和过度回收(正在运行的协程也会收到一个终止信号)。

旧的方法内容如下(现在看简直不忍直视):

代码语言:javascript复制

// balance  
//  @Description: 平衡活跃协程数  
//  @receiver pool  
//  
func (pool *GorotinesPool) balance() {  
   if pool.status {  
      if len(pool.tasks) > 0 && pool.active < int32(pool.Max) {  
         pool.AddWorker()  
      }  
      if len(pool.tasks) == 0 && pool.active > int32(pool.Min) {  
         pool.ReduceWorker()  
      }  
   }  
}

解决方法

重写回收方法

这里参照了Java线程池的实现,通过设置一个最大空闲时间,通过这个设置来回收协程,这样即方便又能照顾到协程运行状态,避免过度回收。这里改造了一下worker()方法。

实现内容如下:

代码语言:javascript复制
  
// worker  
//  @Description: 开始执行协程  
//  @receiver pool  
//  
func (pool *GorotinesPool) worker() {  
   defer func() {  
      if p := recover(); p != nil {  
         log.Printf("execute task fail: %v", p)  
      }  
   }()  
Fun:  
   for {  
      select {  
      case t := <-pool.tasks:  
         atomic.AddInt32(&pool.ExecuteTotal, 1)  
         t()  
      case <-time.After(pool.MaxIdle):  
         if pool.Active > int32(pool.Min) {  
            atomic.AddInt32(&pool.Active, -1)  
            break Fun  
         }  
      }  
   }  
}

这里顺道解决了协程回收效率不高的问题。

协程增加效率

这里做了2项修改:

  1. 增加扫描后单次增加协程数量,增加等待channel数量等量的协程数
  2. 调整func (pool *GorotinesPool) Execute(t func()) error合并任务的算法,采取固定的数量合并方案

增加协程数量:

代码语言:javascript复制
// balance  
//  @Description: 平衡活跃协程数  
//  @receiver pool  
//  
func (pool *GorotinesPool) balance() {  
   if pool.status {  
      if len(pool.tasks) > 0 && pool.Active < int32(pool.Max) {  
         for i := 0; i < len(pool.tasks); i   {  
            if int(pool.Active) < pool.Max {  
               pool.AddWorker()  
            }  
         }  
      }  
   }  
}

调整合并任务方案:

代码语言:javascript复制
// ExecuteQps
//  @Description: 执行任务固定次数
//  @receiver pool
//  @param t
//  @param qps
//
func (pool *GorotinesPool) ExecuteQps(t func(), qps int) {
 mutiple := qps / pool.SingleTimes
 remainder := qps % pool.SingleTimes
 for i := 0; i < mutiple; i   {
  pool.Execute(func() {
   atomic.AddInt32(&pool.ExecuteTotal, -1)
   for i := 0; i < pool.SingleTimes; i   {
    atomic.AddInt32(&pool.ExecuteTotal, 1)
    t()
   }
  })
 }
 pool.Execute(func() {
  atomic.AddInt32(&pool.ExecuteTotal, -1)
  for i := 0; i < remainder; i   {
   atomic.AddInt32(&pool.ExecuteTotal, 1)
   t()
  }
 })
}

计数不准确

这个纯属BUG,改掉计数错误的地方,修复BUG,代码已经在上述修复代码中有所体现了。

完整代码

代码语言:javascript复制
package execute  
  
import (  
   "errors"  
   "funtester/ftool"   "log"   "sync/atomic"   "time")  
  
type GorotinesPool struct {  
   Max          int  
   Min          int  
   tasks        chan func()  
   status       bool  
   Active       int32  
   ExecuteTotal int32  
   SingleTimes  int  
   addTimeout   time.Duration  
   MaxIdle      time.Duration  
}  
  
type taskType int  
  
const (  
   normal taskType = 0  
   reduce taskType = 1  
)  
  
// GetPool  
//  @Description: 创建线程池  
//  @param max 最大协程数  
//  @param min 最小协程数  
//  @param maxWaitTask 最大任务等待长度  
//  @param timeout 添加任务超时时间,单位s  
//  @return *GorotinesPool  
//  
func GetPool(max, min, maxWaitTask, timeout, maxIdle int) *GorotinesPool {  
   p := &GorotinesPool{  
      Max:          max,  
      Min:          min,  
      tasks:        make(chan func(), maxWaitTask),  
      status:       true,  
      Active:       0,  
      ExecuteTotal: 0,  
      SingleTimes:  10,  
      addTimeout:   time.Duration(timeout) * time.Second,  
      MaxIdle:      time.Duration(maxIdle) * time.Second,  
   }  
   for i := 0; i < min; i   {  
      p.AddWorker()  
   }  
   go func() {  
      for {  
         if !p.status {  
            break  
         }  
         ftool.Sleep(1000)  
         p.balance()  
      }  
   }()  
   return p  
}  
  
// worker  
//  @Description: 开始执行协程  
//  @receiver pool  
//  
func (pool *GorotinesPool) worker() {  
   defer func() {  
      if p := recover(); p != nil {  
         log.Printf("execute task fail: %v", p)  
      }  
   }()  
Fun:  
   for {  
      select {  
      case t := <-pool.tasks:  
         atomic.AddInt32(&pool.ExecuteTotal, 1)  
         t()  
      case <-time.After(pool.MaxIdle):  
         if pool.Active > int32(pool.Min) {  
            atomic.AddInt32(&pool.Active, -1)  
            break Fun  
         }  
      }  
   }  
}  
  
// Execute  
//  @Description: 执行任务  
//  @receiver pool  
//  @param t  
//  @return error  
//  
func (pool *GorotinesPool) Execute(t func()) error {  
   if pool.status {  
      select {  
      case pool.tasks <- func() {  
         t()  
      }:  
         return nil  
      case <-time.After(pool.addTimeout):  
         return errors.New("add tasks timeout")  
      }  
   } else {  
      return errors.New("pools is down")  
   }  
}  
  
// Wait  
//  @Description: 结束等待任务完成  
//  @receiver pool  
//  
func (pool *GorotinesPool) Wait() {  
   pool.status = false  
Fun:  
   for {  
      if len(pool.tasks) == 0 || pool.Active == 0 {  
         break Fun  
      }  
      ftool.Sleep(1000)  
   }  
   defer close(pool.tasks)  
   log.Printf("execute: %d", pool.ExecuteTotal)  
}  
  
// AddWorker  
//  @Description: 添加worker,协程数加1  
//  @receiver pool  
//  
func (pool *GorotinesPool) AddWorker() {  
   atomic.AddInt32(&pool.Active, 1)  
   go pool.worker()  
}  
  
// balance  
//  @Description: 平衡活跃协程数  
//  @receiver pool  
//  
func (pool *GorotinesPool) balance() {  
   if pool.status {  
      if len(pool.tasks) > 0 && pool.Active < int32(pool.Max) {  
         for i := 0; i < len(pool.tasks); i   {  
            if int(pool.Active) < pool.Max {  
               pool.AddWorker()  
            }  
         }  
      }  
   }  
}  
  
// ExecuteQps  
//  @Description: 执行任务固定次数  
//  @receiver pool  
//  @param t  
//  @param qps  
//  
func (pool *GorotinesPool) ExecuteQps(t func(), qps int) {  
   mutiple := qps / pool.SingleTimes  
   remainder := qps % pool.SingleTimes  
   for i := 0; i < mutiple; i   {  
      pool.Execute(func() {  
         atomic.AddInt32(&pool.ExecuteTotal, -1)  
         for i := 0; i < pool.SingleTimes; i   {  
            atomic.AddInt32(&pool.ExecuteTotal, 1)  
            t()  
         }  
      })  
   }  
   pool.Execute(func() {  
      atomic.AddInt32(&pool.ExecuteTotal, -1)  
      for i := 0; i < remainder; i   {  
         atomic.AddInt32(&pool.ExecuteTotal, 1)  
         t()  
      }  
   })  
}

0 人点赞