Postgresql源码(36)Latch的原理分析和应用场景

2022-05-12 09:02:37 浏览数 (1)

提到备机startup进程等在这个堆栈里面:

代码语言:javascript复制
#0  0x00007f66aef20913 in __epoll_wait_nocancel () from /lib64/libc.so.6
#1  0x000000000084d7f2 in WaitEventSetWaitBlock (set=0x1ca0dc0, cur_timeout=5000, occurred_events=0x7ffc088b3130, nevents=1) at latch.c:1048
#2  0x000000000084d6cd in WaitEventSetWait (set=0x1ca0dc0, timeout=5000, occurred_events=0x7ffc088b3130, nevents=1, wait_event_info=83886088) at latch.c:1000
#3  0x000000000084ce76 in WaitLatchOrSocket (latch=0x2aaaaac0d254, wakeEvents=25, sock=-1, timeout=5000, wait_event_info=83886088) at latch.c:385
#4  0x000000000084cd57 in WaitLatch (latch=0x2aaaaac0d254, wakeEvents=25, timeout=5000, wait_event_info=83886088) at latch.c:339
#5  0x0000000000551751 in WaitForWALToBecomeAvailable (RecPtr=206024220672, randAccess=0 '00', fetching_ckpt=0 '00', tliRecPtr=206029051848) at xlog.c:12238
#6  0x0000000000550d79 in XLogPageRead (xlogreader=0x1cc9df0, targetPagePtr=206024212480, reqLen=8192, targetRecPtr=206029051848, readBuf=0x1ccae08 "22732005", readTLI=0x1cca69c) at xlog.c:11707
#7  0x0000000000556c1f in ReadPageInternal (state=0x1cc9df0, pageptr=206029045760, reqLen=6112) at xlogreader.c:557
#8  0x0000000000556492 in XLogReadRecord (state=0x1cc9df0, RecPtr=206029051848, errormsg=0x7ffc088b3428) at xlogreader.c:276
#9  0x0000000000542c23 in ReadRecord (xlogreader=0x1cc9df0, RecPtr=0, emode=15, fetching_ckpt=0 '00') at xlog.c:4232
#10 0x00000000005494eb in StartupXLOG () at xlog.c:7350
#11 0x00000000007e1b8b in StartupProcessMain () at startup.c:230
#12 0x000000000055d935 in AuxiliaryProcessMain (argc=2, argv=0x7ffc088b3fd0) at bootstrap.c:426
#13 0x00000000007e0a7c in StartChildProcess (type=StartupProcess) at postmaster.c:5463
#14 0x00000000007db927 in PostmasterMain (argc=1, argv=0x1c9fd80) at postmaster.c:1377
#15 0x0000000000719962 in main (argc=1, argv=0x1c9fd80) at main.c:228

备机使用latch机制等待新日志到来唤醒、处理。

本篇重点分析Wait这一系列Latch相关的函数。(latch.c)

本篇需要一点背景支持:《Postgresql的latch实现中self-pipe trick解决什么问题》

0 总结速查

整体总结:

1、latch的实现(如果支持epoll的话)就是epoll_wait的封装 利用self-pipe,实现等锁唤醒的机制。

2、备机startup会等待recoveryWakeupLatch、POSTMASTER_FD_WATCH两个事件,两个事件其实都是管道的read端,然后用epoll_wait等待。

LF

初始化过程总结:

中会涉及三把latch锁:MyLatch、MyProc->procLatch、XLogCtl->recoveryWakeupLatch

LF

等锁相关总结:

1、WaitForWALToBecomeAvailable会循环调用WaitLatch等锁,具体等三件事情:recoveryWakeupLatch、postmaster_alive_fdsPOSTMASTER_FD_WATCH、超时(请见2.1WaitLatchOrSocket)

2、唤醒后把锁信息全部清理掉,并把epoll_create创建的fd关掉close(set->epoll_fd)。注意不会close监听的那两个fd。

