PostgreSQL之进程分析

2022-08-17 12:26:02 浏览数 (1)

基本介绍

什么是PostgreSQL?
  • PosgreSQL是一个 开源、对象关系的数据库系统。目前可以运行在Linux/Unix/Windows平台。支持ACID,内置INTEGER/NUMBERIC/BOOLEAN/CHAR/VARCHAR/DATE/INTERVAL/TIMESTAMP/binary larget objects等数据结构

PostgreSQL有些限制?

PostgreSQL有哪些核心功能?

  • MVCC
  • PITR 时间点恢复
  • 独立表空间和异步复制
  • Nested 事务和online/hot备份
  • 查询计划和优化器
  • 采用WAL机制保证可靠性

架构概览

数据库文件布局

  • base:存储数据库的目录,每个数据一个文件.下面是创建一个sampledb数据库,然后查看base下面的数据库对应的oid文件。sampledb的OID是163984,那么在base目录下就会有一个base/16384的数据库目录
代码语言:javascript复制
$ psql  -h 127.0.0.1   -d postgres
postgres=# CREATE DATABASE sampledb OWNER  perrynzhou;
CREATE DATABASE
postgres=# GRANT ALL PRIVILEGES ON DATABASE sampledb to perrynzhou;
GRANT
postgres=# select oid, datname from pg_database ;
  oid  |  datname  
------- -----------
 12974 | postgres
     1 | template1
 12973 | template0
 16384 | sampledb
(4 rows)
postgres=#

  • global:数据库集群维度的系统表,比如pg_database等
  • pg_commit_ts: 这个目录包含了事务提交的时间戳数据
  • pg_dynshmem: 包含动态共享内存系统使用的文件
  • pg_logical:存储逻辑解码状态码
  • pg_multixact:存储多事务状态码数据
  • pg_notify:包含listen/notify的状态数据
  • pg_replslot:存储复制的slot数据
  • pg_serial:存储序列化事务的提交信息
  • pg_snapshots:存储导出快照的信息
  • pg_stat:包含静态子系统的持久化文件
  • pg_stat_tmp:包含静态系统的临时文件
  • pg_subtrans:包含子事务的状态码信息
  • pg_tblspc:每个表空间的符号链接信息
  • pg_twophase:包含事务准备阶段的状态文件
  • PG_VERSION:记录PostgreSQL的大版本号
  • pg_wal:包含wal的日志文件
  • pg_xact:包含事务提交的状态码信息

PostgreSQL的进程模型

  • PostMaster进程:当PostgreSQL启动后,Postmaster进程会第一个启动,这个进程是PG所有进程的父进程。该进程负责所有控制处理。外部发来的请求第一次都会请求到该进程,由该进程负责给每个请求初始化一个子进程服务。
  • 客户端进程:PostgreSQL启动后主服务的PostMaster主进程启动,负责监听postgresql.conf中的port端口同时初始化整个PostgreSQL的其他的内部进程,每个一个请求到这个端口,主进程会fork一个子进程,根据pg_hba.conf中的配置策略服务或者拒绝这个请求的处理。后续这个客户端子进程会接受SQL语句,然后拿到结果返回给请求的客户端。
  • background writer进程:write进程负责把数据写入到共享内存的cache,在合适的时间定期flush到磁盘。这个进程每次按照 postgresql.conf中的ngwriter_delay来控制每次写的频度;其次是PG在做常规checkpoint时候必须把所有的脏页flush到磁盘。
  • checkpointer 进程:检查点是事务序列号,设置检查点可确保将检查点刷新到磁盘之前的日志信息,也就是说检查点之前的所有wal 日志在PG崩溃之后,是不需要进行恢复的,只需要从检查点之后的所有wal log进行日志恢复。
  • walwriter 进程:wal writer进程负责把wal cache中的日志数据在适合的时间点flush到Wal日志文件。wal log的核心思想是修改数据库文件之前必须先把更改数据记录到log,也就是说wal日志是先于数据文件写入的。
  • archiver 进程:归档进程负责把WAL 日志归档到归档路径中。如果是磁盘损坏,可以从这个归档日志中进行数据恢复。
  • stats collector 进程:该进程负责收集表和磁盘的访问的静态信息,包括表的添加、删除、更改的的数据、data block的数量、索引改变等静态信息,这些信息主要给PG的优化器使用,以便提供更优的执行计划,这些信息也会被autovacuum进程使用。
  • autovacuum launcher进程:在PG中数据表的UPDATE/DELETE操作不是立即删除旧版本数据而是标记为删除,这样做的目的是为了PG的MVCC.当事务提交,旧版本的数据不再需求了,这些数据需要清理腾出空间,这个清理的工作就是有autovacuum进程处理
  • wal sender/wal receiver进程:wal sender和wal receiver进程是PG实现streaming replication的基础。wal sender进程负责把WAL日志文件通过网络传输到send receiver节点;wal receiver进程负责结构wal log日志文件,同时从wal log进行数据恢复的操作。

