Postgresql源码(25)Postgresql复制冲突的发生和处理逻辑分析

2022-05-12 08:49:36 浏览数 (3)

复制冲突发生

备库起事务在读,主库truncate表,备库复制冲突发生:

代码语言:javascript复制
-- 发生前, 5833是startup,7444是持锁的后台进程。后台进程拿表的读锁和虚拟事务ID的ex锁。
postgres=# select * from pg_locks where pid != pg_backend_pid();
  locktype  | database | relation | page | tuple | virtualxid | transactionid | classid | objid | objsubid | virtualtransaction | pid  |      mode       | granted | fastpath 
------------ ---------- ---------- ------ ------- ------------ --------------- --------- ------- ---------- -------------------- ------ ----------------- --------- ----------
 relation   |    13212 |   102189 |      |       |            |               |         |       |          | 2/8                | 7444 | AccessShareLock | t       | t
 virtualxid |          |          |      |       | 2/8        |               |         |       |          | 2/8                | 7444 | ExclusiveLock   | t       | t
 virtualxid |          |          |      |       | 1/1        |               |         |       |          | 1/0                | 5833 | ExclusiveLock   | t       | t


-- 发生后,stantup进程需要拿8锁AccessExclusiveLock,拿不到开始等锁。
postgres=# select * from pg_locks where pid != pg_backend_pid();
  locktype  | database | relation | page | tuple | virtualxid | transactionid | classid | objid | objsubid | virtualtransaction | pid  |        mode         | granted | fastpath 
------------ ---------- ---------- ------ ------- ------------ --------------- --------- ------- ---------- -------------------- ------ --------------------- --------- ----------
 virtualxid |          |          |      |       | 2/8        |               |         |       |          | 2/8                | 7444 | ExclusiveLock       | t       | t
 virtualxid |          |          |      |       | 1/1        |               |         |       |          | 1/0                | 5833 | ExclusiveLock       | t       | t
 relation   |    13212 |   102189 |      |       |            |               |         |       |          | 2/8                | 7444 | AccessShareLock     | t       | f
 relation   |    13212 |   102189 |      |       |            |               |         |       |          | 1/0                | 5833 | AccessExclusiveLock | f       | f

startup等待堆栈

代码语言:javascript复制
StartupProcessMain
  StartupXLOG
    standby_redo
      StandbyAcquireAccessExclusiveLock
        LockAcquire
          LockAcquireExtended
            WaitOnLock
              ProcSleep
                ResolveRecoveryConflictWithLock
                  ProcWaitForSignal

startup超时唤醒堆栈

等待时间超过max_standby_streaming_delay后,跳出ProcWaitForSignal重新进入ResolveRecoveryConflictWithLock,走第一个分支

代码语言:javascript复制
ProcSleep
  ResolveRecoveryConflictWithLock
    if (GetCurrentTimestamp() >= ltime && ltime != 0)
      VirtualTransactionId *backends;
      backends = GetLockConflicts(&locktag, AccessExclusiveLock);
      ResolveRecoveryConflictWithVirtualXIDs(backends, PROCSIG_RECOVERY_CONFLICT_LOCK, false)

VirtualTransactionId是什么?

代码语言:javascript复制
typedef struct
{
    BackendId   backendId;                  /* backendId from PGPROC */
    LocalTransactionId localTransactionId;  /* lxid from PGPROC */
} VirtualTransactionId;

backendid和当前会话私有的一个事务ID(类似事务ID自增,从0开始,当前会话结束后清空)

GetLockConflicts怎么拿到和传入locktag冲突的vxid?

代码语言:javascript复制
VirtualTransactionId *
GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
{ ... }



(gdb) p *locktag //其中 13212 是库oid  102189 是表oid
$5 = {locktag_field1 = 13212, locktag_field2 = 102189, locktag_field3 = 0, locktag_field4 = 0, locktag_type = 0 '00', locktag_lockmethodid = 1 '01'}

(gdb) p lockmode
$6 = 8
  
#define NoLock					         0
#define AccessShareLock			     1	/* SELECT */
#define RowShareLock			       2	/* SELECT FOR UPDATE/FOR SHARE */
#define RowExclusiveLock		     3	/* INSERT, UPDATE, DELETE */
#define ShareUpdateExclusiveLock 4	/* VACUUM (non-FULL),ANALYZE, CREATE INDEX * CONCURRENTLY */
#define ShareLock				         5	/* CREATE INDEX (WITHOUT CONCURRENTLY) */
#define ShareRowExclusiveLock	   6	/* like EXCLUSIVE MODE, but allows ROW * SHARE */
#define ExclusiveLock		         7	/* blocks ROW SHARE/SELECT...FOR UPDATE */
#define AccessExclusiveLock		   8	/* ALTER TABLE, DROP TABLE, VACUUM FULL,* and unqualified LOCK TABLE */

fastpath

代码语言:javascript复制
if (ConflictsWithRelationFastPath(locktag, lockmode))
  
  for (i = 0; i < ProcGlobal->allProcCount; i  )
    ...
    /* 排除当前PROC、排除非目标库、排除非目标表 */
    ...
    /* 16个FASTPATH槽位 */
    for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f  )
      /* proc->fpRelId中可以记录16个表OID,遍历一遍是否命中 */
      if (relid != proc->fpRelId[f])
        continue;
      
      /* 冲突发生了,从proc中拿到vxid信息 */
      GET_VXID_FROM_PGPROC(vxid, *proc);
      if (VirtualTransactionIdIsValid(vxid))
        vxids[count  ] = vxid;
      break;

遍历常规锁

代码语言:javascript复制
	while (proclock)
	{
		if (conflictMask & proclock->holdMask)
		{
			PGPROC	   *proc = proclock->tag.myProc;

			/* A backend never blocks itself */
			if (proc != MyProc)
			{
				VirtualTransactionId vxid;

				GET_VXID_FROM_PGPROC(vxid, *proc);

				if (VirtualTransactionIdIsValid(vxid))
				{
					int			i;

					/* Avoid duplicate entries. */
					for (i = 0; i < fast_count;   i)
						if (VirtualTransactionIdEquals(vxids[i], vxid))
							break;
					if (i >= fast_count)
						vxids[count  ] = vxid;
				}
				/* else, xact already committed or aborted */
			}
		}

		proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
											 offsetof(PROCLOCK, lockLink));
	}

记录结果

代码语言:javascript复制
(gdb) p * vxids
$5 = {backendId = 2, localTransactionId = 17}

拿到txid后如何处理?

超时后会 kill冲突的backend,注意 kill的信号是sig_usr1,kill的pid是持锁的那个backend

代码语言:javascript复制
static void
ResolveRecoveryConflictWithVirtualXIDs(
  VirtualTransactionId *waitlist,    // 冲突的vxid
  ProcSignalReason reason,           // PROCSIG_RECOVERY_CONFLICT_LOCK
  bool report_waiting)               
{ 
  ...
  /* wait until the virtual xid is gone */
  while (!VirtualXactLock(*waitlist, false))
    ...
    /* Is it time to kill it? */
    if (WaitExceedsMaxStandbyDelay())
      ...
      CancelVirtualTransaction
        SignalVirtualTransaction
          for (index = 0; index < arrayP->numProcs; index  )
            if (procvxid.backendId == vxid.backendId && procvxid.localTransactionId == vxid.localTransactionId)
              SendProcSignal
                ...
                slot = &ProcSignalSlots[backendId - 1];
                slot->pss_signalFlags[reason] = true;
                ...
                kill(pid, SIGUSR1);

1 人点赞