3、在进入新一轮循环WaitLatch。

4、recoveryWakeupLatch在epoll_wait的时候,等的是Pipe的读端,应用了上面提到的self-pipe trick。

LF

等锁WaitLatchOrSocket流程总结:

0、(补充)WL_LATCH_SET时在AddWaitEventToSet中要监听的fd是selfpipe_readfd,也就是上面创建的管道读端,应用self-pipe trick

1、WaitLatchOrSocket完成了epoll的配置和等待

2、WaitLatchOrSocket中增加对&XLogCtl->recoveryWakeupLatch的等待,记录为一个wakeEvents

3、WaitLatchOrSocket中增加对postmaster_alive_fdsPOSTMASTER_FD_WATCH的等待,记录为一个wakeEvents

4、wakeEvents汇总到WaitEventSet中

5、调用epoll_wait等上面两把锁 或 超时唤醒

6、清理WaitEventSet

1 关键数据结构和初始化

数据结构

代码语言:javascript复制
/* typedef in latch.h */
struct WaitEventSet
{
	int			nevents;		/* number of registered events */
	int			nevents_space;	/* maximum number of events in this set */

	/*
	 * Array, of nevents_space length, storing the definition of events this
	 * set is waiting for.
	 */
	WaitEvent  *events;

	/*
	 * If WL_LATCH_SET is specified in any wait event, latch is a pointer to
	 * said latch, and latch_pos the offset in the ->events array. This is
	 * useful because we check the state of the latch before performing doing
	 * syscalls related to waiting.
	 */
	Latch	   *latch;    // 数组记录该set下所有的latch = event
	int			latch_pos;

	int			epoll_fd;
	/* epoll_wait returns events in a user provided arrays, allocate once */
	struct epoll_event *epoll_ret_events;
};

// 每个epoll事件对应一个,也对应一个latch
typedef struct WaitEvent
{
	int			pos;			/* position in the event data structure */
	uint32		events;			/* triggered events */
	pgsocket	fd;				/* socket fd associated with event */
	void	   *user_data;		/* pointer provided in AddWaitEventToSet */
} WaitEvent;

typedef struct Latch
{
	sig_atomic_t is_set;
	bool		is_shared;
	int			owner_pid;
} Latch;

初始化MyLatch指向LocalLatchData

位置:

代码语言:javascript复制
(gdb) bt
#0  InitializeLatchSupport () at latch.c:152
#1  0x00000000009ee63b in InitPostmasterChild () at miscinit.c:198
#2  0x00000000007e0a38 in StartChildProcess (type=StartupProcess) at postmaster.c:5453
#3  0x00000000007db927 in PostmasterMain (argc=1, argv=0xf80d40) at postmaster.c:1377
#4  0x0000000000719962 in main (argc=1, argv=0xf80d40) at main.c:228

在InitPostmasterChild中依次执行三步初始化:

代码语言:javascript复制
InitializeLatchSupport();
MyLatch = &LocalLatchData;
InitLatch(MyLatch);

源码走读

第一步:初始化PIPE,信号到来时用管道唤醒io wait函数(为什么建非阻塞管道参考《Postgresql的latch实现中self-pipe trick解决什么问题》)

代码语言:javascript复制
InitializeLatchSupport
 	pipe(pipefd)
 	fcntl(pipefd[0], F_SETFL, O_NONBLOCK)
	fcntl(pipefd[1], F_SETFL, O_NONBLOCK)
	fcntl(pipefd[0], F_SETFD, FD_CLOEXEC)
	fcntl(pipefd[1], F_SETFD, FD_CLOEXEC)

	selfpipe_readfd = pipefd[0]
	selfpipe_writefd = pipefd[1]
	selfpipe_owner_pid = MyProcPid

第二步:MyLatch = &LocalLatchData,内存指向私有全局变量

第三步:InitLatch

