PostgreSQL进程功能源码分析

2022-08-17 13:23:21 浏览数 (1)

Postgres服务端后台进程功能概览

  • 在PG14版本中定义了如上图中的进程基本的描述信息
代码语言:javascript复制
// 这里定义了PG基本的进程类型const char *GetBackendTypeDesc(BackendType backendType){
	const char *backendDesc = "unknown process type";

	switch (backendType)
	{
		case B_INVALID:
			backendDesc = "not initialized";
			break;
		case B_AUTOVAC_LAUNCHER:
			backendDesc = "autovacuum launcher";
			break;
		case B_AUTOVAC_WORKER:
			backendDesc = "autovacuum worker";
			break;
		case B_BACKEND:
			backendDesc = "client backend";
			break;
		case B_BG_WORKER:
			backendDesc = "background worker";
			break;
		case B_BG_WRITER:
			backendDesc = "background writer";
			break;
		case B_CHECKPOINTER:
			backendDesc = "checkpointer";
			break;
		case B_STARTUP:
			backendDesc = "startup";
			break;
		case B_WAL_RECEIVER:
			backendDesc = "walreceiver";
			break;
		case B_WAL_SENDER:
			backendDesc = "walsender";
			break;
		case B_WAL_WRITER:
			backendDesc = "walwriter";
			break;
		case B_ARCHIVER:
			backendDesc = "archiver";
			break;
		case B_STATS_COLLECTOR:
			backendDesc = "stats collector";
			break;
		case B_LOGGER:
			backendDesc = "logger";
			break;
	}

	return backendDesc;}

后台进程启动流程

  • postgres数据库启动后会在ServerLoop中不断的监听来自客户端的第一次IO请求,然后在创建客户端进程,接着判断整个postgres中的辅助的后台进程是否存在,如果不存在会自动拉起这个进程。这种模式和oracle类似
代码语言:javascript复制
// PG启动后的非常核心的监听循环static int ServerLoop(void){

	nSockets = initMasks(&readmask);
	// for循环监听是否有来自客户端的请求
	for (;;)
	{
		fd_set		rmask;
		int			selres;
		time_t		now;
		// 这里采用selelct模型,因为有客户端就有创建进程,select的模型是for循环遍历,可以优化为epoll模型
		selres = select(nSockets, &rmask, NULL, NULL, &timeout);
	
		if (selres > 0)
		{
			int			i;

			for (i = 0; i < MAXLISTEN; i  )
			{
				// 如果有新客户端进来则创建新的后台进程,专门为客户端服务
				if (FD_ISSET(ListenSocket[i], &rmask))
				{
					Port	   *port;
					// 创建为客户端服务的进程
					port = ConnCreate(ListenSocket[i]);
					if (port)
					{
						// 启动后台进程
						BackendStartup(port);
						StreamClose(port->sock);
						ConnFree(port);
					}
				}
			}
		}

		// 如果日志进程不存在,且开启了日志进程则进行创建
		if (SysLoggerPID == 0 && Logging_collector)
			SysLoggerPID = SysLogger_Start();

		// 检查checkpoint进程和后台磁盘写进程,如果不存在则创建
		if (pmState == PM_RUN || pmState == PM_RECOVERY ||
			pmState == PM_HOT_STANDBY)
		{
			if (CheckpointerPID == 0)
				CheckpointerPID = StartCheckpointer();
			if (BgWriterPID == 0)
				BgWriterPID = StartBackgroundWriter();
		}

		// 如果wal写进程不存在,则创建wal写进程
		if (WalWriterPID == 0 && pmState == PM_RUN)
			WalWriterPID = StartWalWriter();

		// 判断vacuum的守护进程,如果不存在则启动
		if (!IsBinaryUpgrade && AutoVacPID == 0 &&
			(AutoVacuumingActive() || start_autovac_launcher) &&
			pmState == PM_RUN)
		{
			AutoVacPID = StartAutoVacLauncher();
			if (AutoVacPID != 0)
				start_autovac_launcher = false; /* signal processed */
		}

		// 判断统计信息收集进程不存在,则启动统计信息收集进程
		if (PgStatPID == 0 &&
			(pmState == PM_RUN || pmState == PM_HOT_STANDBY))
			PgStatPID = pgstat_start();

		// 如果配置归档进程,如果不存在则启动归档进程
		if (PgArchPID == 0 && PgArchStartupAllowed())
			PgArchPID = StartArchiver();

		

		// 如果是备库,则启动wal receiver进程
		if (WalReceiverRequested)
			MaybeStartWalReceiver();

		// 启动其他的辅助进程,如果存在crash的情况
		if (StartWorkerNeeded || HaveCrashedWorker)
			maybe_start_bgworkers();

		// 计算时间的间隔
		now = time(NULL);
		if (now - last_touch_time >= 58 * SECS_PER_MINUTE)
		{
			TouchSocketFiles();
			TouchSocketLockFiles();
			last_touch_time = now;
		}
	}}

