相关 《Postgresql源码(60)事务系统总结》 《Postgresql源码(75)notify与listen执行流程分析》
顺着看事务提交时发现PG有异步消息队列的功能,这里试着分析总结。
0 总结速查
两句话总结:
- notify将msg追加到slru消息队列,发信号通知。
- listen注册监听人backend到监听队列,每个监听者消费,并自己记录消费位置。
Listen监听:
CommitTransaction->PreCommit_Notifybackend
在事务提交时执行listen,把自己注册进入AsyncQueueControl->backend数组中的一个位置。- 在数组中表示自己已在监听队列中,且在监听队列的结构会记录自己当前消费到的位置。
- 一个后端进程占用队列一个位置,多次执行Listen不会占用新的位置,同一个backend db,只能使用一个位置。
- 监听队列是SLRU结构,所以指向监听队列的指针为
{page, offset}
。
notify通知:
- DDL记录通知信息(不通知)。
CommitTransaction --> PreCommit_Notify
事务提交时将记录的notify追加到消息队列。CommitTransaction --> AtCommit_Notify
事务提交时kill sigusr1通知其他进程。
消息队列:
- 使用通用SLRU结构,也会标记为脏、也会正常淘汰落盘。参考之前写过的SLRU页面分析(CLOG、SUBTRANS等)。
- 总控结构AsyncQueueControl->head端新增,AsyncQueueControl->tail端消费。
- 注意:消息队列虽然使用SLRU结构,但不持久化,只是在内存页面不够用的时候,用LRU换出到磁盘。
1 背景
Listen:
- 监听语句如果在事务内,listen执行后不能拿到通知信息,必须等待事务提交;注意事务提交后,会拿到所有listen语句后的通知。
- 监听必须在notify之前,如果notify时没有监听,消息收不到。
- 监听如果在psql执行,只在任何语句执行完时收到通知,如没有语句执行不会收到通知。
- 监听如果使用API,例如libpq的PQnotifies函数,可以立即收到通知进行处理。
Notify:
- 通知语句的事务必须提交才会生效。
- 通知是异步的,记录在队列中,每次监听会收到队列中累加的所有消息,PG保证收到的顺序和发送顺序一致。
2 使用案例
2.1 PSQL
代码语言:javascript复制-- session 1
postgres=# listen ch1;
LISTEN
-- session 2
postgres=# listen ch1;
LISTEN
-- session 3
postgres=# notify ch1;
NOTIFY
postgres=# notify ch1;
NOTIFY
-- session 1
postgres=# select 1;
?column?
----------
1
(1 row)
Asynchronous notification "ch1" received from server process with PID 1837.
Asynchronous notification "ch1" received from server process with PID 1837.
-- session 2
postgres=# select 1;
?column?
----------
1
(1 row)
Asynchronous notification "ch1" received from server process with PID 1837.
Asynchronous notification "ch1" received from server process with PID 1837.
2.2 LIBPQ使用案例
https://www.postgresql.org/docs/14/libpq-example.html#LIBPQ-EXAMPLE-2
3 内核代码分析
3.1 listen监听
先放总结
- backend在事务提交时执行listen,把自己注册进入AsyncQueueControl->backend数组中的一个位置。
- 在数组中表示自己已在监听队列中,且在监听队列的结构会记录自己当前消费到的位置。
- 一个后端进程占用队列一个位置,多次执行Listen不会占用新的位置,同一个backend db,只能使用一个位置。
- 监听队列是SLRU结构,所以指向监听队列的指针为
{page, offset}
。
Async_Listen进入位置
代码语言:javascript复制exec_simple_query
PortalRun
PortalRunMulti
PortalRunUtility
ProcessUtility
standard_ProcessUtility
Async_Listen
queue_listen
listen属于DDL,也是跟着事务提交才会生效,所以函数调用嵌在事务系统中。
listen调用Async_Listen登记Listen信息,只把action(三种类型:listen、unlisten、unlisten all)记录在pendingActions中。
在语句结尾的事务状态机流转函数中,如果是事务提交状态,会走入CommitTransaction进行事务提交的具体工作。
在事务提交时,调用PreCommit_Notify函数:
代码语言:javascript复制void
PreCommit_Notify(void)
{
...
if (pendingActions != NULL)
{
foreach(p, pendingActions->actions)
{
ListenAction *actrec = (ListenAction *) lfirst(p);
switch (actrec->action)
{
case LISTEN_LISTEN:
Exec_ListenPreCommit();
break;
case LISTEN_UNLISTEN:
/* there is no Exec_UnlistenPreCommit() */
break;
case LISTEN_UNLISTEN_ALL:
/* there is no Exec_UnlistenAllPreCommit() */
break;
}
}
}
异步队列的数据结构
代码语言:javascript复制typedef struct AsyncQueueControl
{
QueuePosition head; /* head points to the next free location */
QueuePosition tail; /* tail must be <= the queue position of every
* listening backend */
int stopPage; /* oldest unrecycled page; must be <=
* tail.page */
BackendId firstListener; /* id of first listener, or InvalidBackendId */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
/* backend[0] is not used; used entries are from [1] to [MaxBackends] */
} AsyncQueueControl;
typedef struct QueueBackendStatus
{
int32 pid; /* either a PID or InvalidPid */
Oid dboid; /* backend's database OID, or InvalidOid */
BackendId nextListener; /* id of next listener, or InvalidBackendId */
QueuePosition pos; /* backend has read queue up to here */
} QueueBackendStatus;
typedef struct QueuePosition
{
int page; /* SLRU page number */
int offset; /* byte offset within page */
} QueuePosition;
static AsyncQueueControl *asyncQueueControl;
#define QUEUE_HEAD (asyncQueueControl->head)
#define QUEUE_TAIL (asyncQueueControl->tail)
#define QUEUE_STOP_PAGE (asyncQueueControl->stopPage)
#define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener)
#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
#define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener)
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
完成监听过程
注意拿到的消费起点位置是:max(控制结构记录的TAIL,其他所有进程消费到的最新位置)
Exec_ListenPreCommit
if (amRegisteredListener)
return;
head = QUEUE_HEAD;
max = QUEUE_TAIL;
prevListener = InvalidBackendId;
for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
{
if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
// 拿到消费位置,从全局信息取QUEUE_TAIL,或从每个backend消费到的最大位置取。
max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
/* Also find last listening backend before this one */
if (i < MyBackendId)
prevListener = i;
}
QUEUE_BACKEND_POS(MyBackendId) = max;
QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId;
// 后插入监听队列
if (prevListener > 0)
{
QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_NEXT_LISTENER(prevListener);
QUEUE_NEXT_LISTENER(prevListener) = MyBackendId;
}
else
{
QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_FIRST_LISTENER;
QUEUE_FIRST_LISTENER = MyBackendId;
}
LWLockRelease(NotifyQueueLock);
3.2 notify通知
第一步:DDL记录通知信息(不通知)
代码语言:javascript复制Async_Notify
// 拼接 Notification n = {channel_len = 3, payload_len = 0, data = 0x2a0ab84 "ch1"}
// 挂在 pendingNotifies->events后 = list_make1(n)
第二步:PreCommit_Notify事务提交时append to 消息队列
CommitTransaction --> PreCommit_Notify --> asyncQueueAddEntries
static ListCell *
asyncQueueAddEntries(ListCell *nextNotify)
{
AsyncQueueEntry qe;
QueuePosition queue_head;
int pageno;
int offset;
int slotno;
/* We hold both NotifyQueueLock and NotifySLRULock during this operation */
LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE);
SLRU标准接口拿消息队列页面SimpleLruZeroPage
代码语言:javascript复制 queue_head = QUEUE_HEAD;
pageno = QUEUE_POS_PAGE(queue_head);
if (QUEUE_POS_IS_ZERO(queue_head))
slotno = SimpleLruZeroPage(NotifyCtl, pageno);
else
slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
InvalidTransactionId);
使用slru标准结构,会刷脏落盘。
代码语言:javascript复制 NotifyCtl->shared->page_dirty[slotno] = true;
while (nextNotify != NULL)
{
Notification *n = (Notification *) lfirst(nextNotify);
/* Construct a valid queue entry in local variable qe */
asyncQueueNotificationToEntry(n, &qe);
offset = QUEUE_POS_OFFSET(queue_head);
当前页面能装得下,可以把nextNotify指向下一条了。
代码语言:javascript复制 if (offset qe.length <= QUEUE_PAGESIZE)
{
/* OK, so advance nextNotify past this item */
nextNotify = lnext(pendingNotifies->events, nextNotify);
}
当前页面装不下,length把剩下的装满,dboid=InvalidOid用于标记无效。
代码语言:javascript复制 else
{
qe.length = QUEUE_PAGESIZE - offset;
qe.dboid = InvalidOid;
qe.data[0] = '