Go协程-使用和泄漏

2024-08-25 17:30:32 浏览数 (3)

Go协程的基本概念和使用

go的协程作为一种更为轻量级的执行单元,与传统的线程相比,goroutine的创建和切换开销很小(这里主要是基于GMP模型,goroutine定义在用户态,只需要P将其队列中的G调度到Machine上,从下图中可以看出协程的调度过程时在用户态进行的,避免了频繁的系统调用过程)因此可以创建大量的goroutine来并行执行任务,而不会造成太大的系统负担

使用方法:只需要通过go关键字,即可创建一个goroutine:例如 go func1()

但是goroutine使用简单并且具备轻量级的特点会导致我们容易忽略可能存在的问题

goRoutine在使用时需要注意的

goroutine使用中的三个核心点

1.协程什么时候应该退出,生命周期管理应该搞清楚,goroutine什么时候退出,管控起生命周期

2.函数方法的调用者来决定要不要在后台执行,并发交给调用者

3.能够控制goroutine什么时候退出,不管是用channel还是context超时处理

即:go协程启动时需要注意的“永远不要在不知道如何停止的情况下开始一个goroutine”,即对于所起的goroutine的生命周期管理要十分明确

go协程泄漏

如果启动了一个goroutine,希望他终止时并未终止,这就出现了泄漏;特点是:应用程序的生命周期内存在,为goroutine分配的任何内存都不能释放

go协程泄漏的情况

1.没有发送者,导致协程始终等待

首先go是“通过channel来共享内存”,而无缓冲的channel是同步的,因此如果有个消费者通过goroutine去channel中读取(消费)数据,则就需要等待生产者向这个channel中去存放(生产)数据

如下面的例子list1中的leak函数:

代码语言:go复制
// list1
func leak() {
	ch := make(chan int) // 定义一个无缓冲的通道
	go func() { // 消费者起一个协程尝试从ch中读取数据
		val := <-ch // 当ch中被填入数据后,触发同步操作,否则该协程会始终阻塞在这里
		fmt.Println(val)
	}()
}

上面的例子中由于该协程始终阻塞且无法释放,导致该协程泄漏

如果在某个函数中需要顺序调用另一个函数(例如查找功能),则需要等待查找search函数返回给结果后,才能继续执行后续操作,这就会导致函数的执行耗时增加,例如下面的list2:

代码语言:go复制
func search(x int) (int, error) {
	time.Sleep(1 * time.Second) // 这里模拟查找所需的耗时
	return 0, nil
}
func process(x int)error{
	record,err := search(x) // process调用查询函数
	if err!=nil{
		return err
	}
	fmt.Println("record:",record)
	return nil
}

由于顺序调用某个函数导致耗时增加,所以可能会通过起一个协程的方式来规避这种耗时

设计实现的思路是:定义一个上下文用来做超时机制context.WithTimeOut,使用无缓冲的chan来作为接受方和发送方的同步接受数据的操作ch:=make(chan result),使用go起一个协程来执行某个函数search,并返回result将其放到ch中。这里相当于作为发送者向通道中发送了数据。使用select来做接收处理,使用ctx.Done()做超时结束机制,使用result:=<-ch做接收者操作以及打印。代码具体如下:

代码语言:go复制
type result struct {
	record int
	err    error
}

func process(x int) error {
	// 先定义一个上下文context实现超时机制
	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()

	// 定义一个无缓冲的通道用来做两者的共享内存(即返回给调用者的结果)
	ch := make(chan result)
	// 起一个协程用来异步调用search函数
	go func() {
		record, err := search(x)
		ch <- result{record: record, err: err}
	}()
	// 使用select会一直阻塞到goroutine收到ch的值,或者是超时
	select {
	case <-ctx.Done(): // 3s后触发超时
		fmt.Println("time out")
		return errors.New("time out")
	case result := <-ch:
		if result.err != nil {
			return result.err
		}
		fmt.Println("record:", result.record)
		return nil
	}
}

但是这里也会有潜在的Goroutine泄漏的情况,因为通道是无缓冲的,需要等到chan中的值被接收者取走后,在go协程中的发送者则会导致无法在通道chan上发送。因此在超时的情况下,接收者会停止等待Goroutine的接收,这将会导致协程始终阻塞在ch <- result{record: record, err: err}这里,从而发生了跟list1相同的问题,即go协程泄漏

