之前写了Go语言协程池的实践以及动态QPS的实现,本来计划就是开始做一些测试了,但是发现协程池的实现有些问题也有一些BUG,所以连夜修改了部分功能。
为了不咋不明真相的读者造成困扰,赶紧写篇文章报告一下。
缺陷&BUG
这里先把测试中遇到的问题和BUG梳理一下:
- 活跃协程数计算错误
- 执行数量和收到计数错误
- QPS陡增和陡降的时候,无法及时增加压力和回收协程
- 协程回收存在问题不够优雅,效率太低
BUG分析
活跃协程数
这里计数错误的原因是因为在原来的实现中多次使用了ReduceWorker()
和AddWorker()
方法,导致没有将添加和减少的功能收拢到某一个时机统一处理,有两处重复使用的问题,导致Active
计算错误。
执行数量和收到计数错误
这里问题出现在这个方法,本意是设计一个执行固定次数任务的方法。通过将一部分任务合并到同一个任务丢给协程池。但是在实现过程中并没有将原始的任务单独计数。同时在记录到所有收到的任务ReceiveTotal
中也存在这个问题。
// ExecuteQps
// @Description: 执行任务固定次数
// @receiver pool
// @param t
// @param qps
//
func (pool *GorotinesPool) ExecuteQps(t func(), qps int) {
mutiple := qps / pool.SingleTimes
remainder := qps % pool.SingleTimes
for i := 0; i < pool.SingleTimes; i {
pool.Execute(func() {
for i := 0; i < mutiple; i {
t()
}
})
}
pool.Execute(func() {
for i := 0; i < remainder; i {
t()
}
})
}
新建回收协程不及时
在QPS陡增陡降的场景测试中,存在2个,需要增加协程数的时候增加较慢,因为每1秒扫描一次的等待channel
,增多增加1个,同样的也减少1个。这个效率非常低,设计太蠢了。
线程回收不优雅
在旧实现中用的是扫描等待channel
判断是否增加还是减少协程。除了效率低以外,还存在无法及时回收和过度回收(正在运行的协程也会收到一个终止信号)。
旧的方法内容如下(现在看简直不忍直视):
代码语言:javascript复制
// balance
// @Description: 平衡活跃协程数
// @receiver pool
//
func (pool *GorotinesPool) balance() {
if pool.status {
if len(pool.tasks) > 0 && pool.active < int32(pool.Max) {
pool.AddWorker()
}
if len(pool.tasks) == 0 && pool.active > int32(pool.Min) {
pool.ReduceWorker()
}
}
}
解决方法
重写回收方法
这里参照了Java线程池的实现,通过设置一个最大空闲时间,通过这个设置来回收协程,这样即方便又能照顾到协程运行状态,避免过度回收。这里改造了一下worker()
方法。
实现内容如下:
代码语言:javascript复制
// worker
// @Description: 开始执行协程
// @receiver pool
//
func (pool *GorotinesPool) worker() {
defer func() {
if p := recover(); p != nil {
log.Printf("execute task fail: %v", p)
}
}()
Fun:
for {
select {
case t := <-pool.tasks:
atomic.AddInt32(&pool.ExecuteTotal, 1)
t()
case <-time.After(pool.MaxIdle):
if pool.Active > int32(pool.Min) {
atomic.AddInt32(&pool.Active, -1)
break Fun
}
}
}
}
这里顺道解决了协程回收效率不高的问题。
协程增加效率
这里做了2项修改:
- 增加扫描后单次增加协程数量,增加等待
channel
数量等量的协程数 - 调整
func (pool *GorotinesPool) Execute(t func()) error
合并任务的算法,采取固定的数量合并方案
增加协程数量:
代码语言:javascript复制// balance
// @Description: 平衡活跃协程数
// @receiver pool
//
func (pool *GorotinesPool) balance() {
if pool.status {
if len(pool.tasks) > 0 && pool.Active < int32(pool.Max) {
for i := 0; i < len(pool.tasks); i {
if int(pool.Active) < pool.Max {
pool.AddWorker()
}
}
}
}
}
调整合并任务方案:
代码语言:javascript复制// ExecuteQps
// @Description: 执行任务固定次数
// @receiver pool
// @param t
// @param qps
//
func (pool *GorotinesPool) ExecuteQps(t func(), qps int) {
mutiple := qps / pool.SingleTimes
remainder := qps % pool.SingleTimes
for i := 0; i < mutiple; i {
pool.Execute(func() {
atomic.AddInt32(&pool.ExecuteTotal, -1)
for i := 0; i < pool.SingleTimes; i {
atomic.AddInt32(&pool.ExecuteTotal, 1)
t()
}
})
}
pool.Execute(func() {
atomic.AddInt32(&pool.ExecuteTotal, -1)
for i := 0; i < remainder; i {
atomic.AddInt32(&pool.ExecuteTotal, 1)
t()
}
})
}
计数不准确
这个纯属BUG,改掉计数错误的地方,修复BUG,代码已经在上述修复代码中有所体现了。
完整代码
代码语言:javascript复制package execute
import (
"errors"
"funtester/ftool" "log" "sync/atomic" "time")
type GorotinesPool struct {
Max int
Min int
tasks chan func()
status bool
Active int32
ExecuteTotal int32
SingleTimes int
addTimeout time.Duration
MaxIdle time.Duration
}
type taskType int
const (
normal taskType = 0
reduce taskType = 1
)
// GetPool
// @Description: 创建线程池
// @param max 最大协程数
// @param min 最小协程数
// @param maxWaitTask 最大任务等待长度
// @param timeout 添加任务超时时间,单位s
// @return *GorotinesPool
//
func GetPool(max, min, maxWaitTask, timeout, maxIdle int) *GorotinesPool {
p := &GorotinesPool{
Max: max,
Min: min,
tasks: make(chan func(), maxWaitTask),
status: true,
Active: 0,
ExecuteTotal: 0,
SingleTimes: 10,
addTimeout: time.Duration(timeout) * time.Second,
MaxIdle: time.Duration(maxIdle) * time.Second,
}
for i := 0; i < min; i {
p.AddWorker()
}
go func() {
for {
if !p.status {
break
}
ftool.Sleep(1000)
p.balance()
}
}()
return p
}
// worker
// @Description: 开始执行协程
// @receiver pool
//
func (pool *GorotinesPool) worker() {
defer func() {
if p := recover(); p != nil {
log.Printf("execute task fail: %v", p)
}
}()
Fun:
for {
select {
case t := <-pool.tasks:
atomic.AddInt32(&pool.ExecuteTotal, 1)
t()
case <-time.After(pool.MaxIdle):
if pool.Active > int32(pool.Min) {
atomic.AddInt32(&pool.Active, -1)
break Fun
}
}
}
}
// Execute
// @Description: 执行任务
// @receiver pool
// @param t
// @return error
//
func (pool *GorotinesPool) Execute(t func()) error {
if pool.status {
select {
case pool.tasks <- func() {
t()
}:
return nil
case <-time.After(pool.addTimeout):
return errors.New("add tasks timeout")
}
} else {
return errors.New("pools is down")
}
}
// Wait
// @Description: 结束等待任务完成
// @receiver pool
//
func (pool *GorotinesPool) Wait() {
pool.status = false
Fun:
for {
if len(pool.tasks) == 0 || pool.Active == 0 {
break Fun
}
ftool.Sleep(1000)
}
defer close(pool.tasks)
log.Printf("execute: %d", pool.ExecuteTotal)
}
// AddWorker
// @Description: 添加worker,协程数加1
// @receiver pool
//
func (pool *GorotinesPool) AddWorker() {
atomic.AddInt32(&pool.Active, 1)
go pool.worker()
}
// balance
// @Description: 平衡活跃协程数
// @receiver pool
//
func (pool *GorotinesPool) balance() {
if pool.status {
if len(pool.tasks) > 0 && pool.Active < int32(pool.Max) {
for i := 0; i < len(pool.tasks); i {
if int(pool.Active) < pool.Max {
pool.AddWorker()
}
}
}
}
}
// ExecuteQps
// @Description: 执行任务固定次数
// @receiver pool
// @param t
// @param qps
//
func (pool *GorotinesPool) ExecuteQps(t func(), qps int) {
mutiple := qps / pool.SingleTimes
remainder := qps % pool.SingleTimes
for i := 0; i < mutiple; i {
pool.Execute(func() {
atomic.AddInt32(&pool.ExecuteTotal, -1)
for i := 0; i < pool.SingleTimes; i {
atomic.AddInt32(&pool.ExecuteTotal, 1)
t()
}
})
}
pool.Execute(func() {
atomic.AddInt32(&pool.ExecuteTotal, -1)
for i := 0; i < remainder; i {
atomic.AddInt32(&pool.ExecuteTotal, 1)
t()
}
})
}