协程
Goroutine 是 Golang 提供的一种轻量级线程,我们通常称之为「协程」,相比较线程,创建一个协程的成本是很低的。所以你会经常看到 Golang 开发的应用出现上千个协程并发的场景。
协程池
高并发场景下,会启动大量协程进行业务处理,此时如果使用协程池可以复用对象,减少协程池内存分配的效率与创建协程池点创建开销,提高协程的执行效率。字节官方开源了gopkg库提供的 gopool
协程池实现。
协程池实现原理
线程池设计
代码语言:javascript复制type pool struct {
// pool 的名字,打 metrics 和打 log 时用到
name string
// pool 的容量,也就是最大的真正在工作的 goroutine 的数量
// 为了性能考虑,可能会有小误差
cap int32
// 配置信息
config *Config
// 任务链表
taskHead *task
taskTail *task
taskLock sync.Mutex
taskCount int32
// 记录正在运行的 worker 数量
workerCount int32
// 用来标记是否关闭
closed int32
// worker panic 的时候会调用这个方法
panicHandler func(context.Context, interface{})
}
gopool最核心的方法
代码语言:javascript复制func (p *pool) CtxGo(ctx context.Context, f func()) {
// 首先是一个对象池,避免task对象反复分配
t := taskPool.Get().(*task)
// 拿到的对象做一些变量的设置
t.ctx = ctx
t.f = f
// 这里的加上task锁,把生成的task绑定到task链表上
p.taskLock.Lock()
if p.taskHead == nil {
p.taskHead = t
p.taskTail = t
} else {
p.taskTail.next = t
p.taskTail = t
}
p.taskLock.Unlock()
// 解锁之后做一些统计信息变更
atomic.AddInt32(&p.taskCount, 1)
// 这个地方的逻辑是不是放到一开头判断一下也可以,
// 或者上下都判断一下;
// 这里写在下面可能考虑到上面的lock和taskpool的get可能会有一点时间的等待,
// 所以如果我来写可能方法的入口和这里都会判断一下吧。不过这里也不是什么重点问题。
// 如果 pool 已经被关闭了,就 panic
if atomic.LoadInt32(&p.closed) == 1 {
panic("use closed pool")
}
// 下面的注释作者的意图都已经说明了,
// 解决的问题就是协程池goroutine需不需要加的问题;
// 以及协程池是不是啥都没有的问题
// 满足以下两个条件:
// 1. task 数量大于阈值
// 2. 目前的 worker 数量小于上限 p.cap
// 或者目前没有 worker
if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {
p.incWorkerCount() // 加一个工作协程
w := workerPool.Get().(*worker) // 对象池的优化
w.pool = p
w.run() // 让worker跑起来。
}
}
worker跑起来是如何执行的
代码语言:javascript复制 func (w *worker) run() {
// 通过goroutine异步执行
go func() {
for {
//select {
//case <-w.stopChan:
// w.close()
// return
//default:
// 这里是从task池中获取一个task
var t *task
w.pool.taskLock.Lock()
if w.pool.taskHead != nil {
t = w.pool.taskHead
w.pool.taskHead = w.pool.taskHead.next
atomic.AddInt32(&w.pool.taskCount, -1)
}
if t == nil {
// 如果没有任务要做了,就释放资源,退出
w.close()
w.pool.taskLock.Unlock()
w.Recycle()
return
}
w.pool.taskLock.Unlock()
// 这里就是成功的获取一个任务,然后准备开始执行这个任务
func() {
// 防止任务panic做的一些打点逻辑
defer func() {
if r := recover(); r != nil {
logs.CtxFatal(t.ctx, "GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())
if w.pool.config.EnablePanicMetrics {
panicMetricsClient.EmitCounter(panicKey, 1, metrics.T{Name: "pool", Value: w.pool.name})
}
// 这里如果没有设置panicHandler可能会有空指针
w.pool.panicHandler(t.ctx, r)
}
}()
// 执行函数
t.f()
}()
// 这个task已经做完了,回收对象
t.Recycle()
//}
}
}()
}
- 来一个任务,放到任务链表中,使用锁控制保证并发安全
- 如果 task 数量大于阈值且当前的 worker 数量小于上限 p.cap 或者目前没有 worker,那么新建一个工作协程来执行任务。
- 任务执行过程中,使用 for 循环不断遍历 task 链表,如果链表不为空,则从链表中拿任务执行。链表为空,协程关闭,工作协程数减一。