发生go协程泄漏的情况:当go协程中的发送到无缓冲通道中时,要在接收者接收之前都会进行阻塞,但是当出现超时的情况时,则select则会通过ctx.Done()的方式结束,使得接收器停止接收,而导致go协程始终处于阻塞状态,就发生了go协程泄漏

修复方法:准备一些空间,将无缓冲的通道改为容量cap为1的有缓冲通道

代码语言:go复制
ch := make(chan result,1)

这样操作后,即使在超时的情况下发送者所在的协程中仍然可以将search函数返回的result放到ch中然后结束,从而使得该协程的内存以及通道ch的内存被回收掉,避免了协程泄漏

2.不完整的工作

如下例子中,因为main函数其实在go语言中也是作为一个协程(主协程)存在的,在main中再起了一个协程,而主函数对应的协程没有等该协程执行完成就结束了。即下面的例子执行后并不会打印

代码语言:go复制
func main(){
	fmt.Println("hello")
	go fmt.Println("Goodbye")
}

即:主函数中其了一个go协程,但是主函数并没有等待该协程执行完成就结束了

这里体现了go语言中的一条规则:

程序的执行开始于main包的初始化,并执行main函数。当main函数执行结束时这个程序就退出了,它不会等待其他非主协程的结束

在真实的场景中,我们通常需要调用某个功能函数去实现对某个事件的追踪,但是如果是同步执行例如需要调用下面的函数,则会导致调用该函数会增加更多不必要的响应延迟

代码语言:go复制
type Tracker struct {}

func (t *Tracker) Event(data string) {
	time.Sleep(1 * time.Second) // 模拟网络写入延迟
	log.Println(data)
}

因此我们需要改进该函数的调用,并尝试把它作为异步执行的方式,如下面的例子

代码语言:go复制
type App struct {
	track Tracker
}

func (a *App) Handle(w http.ResponseWriter, r *http.Request) {

	w.WriteHeader(http.StatusCreated)

	go a.track.Event("this event") // 这里异步调用了Event事件追踪函数
}

我们在上面的例子中通过起一个新的goroutine的方式来达到异步追踪Event函数而不增加请求延迟的效果。但是这里就会导致上面提到的不完整工作的trap,即起的协程不能保证运行或完成,这可能会导致执行上报的结果不完整,因为服务器关闭时事件可能会丢失,因此我们需要对goroutine进行管理,保证所有的goroutine都执行结束后才可以退出,这里可以考虑使用sync包中的WaitGroup,来维持一个计数器,wg.Add每次保证加入一个goroutine,执行结束一个goroutine则通过wg.Done(),最终通过wg.Wait来等待所有的goroutine都完成

这里我们将起goroutine的部分放到Event中,并通过sync.WaitGroup进行管理,因此我们做了如下的修改:

代码语言:go复制
func (a *App) Handle(w http.ResponseWriter, r *http.Request) {

	w.WriteHeader(http.StatusCreated)

	a.track.Event("this event") // 不再在这里进行异步调用
}

然后我们要将原来的Tracker结构体进行重写,加入wg

代码语言:go复制
type Tracker struct{
	wg sync.WaitGroup
}

func (t *Tracker) Event(data string) {
	t.wg.Add(1) // 计数器 1,表示goroutine数量增1
	go func() {
		defer t.wg.Done() // 该协程执行完一个后,数量-1
		time.Sleep(1 * time.Second) // 模拟网络写入延迟
		log.Println(data)
	}()
}

func (t *Tracker)Shutdown(){
	t.wg.Wait() // 这里一直会等待到wg=0(即所有的goroutine都执行完)
}

这里的Shutdown函数会等待所有的goroutine完成,但如果没有等待的时间限制,在真实的生产环境中可能不能接受,因此可以通过设置一个截止时间,结合context和select进行优化

代码语言:go复制
func (t *Tracker) Shutdown(ctx context.Context) error {
	ch := make(chan struct{}) // 用作同步触发select,
	go func() {
		defer close(ch) // 由于ch是同步通道,close执行后会返回通道的0值
		t.wg.Wait()     // 始终等待所有协程的执行结束
	}()
	select {
	case <-ch:
		return nil
	case <-ctx.Done():
		return errors.New("time out")
	}
}
// 调用方
func (a *App) Handle(w http.ResponseWriter, r *http.Request) {

	w.WriteHeader(http.StatusCreated)

	a.track.Event("this event")

	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel() // 这里通过context定义了超时机制
	if err := a.track.Shutdown(ctx); err != nil {
		log.Println(err)
	}
}

0 人点赞