autovacuum launcher主要做了什么?

  • autovacuum launcher进程可以理解为vacuum进程的守护进程,根据参数配置和负载动态的创建vacuum进程,这个核心逻辑在AutoVacLauncherMain函数中
代码语言:javascript复制
NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]){
	
	if (!AutoVacuumingActive())
	{
		// 如果么有stop的请求,就直接创建vacuum工作进程
		if (!ShutdownRequestPending)
			do_start_worker();
		proc_exit(0);			/* done */
	}}

background worker主要做了什么?

  • 后台刷脏进程核心工作是从shard buffer pool中把脏的page刷新到磁盘,目的是尽可能的利用好shard buffer pool的内存缓冲区。后台刷脏的核心工作定义在BackgroundWriterMain
代码语言:javascript复制
void BackgroundWriterMain(void){
	// 核心的loop,不断的同步脏page到磁盘
	for (;;)
	{
		bool		can_hibernate;
		int			rc;
		ResetLatch(MyLatch);
		// 刷脏的动作
		can_hibernate = BgBufferSync(&wb_context);

	
		// 发送统计数据给统计信息的进程
		pgstat_send_bgwriter();

		// 如果没有脏page,就休眠一会
		rc = WaitLatch(MyLatch,
					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
					   BgWriterDelay /* ms */ , WAIT_EVENT_BGWRITER_MAIN);
		prev_hibernate = can_hibernate;
	}}

autovacuum worker主要做了什么?

  • vacuum进程的主要目的是回收当前数据库中的已经废弃或者我用的数据,目前有两种模式,一种是回收无用page的空间并不归还给操作系统,这样设计为后续再插入新的page就不用分配空间;第二种是扫描表中无用的page,重新写到新的page中,然后把无用的page的空间归还给操作系统。
代码语言:javascript复制
// 启动vacuum进程回收无用的page空间static Oid do_start_worker(void){


	// 获取当前的统计信息
	autovac_refresh_stats();

	// 获取数据库列表
	dblist = get_database_list();

	// 查找可以被vacuum的数据库
	foreach(cell, dblist)
	{
		avw_dbase  *tmp = lfirst(cell);
		dlist_iter	iter;
		skipit = false;
		dlist_reverse_foreach(iter, &DatabaseList)
		{
			avl_dbase  *dbp = dlist_container(avl_dbase, adl_node, iter.cur);

			if (dbp->adl_datid == tmp->adw_datid)
			{
			
				if (!TimestampDifferenceExceeds(dbp->adl_next_worker,
												current_time, 0) &&
					!TimestampDifferenceExceeds(current_time,
												dbp->adl_next_worker,
												autovacuum_naptime * 1000))
					skipit = true;

				break;
			}
		}
		if (skipit)
			continue;
		if (avdb == NULL ||
			tmp->adw_entry->last_autovac_time < avdb->adw_entry->last_autovac_time)
			avdb = tmp;
	}


	// 查找到数据库然后进行处理
	if (avdb != NULL)
	{
		WorkerInfo	worker;
		dlist_node *wptr;

		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);

	
		wptr = dlist_pop_head_node(&AutoVacuumShmem->av_freeWorkers);

		worker = dlist_container(WorkerInfoData, wi_links, wptr);
		worker->wi_dboid = avdb->adw_datid;
		worker->wi_proc = NULL;
		worker->wi_launchtime = GetCurrentTimestamp();

		AutoVacuumShmem->av_startingWorker = worker;

		LWLockRelease(AutovacuumLock);

		SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);

		retval = avdb->adw_datid;
	}

	return retval;}

