1 network setup简要流程
代码语言:javascript复制[mysql.cc]network_init
> Mysqld_socket_listener *mysqld_socket_listener=new (std::nothrow) Mysqld_socket_listener(bind_addr_str,...)
> Connection_acceptor<Mysqld_socket_listener> *mysqld_socket_acceptor = new (std::nothrow) Connection_acceptor<Mysqld_socket_listener>(mysqld_socket_listener)
> mysqld_socket_acceptor->init_connection_acceptor()
|
[connection_acceptor.h]init_connection_acceptor()
> m_listener->setup_listener();
|
[socket_connection.cc]Mysqld_socket_listener::setup_listener()
> setup_listener都做了什么? 跳转到“3 class Mysqld_socket_listene”
2 class Connection_acceptor
这里面涉及两个重要数据结构,Connection_acceptor和Mysqld_socket_listener。
- Connection_acceptor模版类需要listener
- Mysqld_socket_listener作为listener传入Connection_acceptor
Connection_acceptor较为简单,需要传入的listener有setup_listener()方法即可。
代码语言:javascript复制template <typename Listener> class Connection_acceptor
{
Listener *m_listener;
public:
Connection_acceptor(Listener *listener)
: m_listener(listener)
{ }
~Connection_acceptor()
{
delete m_listener;
}
/**
Initialize a connection acceptor.
@retval return true if initialization failed, else false.
*/
bool init_connection_acceptor()
{
return m_listener->setup_listener();
}
/**
Connection acceptor loop to accept connections from clients.
*/
void connection_event_loop()
{
Connection_handler_manager *mgr= Connection_handler_manager::get_instance();
while (!abort_loop)
{
Channel_info *channel_info= m_listener->listen_for_connection_event();
if (channel_info != NULL)
mgr->process_new_connection(channel_info);
}
}
/**
Close the listener.
*/
void close_listener()
{
m_listener->close_listener();
}
};
3 class Mysqld_socket_listener
Mysqld_socket_listener对连接进行具体的初始化操作
代码语言:javascript复制class Mysqld_socket_listener
{
std::string m_bind_addr_str; // IP address string
uint m_tcp_port; // TCP port to bind to
uint m_backlog; // backlog specifying length of pending connection queue
uint m_port_timeout; // port timeout value
std::string m_unix_sockname; // unix socket pathname to bind to
bool m_unlink_sockname; // Unlink socket & lock file if true.
/*
Map indexed by MYSQL socket fds and correspoding bool to distinguish
between unix and tcp socket.
*/
socket_map_t m_socket_map; // map indexed by mysql socket fd and index
uint m_error_count; // Internal variable for maintaining error count.
#ifdef HAVE_POLL
static const int MAX_SOCKETS=2;
struct poll_info_t
{
struct pollfd m_fds[MAX_SOCKETS];
MYSQL_SOCKET m_pfs_fds[MAX_SOCKETS];
};
// poll related info. used in poll for listening to connection events.
poll_info_t m_poll_info;
#else
struct select_info_t
{
fd_set m_read_fds,m_client_fds;
my_socket m_max_used_connection;
select_info_t() : m_max_used_connection(0)
{ FD_ZERO(&m_client_fds); }
};
// select info for used in select for listening to connection events.
select_info_t m_select_info;
#endif // HAVE_POLL
...
public:
static ulong get_connection_errors_select()
{
return connection_errors_select;
}
static ulong get_connection_errors_accept()
{
return connection_errors_accept;
}
static ulong get_connection_errors_tcpwrap()
{
return connection_errors_tcpwrap;
}
/**
Constructor to setup a listener for listen to connect events from
clients.
@param bind_addr_str IP address used in bind
@param tcp_port TCP port to bind to
@param backlog backlog specifying length of pending
connection queue used in listen.
@param port_timeout portname.
@param unix_sockname pathname for unix socket to bind to
*/
Mysqld_socket_listener(std::string bind_addr_str, uint tcp_port,
uint backlog, uint port_timeout,
std::string unix_sockname);
/**
Set up a listener - set of sockets to listen for connection events
from clients.
@retval false listener sockets setup to be used to listen for connect events
true failure in setting up the listener.
*/
bool setup_listener();
/**
The body of the event loop that listen for connection events from clients.
@retval Channel_info Channel_info object abstracting the connected client
details for processing this connection.
*/
Channel_info* listen_for_connection_event();
/**
Close the listener.
*/
void close_listener();
~Mysqld_socket_listener()
{
if (!m_socket_map.empty())
close_listener();
}
};
3.1 Mysqld_socket_listener::setup_listener
比较重要的就是这里了,实现具体的socket初始化流程:
[socket_connection.cc]bool Mysqld_socket_listener::setup_listener()
第一步,如果有配置端口监听,拿到第一个网络套接字
代码语言:javascript复制 if (m_tcp_port)
{
TCP_socket tcp_socket(m_bind_addr_str, m_tcp_port,
m_backlog, m_port_timeout);
MYSQL_SOCKET mysql_socket= tcp_socket.get_listener_socket();
if (mysql_socket.fd == INVALID_SOCKET)
return true;
m_socket_map.insert(std::pair<MYSQL_SOCKET,bool>(mysql_socket, false));
}
第二步,如果有域套接字,拿到第二个域套接字
代码语言:javascript复制 if (m_unix_sockname != "")
{
Unix_socket unix_socket(&m_unix_sockname, m_backlog);
MYSQL_SOCKET mysql_socket= unix_socket.get_listener_socket();
if (mysql_socket.fd == INVALID_SOCKET)
return true;
m_socket_map.insert(std::pair<MYSQL_SOCKET,bool>(mysql_socket, true));
m_unlink_sockname= true;
}
现在socket_map里面有两个socket:
第三步,使用poll监听相关的socket
代码语言:javascript复制 for (socket_map_iterator_t sock_map_iter= m_socket_map.begin();
sock_map_iter != m_socket_map.end(); sock_map_iter)
{
MYSQL_SOCKET listen_socket= sock_map_iter->first;
mysql_socket_set_thread_owner(listen_socket);
m_poll_info.m_fds[count].fd= mysql_socket_getfd(listen_socket);
m_poll_info.m_fds[count].events= POLLIN;
m_poll_info.m_pfs_fds[count]= listen_socket;
count ;
}
装入两个socket后:
4 进入监听循环
进入监听循环的还是Mysqld_socket_listener类的方法:listen_for_connection_event
代码语言:javascript复制mysqld_main
|
[mysql.cc]network_init
...
[mysql.cc]mysqld_socket_acceptor->connection_event_loop();
> Connection_handler_manager *mgr= Connection_handler_manager::get_instance();
> while (!abort_loop)
> [socket_connection.cc]Channel_info* Mysqld_socket_listener::listen_for_connection_event()
> 等待连接:poll(&m_poll_info.m_fds[0], m_socket_map.size(), -1);
> 如果有连接进来:mgr->process_new_connection(channel_info);
> check_and_incr_conn_count()
> m_connection_handler->add_connection(channel_info)
> Per_thread_connection_handler::add_connection()
> 这就到另外一篇的内容了《Mysql连接建立与thread cache唤醒原理
> https://blog.csdn.net/jackgo73/article/details/113118822
5 poll支持拓展
主线程在监听socket时使用的poll似乎只传入了第一个socket?看下代码:
代码语言:javascript复制poll(&m_poll_info.m_fds[0], m_socket_map.size(), -1);
m_poll_info.m_fds默认会有两个socket,第一个是3306端口绑定的网络套接字,第二个是给定路径的域套接字。poll全部都会监听的:
int poll ( struct pollfd *fdarray , unsigned long nfds , int timeout );
参数:
- fdarray :指向数组的指针,这个数组是由 struct pollfd 作为元素构成的
- nfds :与 select 相同,对应的是所监控的最大文件描述符的个数,使用的时候传入当前最大的文件描述符号 1 即可
- timeout : 在这里可以用来设定 poll 的工作模式 : 阻塞模式, 非阻塞模式, 介于阻塞非阻塞之间的在限定时间内阻塞模式
- INFTIME(-1) 永远等待
- 0 立即返回,不阻塞进程
- >0 等待指定数目的毫秒数
返回值:
- 0 : 没有任何套接字上没有变化
- n(n>0) : 有 n 个套接字上面有变化(来可读数据,有了可写的数据)
- -1 : poll 执行过程中出错
6 总结
- mysqld初始化,主进程先network_init() (1)创建connection_accepter,传入mysqld_socket_listener (2)执行mysqld_socket_listener的setup操作,对网络和域套接字分别初始化,按poll能识别的格式记录到m_poll_info数组中
- mysqld继续初始化,主进程先connection_event_loop() (3)开始监听,执行mysqld_socket_listener的eventloop动作,poll传入上面构造好的结构体m_poll_info开始监听。 (4)监听到了之后,用mgr->process_new_connection(channel_info);处理 (5)进入add_connection()里面优先使用thread cache处理连接,发信号唤醒线程继续处理。