PostgreSQL SQL执行流程

  • 从前端过来的SQL语句到了对应的服务端fork的客户端进程,经过SQL的词法、语法解析->SQL重写->生成物理执行计划->SQL语句的执行,最终该进程把结果返回给前端,来完成整个SQL语句的请求的生命周期。

代码语言:javascript复制
// port是外部请求信息的封装
Port	   *port;
// 初始化port
port = ConnCreate(ListenSocket[i]);
// 启动一个用来处理外部请求的服务子进程
BackendStartup(port);

static int BackendStartup(Port *port)
{
	Backend    *bn;				/* for backend cleanup */
	pid_t		pid;
	pid = fork_process();
	if (pid == 0)				/* child */
	{
		free(bn);

		/* Detangle from postmaster */
		InitPostmasterChild();

		/* Close the postmaster's sockets */
		ClosePostmasterPorts(false);

		/* Perform additional initialization and collect startup packet */
		BackendInitialize(port);

		// 在子进程中进行初始化和运行外部请求和子进程的交互
		BackendRun(port);
	}

}

static void BackendRun(Port *port)
{
	char	   *av[2];
	const int	ac = 1;

	av[0] = "postgres";
	av[1] = NULL;

	// 子进程的内存初始化
	MemoryContextSwitchTo(TopMemoryContext);
	// 启动一个子进程
	PostgresMain(ac, av, port->database_name, port->user_name);
}


// PG用于处理外部请求的子进程入口函数
void PostgresMain(int argc, char *argv[],
			 const char *dbname,
			 const char *username)
{
	int			firstchar;
	StringInfoData input_message;
	sigjmp_buf	local_sigjmp_buf;
	volatile bool send_ready_for_query = true;
	bool		idle_in_transaction_timeout_enabled = false;
	bool		idle_session_timeout_enabled = false;

	// 子进程的初始化
	InitPostgres(dbname, InvalidOid, username, InvalidOid, NULL, false);

	// 是否配置wal sender
	if (am_walsender)
		InitWalSender();

	for (;;)
	{

		DoingCommandRead = true;
		// 通过TCP读取客户端传过来的数据,该数据封装了PG客户端和该子进程的协议和SQL语句
		firstchar = ReadCommand(&input_message);
 
		DoingCommandRead = false;
 
		// 根据读取到的第一个字符来判断请求类型
		switch (firstchar)
		{
			// 简单SQL语句的处理
			case 'Q':			/* simple query */
				{
					const char *query_string;

					query_string = pq_getmsgstring(&input_message);
					pq_getmsgend(&input_message);

					if (am_walsender)
					{
						if (!exec_replication_command(query_string))
							exec_simple_query(query_string);
					}
					else
						exec_simple_query(query_string);

					send_ready_for_query = true;
				}
				break;
			// SQL语句的协议解析
			case 'P':			/* parse */
				{
					const char *stmt_name;
					const char *query_string;
					int			numParams;
					Oid		   *paramTypes = NULL;

					forbidden_in_wal_sender(firstchar);

					/* Set statement_timestamp() */
					SetCurrentStatementStartTimestamp();

					stmt_name = pq_getmsgstring(&input_message);
					query_string = pq_getmsgstring(&input_message);
					numParams = pq_getmsgint(&input_message, 2);
					if (numParams > 0)
					{
						paramTypes = (Oid *) palloc(numParams * sizeof(Oid));
						for (int i = 0; i < numParams; i  )
							paramTypes[i] = pq_getmsgint(&input_message, 4);
					}
					pq_getmsgend(&input_message);

					exec_parse_message(query_string, stmt_name,
									   paramTypes, numParams);
				}
				break;
			case 'C':			/* close */
				{
					int			close_type;
					const char *close_target;

					forbidden_in_wal_sender(firstchar);

					close_type = pq_getmsgbyte(&input_message);
					close_target = pq_getmsgstring(&input_message);
					pq_getmsgend(&input_message);

					switch (close_type)
					{
						case 'S':
							if (close_target[0] != '')
								DropPreparedStatement(close_target, false);
							else
							{
								/* special-case the unnamed statement */
								drop_unnamed_stmt();
							}
							break;
						case 'P':
							{
								Portal		portal;

								portal = GetPortalByName(close_target);
								if (PortalIsValid(portal))
									PortalDrop(portal, false);
							}
							break;
						default:
							ereport(ERROR,
									(errcode(ERRCODE_PROTOCOL_VIOLATION),
									 errmsg("invalid CLOSE message subtype %d",
											close_type)));
							break;
					}

					if (whereToSendOutput == DestRemote)
						pq_putemptymessage('3');	/* CloseComplete */
				}
				break;

			case 'D':			/* describe */
				{
					int			describe_type;
					const char *describe_target;

					forbidden_in_wal_sender(firstchar);

					/* Set statement_timestamp() (needed for xact) */
					SetCurrentStatementStartTimestamp();

					describe_type = pq_getmsgbyte(&input_message);
					describe_target = pq_getmsgstring(&input_message);
					pq_getmsgend(&input_message);

					switch (describe_type)
					{
						case 'S':
							exec_describe_statement_message(describe_target);
							break;
						case 'P':
							exec_describe_portal_message(describe_target);
							break;
						default:
							ereport(ERROR,
									(errcode(ERRCODE_PROTOCOL_VIOLATION),
									 errmsg("invalid DESCRIBE message subtype %d",
											describe_type)));
							break;
					}
				}
				break;

			case 'H':			/* flush */
				pq_getmsgend(&input_message);
				if (whereToSendOutput == DestRemote)
					pq_flush();
				break;

			case 'S':			/* sync */
				pq_getmsgend(&input_message);
				finish_xact_command();
				send_ready_for_query = true;
				break;

			case EOF:

				pgStatSessionEndCause = DISCONNECT_CLIENT_EOF;

			case 'X':
				if (whereToSendOutput == DestRemote)
					whereToSendOutput = DestNone;

				proc_exit(0);


		}
	}							
}

