Go-Redis Client 配置

2022-02-13 14:36:13 浏览数 (1)

前言

Go Redis 采用的是 new Client 方式初始化一个 client 实例, 一个 client 示例会给每个后端 proxy(codis-proxy) 建立一个连接池 connPool, 每次调用 client 的方法时, client 会轮训选择一个 connPool, 然后再选择一个 conn 来请求真正的 redis proxy

创建一个 Redis 客户端
代码语言:javascript复制
rdb := redis.NewClient(&redis.Options{
  Addr:     "172.31.1.135:7000",
  Password: "",
  DB:       0,
 })
 
func NewClient(opt *Options) *Client {
 opt.init()

 c := Client{
  baseClient: newBaseClient(opt, newConnPool(opt)),
  ctx:        context.Background(),
 }
 c.cmdable = c.Process

 return &c
}

Option 说明

goredis 目录下 optiion.go

opt.init()options 初始化方法,PoolSize, ReadTimeOut, WriteTimeOut 有初始化设置值

代码语言:javascript复制
func (opt *Options) init() {
 if opt.Addr == "" {
  opt.Addr = "localhost:6379"
 }
 if opt.Network == "" {
  if strings.HasPrefix(opt.Addr, "/") {
   opt.Network = "unix"
  } else {
   opt.Network = "tcp"
  }
 }
 if opt.DialTimeout == 0 {
  opt.DialTimeout = 5 * time.Second
 }
 if opt.Dialer == nil {
  opt.Dialer = func(ctx context.Context, network, addr string) (net.Conn, error) {
   netDialer := &net.Dialer{
    Timeout:   opt.DialTimeout,
    KeepAlive: 5 * time.Minute,
   }
   if opt.TLSConfig == nil {
    return netDialer.DialContext(ctx, network, addr)
   }
   return tls.DialWithDialer(netDialer, network, addr, opt.TLSConfig)
  }
 }
 if opt.PoolSize == 0 {
  opt.PoolSize = 10 * runtime.NumCPU()
 }
 switch opt.ReadTimeout {
 case -1:
  opt.ReadTimeout = 0
 case 0:
  opt.ReadTimeout = 3 * time.Second
 }
 switch opt.WriteTimeout {
 case -1:
  opt.WriteTimeout = 0
 case 0:
  opt.WriteTimeout = opt.ReadTimeout
 }
 if opt.PoolTimeout == 0 {
  opt.PoolTimeout = opt.ReadTimeout   time.Second
 }
 if opt.IdleTimeout == 0 {
  opt.IdleTimeout = 5 * time.Minute
 }
 if opt.IdleCheckFrequency == 0 {
  opt.IdleCheckFrequency = time.Minute
 }

 if opt.MaxRetries == -1 {
  opt.MaxRetries = 0
 } else if opt.MaxRetries == 0 {
  opt.MaxRetries = 3
 }
 switch opt.MinRetryBackoff {
 case -1:
  opt.MinRetryBackoff = 0
 case 0:
  opt.MinRetryBackoff = 8 * time.Millisecond
 }
 switch opt.MaxRetryBackoff {
 case -1:
  opt.MaxRetryBackoff = 0
 case 0:
  opt.MaxRetryBackoff = 512 * time.Millisecond
 }
}

option 结构体如下:

代码语言:javascript复制
type Options struct {
 Dialer  func(context.Context) (net.Conn, error)
 OnClose func(*Conn) error

 PoolSize           int
 MinIdleConns       int
 MaxConnAge         time.Duration
 PoolTimeout        time.Duration
 IdleTimeout        time.Duration
 IdleCheckFrequency time.Duration
}
option 参数说明
  1. PoolSize: 单个connPool最大连接数。建议根据业务qps计算合理值配置。可以简单计算如:单业务实例需要抗瞬时最大请求数量 500、访问耗时常态10ms、超时50ms、后端10个proxy。则合理配置PoolSize为:500 / 10 proxy / (50ms / 10ms) = 10。
  2. SetPoolInitSize。每个connPool初始化时创建的连接数,脉冲型业务建议为(proxy数量 * PoolSize),即初始化时就创建全部连接,避免尖峰流量到来时突发建连(会导致一波超时) 3.LiveTimeout。单个连接建立后超过liveTimeout时client会主动断链。建议使用默认值0。
  3. IdleTimeout。单个连接空闲IdleTimeout时长后client会主动断链。建议值为25min。
  4. DialTimeoutReadTimeoutWriteTimeout。分别为建连、读、写超时时间。建议值均为100ms。
  5. PoolTimeout。连接数达到上线PoolSize时等待的最长时间,建议设置为(业务超时时间 - 访问abase耗时)
  6. MaxRetries。最大重试次数,会对连接错误以及timeout错误做重试。建议使用默认配置0。
