Mysql连接管理从network_init()到connection_event_loop()

2022-05-12 10:39:17 浏览数 (1)

1 network setup简要流程

代码语言:javascript复制
[mysql.cc]network_init
  > Mysqld_socket_listener *mysqld_socket_listener=new (std::nothrow) Mysqld_socket_listener(bind_addr_str,...)
  > Connection_acceptor<Mysqld_socket_listener> *mysqld_socket_acceptor = new (std::nothrow) Connection_acceptor<Mysqld_socket_listener>(mysqld_socket_listener)
  > mysqld_socket_acceptor->init_connection_acceptor()
  |
  [connection_acceptor.h]init_connection_acceptor()
    > m_listener->setup_listener();
    |
    [socket_connection.cc]Mysqld_socket_listener::setup_listener()
      > setup_listener都做了什么? 跳转到“3 class Mysqld_socket_listene”

2 class Connection_acceptor

这里面涉及两个重要数据结构,Connection_acceptor和Mysqld_socket_listener。

  • Connection_acceptor模版类需要listener
  • Mysqld_socket_listener作为listener传入Connection_acceptor

Connection_acceptor较为简单,需要传入的listener有setup_listener()方法即可。

代码语言:javascript复制
template <typename Listener> class Connection_acceptor
{
  Listener *m_listener;

public:
  Connection_acceptor(Listener *listener)
  : m_listener(listener)
  { }

  ~Connection_acceptor()
  {
    delete m_listener;
  }

  /**
    Initialize a connection acceptor.

    @retval   return true if initialization failed, else false.
  */
  bool init_connection_acceptor()
  {
    return m_listener->setup_listener();
  }

  /**
    Connection acceptor loop to accept connections from clients.
  */
  void connection_event_loop()
  {
    Connection_handler_manager *mgr= Connection_handler_manager::get_instance();
    while (!abort_loop)
    {
      Channel_info *channel_info= m_listener->listen_for_connection_event();
      if (channel_info != NULL)
        mgr->process_new_connection(channel_info);
    }
  }

  /**
    Close the listener.
  */
  void close_listener()
  {
    m_listener->close_listener();
  }

};

3 class Mysqld_socket_listener

Mysqld_socket_listener对连接进行具体的初始化操作

代码语言:javascript复制
class Mysqld_socket_listener
{
  std::string m_bind_addr_str; // IP address string
  uint m_tcp_port; // TCP port to bind to
  uint m_backlog; // backlog specifying length of pending connection queue
  uint m_port_timeout; // port timeout value
  std::string m_unix_sockname; // unix socket pathname to bind to
  bool m_unlink_sockname; // Unlink socket & lock file if true.
  /*
    Map indexed by MYSQL socket fds and correspoding bool to distinguish
    between unix and tcp socket.
  */
  socket_map_t m_socket_map; // map indexed by mysql socket fd and index

  uint m_error_count; // Internal variable for maintaining error count.

#ifdef HAVE_POLL
  static const int MAX_SOCKETS=2;
  struct poll_info_t
  {
    struct pollfd m_fds[MAX_SOCKETS];
    MYSQL_SOCKET m_pfs_fds[MAX_SOCKETS];
  };
  // poll related info. used in poll for listening to connection events.
  poll_info_t m_poll_info;
#else
  struct select_info_t
  {
    fd_set  m_read_fds,m_client_fds;
    my_socket m_max_used_connection;
    select_info_t() : m_max_used_connection(0)
    { FD_ZERO(&m_client_fds); }
  };
  // select info for used in select for listening to connection events.
  select_info_t m_select_info;
#endif // HAVE_POLL

...

public:
  static ulong get_connection_errors_select()
  {
    return connection_errors_select;
  }

  static ulong get_connection_errors_accept()
  {
    return connection_errors_accept;
  }

  static ulong get_connection_errors_tcpwrap()
  {
    return connection_errors_tcpwrap;
  }

  /**
    Constructor to setup a listener for listen to connect events from
    clients.

    @param   bind_addr_str  IP address used in bind
    @param   tcp_port       TCP port to bind to
    @param   backlog        backlog specifying length of pending
                            connection queue used in listen.
    @param   port_timeout   portname.
    @param   unix_sockname  pathname for unix socket to bind to
  */
  Mysqld_socket_listener(std::string bind_addr_str, uint tcp_port,
                         uint backlog, uint port_timeout,
                         std::string unix_sockname);

  /**
    Set up a listener - set of sockets to listen for connection events
    from clients.

    @retval false  listener sockets setup to be used to listen for connect events
            true   failure in setting up the listener.
  */
  bool setup_listener();

  /**
    The body of the event loop that listen for connection events from clients.

    @retval Channel_info   Channel_info object abstracting the connected client
                           details for processing this connection.
  */
  Channel_info* listen_for_connection_event();

  /**
    Close the listener.
  */
  void close_listener();

