Postgresql源码(75)notify与listen执行流程分析

2022-09-23 15:36:47 浏览数 (1)

相关 《Postgresql源码(60)事务系统总结》 《Postgresql源码(75)notify与listen执行流程分析》

顺着看事务提交时发现PG有异步消息队列的功能,这里试着分析总结。

0 总结速查

两句话总结:

  • notify将msg追加到slru消息队列,发信号通知。
  • listen注册监听人backend到监听队列,每个监听者消费,并自己记录消费位置。

Listen监听:

  • CommitTransaction->PreCommit_Notifybackend事务提交时执行listen,把自己注册进入AsyncQueueControl->backend数组中的一个位置。
  • 在数组中表示自己已在监听队列中,且在监听队列的结构会记录自己当前消费到的位置。
  • 一个后端进程占用队列一个位置,多次执行Listen不会占用新的位置,同一个backend db,只能使用一个位置。
  • 监听队列是SLRU结构,所以指向监听队列的指针为{page, offset}

notify通知:

  • DDL记录通知信息(不通知)。
  • CommitTransaction --> PreCommit_Notify事务提交时将记录的notify追加到消息队列。
  • CommitTransaction --> AtCommit_Notify事务提交时kill sigusr1通知其他进程。

消息队列:

  • 使用通用SLRU结构,也会标记为脏、也会正常淘汰落盘。参考之前写过的SLRU页面分析(CLOG、SUBTRANS等)。
  • 总控结构AsyncQueueControl->head端新增,AsyncQueueControl->tail端消费。
  • 注意:消息队列虽然使用SLRU结构,但不持久化,只是在内存页面不够用的时候,用LRU换出到磁盘。

1 背景

Listen:

  1. 监听语句如果在事务内,listen执行后不能拿到通知信息,必须等待事务提交;注意事务提交后,会拿到所有listen语句后的通知。
  2. 监听必须在notify之前,如果notify时没有监听,消息收不到。
  3. 监听如果在psql执行,只在任何语句执行完时收到通知,如没有语句执行不会收到通知。
  4. 监听如果使用API,例如libpq的PQnotifies函数,可以立即收到通知进行处理。

Notify:

  1. 通知语句的事务必须提交才会生效。
  2. 通知是异步的,记录在队列中,每次监听会收到队列中累加的所有消息,PG保证收到的顺序和发送顺序一致。

2 使用案例

2.1 PSQL

代码语言:javascript复制
-- session 1
postgres=# listen ch1;
LISTEN

-- session 2
postgres=# listen ch1;
LISTEN

-- session 3
postgres=# notify ch1;
NOTIFY
postgres=# notify ch1;
NOTIFY

-- session 1
postgres=# select 1;
 ?column? 
----------
        1
(1 row)

Asynchronous notification "ch1" received from server process with PID 1837.
Asynchronous notification "ch1" received from server process with PID 1837.

-- session 2
postgres=# select 1;
 ?column? 
----------
        1
(1 row)

Asynchronous notification "ch1" received from server process with PID 1837.
Asynchronous notification "ch1" received from server process with PID 1837.

2.2 LIBPQ使用案例

https://www.postgresql.org/docs/14/libpq-example.html#LIBPQ-EXAMPLE-2

3 内核代码分析

3.1 listen监听

先放总结
  • backend在事务提交时执行listen,把自己注册进入AsyncQueueControl->backend数组中的一个位置。
  • 在数组中表示自己已在监听队列中,且在监听队列的结构会记录自己当前消费到的位置。
  • 一个后端进程占用队列一个位置,多次执行Listen不会占用新的位置,同一个backend db,只能使用一个位置。
  • 监听队列是SLRU结构,所以指向监听队列的指针为{page, offset}
Async_Listen进入位置
代码语言:javascript复制
exec_simple_query
  PortalRun
    PortalRunMulti
      PortalRunUtility
        ProcessUtility
          standard_ProcessUtility
            Async_Listen
              queue_listen

listen属于DDL,也是跟着事务提交才会生效,所以函数调用嵌在事务系统中。