代码语言:javascript复制
void
InitLatch(volatile Latch *latch)
{
	latch->is_set = false;
	latch->owner_pid = MyProcPid;
	latch->is_shared = false;
}

(第三步的另一种方式)

代码语言:javascript复制
InitSharedLatch
    latch->is_set = false;
	  latch->owner_pid = 0;
	  latch->is_shared = true;
OwnLatch
    latch->owner_pid = MyProcPid;
DisownLatch
    latch->owner_pid = 0;

初始化MyProc->procLatch

代码语言:javascript复制
#0  OwnLatch (latch=0x2aaab4df6ea4) at latch.c:291
#1  0x0000000000863e47 in InitAuxiliaryProcess () at proc.c:574
#2  0x000000000055d8a7 in AuxiliaryProcessMain (argc=2, argv=0x7fffffffe0b0) at bootstrap.c:372
#3  0x00000000007e0a7c in StartChildProcess (type=StartupProcess) at postmaster.c:5463
#4  0x00000000007db927 in PostmasterMain (argc=1, argv=0xf80d50) at postmaster.c:1377
#5  0x0000000000719962 in main (argc=1, argv=0xf80d50) at main.c:228

【MyProc->procLatch】在辅助进程初始化中做两步latch初始化:

代码语言:javascript复制
OwnLatch(&MyProc->procLatch);
    latch->owner_pid = MyProcPid;
SwitchToSharedLatch();
    MyLatch = &MyProc->procLatch;
    /* Sets a latch and wakes up anyone waiting on it */
    /* This is cheap if the latch is already set, otherwise not so much */
    SetLatch(MyLatch);
        pg_memory_barrier();
        if (latch->is_set)      // 已经SET了直接返回
            return;
        latch->is_set = true;   // 没SET给SET进去
        如果是自己进程owner:给Pipe发1字节
        如果是其他进程owner:给其他进程发sigusr1
        如果是0进程owner:返回

初始化XLogCtl->recoveryWakeupLatch

将共享latch配置上pid

代码语言:javascript复制
#0  OwnLatch (latch=0x2aaaaac0d254) at latch.c:291
#1  0x000000000054797a in StartupXLOG () at xlog.c:6425
#2  0x00000000007e1b8b in StartupProcessMain () at startup.c:230
#3  0x000000000055d935 in AuxiliaryProcessMain (argc=2, argv=0x7fffffffe0b0) at bootstrap.c:426
#4  0x00000000007e0a7c in StartChildProcess (type=StartupProcess) at postmaster.c:5463
#5  0x00000000007db927 in PostmasterMain (argc=1, argv=0xf80d50) at postmaster.c:1377
#6  0x0000000000719962 in main (argc=1, argv=0xf80d50) at main.c:228

2 等锁相关

总结:

1、WaitForWALToBecomeAvailable会循环调用WaitLatch等锁,具体等三件事情:recoveryWakeupLatch、postmaster_alive_fdsPOSTMASTER_FD_WATCH、超时(请见2.1WaitLatchOrSocket)

2、唤醒后把锁信息全部清理掉,并把epoll_create创建的fd关掉close(set->epoll_fd)。注意不会close监听的那两个fd。

3、在进入新一轮循环WaitLatch。

4、recoveryWakeupLatch在epoll_wait的时候,等的是Pipe的读端,应用了上面提到的self-pipe trick。

分析:

第一次进入等锁堆栈:

代码语言:javascript复制
#0  WaitLatch (latch=0x2aaaaac0d254, wakeEvents=25, timeout=5000, wait_event_info=83886088) at latch.c:339
#1  0x0000000000551751 in WaitForWALToBecomeAvailable (RecPtr=206024220672, randAccess=0 '00', fetching_ckpt=0 '00', 
    tliRecPtr=206029093456) at xlog.c:12238
