2020-06-30 一次极大的优化cpu和内存使用的记录

2022-12-28 14:09:28 浏览数 (2)

CPU 的优化

未优化前的代码:

代码语言:javascript复制
func (s *Schedule) handleLoopQueue(ctx context.Context, lq *loopQueue) {
    defer lq.Close()
    goroutineFlag := fmt.Sprintf("pipeline[%v-%v]", ctx.Value(common.PipelineKey), ctx.Value(common.ConcurrencyKey))
    for {
        select {
        case <-s.ctx.Done():
            s.logger.Debug("Schedule context done")
            return
        case <-ctx.Done():
            s.logger.Debug("Context done %s", goroutineFlag)
            return
        case task, ok := <-lq.Pop(): // 关闭后,读取时,task是空,但ok是false
            if ok {
                task.Execute(ctx)
                if err := lq.Push(task); err != nil {
                    s.logger.Error(errors.Wrapf(err, "LoopQueue push %s", goroutineFlag).Error())
                    return
                }
            }
        }
    }
}

调用代码:

代码语言:javascript复制
for i, lq := 0, newLoopQueue(tasks); i < concurrency; i   {
            go s.handleLoopQueue(context.WithValue(ctx, common.ConcurrencyKey, ip "-" strconv.Itoa(i)), lq)
}

逻辑是,生成一个任务队列,然后根据并发数共同消费这个队列,每次任务执行完毕后,都会将任务重新放回队列中,这样循环使用。同时,每个任务执行完毕后,若失败,或空执行(即,任务无实际数据的处理),会延长其下次执行时间。每次执行时,若未到执行时间,则直接执行完毕。

然而根据实测效果,这段代码几乎能跑满cpu,原因是cpu几乎都耗在select等待上,并且总能等到队列出来任务(因为总有其他的任务执行完了然后被放回去,或未到执行时间立即被放回去)。

后来优化了半天没有好的思路,后来灵机一动,给外层for循环增加一个ticker,以增加任务间的等待时间,降低cpu空转率。

优化后代码如下:

代码语言:javascript复制
func (s *Schedule) handleLoopQueue(ctx context.Context, lq *loopQueue) {
    defer lq.Close()
    goroutineFlag := fmt.Sprintf("pipeline[%v-%v]", ctx.Value(common.PipelineKey), ctx.Value(common.ConcurrencyKey))
    for range time.Tick(time.Second) { // 每个任务执行间隔一秒钟,极大降低cpu空转率
        select {
        case <-s.ctx.Done():
            s.logger.Debug("Schedule context done")
            return
        case <-ctx.Done():
            s.logger.Debug("Context done %s", goroutineFlag)
            return
        case task, ok := <-lq.Pop(): // 关闭后,读取时,task是空,但ok是false
            if ok {
                task.Execute(ctx)
                if err := lq.Push(task); err != nil {
                    s.logger.Error(errors.Wrapf(err, "LoopQueue push %s", goroutineFlag).Error())
                    return
                }
            }
        }
    }
}

即将for {改成for range time.Tick(time.Second) {,部署后,比原先cpu使用率缩水10倍以上。

内存的优化

优化前的代码:

代码语言:javascript复制
var (
    _timedOut time.Duration = 5 * time.Second
)

func (lq *loopQueue) Push(task task.ScheduleTask) error {
    lq.RLock()
    defer lq.RUnlock()
    // 如果队列关闭,则停止入队
    if lq.closed == 1 {
        return nil
    }
    select {
    case lq.queue <- task:
        return nil
    case time.After(_timedOut):
        return errors.New("push timed out")
    }
}

在大并发的情况下,time.After非常消耗内存,因为每次都会创建一个chan。每秒6000个并发情况下,几秒钟就花费了几十M内存,几分钟内存上升至G级别。因此考虑采用内存池(sync.Pool)的方式,尽量将计时器回收利用,以节省内存空间。

优化后的代码:

代码语言:javascript复制
func newLoopQueue(tasks map[string]task.ScheduleTask) *loopQueue {
    queue := make(chan task.ScheduleTask, len(tasks))
    for _, task := range tasks {
        queue <- task
    }
    pool := &sync.Pool{
        New: func() interface{} {
            return time.NewTimer(_timedOut)
        },
    }
    return &loopQueue{tasks: tasks, queue: queue, pool: pool}
}

func (lq *loopQueue) Push(task task.ScheduleTask) error {
    lq.RLock()
    defer lq.RUnlock()
    // 如果队列关闭,则停止入队
    if lq.closed == 1 {
        return nil
    }
    // 从池中取出计时器
    timer, ok := lq.pool.Get().(*time.Timer)
    if !ok {
        return errors.Errorf("Invalid pool type, want[*time.Timer], actul[%s]", reflect.TypeOf(timer))
    }
    // 使用前重置计时器的超时时间
    if !timer.Reset(_timedOut) {
        return errors.New("Reset timer")
    }
    select {
    case lq.queue <- task:
        // 计时器未被使用,放回池中重复利用
        lq.pool.Put(timer)
        return nil
    case <-timer.C:
        return errors.New("push timed out")
    }
}

注意:如果timer被使用(<-time.C)或者被手动停止后,time.Reset是会报错的。 部署后,运行3个小时 查看,内存使用前30名已然看不到人影,实际占用内存不足1M可以忽略不计~

0 人点赞