  ~Mysqld_socket_listener()
  {
    if (!m_socket_map.empty())
      close_listener();
  }
};

3.1 Mysqld_socket_listener::setup_listener

比较重要的就是这里了,实现具体的socket初始化流程:

[socket_connection.cc]bool Mysqld_socket_listener::setup_listener()

第一步,如果有配置端口监听,拿到第一个网络套接字

代码语言:javascript复制
  if (m_tcp_port)
  {
    TCP_socket tcp_socket(m_bind_addr_str, m_tcp_port,
                          m_backlog, m_port_timeout);

    MYSQL_SOCKET mysql_socket= tcp_socket.get_listener_socket();
    if (mysql_socket.fd == INVALID_SOCKET)
      return true;

    m_socket_map.insert(std::pair<MYSQL_SOCKET,bool>(mysql_socket, false));
  }

第二步,如果有域套接字,拿到第二个域套接字

代码语言:javascript复制
  if (m_unix_sockname != "")
  {
    Unix_socket unix_socket(&m_unix_sockname, m_backlog);

    MYSQL_SOCKET mysql_socket= unix_socket.get_listener_socket();
    if (mysql_socket.fd == INVALID_SOCKET)
      return true;

    m_socket_map.insert(std::pair<MYSQL_SOCKET,bool>(mysql_socket, true));
    m_unlink_sockname= true;
  }

现在socket_map里面有两个socket:

第三步,使用poll监听相关的socket

代码语言:javascript复制
  for (socket_map_iterator_t sock_map_iter=  m_socket_map.begin();
       sock_map_iter != m_socket_map.end();   sock_map_iter)
  {
    MYSQL_SOCKET listen_socket= sock_map_iter->first;
    mysql_socket_set_thread_owner(listen_socket);
    m_poll_info.m_fds[count].fd= mysql_socket_getfd(listen_socket);
    m_poll_info.m_fds[count].events= POLLIN;
    m_poll_info.m_pfs_fds[count]= listen_socket;
    count  ;
  }

装入两个socket后:

4 进入监听循环

进入监听循环的还是Mysqld_socket_listener类的方法:listen_for_connection_event

代码语言:javascript复制
mysqld_main
  |
  [mysql.cc]network_init
  ...
  [mysql.cc]mysqld_socket_acceptor->connection_event_loop();
    > Connection_handler_manager *mgr= Connection_handler_manager::get_instance();
    > while (!abort_loop)
      > [socket_connection.cc]Channel_info* Mysqld_socket_listener::listen_for_connection_event()
        > 等待连接:poll(&m_poll_info.m_fds[0], m_socket_map.size(), -1);
      > 如果有连接进来:mgr->process_new_connection(channel_info);
        > check_and_incr_conn_count()
        > m_connection_handler->add_connection(channel_info)
          > Per_thread_connection_handler::add_connection()
            > 这就到另外一篇的内容了《Mysql连接建立与thread cache唤醒原理
              > https://blog.csdn.net/jackgo73/article/details/113118822

5 poll支持拓展

主线程在监听socket时使用的poll似乎只传入了第一个socket?看下代码:

代码语言:javascript复制
poll(&m_poll_info.m_fds[0], m_socket_map.size(), -1);

m_poll_info.m_fds默认会有两个socket,第一个是3306端口绑定的网络套接字,第二个是给定路径的域套接字。poll全部都会监听的:

int poll ( struct pollfd *fdarray , unsigned long nfds , int timeout );

参数:

  • fdarray :指向数组的指针,这个数组是由 struct pollfd 作为元素构成的
  • nfds :与 select 相同,对应的是所监控的最大文件描述符的个数,使用的时候传入当前最大的文件描述符号 1 即可
  • timeout : 在这里可以用来设定 poll 的工作模式 : 阻塞模式, 非阻塞模式, 介于阻塞非阻塞之间的在限定时间内阻塞模式
    • INFTIME(-1) 永远等待
    • 0 立即返回,不阻塞进程
    • >0 等待指定数目的毫秒数

返回值:

  • 0 : 没有任何套接字上没有变化
  • n(n>0) : 有 n 个套接字上面有变化(来可读数据,有了可写的数据)
  • -1 : poll 执行过程中出错

6 总结

  1. mysqld初始化,主进程先network_init() (1)创建connection_accepter,传入mysqld_socket_listener (2)执行mysqld_socket_listener的setup操作,对网络和域套接字分别初始化,按poll能识别的格式记录到m_poll_info数组中
  2. mysqld继续初始化,主进程先connection_event_loop() (3)开始监听,执行mysqld_socket_listener的eventloop动作,poll传入上面构造好的结构体m_poll_info开始监听。 (4)监听到了之后,用mgr->process_new_connection(channel_info);处理 (5)进入add_connection()里面优先使用thread cache处理连接,发信号唤醒线程继续处理。

0 人点赞