优雅的控制协程(goroutine)的并发数量

2024-01-25 15:12:33 浏览数 (3)

对golang熟悉的小伙伴都知道,在go中开启go协程是一件简单的事,只需要一个关键字”go“。

并且相比于线程,所需要的系统资源非常少。于是在程序中我们总会开启协程去并发获取数据。

例如:

商城首页,每个商品需要获取图片、价格、销量、店铺、优惠等等一系列信息。

如果单个单个的请求,肯定会由于响应太慢,流失用户。

于是我们自然的会想到使用并发去获取数据,组装后在返回给前端展示。

不过在微服务中,一般信息不会都存在自己这里,会有下游服务进行提供,为了保护自己的系统不会被高流量打垮,下游一般都会限制请求的qps。

比如你有50个商品,但是下游限制10个并发。那么我们就需要一种控制并发数量的手段去请求下游。

在golang中,channel 和 waitgroup 就是常用的控制并发请求的手段。下面我们就来实现一个通用的并发控制方法。

代码语言:javascript复制
//定义通用函数,包装用户的业务函数,返回单次执行的result和error
type fcWarp func(interface{}) (interface{}, error)


/**
   goNum := 一共需要开启的协程数
   data  := 业务请求参数,必须是slice
   fc:定义通用函数,包装用户的业务函数,返回单次执行的result和error
   返回:
   result:slice类型,包含多次请求的结果
   resErrs :slice类型,包含多次请求中出现的err
 */
func concurrent(goNum uint, data interface{}, fc fcWarp)(result []interface{},resErrs []error) {
	if goNum <= 0 {
		panic("goNum must positive")
	}
	vType := reflect.TypeOf(data)
	if vType.Kind() != reflect.Slice{
		panic("data must slice")
	}
	vValue := reflect.ValueOf(data)

	limiter := make(chan struct{},goNum)
	result = make([]interface{},vValue.Len())
	resErrs = make([]error,vValue.Len())

	var wg sync.WaitGroup
	wg.Add(vValue.Len())
	for i := 0; i < vValue.Len(); i   {
		limiter <- struct{}{}
		go func(i int,d interface{}) {
			defer func() {
				<-limiter
				wg.Done()
				if err := recover();err != nil{
					resErrs[i] = fmt.Errorf("panic:%v",err)
				}
			}()
			res,err := fc(d)
			result[i] = res
			resErrs[i] = err
		}(i,vValue.Index(i).Interface())
	}
	wg.Wait()
	return result, resErrs
}

实际使用效果:

整体使用非常简单,只需要进行简单的断言就可以转化为原始数据进行处理。

并且可以通过下标获取到每次的请求res 和 err。

不需要去关心协程控制、错误处理

代码语言:javascript复制
func main()  {
	data := []int{1,2,3}
	res,errs := concurrent(2,data, func(idata interface{}) (interface{}, error) {
		tmp := idata.(int)
		if tmp == 2 {
			panic("is 2")
		}
		tmp  = tmp
		return tmp,nil
	})

	for i := 0; i < len(data); i   {
		if errs[i] != nil{
			fmt.Println(errs[i])
			continue
		}
		fmt.Println(res[i])
	}
}

输出:

代码语言:javascript复制
2
panic:is 2
6

0 人点赞