golang redis 客户端源码阅读(2)连接池初始化

2022-08-02 16:08:05 浏览数 (1)

初始化连接池的核心代码如下

代码语言:javascript复制
  client.pool = &redis.Pool{
    MaxIdle:     client.MaxIdle,
    IdleTimeout: time.Duration(client.IdleTimeoutS) * time.Second,
    MaxActive:   client.MaxActive,
    Dial: func() (redis.Conn, error) {
      var c redis.Conn
      var err error
      for i := 0; i < len(client.Servers) 1; i   {
        //随机挑选一个IP
        index := common.RandIntn(len(client.Servers))
        client.current_index = index
        c, err = redis.DialTimeout("tcp", client.Servers[index],
          time.Duration(client.ConnTimeoutMs)*time.Millisecond,
          time.Duration(client.ReadTimeoutMs)*time.Millisecond,
          time.Duration(client.WriteTimeoutMs)*time.Millisecond)
          }
        }
        }

除了超时和最大活跃连接数最大空闲连接数外,最重要的就是指定连接函数。

连接函数只是定义,在用的时候才连接,连接的调用链路如下:

conn.go

redis.DialTimeout(){}

代码语言:javascript复制
func Dial(network, address string, options ...DialOption) (Conn, error) {

然后调用

代码语言:javascript复制
 net.Dial的dial函数进行tcp连接,接着
代码语言:javascript复制
"AUTH"验证和db选择
代码语言:javascript复制
"SELECT"

返回一个连接

连接池的使用:

代码语言:javascript复制
conn := client.pool.Get()
  defer conn.Close()
  _, err := conn.Do("SET", key, value)

1,从池中捞一个链接

2,发送

3,放回池子

连接池的定义

代码语言:javascript复制
type Pool struct {

  // Dial is an application supplied function for creating and configuring a
  // connection.
  //
  // The connection returned from Dial must not be in a special state
  // (subscribed to pubsub channel, transaction started, ...).
  Dial func() (Conn, error) //连接函数

  // TestOnBorrow is an optional application supplied function for checking
  // the health of an idle connection before the connection is used again by
  // the application. Argument t is the time that the connection was returned
  // to the pool. If the function returns an error, then the connection is
  // closed.
  TestOnBorrow func(c Conn, t time.Time) error 
  //每次从连接池取出连接的时候,检查连接的健康度,如果放回错误,则释放这个连接

  // Maximum number of idle connections in the pool.
  MaxIdle int  //最大空闲连接

  // Maximum number of connections allocated by the pool at a given time.
  // When zero, there is no limit on the number of connections in the pool.
  MaxActive int  //如果是0 无限制,非0 一定时间端内,池子则最大的连接数

  // Close connections after remaining idle for this duration. If the value
  // is zero, then idle connections are not closed. Applications should set
  // the timeout to a value less than the server's timeout.
  IdleTimeout time.Duration //如果是0 连接不关闭,非0 ,剩余关闭时间

  // If Wait is true and the pool is at the MaxActive limit, then Get() waits
  // for a connection to be returned to the pool before returning.
  Wait bool  //当Wait 为true 时,并且池子则最大活跃连接数达到最大限制,获取连接的方法需要等待,有连接被放回池子,才能使用

  // mu protects fields defined below.
  mu     sync.Mutex
  cond   *sync.Cond
  closed bool
  active int

  // Stack of idleConn with most recently used at the front.
  idle list.List  //存放空闲连接的链表
}

获取可用连接函数(放回的连接用完后,需要用户自己释放)

其实这里返回的连接不是最原始的连接,而是池化连接

代码语言:javascript复制
type pooledConnection struct {
  p     *Pool
  c     Conn
  state int
}

即对原始连接进行了包装

代码语言:javascript复制
// Get gets a connection. The application must close the returned connection.
// This method always returns a valid connection so that applications can defer
// error handling to the first use of the connection. If there is an error
// getting an underlying connection, then the connection Err, Do, Send, Flush
// and Receive methods return that error.
func (p *Pool) Get() Conn {
  c, err := p.get()
  if err != nil {
    return errorConnection{err}
  }
  return &pooledConnection{p: p, c: c}
}

//释放
func (pc *pooledConnection) Close() error {
  c := pc.c
  if _, ok := c.(errorConnection); ok {
    return nil
  }
  pc.c = errorConnection{errConnClosed}

  if pc.state&internal.MultiState != 0 {
    c.Send("DISCARD")
    pc.state &^= (internal.MultiState | internal.WatchState)
  } else if pc.state&internal.WatchState != 0 {
    c.Send("UNWATCH")
    pc.state &^= internal.WatchState
  }
  if pc.state&internal.SubscribeState != 0 {
    c.Send("UNSUBSCRIBE")
    c.Send("PUNSUBSCRIBE")
    // To detect the end of the message stream, ask the server to echo
    // a sentinel value and read until we see that value.
    sentinelOnce.Do(initSentinel)
    c.Send("ECHO", sentinel)
    c.Flush()
    for {
      p, err := c.Receive()
      if err != nil {
        break
      }
      if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) {
        pc.state &^= internal.SubscribeState
        break
      }
    }
  }
  c.Do("")
  pc.p.put(c, pc.state != 0)
  return nil
}
代码语言:javascript复制