#2  0x0000000000550d79 in XLogPageRead (xlogreader=0xfaade0, targetPagePtr=206024212480, reqLen=8192, targetRecPtr=206029093456, 
    readBuf=0xfabdf8 "22732005", readTLI=0xfab68c) at xlog.c:11707
#3  0x0000000000556c1f in ReadPageInternal (state=0xfaade0, pageptr=206029086720, reqLen=6760) at xlogreader.c:557
#4  0x0000000000556492 in XLogReadRecord (state=0xfaade0, RecPtr=206029093456, errormsg=0x7fffffffd508) at xlogreader.c:276
#5  0x0000000000542c23 in ReadRecord (xlogreader=0xfaade0, RecPtr=0, emode=15, fetching_ckpt=0 '00') at xlog.c:4232
#6  0x00000000005494eb in StartupXLOG () at xlog.c:7350
#7  0x00000000007e1b8b in StartupProcessMain () at startup.c:230
#8  0x000000000055d935 in AuxiliaryProcessMain (argc=2, argv=0x7fffffffe0b0) at bootstrap.c:426
#9  0x00000000007e0a7c in StartChildProcess (type=StartupProcess) at postmaster.c:5463
#10 0x00000000007db927 in PostmasterMain (argc=1, argv=0xf80d50) at postmaster.c:1377
#11 0x0000000000719962 in main (argc=1, argv=0xf80d50) at main.c:228

WaitForWALToBecomeAvailable函数进入等待事件中:

这里等待的是recoveryWakeupLatch:

代码语言:javascript复制
WaitLatch(
    &XLogCtl->recoveryWakeupLatch,
    WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,  
    5000L, 
    WAIT_EVENT_RECOVERY_WAL_ALL
)

继续调用:
WaitLatchOrSocket(
    latch = &XLogCtl->recoveryWakeupLatch, 
    wakeEvents = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 
    PGINVALID_SOCKET, 
    timeout = 5000L,
    wait_event_info = WAIT_EVENT_RECOVERY_WAL_ALL
)

**注意传入的锁:&XLogCtl->recoveryWakeupLatch

2.1 WaitLatchOrSocket整体等锁流程

先回忆下epoll怎么用? 1、epoll_create1(size)创建epollfd,给的size只是参考值,注意create会占用一个fd 2、epoll_ctl(epollfd上面创建的fd,行为ADD,监听FD,epoll_event监听什么事件) 3、唤醒的nfds = epoll_wait(传入epollfd上面创建的fd,返回唤醒的events,传入监听最大fd数量,传入timeout)

总结:

0、(补充)WL_LATCH_SET时在AddWaitEventToSet中要监听的fd是selfpipe_readfd,也就是上面创建的管道读端,应用self-pipe trick

1、WaitLatchOrSocket完成了epoll的配置和等待

2、WaitLatchOrSocket中增加对&XLogCtl->recoveryWakeupLatch的等待,记录为一个wakeEvents

3、WaitLatchOrSocket中增加对postmaster_alive_fdsPOSTMASTER_FD_WATCH的等待,记录为一个wakeEvents

4、wakeEvents汇总到WaitEventSet中

5、调用epoll_wait等上面两把锁 或 超时唤醒

6、清理WaitEventSet

wakeEvents = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH

代码语言:javascript复制
int
WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
				  long timeout, uint32 wait_event_info)
{
	int			ret = 0;
	int			rc;
	WaitEvent	event;
	WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, 3);
/******************************************************************
CreateWaitEventSet展开:构造WaitEventSet

WaitEventSet *set
          // 内存结构:
set->        
          sz  = MAXALIGN(sizeof(WaitEventSet))                 // 整体分配一个WaitEventSet
set->events->  
          sz  = MAXALIGN(sizeof(WaitEvent) * nevents)          // 每个事件有一个WaitEvent
set->epoll_ret_events->             
          sz  = MAXALIGN(sizeof(struct epoll_event) * nevents) // 要监听的3个事件
set->latch = NULL
set->nevents_space = nevents

set->epoll_fd = epoll_create1(EPOLL_CLOEXEC)                   // 200w个
******************************************************************/          
          

	if (wakeEvents & WL_TIMEOUT)
		Assert(timeout >= 0);
	else
		timeout = -1;

	if (wakeEvents & WL_LATCH_SET)
		AddWaitEventToSet(set, WL_LATCH_SET, PGINVALID_SOCKET,
						  (Latch *) latch, NULL);