PostgreSQL进程模型源码分析

  • PostgreSQL进程启动源码,整个逻辑实现是在ServerLoop函数中,PostmasterMain是PG启动的第一个进程,由它来实现其他的内部进程,其他的内部进程是在SeverLoop的实现如下
代码语言:javascript复制
// pg_ctl -D /data/postgres/data -l logfile start执行以后,最终的入口函数就是PostmasterMain函数
void PostmasterMain(int argc, char *argv[])
{

	status = ServerLoop();
}

}
static int ServerLoop(void)
{
	fd_set		readmask;
	int			nSockets;
	time_t		last_lockfile_recheck_time,
				last_touch_time;
	// 初始化数据库的监听socket
	nSockets = initMasks(&readmask);
	// 无限的死循环,不断的监听
	for (;;)
	{
		fd_set		rmask;
		int			selres;
		time_t		now;
		// 这里采用select来监听来自外部的请求,这里有些疑惑为啥不采用epoll模型?
		// select目前仅仅是循环找到哪些fd有IO事件吗,这个在高并发的场景下,效率相对比较低
		selres = select(nSockets, &rmask, NULL, NULL, &timeout);
		if (selres > 0)
		{
			int			i;
 
			for (i = 0; i < MAXLISTEN; i  )
			{
				if (ListenSocket[i] == PGINVALID_SOCKET)
					break;
				if (FD_ISSET(ListenSocket[i], &rmask))
				{
					// Port封装了此次连接的客户端信息
					Port	   *port;
					// 
					port = ConnCreate(ListenSocket[i]);
					if (port)
					{
						// fork一个子进程,用来服务此次来自外部客户端的请求
						BackendStartup(port);
						StreamClose(port->sock);
						ConnFree(port);
					}
				}
			}
		}
		// 如果配置文件中启用了syslog,则启动一个log进程用来写入PG的日志
		if (SysLoggerPID == 0 && Logging_collector)
			SysLoggerPID = SysLogger_Start();

 
		if (pmState == PM_RUN || pmState == PM_RECOVERY ||
			pmState == PM_HOT_STANDBY)
		{
			// 启动checkpoint进程
			if (CheckpointerPID == 0)
				CheckpointerPID = StartCheckpointer();
			// 启动后台的writer进程
			if (BgWriterPID == 0)
				BgWriterPID = StartBackgroundWriter();
		}
		// 启动wal writer进程
		if (WalWriterPID == 0 && pmState == PM_RUN)
			WalWriterPID = StartWalWriter();

		if (!IsBinaryUpgrade && AutoVacPID == 0 &&
			(AutoVacuumingActive() || start_autovac_launcher) &&
			pmState == PM_RUN)
		{
			// 启动auto vacuum进程
			AutoVacPID = StartAutoVacLauncher();
			if (AutoVacPID != 0)
				start_autovac_launcher = false; /* signal processed */
		}
 
		//启动statics collector进程
		if (PgStatPID == 0 &&
			(pmState == PM_RUN || pmState == PM_HOT_STANDBY))
			PgStatPID = pgstat_start();
		// 如果配置了archive进程,则启动该进程
		if (PgArchPID == 0 && PgArchStartupAllowed())
			PgArchPID = StartArchiver();
 
		// 如果配置了复制,同时是slave节点,则启动receive进程
		if (WalReceiverRequested)
			MaybeStartWalReceiver();
 
		//启动后台工作进程
		if (StartWorkerNeeded || HaveCrashedWorker)
			maybe_start_bgworkers();

		now = time(NULL);

}

0 人点赞