go实现协程池,协程轻量但并不是越多越好。虽然golang底层实现了对协程的复用,协程(Goroutine)的创建和调度由底层的运行时系统(runtime)负责,它会自动管理和复用协程,但是一瞬间并发过高仍然会导致内存资源消耗过大。使用协程池可用对资源进行有效控制。在内存资源够用的情况,或者其他不用限制同时任务数的情况,请用原生go 协程,不必使用协程池
协程池的数量和CPU核数的关系 小于或者等于CPU核数: 适用于计算密集型的任务中,如果协程的执行时间较长且没有IO操作,可以将协程池的数量设置为小于CPU核数的值。这样做可以避免过多的协程竞争CPU资源,减少上下文切换的开销,如图像处理、数据分析等。 大于CPU核数: 如果任务需要进行大量的IO操作,可以考虑将协程池的数量设置为大于CPU核数的值。 这样做可以充分利用CPU等待IO操作的时间,如网络请求、数据库查询等。
代码语言:javascript复制package main
import (
"fmt"
"runtime"
"sync"
"time"
)
type Job struct {
ID int
}
type Worker struct {
ID int
JobQueue <-chan Job
QuitChan <-chan bool
WaitGroup *sync.WaitGroup
f func(job Job)
}
type Pool struct {
WorkerSize int
JobQueue chan Job
QuitChan chan bool
WaitGroup *sync.WaitGroup
workerFunc func(job Job)
}
func NewWorker(id int, wg *sync.WaitGroup, jobQueue chan Job, quitChan <-chan bool) *Worker {
return &Worker{
ID: id,
JobQueue: jobQueue,
QuitChan: quitChan,
WaitGroup: wg,
}
}
func (worker *Worker) StartWork() {
go func() {
for {
select {
case job := <-worker.JobQueue:
worker.f(job)
worker.WaitGroup.Done()
case <-worker.QuitChan:
fmt.Printf("Worker %d: quittingn", worker.ID)
return
}
}
}()
}
func NewPool(workerSize int, f func(job Job)) *Pool {
return &Pool{
WorkerSize: workerSize,
JobQueue: make(chan Job),
WaitGroup: &sync.WaitGroup{},
workerFunc: f,
}
}
func (p *Pool) AddJob(job Job) {
p.WaitGroup.Add(1)
p.JobQueue <- job
}
func (p *Pool) Start() {
for i := 0; i < p.WorkerSize; i {
worker := NewWorker(i, p.WaitGroup, p.JobQueue, p.QuitChan)
worker.f = p.workerFunc
worker.StartWork()
}
}
func (p *Pool) Stop() {
for i := 0; i < p.WorkerSize; i {
p.QuitChan <- true
}
}
func (p *Pool) Close() {
close(p.JobQueue)
close(p.QuitChan)
}
func WorKerFunc(job Job) {
fmt.Println("Job,id:", job.ID)
time.Sleep(1 * time.Second)
for i := 0; i < 1000000; i {
c := 100 * 1000
_ = c
}
}
func main() {
fmt.Printf("CPU 内核数量:%d,协程数量:%d", runtime.NumCPU(), runtime.NumCPU()*2)
start := time.Now().UnixMilli()
pool := NewPool(runtime.NumCPU()*2, WorKerFunc)
pool.Start()
for i := 0; i < 100; i {
job := Job{
ID: i,
}
pool.AddJob(job)
}
pool.WaitGroup.Wait()
end := time.Now().UnixMilli()
fmt.Println(end - start)
}