Golang实现协程池

2023-10-14 19:24:15 浏览数 (2)

go实现协程池,协程轻量但并不是越多越好。虽然golang底层实现了对协程的复用,协程(Goroutine)的创建和调度由底层的运行时系统(runtime)负责,它会自动管理和复用协程,但是一瞬间并发过高仍然会导致内存资源消耗过大。使用协程池可用对资源进行有效控制。在内存资源够用的情况,或者其他不用限制同时任务数的情况,请用原生go 协程,不必使用协程池

协程池的数量和CPU核数的关系 小于或者等于CPU核数: 适用于计算密集型的任务中,如果协程的执行时间较长且没有IO操作,可以将协程池的数量设置为小于CPU核数的值。这样做可以避免过多的协程竞争CPU资源,减少上下文切换的开销,如图像处理、数据分析等。 大于CPU核数: 如果任务需要进行大量的IO操作,可以考虑将协程池的数量设置为大于CPU核数的值。 这样做可以充分利用CPU等待IO操作的时间,如网络请求、数据库查询等。

代码语言:javascript复制
package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

type Job struct {
	ID int
}

type Worker struct {
	ID        int
	JobQueue  <-chan Job
	QuitChan  <-chan bool
	WaitGroup *sync.WaitGroup
	f         func(job Job)
}

type Pool struct {
	WorkerSize int
	JobQueue   chan Job
	QuitChan   chan bool
	WaitGroup  *sync.WaitGroup
	workerFunc func(job Job)
}

func NewWorker(id int, wg *sync.WaitGroup, jobQueue chan Job, quitChan <-chan bool) *Worker {
	return &Worker{
		ID:        id,
		JobQueue:  jobQueue,
		QuitChan:  quitChan,
		WaitGroup: wg,
	}
}

func (worker *Worker) StartWork() {
	go func() {
		for {
			select {
			case job := <-worker.JobQueue:
				worker.f(job)
				worker.WaitGroup.Done()
			case <-worker.QuitChan:
				fmt.Printf("Worker %d: quittingn", worker.ID)
				return
			}
		}
	}()
}

func NewPool(workerSize int, f func(job Job)) *Pool {
	return &Pool{
		WorkerSize: workerSize,
		JobQueue:   make(chan Job),
		WaitGroup:  &sync.WaitGroup{},
		workerFunc: f,
	}
}

func (p *Pool) AddJob(job Job) {
	p.WaitGroup.Add(1)
	p.JobQueue <- job
}

func (p *Pool) Start() {
	for i := 0; i < p.WorkerSize; i   {
		worker := NewWorker(i, p.WaitGroup, p.JobQueue, p.QuitChan)
		worker.f = p.workerFunc
		worker.StartWork()
	}
}

func (p *Pool) Stop() {
	for i := 0; i < p.WorkerSize; i   {
		p.QuitChan <- true
	}
}
func (p *Pool) Close() {
	close(p.JobQueue)
	close(p.QuitChan)
}
func WorKerFunc(job Job) {
	fmt.Println("Job,id:", job.ID)
	time.Sleep(1 * time.Second)
	for i := 0; i < 1000000; i   {
		c := 100 * 1000
		_ = c
	}
}
func main() {
	fmt.Printf("CPU 内核数量:%d,协程数量:%d", runtime.NumCPU(), runtime.NumCPU()*2)
	start := time.Now().UnixMilli()
	pool := NewPool(runtime.NumCPU()*2, WorKerFunc)
	pool.Start()
	for i := 0; i < 100; i   {
		job := Job{
			ID: i,
		}
		pool.AddJob(job)
	}
	pool.WaitGroup.Wait()
	end := time.Now().UnixMilli()
	fmt.Println(end - start)
}

0 人点赞