// get prunes stale connections and returns a connection from the idle list or
// creates a new connection.
func (p *Pool) get() (Conn, error) {
  p.mu.Lock()

  // Prune stale connections.

  if timeout := p.IdleTimeout; timeout > 0 {
    for i, n := 0, p.idle.Len(); i < n; i   {
      e := p.idle.Back()
      if e == nil {//没有空闲连接了
        break
      }
      ic := e.Value.(idleConn)
      if ic.t.Add(timeout).After(nowFunc()) {//连接已经超时
        break
      }
      p.idle.Remove(e) //从空闲连接中移除
      p.release()//1,空闲连接数减一  2,给所有等待获取连接的协程发信号
      p.mu.Unlock()
      ic.c.Close()//以下几种,状态特殊处理,
      //最后将连接   重新放回连接池头部   ,如果达到最大连接数,则挤掉尾部连接,并放回
        //const (
        //WatchState = 1 << iota
        //MultiState
        //SubscribeState
        //MonitorState
        //)
      p.mu.Lock()
    }
  }

  for {

    // Get idle connection.

    for i, n := 0, p.idle.Len(); i < n; i   {
      e := p.idle.Front()
      if e == nil {
        break
      }
      ic := e.Value.(idleConn)
      p.idle.Remove(e)//从空闲连接中取出
      test := p.TestOnBorrow //检查连接是否可用
      p.mu.Unlock()
      if test == nil || test(ic.c, ic.t) == nil {
        return ic.c, nil  //可用就直接放回
      }
      ic.c.Close()  //关闭不可用连接,(放回链表头部)
      p.mu.Lock()
      p.release()
    }

    // Check for pool closed before dialing a new connection.

    if p.closed {
      p.mu.Unlock()
      return nil, errors.New("redigo: get on closed pool")
    }

    // Dial new connection if under limit.

    if p.MaxActive == 0 || p.active < p.MaxActive {
      dial := p.Dial // 没有达到最大活跃连接数,重新生成一个连接,并返回
      p.active  = 1
      p.mu.Unlock()
      c, err := dial()
      if err != nil {
        p.mu.Lock()
        p.release()
        p.mu.Unlock()
        c = nil
      }
      return c, err
    }

    if !p.Wait {
      p.mu.Unlock()
      return nil, ErrPoolExhausted
    }

    if p.cond == nil {
      p.cond = sync.NewCond(&p.mu)
    }
    p.cond.Wait() //循环中等待事件发生
  }
}

func (p *Pool) put(c Conn, forceClose bool) error {
  err := c.Err()
  p.mu.Lock()
  if !p.closed && err == nil && !forceClose {
    p.idle.PushFront(idleConn{t: nowFunc(), c: c})
    if p.idle.Len() > p.MaxIdle {//达到最大空闲连接数,将队列尾部的连接,弹出
      c = p.idle.Remove(p.idle.Back()).(idleConn).c
    } else {
      c = nil
    }
  }

  if c == nil {
    if p.cond != nil {
      p.cond.Signal()   //没有达到最大空闲连接数,发信号,重新生成一个连接
    }
    p.mu.Unlock()
    return nil
  }

  p.release()
  p.mu.Unlock()  
  return c.Close() //达到最大连接数,关闭连接
}

0 人点赞