前言
Go Redis 采用的是 new Client
方式初始化一个 client
实例, 一个 client 示例会给每个后端 proxy(codis-proxy) 建立一个连接池 connPool
, 每次调用 client 的方法时, client 会轮训选择一个 connPool
, 然后再选择一个 conn
来请求真正的 redis proxy
创建一个 Redis
客户端
代码语言:javascript复制rdb := redis.NewClient(&redis.Options{
Addr: "172.31.1.135:7000",
Password: "",
DB: 0,
})
func NewClient(opt *Options) *Client {
opt.init()
c := Client{
baseClient: newBaseClient(opt, newConnPool(opt)),
ctx: context.Background(),
}
c.cmdable = c.Process
return &c
}
Option 说明
goredis
目录下 optiion.go
opt.init()
是 options
初始化方法,PoolSize, ReadTimeOut, WriteTimeOut
有初始化设置值
func (opt *Options) init() {
if opt.Addr == "" {
opt.Addr = "localhost:6379"
}
if opt.Network == "" {
if strings.HasPrefix(opt.Addr, "/") {
opt.Network = "unix"
} else {
opt.Network = "tcp"
}
}
if opt.DialTimeout == 0 {
opt.DialTimeout = 5 * time.Second
}
if opt.Dialer == nil {
opt.Dialer = func(ctx context.Context, network, addr string) (net.Conn, error) {
netDialer := &net.Dialer{
Timeout: opt.DialTimeout,
KeepAlive: 5 * time.Minute,
}
if opt.TLSConfig == nil {
return netDialer.DialContext(ctx, network, addr)
}
return tls.DialWithDialer(netDialer, network, addr, opt.TLSConfig)
}
}
if opt.PoolSize == 0 {
opt.PoolSize = 10 * runtime.NumCPU()
}
switch opt.ReadTimeout {
case -1:
opt.ReadTimeout = 0
case 0:
opt.ReadTimeout = 3 * time.Second
}
switch opt.WriteTimeout {
case -1:
opt.WriteTimeout = 0
case 0:
opt.WriteTimeout = opt.ReadTimeout
}
if opt.PoolTimeout == 0 {
opt.PoolTimeout = opt.ReadTimeout time.Second
}
if opt.IdleTimeout == 0 {
opt.IdleTimeout = 5 * time.Minute
}
if opt.IdleCheckFrequency == 0 {
opt.IdleCheckFrequency = time.Minute
}
if opt.MaxRetries == -1 {
opt.MaxRetries = 0
} else if opt.MaxRetries == 0 {
opt.MaxRetries = 3
}
switch opt.MinRetryBackoff {
case -1:
opt.MinRetryBackoff = 0
case 0:
opt.MinRetryBackoff = 8 * time.Millisecond
}
switch opt.MaxRetryBackoff {
case -1:
opt.MaxRetryBackoff = 0
case 0:
opt.MaxRetryBackoff = 512 * time.Millisecond
}
}
option 结构体如下:
代码语言:javascript复制type Options struct {
Dialer func(context.Context) (net.Conn, error)
OnClose func(*Conn) error
PoolSize int
MinIdleConns int
MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
}
option 参数说明
PoolSize
: 单个connPool
最大连接数。建议根据业务qps计算合理值配置。可以简单计算如:单业务实例需要抗瞬时最大请求数量 500、访问耗时常态10ms、超时50ms、后端10个proxy。则合理配置PoolSize为:500 / 10 proxy / (50ms / 10ms) = 10。SetPoolInitSize
。每个connPool初始化时创建的连接数,脉冲型业务建议为(proxy数量 * PoolSize),即初始化时就创建全部连接,避免尖峰流量到来时突发建连(会导致一波超时) 3.LiveTimeout
。单个连接建立后超过liveTimeout时client会主动断链。建议使用默认值0。IdleTimeout
。单个连接空闲IdleTimeout时长后client会主动断链。建议值为25min。DialTimeout
、ReadTimeout
、WriteTimeout
。分别为建连、读、写超时时间。建议值均为100ms。PoolTimeout
。连接数达到上线PoolSize时等待的最长时间,建议设置为(业务超时时间 - 访问abase耗时)MaxRetries
。最大重试次数,会对连接错误以及timeout错误做重试。建议使用默认配置0。
创建连接池代码
代码语言:javascript复制func newConnPool(opt *Options) *pool.ConnPool {
return pool.NewConnPool(&pool.Options{
Dialer: func(ctx context.Context) (net.Conn, error) {
ctx, span := internal.StartSpan(ctx, "redis.dial")
defer span.End()
if span.IsRecording() {
span.SetAttributes(
attribute.String("db.connection_string", opt.Addr),
)
}
cn, err := opt.Dialer(ctx, opt.Network, opt.Addr)
if err != nil {
return nil, internal.RecordError(ctx, span, err)
}
return cn, nil
},
PoolSize: opt.PoolSize,
MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
})
}
Redis 保活
发现问题没,goredis 中,并没有最小连接数的设置,也没有 KeepAlive 。 业务如果长时间没有流量,会导致 和 Redis 实例断开链接,因为已经到达了 IdelTimeout
。 当尖峰流量到来的时候,也会导致突发建联, 导致一波超时,因为建立链接是比较耗时的。典型场景: 红包雨。直播间喊麦倒计时开始 ... 等,此时流量会突刺上涨。
如何解决上面说的问题: 保活
保活方法:
后台增加一个定时周期通过 ping
命令来保持链接, ping
是从链接池队尾获取链接, ping
完之后放回队尾, 这样需要足够的并发来实现保活,建议设置并发数 PoolInitSize*实例个数 , 周期设置为 IdleTimeOut/2
原理:定时去 ping 即可
代码语言:javascript复制// ping function
go func() {
ping := func() {
cli, err := libabase.GetABaseClient(psm)
if err == nil {
cli.Ping()
}
}
var funcs = make([]func(), parallelism)
for i := int64(0); i < parallelism; i {
funcs[i] = ping
}
for {
libs.Dispatch(context.Background(), funcs, parallelism, false)
time.Sleep(time.Minute time.Duration(libs.RandInt63()0)*5*time.Second)
}
}()
// libs.dispatch
func Dispatch(ctx context.Context, funcs []func(), parallelism int64, isBlock bool) error {
if parallelism <= 0 {
return liberror.InvalidValueError("parallelism should > 0, but", parallelism)
}
var runningCnt = int64(0)
var wg = sync.WaitGroup{}
var lock = sync.Mutex{}
var cond = sync.NewCond(&lock)
for _, f := range funcs {
wg.Add(1)
go func(f func()) {
defer func() {
if r := recover(); r != nil {
logs.CtxWarn(ctx, "Dispatch, recover, % v", r)
}
cond.L.Lock()
runningCnt--
cond.L.Unlock()
cond.Signal()
wg.Done()
}()
cond.L.Lock()
for runningCnt >= parallelism { // 按照设定的并发度执行
cond.Wait()
}
runningCnt
cond.L.Unlock()
f()
}(f)
}
// 需要阻塞的话, 需要wait
if isBlock {
wg.Wait()
}
return nil
}
可以看下简单的写法
启动一个协程,定时去 ping
即可保活
// ping function
go func() {
ping := func() {
cli, err := libabase.GetABaseClient(psm)
if err == nil {
cli.Ping()
}
}
var funcs = make([]func(), parallelism)
for i := int64(0); i < parallelism; i {
funcs[i] = ping
}
for {
libs.Dispatch(context.Background(), funcs, parallelism, false)
time.Sleep(time.Minute time.Duration(libs.RandInt63()0)*5*time.Second)
}
}()
// libs.dispatch
func Dispatch(ctx context.Context, funcs []func(), parallelism int64, isBlock bool) error {
if parallelism <= 0 {
return liberror.InvalidValueError("parallelism should > 0, but", parallelism)
}
var runningCnt = int64(0)
var wg = sync.WaitGroup{}
var lock = sync.Mutex{}
var cond = sync.NewCond(&lock)
for _, f := range funcs {
wg.Add(1)
go func(f func()) {
defer func() {
if r := recover(); r != nil {
logs.CtxWarn(ctx, "Dispatch, recover, % v", r)
}
cond.L.Lock()
runningCnt--
cond.L.Unlock()
cond.Signal()
wg.Done()
}()
cond.L.Lock()
for runningCnt >= parallelism { // 按照设定的并发度执行
cond.Wait()
}
runningCnt
cond.L.Unlock()
f()
}(f)
}
// 需要阻塞的话, 需要wait
if isBlock {
wg.Wait()
}
return nil
}