聊聊PostgreSQL事务ID

2022-08-17 12:57:05 浏览数 (1)

聊聊PostgreSQL事务ID

事务隔离级别

事务状态

  • PostgreSQL事务执行可以理解为是一个有限状态机的执行,每个语句进入不同的执行阶段会有该阶段的状态。预计执行的过程中,事务状态不断的改变,直到事务commitrollback.如下是有限状态机中状态说明
代码语言:javascript复制
// 事务的执行的状态
typedef enum TransState
{
	// 没有事务运行时候的状态
	TRANS_DEFAULT,				/* idle */
	// 事务开始时候的状态
	TRANS_START,				/* transaction starting */
	// 事务进入处理函数时候的状态
	TRANS_INPROGRESS,			/* inside a valid transaction */
	// 事务提交处理函数的状态
	TRANS_COMMIT,				/* commit in progress */
	// 事务进入回滚阶段,调用资源清理处理函数的状态
	TRANS_ABORT,				/* abort in progress */
	// 进入两阶段提交时候的事务 状态
	TRANS_PREPARE				/* prepare in progress */
} TransState;

// 事务的语句块的状态的定义,这些语句块的状态变更依赖于事务的事务执行的状态
typedef enum TBlockState
{
	// 事务块的默认状态,事务开始之前或者结束之后都是出于这个状态
	TBLOCK_DEFAULT,				/* idle */
	// 开始进入事务块的状态
	TBLOCK_STARTED,				/* running single-query transaction */

	// 事务块是以begin命令开始
	TBLOCK_BEGIN,				/* starting transaction block */
	// 事务块命令执行完毕后的状态
	TBLOCK_INPROGRESS,			/* live transaction */
	// 隐式事务命令执行完毕的状态
	TBLOCK_IMPLICIT_INPROGRESS, /* live transaction after implicit BEGIN */
	// 并行事务命令执行完毕的状态
	TBLOCK_PARALLEL_INPROGRESS, /* live transaction inside parallel worker */
	// 事务提交时候的状态
	TBLOCK_END,					/* COMMIT received */
	// 事务执行发生SQL错误,停止后面事务命令执行,设置状态为abort
	TBLOCK_ABORT,				/* failed xact, awaiting ROLLBACK */
	// 事务出于abort状态后,后续有不断的SQL语句,这些语句不会执行成功,这时候设置abort end状态
	TBLOCK_ABORT_END,			/* failed xact, ROLLBACK received */
	// 事务执行过程中,显式执行rollback,事务是由TBLOCK_INPROGRESS更改为TBLOCK_ABORT_PENDING
	TBLOCK_ABORT_PENDING,		/* live xact, ROLLBACK received */
	// 执行pg的二阶段提交的事务prepare_transaction命令,进入事务块时候的状态
	TBLOCK_PREPARE,				/* live xact, PREPARE received */

	/* 如下是子事务的状态 */
	TBLOCK_SUBBEGIN,			/* starting a subtransaction */
	TBLOCK_SUBINPROGRESS,		/* live subtransaction */
	TBLOCK_SUBRELEASE,			/* RELEASE received */
	TBLOCK_SUBCOMMIT,			/* COMMIT received while TBLOCK_SUBINPROGRESS */
	TBLOCK_SUBABORT,			/* failed subxact, awaiting ROLLBACK */
	TBLOCK_SUBABORT_END,		/* failed subxact, ROLLBACK received */
	TBLOCK_SUBABORT_PENDING,	/* live subxact, ROLLBACK received */
	TBLOCK_SUBRESTART,			/* live subxact, ROLLBACK TO received */
	TBLOCK_SUBABORT_RESTART		/* failed subxact, ROLLBACK TO received */
} TBlockState;
  • PostgreSQL中针对针对只读事务不会去申请事务ID,但是会在涉及更改操作的情况下才会申请事务ID,只读事务通过快照机制判断判断元组的可见性,也不需要为只读事务产生事务日志。事务的ID的分配是在GetNewTransactionId中进行,事务ID的全局计数器保存在struct VariableCacheData中,每次申请成功都会自增。PG的事务ID是一个无符号32位的整数,当整个事务执行过程中,事务ID不断的消耗,当消耗到一定的程度事务ID就会回卷。简单的可以理解为事务ID是一个环,使用PG的vacuum命令进行回收事务ID,被回收的可以被二次使用。
  • PG为了保证事务ID的重组,会在struct VariableCacheData中保存对个限制变量,在事务ID分配时候会去和这些变量比较,比较时候达到一定的条件就会触发vacuum来回收事务ID.