checkpointer主要做了什么?

  • 假设数据库不断的写wal日志,很多脏page都没有刷新到磁盘,一旦数据库crash,需要从头开始进行数据库恢复,这样数据库恢复的时间非常长;如果有了checkpointer进程定期涉及到wal日志文件中,缓存池中的脏page,定期刷新到磁盘,并在wal中做好记录说明脏page刷到哪个位置,即使数据库崩溃,可以从上一次的checkpointer点进行恢复,这样能大大减少数据库恢复的时间。
代码语言:javascript复制
// checkpoint进程的工作入口void CheckpointerMain(void){

	/*
	 * Loop forever
	 */
	for (;;)
	{
		bool		do_checkpoint = false;
		int			flags = 0;
		pg_time_t	now;
		int			elapsed_secs;
		int			cur_timeout;

		/* Clear any already-pending wakeups */
		ResetLatch(MyLatch);

		/*
		 * Process any requests or signals received recently.
		 */
		AbsorbSyncRequests();
		HandleCheckpointerInterrupts();

		/*
		 * Detect a pending checkpoint request by checking whether the flags
		 * word in shared memory is nonzero.  We shouldn't need to acquire the
		 * ckpt_lck for this.
		 */
		if (((volatile CheckpointerShmemStruct *) CheckpointerShmem)->ckpt_flags)
		{
			do_checkpoint = true;
			BgWriterStats.m_requested_checkpoints  ;
		}

		/*
		 * Force a checkpoint if too much time has elapsed since the last one.
		 * Note that we count a timed checkpoint in stats only when this
		 * occurs without an external request, but we set the CAUSE_TIME flag
		 * bit even if there is also an external request.
		 */
		now = (pg_time_t) time(NULL);
		elapsed_secs = now - last_checkpoint_time;
		if (elapsed_secs >= CheckPointTimeout)
		{
			if (!do_checkpoint)
				BgWriterStats.m_timed_checkpoints  ;
			do_checkpoint = true;
			flags |= CHECKPOINT_CAUSE_TIME;
		}

		/*
		 * Do a checkpoint if requested.
		 */
		if (do_checkpoint)
		{
			bool		ckpt_performed = false;
			bool		do_restartpoint;

			// 是否需要进行recovery
			do_restartpoint = RecoveryInProgress();

			// 执行检查点行为
			if (!do_restartpoint)
			{
				CreateCheckPoint(flags);
				ckpt_performed = true;
			}
			else
				ckpt_performed = CreateRestartPoint(flags);

			
			if (ckpt_performed)
			{
				last_checkpoint_time = now;
			}
			else
			{
				last_checkpoint_time = now - CheckPointTimeout   15;
			}

			ckpt_active = false;
		}

		// 必要的情况下切换xlog
		CheckArchiveTimeout();

		// 发送统计信息给后台刷脏进程
		pgstat_send_bgwriter();


		// 发送wal的统计信息给统计信息进程
		pgstat_send_wal(true);

		// 完成后短暂休眠
		(void) WaitLatch(MyLatch,
						 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
						 cur_timeout * 1000L /* convert to ms */ ,
						 WAIT_EVENT_CHECKPOINTER_MAIN);
	}}

walwriter主要做了什么?

  • postgre数据写到内存之前是先把数据写到wal日志,wal日志是write-ahead-log。目的是为了防止内存中的已提交或者未提交的page掉电而引起数据丢失。wal写进程是不断的把wal buffer中的日志数据不断的刷盘到wal日志文件中。
代码语言:javascript复制
// wal日志写进程核心逻辑void WalWriterMain(void){
	// Wal进程的LOOP循环
	for (;;)
	{
		long		cur_timeout;
		// 后台刷新Wal buffer中的数据到Wal日志
		if (XLogBackgroundFlush())
			left_till_hibernate = LOOPS_UNTIL_HIBERNATE;
		else if (left_till_hibernate > 0)
			left_till_hibernate--;

		// 发送Wal的日志统计信息给统计信息进程
		pgstat_send_wal(false);
		// 休眠短暂时间
		(void) WaitLatch(MyLatch,
						 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
						 cur_timeout,
						 WAIT_EVENT_WAL_WRITER_MAIN);
	}}

0 人点赞