/******************************************************************
WL_LATCH_SET会进入这个分支:
AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data)

1、现在的latch={is_set = 0, is_shared = 1 '01', owner_pid = 30877}, 30877是startup的pid
2、开始拼WaitEvent  *event;
  event = &set->events[set->nevents]
  ...
  event->fd = selfpipe_readfd ***********注意这里监控的是管道的读端
  ...
  //set: {nevents = 1, nevents_space = 3, events = 0xf81dd8, latch = 0x2aaaaac0d254, latch_pos = 0, epoll_fd = 7, epoll_ret_events = 0xf81e20}
  //event: {pos = 0, events = 1, fd = 13, user_data = 0x0}
      WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD)
          epoll_event epoll_ev:
              EPOLLERR:表示对应的文件描述符发生错误;
              EPOLLHUP:表示对应的文件描述符被挂断;
              EPOLLIN:表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
              
              epoll_ctl(set->epoll_fd, action, event->fd, &epoll_ev)

******************************************************************/
	if (wakeEvents & WL_POSTMASTER_DEATH && IsUnderPostmaster)
		AddWaitEventToSet(set, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
						  NULL, NULL);
 /******************************************************************
 WL_POSTMASTER_DEATH进入这个分支
 
 和上流程相同,不同的是event->fd = postmaster_alive_fds[POSTMASTER_FD_WATCH]
 ******************************************************************/
 
	if (wakeEvents & WL_SOCKET_MASK)
	{
		int			ev;

		ev = wakeEvents & WL_SOCKET_MASK;
		AddWaitEventToSet(set, ev, sock, NULL, NULL);
	}

	rc = WaitEventSetWait(set, timeout, &event, 1, wait_event_info);
/******************************************************************
开始等待:
类似epoll的函数构造,传入上面构造好的set,可能记录多个event。 传出event唤醒的事件。

进入
rc = WaitEventSetWaitBlock(set, cur_timeout,occurred_events, nevents);
  epoll_wait(set->epoll_fd, set->epoll_ret_events, nevents, cur_timeout)
    等5秒唤醒 rc == 0 return -1;
    

******************************************************************/
	if (rc == 0)
		ret |= WL_TIMEOUT;
	else
	{
		ret |= event.events & (WL_LATCH_SET |
							   WL_POSTMASTER_DEATH |
							   WL_SOCKET_MASK);
	}

	FreeWaitEventSet(set);
/******************************************************************
释放刚刚epoll_create1创建的epoll_fd

close(set->epoll_fd)

释放整体

pfree(set)
******************************************************************/
	return ret;
}

2.2 唤醒&XLogCtl->recoveryWakeupLatch

Startup唤醒

代码语言:javascript复制
/* SIGUSR2: set flag to finish recovery */
StartupProcTriggerHandler
/* SIGHUP: set flag to re-read config file at next convenient time */
StartupProcSigHupHandler
/* SIGTERM: set flag to abort redo and exit */
StartupProcShutdownHandler
    WakeupRecovery

wal receiver唤醒

代码语言:javascript复制
// Wait for startup process to set receiveStart and receiveStartTLI.
WalRcvWaitForStartPosition
// Mark us as STOPPED in shared memory at exit.
WalRcvDie
// Flush the log to disk.
XLogWalRcvFlush
    WakeupRecovery
        SetLatch(&XLogCtl->recoveryWakeupLatch)

0 人点赞