listen调用Async_Listen登记Listen信息,只把action(三种类型:listen、unlisten、unlisten all)记录在pendingActions中。

在语句结尾的事务状态机流转函数中,如果是事务提交状态,会走入CommitTransaction进行事务提交的具体工作。

在事务提交时,调用PreCommit_Notify函数:

代码语言:javascript复制
void
PreCommit_Notify(void)
{
    ...
	if (pendingActions != NULL)
	{
		foreach(p, pendingActions->actions)
		{
			ListenAction *actrec = (ListenAction *) lfirst(p);

			switch (actrec->action)
			{
				case LISTEN_LISTEN:
					Exec_ListenPreCommit();
					break;
				case LISTEN_UNLISTEN:
					/* there is no Exec_UnlistenPreCommit() */
					break;
				case LISTEN_UNLISTEN_ALL:
					/* there is no Exec_UnlistenAllPreCommit() */
					break;
			}
		}
	}
异步队列的数据结构
代码语言:javascript复制
typedef struct AsyncQueueControl
{
	QueuePosition head;			/* head points to the next free location */
	QueuePosition tail;			/* tail must be <= the queue position of every
								 * listening backend */
	int			stopPage;		/* oldest unrecycled page; must be <=
								 * tail.page */
	BackendId	firstListener;	/* id of first listener, or InvalidBackendId */
	TimestampTz lastQueueFillWarn;	/* time of last queue-full msg */
	QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
	/* backend[0] is not used; used entries are from [1] to [MaxBackends] */
} AsyncQueueControl;

typedef struct QueueBackendStatus
{
	int32		pid;			/* either a PID or InvalidPid */
	Oid			dboid;			/* backend's database OID, or InvalidOid */
	BackendId	nextListener;	/* id of next listener, or InvalidBackendId */
	QueuePosition pos;			/* backend has read queue up to here */
} QueueBackendStatus;

typedef struct QueuePosition
{
	int			page;			/* SLRU page number */
	int			offset;			/* byte offset within page */
} QueuePosition;

static AsyncQueueControl *asyncQueueControl;

#define QUEUE_HEAD					(asyncQueueControl->head)
#define QUEUE_TAIL					(asyncQueueControl->tail)
#define QUEUE_STOP_PAGE				(asyncQueueControl->stopPage)
#define QUEUE_FIRST_LISTENER		(asyncQueueControl->firstListener)
#define QUEUE_BACKEND_PID(i)		(asyncQueueControl->backend[i].pid)
#define QUEUE_BACKEND_DBOID(i)		(asyncQueueControl->backend[i].dboid)
#define QUEUE_NEXT_LISTENER(i)		(asyncQueueControl->backend[i].nextListener)
#define QUEUE_BACKEND_POS(i)		(asyncQueueControl->backend[i].pos)
完成监听过程

注意拿到的消费起点位置是:max(控制结构记录的TAIL,其他所有进程消费到的最新位置)

代码语言:javascript复制
Exec_ListenPreCommit
  if (amRegisteredListener)
    return;

	head = QUEUE_HEAD;
	max = QUEUE_TAIL;
	prevListener = InvalidBackendId;
	for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
	{
		if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
		    // 拿到消费位置,从全局信息取QUEUE_TAIL,或从每个backend消费到的最大位置取。
			max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
		/* Also find last listening backend before this one */
		if (i < MyBackendId)
			prevListener = i;
	}
	QUEUE_BACKEND_POS(MyBackendId) = max;
	QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
	QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId;
	// 后插入监听队列
	if (prevListener > 0)
	{
		QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_NEXT_LISTENER(prevListener);
		QUEUE_NEXT_LISTENER(prevListener) = MyBackendId;
	}
	else
	{
		QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_FIRST_LISTENER;
		QUEUE_FIRST_LISTENER = MyBackendId;
	}
	LWLockRelease(NotifyQueueLock);

3.2 notify通知

第一步:DDL记录通知信息(不通知)
代码语言:javascript复制
Async_Notify
  // 拼接 Notification n = {channel_len = 3, payload_len = 0, data = 0x2a0ab84 "ch1"}
  // 挂在 pendingNotifies->events后 = list_make1(n)
第二步:PreCommit_Notify事务提交时append to 消息队列

CommitTransaction --> PreCommit_Notify --> asyncQueueAddEntries

代码语言:javascript复制
static ListCell *
asyncQueueAddEntries(ListCell *nextNotify)
{
	AsyncQueueEntry qe;
	QueuePosition queue_head;
	int			pageno;
	int			offset;
	int			slotno;

	/* We hold both NotifyQueueLock and NotifySLRULock during this operation */
	LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE);

SLRU标准接口拿消息队列页面SimpleLruZeroPage

代码语言:javascript复制
	queue_head = QUEUE_HEAD;
	pageno = QUEUE_POS_PAGE(queue_head);
	if (QUEUE_POS_IS_ZERO(queue_head))
		slotno = SimpleLruZeroPage(NotifyCtl, pageno);
	else
		slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
								   InvalidTransactionId);