创建连接池代码
代码语言:javascript复制
func newConnPool(opt *Options) *pool.ConnPool {
 return pool.NewConnPool(&pool.Options{
  Dialer: func(ctx context.Context) (net.Conn, error) {
   ctx, span := internal.StartSpan(ctx, "redis.dial")
   defer span.End()

   if span.IsRecording() {
    span.SetAttributes(
     attribute.String("db.connection_string", opt.Addr),
    )
   }

   cn, err := opt.Dialer(ctx, opt.Network, opt.Addr)
   if err != nil {
    return nil, internal.RecordError(ctx, span, err)
   }

   return cn, nil
  },
  PoolSize:           opt.PoolSize,
  MinIdleConns:       opt.MinIdleConns,
  MaxConnAge:         opt.MaxConnAge,
  PoolTimeout:        opt.PoolTimeout,
  IdleTimeout:        opt.IdleTimeout,
  IdleCheckFrequency: opt.IdleCheckFrequency,
 })
}
Redis 保活

发现问题没,goredis 中,并没有最小连接数的设置,也没有 KeepAlive 。 业务如果长时间没有流量,会导致 和 Redis 实例断开链接,因为已经到达了 IdelTimeout 。 当尖峰流量到来的时候,也会导致突发建联, 导致一波超时,因为建立链接是比较耗时的。典型场景: 红包雨。直播间喊麦倒计时开始 ... 等,此时流量会突刺上涨。

如何解决上面说的问题: 保活

保活方法:

后台增加一个定时周期通过 ping 命令来保持链接, ping 是从链接池队尾获取链接, ping 完之后放回队尾, 这样需要足够的并发来实现保活,建议设置并发数 PoolInitSize*实例个数 , 周期设置为 IdleTimeOut/2

原理:定时去 ping 即可

代码语言:javascript复制
//  ping function
go func() {
    ping := func() {
        cli, err := libabase.GetABaseClient(psm)
        if err == nil {
            cli.Ping()
        }
    }

    var funcs = make([]func(), parallelism)
    for i := int64(0); i < parallelism; i   {
        funcs[i] = ping
    }

    for {
        libs.Dispatch(context.Background(), funcs, parallelism, false)
        time.Sleep(time.Minute   time.Duration(libs.RandInt63()0)*5*time.Second)
    }
}()

//  libs.dispatch
func Dispatch(ctx context.Context, funcs []func(), parallelism int64, isBlock bool) error {
 if parallelism <= 0 {
  return liberror.InvalidValueError("parallelism should > 0, but", parallelism)
 }

 var runningCnt = int64(0)
 var wg = sync.WaitGroup{}
 var lock = sync.Mutex{}
 var cond = sync.NewCond(&lock)

 for _, f := range funcs {
  wg.Add(1)
  go func(f func()) {
   defer func() {
    if r := recover(); r != nil {
     logs.CtxWarn(ctx, "Dispatch, recover, % v", r)
    }
    cond.L.Lock()
    runningCnt--
    cond.L.Unlock()
    cond.Signal()
    wg.Done()
   }()

   cond.L.Lock()
   for runningCnt >= parallelism { // 按照设定的并发度执行
    cond.Wait()
   }
   runningCnt  
   cond.L.Unlock()

   f()
  }(f)
 }

 // 需要阻塞的话, 需要wait
 if isBlock {
  wg.Wait()
 }

 return nil
}

可以看下简单的写法

启动一个协程,定时去 ping 即可保活

代码语言:javascript复制
//  ping function
go func() {
    ping := func() {
        cli, err := libabase.GetABaseClient(psm)
        if err == nil {
            cli.Ping()
        }
    }

    var funcs = make([]func(), parallelism)
    for i := int64(0); i < parallelism; i   {
        funcs[i] = ping
    }

    for {
        libs.Dispatch(context.Background(), funcs, parallelism, false)
        time.Sleep(time.Minute   time.Duration(libs.RandInt63()0)*5*time.Second)
    }
}()

//  libs.dispatch
func Dispatch(ctx context.Context, funcs []func(), parallelism int64, isBlock bool) error {
 if parallelism <= 0 {
  return liberror.InvalidValueError("parallelism should > 0, but", parallelism)
 }

 var runningCnt = int64(0)
 var wg = sync.WaitGroup{}
 var lock = sync.Mutex{}
 var cond = sync.NewCond(&lock)

 for _, f := range funcs {
  wg.Add(1)
  go func(f func()) {
   defer func() {
    if r := recover(); r != nil {
     logs.CtxWarn(ctx, "Dispatch, recover, % v", r)
    }
    cond.L.Lock()
    runningCnt--
    cond.L.Unlock()
    cond.Signal()
    wg.Done()
   }()

   cond.L.Lock()
   for runningCnt >= parallelism { // 按照设定的并发度执行
    cond.Wait()
   }
   runningCnt  
   cond.L.Unlock()

   f()
  }(f)
 }

 // 需要阻塞的话, 需要wait
 if isBlock {
  wg.Wait()
 }

 return nil
}

0 人点赞