代码语言:javascript复制
typedef struct VariableCacheData
{
	// 下一个可用的事务ID
	Oid			nextOid;		/* next OID to assign */
	uint32		oidCount;		/* OIDs available before must do XLOG work */

	/*
	 * These fields are protected by XidGenLock.
	 */
	FullTransactionId nextXid;	/* next XID to assign */

	// 集群维度最小的冻结的事务id
	TransactionId oldestXid;	
	// 当事务ID超过这个变量的时候,事务可能执行一次vaccum,这个变量的是一个告警的作用,告诉PG事务ID的回卷已经非常靠近了。
	
	TransactionId xidVacLimit;	
	// 当xidWarnLimit - xidVacLimit =1000000时候会产生告警需要手动执行vacuum,此时无法执行事务ID的申请
	TransactionId xidWarnLimit; 
	// 当xidStopLimit - xidWarnLimit =1000000,产生告警,需要手动执行vacuum,刺水也是分配事务ID的
	TransactionId xidStopLimit; 
	// 事务ID回卷的上限
	TransactionId xidWrapLimit; /* where the world ends */
	Oid			oldestXidDB;	/* database with minimum datfrozenxid */

	/*
	 * These fields are protected by CommitTsLock
	 */
	TransactionId oldestCommitTsXid;
	TransactionId newestCommitTsXid;

	/*
	 * These fields are protected by ProcArrayLock.
	 */
	FullTransactionId latestCompletedXid;	/* newest full XID that has
											 * committed or aborted */

	/*
	 * Number of top-level transactions with xids (i.e. which may have
	 * modified the database) that completed in some form since the start of
	 * the server. This currently is solely used to check whether
	 * GetSnapshotData() needs to recompute the contents of the snapshot, or
	 * not. There are likely other users of this.  Always above 1.
	 */
	uint64		xactCompletionCount;

	/*
	 * These fields are protected by XactTruncationLock
	 */
	TransactionId oldestClogXid;	/* oldest it's safe to look up in clog */

} VariableCacheData;
  • 接下来分析事务ID分配的过程,事务ID分配函数定义在GetNewTransactionId中,参照如下
代码语言:javascript复制
// 函数去除了部分代码,保留了需要说明的代码
FullTransactionId GetNewTransactionId(bool isSubXact)
{
	FullTransactionId full_xid;
	TransactionId xid;

	
	 // 并行操作阶段是不允许分配事务ID
	if (IsInParallelMode())
		elog(ERROR, "cannot assign TransactionIds during a parallel operation");

	// 事务的崩溃恢复中也不能分配事务ID
	if (RecoveryInProgress())
		elog(ERROR, "cannot assign TransactionIds during recovery");

	LWLockAcquire(XidGenLock, LW_EXCLUSIVE);

	full_xid = ShmemVariableCache->nextXid;
	xid = XidFromFullTransactionId(full_xid);
	
	if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit))
	{
		// 从共享内存中获取数据
		TransactionId xidWarnLimit = ShmemVariableCache->xidWarnLimit;
		TransactionId xidStopLimit = ShmemVariableCache->xidStopLimit;
		TransactionId xidWrapLimit = ShmemVariableCache->xidWrapLimit;
		Oid			oldest_datoid = ShmemVariableCache->oldestXidDB;

		LWLockRelease(XidGenLock);

		// 当下一个事务ID被65536取余等于0,则开启vacuum
		if (IsUnderPostmaster && (xid % 65536) == 0)
			SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER);

		// 如果事务ID 等于xidStopLimit-xid,告警事务ID不足,需要手动执行vacuum
		if (IsUnderPostmaster &&
			TransactionIdFollowsOrEquals(xid, xidStopLimit))
		{
			char	   *oldest_datname = get_database_name(oldest_datoid);

			/* complain even if that DB has disappeared */
			if (oldest_datname)
				ereport(ERROR,
						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
						 errmsg("database is not accepting commands to avoid wraparound data loss in database "%s"",
								oldest_datname),
						 errhint("Stop the postmaster and vacuum that database in single-user mode.n"
								 "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
			else
				ereport(ERROR,
						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
						 errmsg("database is not accepting commands to avoid wraparound data loss in database with OID %u",
								oldest_datoid),
						 errhint("Stop the postmaster and vacuum that database in single-user mode.n"
								 "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
		}
		// 如果事务ID 等于xidWarnLimit-xid,告警事务ID不足,需要手动执行vacuum
		else if (TransactionIdFollowsOrEquals(xid, xidWarnLimit))
		{
			char	   *oldest_datname = get_database_name(oldest_datoid);

			/* complain even if that DB has disappeared */
			if (oldest_datname)
				ereport(WARNING,
						(errmsg("database "%s" must be vacuumed within %u transactions",
								oldest_datname,
								xidWrapLimit - xid),
						 errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.n"
								 "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
			else
				ereport(WARNING,
						(errmsg("database with OID %u must be vacuumed within %u transactions",
								oldest_datoid,
								xidWrapLimit - xid),
						 errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.n"
								 "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
		}

		/* Re-acquire lock and start over */
		LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
		full_xid = ShmemVariableCache->nextXid;
		xid = XidFromFullTransactionId(full_xid);
	}
	//. 省略代码  //
	LWLockRelease(XidGenLock);

	return full_xid;
}

0 人点赞