使用slru标准结构,会刷脏落盘。

代码语言:javascript复制
	NotifyCtl->shared->page_dirty[slotno] = true;

	while (nextNotify != NULL)
	{
		Notification *n = (Notification *) lfirst(nextNotify);

		/* Construct a valid queue entry in local variable qe */
		asyncQueueNotificationToEntry(n, &qe);

		offset = QUEUE_POS_OFFSET(queue_head);

当前页面能装得下,可以把nextNotify指向下一条了。

代码语言:javascript复制
		if (offset   qe.length <= QUEUE_PAGESIZE)
		{
			/* OK, so advance nextNotify past this item */
			nextNotify = lnext(pendingNotifies->events, nextNotify);
		}

当前页面装不下,length把剩下的装满,dboid=InvalidOid用于标记无效。

代码语言:javascript复制
		else
		{
			qe.length = QUEUE_PAGESIZE - offset;
			qe.dboid = InvalidOid;
			qe.data[0] = '';	/* empty channel */
			qe.data[1] = '';	/* empty payload */
		}

拷贝qe到消息队列中:

代码语言:javascript复制
typedef struct AsyncQueueEntry
{
	int			length;			/* total allocated length of entry */
	Oid			dboid;			/* sender's database OID */
	TransactionId xid;			/* sender's XID */
	int32		srcPid;			/* sender's PID */
	char		data[NAMEDATALEN   NOTIFY_PAYLOAD_MAX_LENGTH];
} AsyncQueueEntry;

开始拷贝

代码语言:javascript复制
		memcpy(NotifyCtl->shared->page_buffer[slotno]   offset,
			   &qe,
			   qe.length);

推进head指针

代码语言:javascript复制
		/* Advance queue_head appropriately, and detect if page is full */
		if (asyncQueueAdvance(&(queue_head), qe.length))
		{
			...
		}
	}

控制结构记录head指针asyncQueueControl->head = queue_head

代码语言:javascript复制
	/* Success, so update the global QUEUE_HEAD */
	QUEUE_HEAD = queue_head;

	LWLockRelease(NotifySLRULock);

	return nextNotify;
}
第三步:AtCommit_Notify事务提交时通知其他进程
代码语言:javascript复制
AtCommit_Notify
  SignalBackends
    // 查询asyncQueueControl->backend监听数组,找到监听者
    // 例如两个监听者: 
    // count = 2
    // p pids[0] = 15446
    // p pids[1] = 23101

    // SendProcSignal(15446, PROCSIG_NOTIFY_INTERRUPT)
    // SendProcSignal(23101, PROCSIG_NOTIFY_INTERRUPT)  
第四步:(监听进程)被信号SIGUSR1中断,进入procsignal_sigusr1_handler
代码语言:javascript复制
procsignal_sigusr1_handler
  if (CheckProcSignal(PROCSIG_NOTIFY_INTERRUPT))
    HandleNotifyInterrupt();

HandleNotifyInterrupt函数配置标志位后就退出。notifyInterruptPending = true;

等流程走到信号处理函数在做处理。

0 人点赞